hadoop DAGAMSimulator 源码

  • 2022-10-20
  • 浏览 (190)

haddop DAGAMSimulator 代码

文件路径:/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.hadoop.yarn.sls.appmaster;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * AMSimulator that simulates DAG - it requests for containers
 * based on the delay specified. It finishes when all the tasks
 * are completed.
 * Vocabulary Used:
 * <dl>
 * <dt>Pending</dt><dd>requests which are NOT yet sent to RM.</dd>
 * <dt>Scheduled</dt>
 * <dd>requests which are sent to RM but not yet assigned.</dd>
 * <dt>Assigned</dt><dd>requests which are assigned to a container.</dd>
 * <dt>Completed</dt>
 * <dd>request corresponding to which container has completed.</dd>
 * </dl>
 * Containers are requested based on the request delay.
 */

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DAGAMSimulator extends AMSimulator {

  private static final int PRIORITY = 20;

  private List<ContainerSimulator> pendingContainers =
      new LinkedList<>();

  private List<ContainerSimulator> scheduledContainers =
      new LinkedList<>();

  private Map<ContainerId, ContainerSimulator> assignedContainers =
      new HashMap<>();

  private List<ContainerSimulator> completedContainers =
      new LinkedList<>();

  private List<ContainerSimulator> allContainers =
      new LinkedList<>();

  private boolean isFinished = false;

  private long amStartTime;

  private static final Logger LOG =
      LoggerFactory.getLogger(DAGAMSimulator.class);

  @SuppressWarnings("checkstyle:parameternumber")
  public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
      boolean tracked, long baselineTimeMS, long heartbeatInterval,
      Map<ApplicationId, AMSimulator> appIdToAMSim) {
    super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval,
        appIdToAMSim);
    super.amtype = "dag";

    allContainers.addAll(amDef.getTaskContainers());
    pendingContainers.addAll(amDef.getTaskContainers());
    totalContainers = allContainers.size();

    LOG.info("Added new job with {} containers", allContainers.size());
  }

  @Override
  public void firstStep() throws Exception {
    super.firstStep();
    amStartTime = System.currentTimeMillis();
  }

  @Override
  public void initReservation(ReservationId reservationId,
      long deadline, long now) {
    // DAG AM doesn't support reservation
    setReservationRequest(null);
  }

  @Override
  public synchronized void notifyAMContainerLaunched(Container masterContainer)
      throws Exception {
    if (null != masterContainer) {
      restart();
      super.notifyAMContainerLaunched(masterContainer);
    }
  }

  protected void processResponseQueue() throws Exception {
    while (!responseQueue.isEmpty()) {
      AllocateResponse response = responseQueue.take();

      // check completed containers
      if (!response.getCompletedContainersStatuses().isEmpty()) {
        for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
          ContainerId containerId = cs.getContainerId();
          if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
            if (assignedContainers.containsKey(containerId)) {
              LOG.debug("Application {} has one container finished ({}).",
                  appId, containerId);
              ContainerSimulator containerSimulator =
                  assignedContainers.remove(containerId);
              finishedContainers++;
              completedContainers.add(containerSimulator);
            } else if (amContainer.getId().equals(containerId)) {
              // am container released event
              isFinished = true;
              LOG.info("Application {} goes to finish.", appId);
            }
            if (finishedContainers >= totalContainers) {
              lastStep();
            }
          } else {
            // container to be killed
            if (assignedContainers.containsKey(containerId)) {
              LOG.error("Application {} has one container killed ({}).", appId,
                  containerId);
              pendingContainers.add(assignedContainers.remove(containerId));
            } else if (amContainer.getId().equals(containerId)) {
              LOG.error("Application {}'s AM is "
                  + "going to be killed. Waiting for rescheduling...", appId);
            }
          }
        }
      }

      // check finished
      if (isAMContainerRunning &&
          (finishedContainers >= totalContainers)) {
        isAMContainerRunning = false;
        LOG.info("Application {} sends out event to clean up"
            + " its AM container.", appId);
        isFinished = true;
        break;
      }

      // check allocated containers
      for (Container container : response.getAllocatedContainers()) {
        if (!scheduledContainers.isEmpty()) {
          ContainerSimulator cs = scheduledContainers.remove(0);
          LOG.debug("Application {} starts to launch a container ({}).",
              appId, container.getId());
          assignedContainers.put(container.getId(), cs);
          se.getNmMap().get(container.getNodeId())
              .addNewContainer(container, cs.getLifeTime(), appId);
          getRanNodes().add(container.getNodeId());
        }
      }
    }
  }

  @Override
  protected void sendContainerRequest() throws Exception {
    if (isFinished) {
      return;
    }
    // send out request
    List<ResourceRequest> ask = null;
    if (finishedContainers != totalContainers) {
      if (!pendingContainers.isEmpty()) {
        List<ContainerSimulator> toBeScheduled =
            getToBeScheduledContainers(pendingContainers, amStartTime);
        if (toBeScheduled.size() > 0) {
          ask = packageRequests(toBeScheduled, PRIORITY);
          LOG.info("Application {} sends out request for {} containers.",
              appId, toBeScheduled.size());
          scheduledContainers.addAll(toBeScheduled);
          pendingContainers.removeAll(toBeScheduled);
          toBeScheduled.clear();
        }
      }
    }

    if (ask == null) {
      ask = new ArrayList<>();
    }

    final AllocateRequest request = createAllocateRequest(ask);
    if (totalContainers == 0) {
      request.setProgress(1.0f);
    } else {
      request.setProgress((float) finishedContainers / totalContainers);
    }

    UserGroupInformation ugi =
        UserGroupInformation.createRemoteUser(appAttemptId.toString());
    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
        .get(appAttemptId.getApplicationId())
        .getRMAppAttempt(appAttemptId).getAMRMToken();
    ugi.addTokenIdentifier(token.decodeIdentifier());
    AllocateResponse response = ugi.doAs(
        (PrivilegedExceptionAction<AllocateResponse>) () -> rm
            .getApplicationMasterService().allocate(request));
    if (response != null) {
      responseQueue.put(response);
    }
  }

  @VisibleForTesting
  public List<ContainerSimulator> getToBeScheduledContainers(
      List<ContainerSimulator> containers, long startTime) {
    List<ContainerSimulator> toBeScheduled = new LinkedList<>();
    for (ContainerSimulator cs : containers) {
      // only request for the container if it is time to request
      if (cs.getRequestDelay() + startTime <=
          System.currentTimeMillis()) {
        toBeScheduled.add(cs);
      }
    }
    return toBeScheduled;
  }

  @Override
  protected void checkStop() {
    if (isFinished) {
      super.setEndTime(System.currentTimeMillis());
    }
  }

  @Override
  public void lastStep() throws Exception {
    super.lastStep();

    //clear data structures.
    allContainers.clear();
    pendingContainers.clear();
    scheduledContainers.clear();
    assignedContainers.clear();
    completedContainers.clear();
  }

  /**
   * restart running because of the am container killed.
   */
  private void restart() {
    isFinished = false;
    pendingContainers.clear();
    pendingContainers.addAll(allContainers);
    pendingContainers.removeAll(completedContainers);
    amContainer = null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMSimulator 源码

hadoop MRAMSimulator 源码

hadoop StreamAMSimulator 源码

hadoop package-info 源码

0  赞