hadoop TaskRunner 源码

  • 2022-10-20
  • 浏览 (191)

haddop TaskRunner 代码

文件路径:/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.sls.scheduler;

import java.text.MessageFormat;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Private
@Unstable
public class TaskRunner {
  private static final Logger LOG = LoggerFactory.getLogger(TaskRunner.class);

  @Private
  @Unstable
  public abstract static class Task implements Runnable, Delayed {
    private long start;
    private long end;
    private long nextRun;
    private long startTime;
    private long endTime;
    private long repeatInterval;
    private Queue<Task> queue;

    public Task(){}
    
    //values in milliseconds, start/end are milliseconds from now
    public void init(long startTime, long endTime, long repeatInterval) {
      if (endTime - startTime < 0) {
        throw new IllegalArgumentException(MessageFormat.format(
          "endTime[{0}] cannot be smaller than startTime[{1}]", endTime, 
          startTime));
      }
      if (repeatInterval < 0) {
        throw new IllegalArgumentException(MessageFormat.format(
          "repeatInterval[{0}] cannot be less than 1", repeatInterval));
      }
      if ((endTime - startTime) % repeatInterval != 0) {
        throw new IllegalArgumentException(MessageFormat.format(
          "Invalid parameters: (endTime[{0}] - startTime[{1}]) " +
                  "% repeatInterval[{2}] != 0",
          endTime, startTime, repeatInterval));        
      }
      start = startTime;
      end = endTime;
      this.repeatInterval = repeatInterval;
    }

    private void timeRebase(long now) {
      startTime = now + start;
      endTime = now + end;
      this.nextRun = startTime;
    }
    
    //values in milliseconds, start is milliseconds from now
    //it only executes firstStep()
    public void init(long startTime) {
      init(startTime, startTime, 1);
    }

    private void setQueue(Queue<Task> queue) {
      this.queue = queue;
    }

    @Override
    public final void run() {
      try {
        if (nextRun == startTime) {
          firstStep();
          nextRun += repeatInterval;
          if (nextRun <= endTime) {
            queue.add(this);          
          }
        } else if (nextRun < endTime) {
          middleStep();
          nextRun += repeatInterval;
          queue.add(this);
        } else {
          lastStep();
        }
      } catch (Exception e) {
        LOG.error("Caught exception while executing Taskrunner", e);
        Thread.getDefaultUncaughtExceptionHandler()
            .uncaughtException(Thread.currentThread(), e);
      }
    }

    @Override
    public long getDelay(TimeUnit unit) {
      return unit.convert(nextRun - System.currentTimeMillis(),
        TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
      if (!(o instanceof Task)) {
        throw new IllegalArgumentException("Parameter must be a Task instance");
      }
      Task other = (Task) o;
      return (int) Math.signum(nextRun - other.nextRun);
    }


    public abstract void firstStep() throws Exception;

    public abstract void middleStep() throws Exception;

    public abstract void lastStep() throws Exception;

    public void setEndTime(long et) {
      endTime = et;
    }
  }

  private DelayQueue queue;
  private int threadPoolSize;
  private ThreadPoolExecutor executor;
  private long startTimeMS = 0;
  
  public TaskRunner() {
    queue = new DelayQueue();
  }

  public void setQueueSize(int threadPoolSize) {
    this.threadPoolSize = threadPoolSize;
  }

  @SuppressWarnings("unchecked")
  public void start() {
    if (executor != null && !executor.isTerminated()) {
      throw new IllegalStateException("Executor already running");
    }
    DelayQueue preStartQueue = queue;

    queue = new DelayQueue();
    executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0,
      TimeUnit.MILLISECONDS, queue);
    executor.prestartAllCoreThreads();

    startTimeMS = System.currentTimeMillis();
    for (Object d : preStartQueue) {
      schedule((Task) d, startTimeMS);
    }
  }
  
  public void stop() throws InterruptedException {
    executor.shutdownNow();
    executor.awaitTermination(20, TimeUnit.SECONDS);
  }

  @SuppressWarnings("unchecked")
  private void schedule(Task task, long timeNow) {
    task.timeRebase(timeNow);
    task.setQueue(queue);
    queue.add(task);
  }

  public void schedule(Task task) {
    schedule(task, System.currentTimeMillis());
  }

  public long getStartTimeMS() {
    return this.startTimeMS;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CapacitySchedulerMetrics 源码

hadoop ContainerSimulator 源码

hadoop FairSchedulerMetrics 源码

hadoop FifoSchedulerMetrics 源码

hadoop NodeUpdateSchedulerEventWrapper 源码

hadoop RMNodeWrapper 源码

hadoop SLSCapacityScheduler 源码

hadoop SLSFairScheduler 源码

hadoop SLSSchedulerCommons 源码

hadoop SchedulerMetrics 源码

0  赞