hadoop AllocateResponse 源码

  • 2022-10-20
  • 浏览 (189)

haddop AllocateResponse 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.api.protocolrecords;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.util.Records;

/**
 * The response sent by the <code>ResourceManager</code> the
 * <code>ApplicationMaster</code> during resource negotiation.
 * <p>
 * The response, includes:
 * <ul>
 *   <li>Response ID to track duplicate responses.</li>
 *   <li>
 *     An AMCommand sent by ResourceManager to let the
 *     {@code ApplicationMaster} take some actions (resync, shutdown etc.).
 *   </li>
 *   <li>A list of newly allocated {@link Container}.</li>
 *   <li>A list of completed {@link Container}s' statuses.</li>
 *   <li>
 *     The available headroom for resources in the cluster for the
 *     application.
 *   </li>
 *   <li>A list of nodes whose status has been updated.</li>
 *   <li>The number of available nodes in a cluster.</li>
 *   <li>A description of resources requested back by the cluster</li>
 *   <li>AMRMToken, if AMRMToken has been rolled over</li>
 *   <li>
 *     A list of {@link Container} representing the containers
 *     whose resource has been increased.
 *   </li>
 *   <li>
 *     A list of {@link Container} representing the containers
 *     whose resource has been decreased.
 *   </li>
 * </ul>
 * 
 * @see ApplicationMasterProtocol#allocate(AllocateRequest)
 */
@Public
@Stable
public abstract class AllocateResponse {

  @Public
  @Stable
  public static AllocateResponse newInstance(int responseId,
      List<ContainerStatus> completedContainers,
      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
      Resource availResources, AMCommand command, int numClusterNodes,
      PreemptionMessage preempt, List<NMToken> nmTokens) {
    return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
        .responseId(responseId)
        .completedContainersStatuses(completedContainers)
        .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
        .availableResources(availResources).amCommand(command)
        .preemptionMessage(preempt).nmTokens(nmTokens).build();
  }

  @Private
  @Unstable
  public static AllocateResponse newInstance(int responseId,
      List<ContainerStatus> completedContainers,
      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
      Resource availResources, AMCommand command, int numClusterNodes,
      PreemptionMessage preempt, List<NMToken> nmTokens,
      CollectorInfo collectorInfo) {
    return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
        .responseId(responseId)
        .completedContainersStatuses(completedContainers)
        .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
        .availableResources(availResources).amCommand(command)
        .preemptionMessage(preempt).nmTokens(nmTokens)
        .collectorInfo(collectorInfo).build();
  }

  @Private
  @Unstable
  public static AllocateResponse newInstance(int responseId,
      List<ContainerStatus> completedContainers,
      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
      Resource availResources, AMCommand command, int numClusterNodes,
      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
      List<UpdatedContainer> updatedContainers) {
    return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
        .responseId(responseId)
        .completedContainersStatuses(completedContainers)
        .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
        .availableResources(availResources).amCommand(command)
        .preemptionMessage(preempt).nmTokens(nmTokens)
        .updatedContainers(updatedContainers).amRmToken(amRMToken).build();
  }

  @Public
  @Unstable
  public static AllocateResponse newInstance(int responseId,
      List<ContainerStatus> completedContainers,
      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
      Resource availResources, AMCommand command, int numClusterNodes,
      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
      List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
    return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
        .responseId(responseId)
        .completedContainersStatuses(completedContainers)
        .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
        .availableResources(availResources).amCommand(command)
        .preemptionMessage(preempt).nmTokens(nmTokens)
        .updatedContainers(updatedContainers).amRmToken(amRMToken)
        .collectorInfo(collectorInfo).build();
  }

  @Private
  @Unstable
  public static AllocateResponse newInstance(int responseId,
      List<ContainerStatus> completedContainers,
      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
      Resource availResources, AMCommand command, int numClusterNodes,
      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
      List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo,
      EnhancedHeadroom enhancedHeadroom) {
    AllocateResponse response =
        newInstance(responseId, completedContainers, allocatedContainers,
            updatedNodes, availResources, command, numClusterNodes, preempt,
            nmTokens, amRMToken, updatedContainers, collectorInfo);
    response.setEnhancedHeadroom(enhancedHeadroom);
    return response;
  }

  /**
   * If the <code>ResourceManager</code> needs the
   * <code>ApplicationMaster</code> to take some action then it will send an
   * AMCommand to the <code>ApplicationMaster</code>. See <code>AMCommand</code> 
   * for details on commands and actions for them.
   * @return <code>AMCommand</code> if the <code>ApplicationMaster</code> should
   *         take action, <code>null</code> otherwise
   * @see AMCommand
   */
  @Public
  @Stable
  public abstract AMCommand getAMCommand();

  @Private
  @Unstable
  public abstract void setAMCommand(AMCommand command);

  /**
   * Get the <em>last response id</em>.
   * @return <em>last response id</em>
   */
  @Public
  @Stable
  public abstract int getResponseId();

  @Private
  @Unstable
  public abstract void setResponseId(int responseId);

  /**
   * Get the list of <em>newly allocated</em> <code>Container</code> by the
   * <code>ResourceManager</code>.
   * @return list of <em>newly allocated</em> <code>Container</code>
   */
  @Public
  @Stable
  public abstract List<Container> getAllocatedContainers();

  /**
   * Set the list of <em>newly allocated</em> <code>Container</code> by the
   * <code>ResourceManager</code>.
   * @param containers list of <em>newly allocated</em> <code>Container</code>
   */
  @Private
  @Unstable
  public abstract void setAllocatedContainers(List<Container> containers);

  /**
   * Get the <em>available headroom</em> for resources in the cluster for the
   * application.
   * @return limit of available headroom for resources in the cluster for the
   * application
   */
  @Public
  @Stable
  public abstract Resource getAvailableResources();

  @Private
  @Unstable
  public abstract void setAvailableResources(Resource limit);

  /**
   * Get the list of <em>completed containers' statuses</em>.
   * @return the list of <em>completed containers' statuses</em>
   */
  @Public
  @Stable
  public abstract List<ContainerStatus> getCompletedContainersStatuses();

  @Private
  @Unstable
  public abstract void setCompletedContainersStatuses(List<ContainerStatus> containers);

  /**
   * Get the list of <em>updated <code>NodeReport</code>s</em>. Updates could
   * be changes in health, availability etc of the nodes.
   * @return The delta of updated nodes since the last response
   */
  @Public
  @Stable
  public abstract  List<NodeReport> getUpdatedNodes();

  @Private
  @Unstable
  public abstract void setUpdatedNodes(final List<NodeReport> updatedNodes);

  /**
   * Get the number of hosts available on the cluster.
   * @return the available host count.
   */
  @Public
  @Stable
  public abstract int getNumClusterNodes();
  
  @Private
  @Unstable
  public abstract void setNumClusterNodes(int numNodes);

  /**
   * Get the description of containers owned by the AM, but requested back by
   * the cluster. Note that the RM may have an inconsistent view of the
   * resources owned by the AM. These messages are advisory, and the AM may
   * elect to ignore them.
   * <p>
   * The message is a snapshot of the resources the RM wants back from the AM.
   * While demand persists, the RM will repeat its request; applications should
   * not interpret each message as a request for <em>additional</em>
   * resources on top of previous messages. Resources requested consistently
   * over some duration may be forcibly killed by the RM.
   *
   * @return A specification of the resources to reclaim from this AM.
   */
  @Public
  @Evolving
  public abstract PreemptionMessage getPreemptionMessage();

  @Private
  @Unstable
  public abstract void setPreemptionMessage(PreemptionMessage request);

  /**
   * Get the list of NMTokens required for communicating with NM. New NMTokens
   * issued only if
   * <p>
   * 1) AM is receiving first container on underlying NodeManager.<br>
   * OR<br>
   * 2) NMToken master key rolled over in ResourceManager and AM is getting new
   * container on the same underlying NodeManager.
   * <p>
   * AM will receive one NMToken per NM irrespective of the number of containers
   * issued on same NM. AM is expected to store these tokens until issued a
   * new token for the same NM.
   * @return list of NMTokens required for communicating with NM
   */
  @Public
  @Stable
  public abstract List<NMToken> getNMTokens();

  @Private
  @Unstable
  public abstract void setNMTokens(List<NMToken> nmTokens);
  
  /**
   * Get the list of newly updated containers by
   * <code>ResourceManager</code>.
   * @return list of newly increased containers
   */
  @Public
  @Unstable
  public abstract List<UpdatedContainer> getUpdatedContainers();

  /**
   * Set the list of newly updated containers by
   * <code>ResourceManager</code>.
   *
   * @param updatedContainers List of Updated Containers.
   */
  @Private
  @Unstable
  public abstract void setUpdatedContainers(
      List<UpdatedContainer> updatedContainers);

  /**
   * The AMRMToken that belong to this attempt
   *
   * @return The AMRMToken that belong to this attempt
   */
  @Public
  @Unstable
  public abstract Token getAMRMToken();

  @Private
  @Unstable
  public abstract void setAMRMToken(Token amRMToken);

  /**
   * Priority of the application
   *
   * @return get application priority
   */
  @Public
  @Unstable
  public abstract Priority getApplicationPriority();

  @Private
  @Unstable
  public abstract void setApplicationPriority(Priority priority);

  /**
   * The data associated with the collector that belongs to this app. Contains
   * address and token alongwith identification information.
   *
   * @return The data of collector that belong to this attempt
   */
  @Public
  @Unstable
  public abstract CollectorInfo getCollectorInfo();

  @Private
  @Unstable
  public abstract void setCollectorInfo(CollectorInfo info);

  /**
   * Get the list of container update errors to inform the
   * Application Master about the container updates that could not be
   * satisfied due to error.
   *
   * @return List of Update Container Errors.
   */
  @Public
  @Unstable
  public List<UpdateContainerError> getUpdateErrors() {
    return new ArrayList<>();
  }

  /**
   * Set the list of container update errors to inform the
   * Application Master about the container updates that could not be
   * satisfied due to error.
   * @param updateErrors list of <code>UpdateContainerError</code> for
   *                       containers updates requests that were in error
   */
  @Public
  @Unstable
  public void setUpdateErrors(List<UpdateContainerError> updateErrors) {
  }

  /**
   * Get the list of running containers as viewed by
   * <code>ResourceManager</code> from previous application attempts which
   * have not been reported to the Application Master yet.
   * <br>
   * These containers were recovered by the RM after the application master
   * had already registered. This may happen after RM restart when some NMs get
   * delayed in connecting to the RM and reporting the active containers.
   * Since they were not reported in the registration
   * response, they are reported in the response to the AM heartbeat.
   *
   * @return the list of running containers as viewed by
   *         <code>ResourceManager</code> from previous application attempts.
   */
  @Public
  @Unstable
  public abstract List<Container> getContainersFromPreviousAttempts();

  /**
   * Set the list of running containers as viewed by
   * <code>ResourceManager</code> from previous application attempts which have
   * not been reported to the Application Master yet.
   * <br>
   * These containers were recovered by the RM after the application master
   * had already registered. This may happen after RM restart when some NMs get
   * delayed in connecting to the RM and reporting the active containers.
   * Since they were not reported in the registration
   * response, they are reported in the response to the AM heartbeat.
   *
   * @param containersFromPreviousAttempt
   *          the list of running containers as viewed by
   *          <code>ResourceManager</code> from previous application attempts.
   */
  @Private
  @Unstable
  public abstract void setContainersFromPreviousAttempts(
      List<Container> containersFromPreviousAttempt);

  /**
   * Get a list of all SchedulingRequests that the RM has rejected between
   * this allocate call and the previous one.
   * @return List of RejectedSchedulingRequests.
   */
  @Public
  @Unstable
  public List<RejectedSchedulingRequest> getRejectedSchedulingRequests() {
    return Collections.emptyList();
  }

  /**
   * Add a list of rejected SchedulingRequests to the AllocateResponse.
   * @param rejectedRequests List of Rejected Scheduling Requests.
   */
  @Private
  @Unstable
  public void setRejectedSchedulingRequests(
      List<RejectedSchedulingRequest> rejectedRequests) {
  }

  @Private
  @Unstable
  public static AllocateResponseBuilder newBuilder() {
    return new AllocateResponseBuilder();
  }

  @Public
  @Unstable
  public abstract EnhancedHeadroom getEnhancedHeadroom();

  @Private
  @Unstable
  public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom);

  /**
   * Class to construct instances of {@link AllocateResponse} with specific
   * options.
   */
  @Private
  @Unstable
  public static final class AllocateResponseBuilder {
    private AllocateResponse allocateResponse =
        Records.newRecord(AllocateResponse.class);

    private AllocateResponseBuilder() {
      allocateResponse.setApplicationPriority(Priority.newInstance(0));
    }

    /**
     * Set the <code>amCommand</code> of the response.
     * @see AllocateResponse#setAMCommand(AMCommand)
     * @param amCommand <code>amCommand</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder amCommand(AMCommand amCommand) {
      allocateResponse.setAMCommand(amCommand);
      return this;
    }

    /**
     * Set the <code>responseId</code> of the response.
     * @see AllocateResponse#setResponseId(int)
     * @param responseId <code>responseId</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder responseId(int responseId) {
      allocateResponse.setResponseId(responseId);
      return this;
    }

    /**
     * Set the <code>allocatedContainers</code> of the response.
     * @see AllocateResponse#setAllocatedContainers(List)
     * @param allocatedContainers
     *     <code>allocatedContainers</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder allocatedContainers(
        List<Container> allocatedContainers) {
      allocateResponse.setAllocatedContainers(allocatedContainers);
      return this;
    }

    /**
     * Set the <code>availableResources</code> of the response.
     * @see AllocateResponse#setAvailableResources(Resource)
     * @param availableResources
     *     <code>availableResources</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder availableResources(
        Resource availableResources) {
      allocateResponse.setAvailableResources(availableResources);
      return this;
    }

    /**
     * Set the <code>completedContainersStatuses</code> of the response.
     * @see AllocateResponse#setCompletedContainersStatuses(List)
     * @param completedContainersStatuses
     *     <code>completedContainersStatuses</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder completedContainersStatuses(
        List<ContainerStatus> completedContainersStatuses) {
      allocateResponse
          .setCompletedContainersStatuses(completedContainersStatuses);
      return this;
    }

    /**
     * Set the <code>updatedNodes</code> of the response.
     * @see AllocateResponse#setUpdatedNodes(List)
     * @param updatedNodes <code>updatedNodes</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder updatedNodes(
        List<NodeReport> updatedNodes) {
      allocateResponse.setUpdatedNodes(updatedNodes);
      return this;
    }

    /**
     * Set the <code>numClusterNodes</code> of the response.
     * @see AllocateResponse#setNumClusterNodes(int)
     * @param numClusterNodes <code>numClusterNodes</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder numClusterNodes(int numClusterNodes) {
      allocateResponse.setNumClusterNodes(numClusterNodes);
      return this;
    }

    /**
     * Set the <code>preemptionMessage</code> of the response.
     * @see AllocateResponse#setPreemptionMessage(PreemptionMessage)
     * @param preemptionMessage <code>preemptionMessage</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder preemptionMessage(
        PreemptionMessage preemptionMessage) {
      allocateResponse.setPreemptionMessage(preemptionMessage);
      return this;
    }

    /**
     * Set the <code>nmTokens</code> of the response.
     * @see AllocateResponse#setNMTokens(List)
     * @param nmTokens <code>nmTokens</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder nmTokens(List<NMToken> nmTokens) {
      allocateResponse.setNMTokens(nmTokens);
      return this;
    }

    /**
     * Set the <code>updatedContainers</code> of the response.
     * @see AllocateResponse#setUpdatedContainers(List)
     * @param updatedContainers <code>updatedContainers</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder updatedContainers(
        List<UpdatedContainer> updatedContainers) {
      allocateResponse.setUpdatedContainers(updatedContainers);
      return this;
    }

    /**
     * Set the <code>amRmToken</code> of the response.
     * @see AllocateResponse#setAMRMToken(Token)
     * @param amRmToken <code>amRmToken</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder amRmToken(Token amRmToken) {
      allocateResponse.setAMRMToken(amRmToken);
      return this;
    }

    /**
     * Set the <code>applicationPriority</code> of the response.
     * @see AllocateResponse#setApplicationPriority(Priority)
     * @param applicationPriority
     *     <code>applicationPriority</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder applicationPriority(
        Priority applicationPriority) {
      allocateResponse.setApplicationPriority(applicationPriority);
      return this;
    }

    /**
     * Set the <code>collectorInfo</code> of the response.
     * @see AllocateResponse#setCollectorInfo(CollectorInfo)
     * @param collectorInfo <code>collectorInfo</code> of the response which
     *    contains collector address, RM id, version and collector token.
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder collectorInfo(
        CollectorInfo collectorInfo) {
      allocateResponse.setCollectorInfo(collectorInfo);
      return this;
    }

    /**
     * Set the <code>updateErrors</code> of the response.
     * @see AllocateResponse#setUpdateErrors(List)
     * @param updateErrors <code>updateErrors</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder updateErrors(
        List<UpdateContainerError> updateErrors) {
      allocateResponse.setUpdateErrors(updateErrors);
      return this;
    }

    /**
     * Set the <code>containersFromPreviousAttempt</code> of the response.
     * @see AllocateResponse#setContainersFromPreviousAttempts(List)
     * @param containersFromPreviousAttempt
     *     <code>containersFromPreviousAttempt</code> of the response
     * @return {@link AllocateResponseBuilder}
     */
    @Private
    @Unstable
    public AllocateResponseBuilder containersFromPreviousAttempt(
        List<Container> containersFromPreviousAttempt) {
      allocateResponse.setContainersFromPreviousAttempts(
          containersFromPreviousAttempt);
      return this;
    }

    @Public
    @Unstable
    public EnhancedHeadroom getEnhancedHeadroom() {
      return allocateResponse.getEnhancedHeadroom();
    }

    @Private
    @Unstable
    public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){
      allocateResponse.setEnhancedHeadroom(enhancedHeadroom);
    }

    /**
     * Return generated {@link AllocateResponse} object.
     * @return {@link AllocateResponse}
     */
    @Private
    @Unstable
    public AllocateResponse build() {
      return allocateResponse;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AllocateRequest 源码

hadoop ApplicationsRequestScope 源码

hadoop CancelDelegationTokenRequest 源码

hadoop CancelDelegationTokenResponse 源码

hadoop CommitResponse 源码

hadoop ContainerUpdateRequest 源码

hadoop ContainerUpdateResponse 源码

hadoop FailApplicationAttemptRequest 源码

hadoop FailApplicationAttemptResponse 源码

hadoop FinishApplicationMasterRequest 源码

0  赞