hadoop DeletionService 源码

  • 2022-10-20
  • 浏览 (231)

haddop DeletionService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import static java.util.concurrent.TimeUnit.SECONDS;

import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class DeletionService extends AbstractService {

  private static final Logger LOG =
       LoggerFactory.getLogger(DeletionService.class);

  private int debugDelay;
  private final ContainerExecutor containerExecutor;
  private final NMStateStoreService stateStore;
  private ScheduledThreadPoolExecutor sched;
  private AtomicInteger nextTaskId = new AtomicInteger(0);

  public DeletionService(ContainerExecutor exec) {
    this(exec, new NMNullStateStoreService());
  }

  public DeletionService(ContainerExecutor containerExecutor,
      NMStateStoreService stateStore) {
    super(DeletionService.class.getName());
    this.containerExecutor = containerExecutor;
    this.debugDelay = 0;
    this.stateStore = stateStore;
  }

  public int getDebugDelay() {
    return debugDelay;
  }

  public ContainerExecutor getContainerExecutor() {
    return containerExecutor;
  }

  public NMStateStoreService getStateStore() {
    return stateStore;
  }

  public void delete(DeletionTask deletionTask) {
    if (debugDelay != -1) {
      LOG.debug("Scheduling DeletionTask (delay {}) : {}", debugDelay,
          deletionTask);
      recordDeletionTaskInStateStore(deletionTask);
      sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS);
    }
  }

  private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
      throws IOException {
    Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
        new HashMap<Integer, DeletionTaskRecoveryInfo>();
    Set<Integer> successorTasks = new HashSet<Integer>();

    try (RecoveryIterator<DeletionServiceDeleteTaskProto> it =
             state.getIterator()) {
      while (it.hasNext()) {
        DeletionServiceDeleteTaskProto proto = it.next();
        DeletionTaskRecoveryInfo info =
            NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
        idToInfoMap.put(info.getTask().getTaskId(), info);
        nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
        successorTasks.addAll(info.getSuccessorTaskIds());
      }
    }

    // restore the task dependencies and schedule the deletion tasks that
    // have no predecessors
    final long now = System.currentTimeMillis();
    for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
      for (Integer successorId : info.getSuccessorTaskIds()){
        DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
        if (successor != null) {
          info.getTask().addDeletionTaskDependency(successor.getTask());
        } else {
          LOG.error("Unable to locate dependency task for deletion task "
              + info.getTask().getTaskId());
        }
      }
      if (!successorTasks.contains(info.getTask().getTaskId())) {
        long msecTilDeletion = info.getDeletionTimestamp() - now;
        sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS);
      }
    }
  }

  private int generateTaskId() {
    // get the next ID but avoid an invalid ID
    int taskId = nextTaskId.incrementAndGet();
    while (taskId == DeletionTask.INVALID_TASK_ID) {
      taskId = nextTaskId.incrementAndGet();
    }
    return taskId;
  }

  private void recordDeletionTaskInStateStore(DeletionTask task) {
    if (!stateStore.canRecover()) {
      // optimize the case where we aren't really recording
      return;
    }
    if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) {
      return;  // task already recorded
    }

    task.setTaskId(generateTaskId());

    // store successors first to ensure task IDs have been generated for them
    DeletionTask[] successors = task.getSuccessorTasks();
    for (DeletionTask successor : successors) {
      recordDeletionTaskInStateStore(successor);
    }

    try {
      stateStore.storeDeletionTask(task.getTaskId(),
          task.convertDeletionTaskToProto());
    } catch (IOException e) {
      LOG.error("Unable to store deletion task " + task.getTaskId(), e);
    }
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    ThreadFactory tf = new ThreadFactoryBuilder()
        .setNameFormat("DeletionService #%d")
        .build();
    if (conf != null) {
      sched = new HadoopScheduledThreadPoolExecutor(
          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
              YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
      debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
    } else {
      sched = new HadoopScheduledThreadPoolExecutor(
          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
    }
    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    sched.setKeepAliveTime(60L, SECONDS);
    if (stateStore.canRecover()) {
      recover(stateStore.loadDeletionServiceState());
    }
    super.serviceInit(conf);
  }

  @Override
  public void serviceStop() throws Exception {
    if (sched != null) {
      sched.shutdown();
      boolean terminated = false;
      try {
        terminated = sched.awaitTermination(10, SECONDS);
      } catch (InterruptedException e) { }
      if (!terminated) {
        sched.shutdownNow();
      }
    }
    super.serviceStop();
  }

  /**
   * Determine if the service has completely stopped.
   * Used only by unit tests
   * @return true if service has completely stopped
   */
  @Private
  public boolean isTerminated() {
    return getServiceState() == STATE.STOPPED && sched.isTerminated();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CMgrCompletedAppsEvent 源码

hadoop CMgrCompletedContainersEvent 源码

hadoop CMgrSignalContainersEvent 源码

hadoop CMgrUpdateContainersEvent 源码

hadoop ContainerExecutor 源码

hadoop ContainerManagerEvent 源码

hadoop ContainerManagerEventType 源码

hadoop ContainerStateTransitionListener 源码

hadoop Context 源码

hadoop DefaultContainerExecutor 源码

0  赞