hadoop ExponentiallySmoothedTaskRuntimeEstimator 源码

  • 2022-10-20
  • 浏览 (196)

haddop ExponentiallySmoothedTaskRuntimeEstimator 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.v2.app.speculate;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;

/**
 * This estimator exponentially smooths the rate of progress versus wallclock
 * time.  Conceivably we could write an estimator that smooths time per
 * unit progress, and get different results.
 */
public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {

  private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
      = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();

  private SmoothedValue smoothedValue;

  private long lambda;

  public enum SmoothedValue {
    RATE, TIME_PER_UNIT_PROGRESS
  }

  ExponentiallySmoothedTaskRuntimeEstimator
      (long lambda, SmoothedValue smoothedValue) {
    super();
    this.smoothedValue = smoothedValue;
    this.lambda = lambda;
  }

  public ExponentiallySmoothedTaskRuntimeEstimator() {
    super();
  }

  // immutable
  private class EstimateVector {
    final double value;
    final float basedOnProgress;
    final long atTime;

    EstimateVector(double value, float basedOnProgress, long atTime) {
      this.value = value;
      this.basedOnProgress = basedOnProgress;
      this.atTime = atTime;
    }

    EstimateVector incorporate(float newProgress, long newAtTime) {
      if (newAtTime <= atTime || newProgress < basedOnProgress) {
        return this;
      }

      double oldWeighting
          = value < 0.0
              ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);

      double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);

      if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
        newRead = 1.0 / newRead;
      }

      return new EstimateVector
          (value * oldWeighting + newRead * (1.0 - oldWeighting),
           newProgress, newAtTime);
    }
  }

  private void incorporateReading
      (TaskAttemptId attemptID, float newProgress, long newTime) {
    //TODO: Refactor this method, it seems more complicated than necessary.
    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);

    if (vectorRef == null) {
      estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
      incorporateReading(attemptID, newProgress, newTime);
      return;
    }

    EstimateVector oldVector = vectorRef.get();

    if (oldVector == null) {
      if (vectorRef.compareAndSet(null,
             new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
        return;
      }

      incorporateReading(attemptID, newProgress, newTime);
      return;
    }

    while (!vectorRef.compareAndSet
            (oldVector, oldVector.incorporate(newProgress, newTime))) {
      oldVector = vectorRef.get();
    }
  }

  private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
    AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);

    if (vectorRef == null) {
      return null;
    }

    return vectorRef.get();
  }

  @Override
  public void contextualize(Configuration conf, AppContext context) {
    super.contextualize(conf, context);

    lambda
        = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
    smoothedValue
        = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
            ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
  }

  @Override
  public long estimatedRuntime(TaskAttemptId id) {
    Long startTime = startTimes.get(id);

    if (startTime == null) {
      return -1L;
    }

    EstimateVector vector = getEstimateVector(id);

    if (vector == null) {
      return -1L;
    }

    long sunkTime = vector.atTime - startTime;

    double value = vector.value;
    float progress = vector.basedOnProgress;

    if (value == 0) {
      return -1L;
    }

    double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;

    if (rate == 0.0) {
      return -1L;
    }

    double remainingTime = (1.0 - progress) / rate;

    return sunkTime + (long)remainingTime;
  }

  @Override
  public long runtimeEstimateVariance(TaskAttemptId id) {
    return -1L;
  }

  @Override
  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
    super.updateAttempt(status, timestamp);
    TaskAttemptId attemptID = status.id;

    float progress = status.progress;

    incorporateReading(attemptID, progress, timestamp);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DataStatistics 源码

hadoop DefaultSpeculator 源码

hadoop LegacyTaskRuntimeEstimator 源码

hadoop NullTaskRuntimesEngine 源码

hadoop SimpleExponentialTaskRuntimeEstimator 源码

hadoop Speculator 源码

hadoop SpeculatorEvent 源码

hadoop StartEndTimesBase 源码

hadoop TaskRuntimeEstimator 源码

hadoop TaskSpeculationPredicate 源码

0  赞