hadoop OpportunisticContainerAllocatorAMService 源码

  • 2022-10-20
  • 浏览 (198)

haddop OpportunisticContainerAllocatorAMService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords
    .FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;

import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;

import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;


import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;

import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

/**
 * The OpportunisticContainerAllocatorAMService is started instead of the
 * ApplicationMasterService if opportunistic scheduling is enabled for the YARN
 * cluster (either centralized or distributed opportunistic scheduling).
 *
 * It extends the functionality of the ApplicationMasterService by servicing
 * clients (AMs and AMRMProxy request interceptors) that understand the
 * DistributedSchedulingProtocol.
 */
public class OpportunisticContainerAllocatorAMService
    extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
    EventHandler<SchedulerEvent> {

  private static final Logger LOG =
      LoggerFactory.getLogger(OpportunisticContainerAllocatorAMService.class);

  private final NodeQueueLoadMonitor nodeMonitor;
  private final OpportunisticContainerAllocator oppContainerAllocator;

  private final int numNodes;

  private final long cacheRefreshInterval;
  private volatile List<RemoteNode> cachedNodes;
  private volatile long lastCacheUpdateTime;

  class OpportunisticAMSProcessor implements
      ApplicationMasterServiceProcessor {

    private ApplicationMasterServiceContext context;
    private ApplicationMasterServiceProcessor nextProcessor;

    private YarnScheduler getScheduler() {
      return ((RMContext)context).getScheduler();
    }

    @Override
    public void init(ApplicationMasterServiceContext amsContext,
        ApplicationMasterServiceProcessor next) {
      this.context = amsContext;
      // The AMSProcessingChain guarantees that 'next' is not null.
      this.nextProcessor = next;
    }

    @Override
    public void registerApplicationMaster(
        ApplicationAttemptId applicationAttemptId,
        RegisterApplicationMasterRequest request,
        RegisterApplicationMasterResponse response)
        throws IOException, YarnException {
      SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
          getScheduler()).getApplicationAttempt(applicationAttemptId);
      if (appAttempt.getOpportunisticContainerContext() == null) {
        OpportunisticContainerContext opCtx =
            new OpportunisticContainerContext();
        opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
            .ContainerIdGenerator() {
          @Override
          public long generateContainerId() {
            return appAttempt.getAppSchedulingInfo().getNewContainerId();
          }
        });
        int tokenExpiryInterval = getConfig()
            .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
                YarnConfiguration.
                    DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
        opCtx.updateAllocationParams(
            getScheduler().getMinimumResourceCapability(),
            getScheduler().getMaximumResourceCapability(),
            getScheduler().getMinimumResourceCapability(),
            tokenExpiryInterval);
        appAttempt.setOpportunisticContainerContext(opCtx);
      }
      nextProcessor.registerApplicationMaster(
          applicationAttemptId, request, response);
    }

    @Override
    public void allocate(ApplicationAttemptId appAttemptId,
        AllocateRequest request, AllocateResponse response)
        throws YarnException {
      // Partition requests to GUARANTEED and OPPORTUNISTIC.
      OpportunisticContainerAllocator.PartitionedResourceRequests
          partitionedAsks =
          oppContainerAllocator.partitionAskList(request.getAskList());

      // Allocate OPPORTUNISTIC containers.
      SchedulerApplicationAttempt appAttempt =
          ((AbstractYarnScheduler)rmContext.getScheduler())
              .getApplicationAttempt(appAttemptId);

      if (!appAttempt.getApplicationAttemptId().equals(appAttemptId)){
        LOG.error("Calling allocate on previous or removed or non "
            + "existent application attempt {}", appAttemptId);
        return;
      }

      OpportunisticContainerContext oppCtx =
          appAttempt.getOpportunisticContainerContext();
      oppCtx.updateNodeList(getLeastLoadedNodes());

      if (!partitionedAsks.getOpportunistic().isEmpty()) {
        String appPartition = appAttempt.getAppAMNodePartitionName();

        for (ResourceRequest req : partitionedAsks.getOpportunistic()) {
          if (null == req.getNodeLabelExpression()) {
            req.setNodeLabelExpression(appPartition);
          }
        }
      }

      List<Container> oppContainers =
          oppContainerAllocator.allocateContainers(
              request.getResourceBlacklistRequest(),
              partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
              ResourceManager.getClusterTimeStamp(), appAttempt.getUser());

      // Create RMContainers and update the NMTokens.
      if (!oppContainers.isEmpty()) {
        OpportunisticSchedulerMetrics schedulerMetrics =
            OpportunisticSchedulerMetrics.getMetrics();
        schedulerMetrics.incrAllocatedOppContainers(oppContainers.size());
        handleNewContainers(oppContainers, false);
        appAttempt.updateNMTokens(oppContainers);
        ApplicationMasterServiceUtils.addToAllocatedContainers(
            response, oppContainers);
      }

      // Allocate GUARANTEED containers.
      request.setAskList(partitionedAsks.getGuaranteed());
      nextProcessor.allocate(appAttemptId, request, response);
    }

    @Override
    public void finishApplicationMaster(
        ApplicationAttemptId applicationAttemptId,
        FinishApplicationMasterRequest request,
        FinishApplicationMasterResponse response) {
      nextProcessor.finishApplicationMaster(applicationAttemptId,
          request, response);
    }
  }

  public OpportunisticContainerAllocatorAMService(RMContext rmContext,
      YarnScheduler scheduler) {
    super(OpportunisticContainerAllocatorAMService.class.getName(),
        rmContext, scheduler);
    int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt(
        YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
        YarnConfiguration.
            DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
    this.numNodes = rmContext.getYarnConfiguration().getInt(
        YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
        YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
        YarnConfiguration.
            DEFAULT_NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
    this.cacheRefreshInterval = nodeSortInterval;
    this.lastCacheUpdateTime = System.currentTimeMillis();
    NodeQueueLoadMonitor.LoadComparator comparator =
        NodeQueueLoadMonitor.LoadComparator.valueOf(
            rmContext.getYarnConfiguration().get(
                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
                YarnConfiguration.
                    DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR));

    NodeQueueLoadMonitor topKSelector =
        new NodeQueueLoadMonitor(nodeSortInterval, comparator, numNodes);

    float sigma = rmContext.getYarnConfiguration()
        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
            YarnConfiguration.DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV);

    int limitMin, limitMax;

    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH ||
        comparator ==
            NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) {
      limitMin = rmContext.getYarnConfiguration()
          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
      limitMax = rmContext.getYarnConfiguration()
          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
    } else {
      limitMin = rmContext.getYarnConfiguration()
          .getInt(
              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
      limitMax = rmContext.getYarnConfiguration()
          .getInt(
              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
              YarnConfiguration.
                  DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
    }

    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
    this.nodeMonitor = topKSelector;
    this.oppContainerAllocator =
        new CentralizedOpportunisticContainerAllocator(
            rmContext.getContainerTokenSecretManager(),
            maxAllocationsPerAMHeartbeat, nodeMonitor);
  }

  @Override
  public Server getServer(YarnRPC rpc, Configuration serverConf,
      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
    if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
      Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
          addr, serverConf, secretManager,
          serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
              YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
      // To support application running on NMs that DO NOT support
      // Dist Scheduling... The server multiplexes both the
      // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
      ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
          ApplicationMasterProtocolPB.class,
          ApplicationMasterProtocolService.newReflectiveBlockingService(
              new ApplicationMasterProtocolPBServiceImpl(this)));
      return server;
    }
    return super.getServer(rpc, serverConf, addr, secretManager);
  }

  @Override
  protected List<ApplicationMasterServiceProcessor> getProcessorList(
      Configuration conf) {
    List<ApplicationMasterServiceProcessor> retVal =
        super.getProcessorList(conf);
    retVal.add(new OpportunisticAMSProcessor());
    return retVal;
  }

  @Override
  public RegisterDistributedSchedulingAMResponse
      registerApplicationMasterForDistributedScheduling(
      RegisterApplicationMasterRequest request) throws YarnException,
      IOException {
    RegisterApplicationMasterResponse response =
        registerApplicationMaster(request);
    RegisterDistributedSchedulingAMResponse dsResp = recordFactory
        .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
    dsResp.setRegisterResponse(response);
    dsResp.setMinContainerResource(
        rmContext.getScheduler().getMinimumResourceCapability());
    dsResp.setMaxContainerResource(
        rmContext.getScheduler().getMaximumResourceCapability());
    dsResp.setContainerTokenExpiryInterval(
        getConfig().getInt(
            YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
            YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS));
    dsResp.setContainerIdStart(
        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);

    // Set nodes to be used for scheduling
    dsResp.setNodesForScheduling(getLeastLoadedNodes());
    return dsResp;
  }

  @Override
  public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
      DistributedSchedulingAllocateRequest request)
      throws YarnException, IOException {
    List<Container> distAllocContainers = request.getAllocatedContainers();
    handleNewContainers(distAllocContainers, true);
    AllocateResponse response = allocate(request.getAllocateRequest());
    DistributedSchedulingAllocateResponse dsResp = recordFactory
        .newRecordInstance(DistributedSchedulingAllocateResponse.class);
    dsResp.setAllocateResponse(response);
    dsResp.setNodesForScheduling(getLeastLoadedNodes());
    return dsResp;
  }

  private void handleNewContainers(List<Container> allocContainers,
      boolean isRemotelyAllocated) {
    for (Container container : allocContainers) {
      // Create RMContainer
      RMContainer rmContainer =
          SchedulerUtils.createOpportunisticRmContainer(
              rmContext, container, isRemotelyAllocated);
      if (rmContainer!=null) {
        rmContainer.handle(
            new RMContainerEvent(container.getId(),
                RMContainerEventType.ACQUIRED));
      }
    }
  }

  @Override
  protected void serviceStop() throws Exception {
    if (nodeMonitor != null) {
      nodeMonitor.stop();
    }
    super.serviceStop();
  }

  @Override
  public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      if (!(event instanceof NodeAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
          nodeAddedEvent.getAddedRMNode());
      break;
    case NODE_REMOVED:
      if (!(event instanceof NodeRemovedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeRemovedSchedulerEvent nodeRemovedEvent =
          (NodeRemovedSchedulerEvent) event;
      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
      break;
    case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
          event;
      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
      break;
    case NODE_RESOURCE_UPDATE:
      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
          (NodeResourceUpdateSchedulerEvent) event;
      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
          nodeResourceUpdatedEvent.getResourceOption());
      break;

    // <-- IGNORED EVENTS : START -->
    case APP_ADDED:
      break;
    case APP_REMOVED:
      break;
    case APP_ATTEMPT_ADDED:
      break;
    case APP_ATTEMPT_REMOVED:
      break;
    case CONTAINER_EXPIRED:
      break;
    case NODE_LABELS_UPDATE:
      break;
    case RELEASE_CONTAINER:
      break;
    case NODE_ATTRIBUTES_UPDATE:
      break;
    case KILL_RESERVED_CONTAINER:
      break;
    case MARK_CONTAINER_FOR_PREEMPTION:
      break;
    case MARK_CONTAINER_FOR_KILLABLE:
      break;
    case MARK_CONTAINER_FOR_NONKILLABLE:
      break;
    case MANAGE_QUEUE:
      break;
    // <-- IGNORED EVENTS : END -->
    default:
      LOG.error("Unknown event arrived at" +
          "OpportunisticContainerAllocatorAMService: {}", event);
    }

  }

  QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
    return nodeMonitor.getThresholdCalculator();
  }

  @VisibleForTesting
  synchronized List<RemoteNode> getLeastLoadedNodes() {
    long currTime = System.currentTimeMillis();
    if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
        || (cachedNodes == null)) {
      cachedNodes = convertToRemoteNodes(
          this.nodeMonitor.selectLeastLoadedNodes(this.numNodes));
      if (cachedNodes.size() > 0) {
        lastCacheUpdateTime = currTime;
      }
    }
    return cachedNodes;
  }

  private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
    ArrayList<RemoteNode> retNodes = new ArrayList<>();
    for (NodeId nId : nodeIds) {
      RemoteNode remoteNode = convertToRemoteNode(nId);
      if (null != remoteNode) {
        retNodes.add(remoteNode);
      }
    }
    return retNodes;
  }

  private RemoteNode convertToRemoteNode(NodeId nodeId) {
    SchedulerNode node =
        ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
    if (node != null) {
      RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
      rNode.setRackName(node.getRackName());
      rNode.setNodePartition(node.getPartition());
      return rNode;
    }
    return null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMSProcessingChain 源码

hadoop ActiveStandbyElectorBasedElectorService 源码

hadoop AdminService 源码

hadoop ApplicationMasterService 源码

hadoop ClientRMService 源码

hadoop ClusterMetrics 源码

hadoop ClusterMonitor 源码

hadoop CuratorBasedElectorService 源码

hadoop DBManager 源码

hadoop DecommissioningNodesWatcher 源码

0  赞