hadoop ZombieJob 源码

  • 2022-10-20
  • 浏览 (193)

haddop ZombieJob 代码

文件路径:/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.rumen;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.HashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.rumen.datatypes.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;

/**
 * {@link ZombieJob} is a layer above {@link LoggedJob} raw JSON objects.
 * 
 * Each {@link ZombieJob} object represents a job in job history. For everything
 * that exists in job history, contents are returned unchanged faithfully. To
 * get input splits of a non-exist task, a non-exist task attempt, or an
 * ill-formed task attempt, proper objects are made up from statistical
 * sketches.
 */
@SuppressWarnings("deprecation")
public class ZombieJob implements JobStory {
  static final Logger LOG = LoggerFactory.getLogger(ZombieJob.class);
  private final LoggedJob job;
  private Map<TaskID, LoggedTask> loggedTaskMap;
  private Map<TaskAttemptID, LoggedTaskAttempt> loggedTaskAttemptMap;
  private final Random random;
  private InputSplit[] splits;
  private final ClusterStory cluster;
  private JobConf jobConf;

  private long seed;
  private long numRandomSeeds = 0;
  private boolean hasRandomSeed = false;

  private Map<LoggedDiscreteCDF, CDFRandomGenerator> interpolatorMap =
      new HashMap<LoggedDiscreteCDF, CDFRandomGenerator>();

  // TODO: Fix ZombieJob to initialize this correctly from observed data
  double rackLocalOverNodeLocal = 1.5;
  double rackRemoteOverNodeLocal = 3.0;

  /**
   * This constructor creates a {@link ZombieJob} with the same semantics as the
   * {@link LoggedJob} passed in this parameter
   * 
   * @param job
   *          The dead job this ZombieJob instance is based on.
   * @param cluster
   *          The cluster topology where the dead job ran on. This argument can
   *          be null if we do not have knowledge of the cluster topology.
   * @param seed
   *          Seed for the random number generator for filling in information
   *          not available from the ZombieJob.
   */
  public ZombieJob(LoggedJob job, ClusterStory cluster, long seed) {
    if (job == null) {
      throw new IllegalArgumentException("job is null");
    }
    this.job = job;
    this.cluster = cluster;
    random = new Random(seed);
    this.seed = seed;
    hasRandomSeed = true;
  }

  /**
   * This constructor creates a {@link ZombieJob} with the same semantics as the
   * {@link LoggedJob} passed in this parameter
   * 
   * @param job
   *          The dead job this ZombieJob instance is based on.
   * @param cluster
   *          The cluster topology where the dead job ran on. This argument can
   *          be null if we do not have knowledge of the cluster topology.
   */
  public ZombieJob(LoggedJob job, ClusterStory cluster) {
    this(job, cluster, System.nanoTime());
  }

  private static State convertState(Values status) {
    if (status == Values.SUCCESS) {
      return State.SUCCEEDED;
    } else if (status == Values.FAILED) {
      return State.FAILED;
    } else if (status == Values.KILLED) {
      return State.KILLED;
    } else {
      throw new IllegalArgumentException("unknown status " + status);
    }
  }

  @Override
  public synchronized JobConf getJobConf() {
    if (jobConf == null) {
      jobConf = new JobConf();
      
      // Add parameters from the configuration in the job trace
      //
      // The reason why the job configuration parameters, as seen in the jobconf
      // file, are added first because the specialized values obtained from 
      // Rumen should override the job conf values.
      //
      for (Map.Entry<Object, Object> entry : job.getJobProperties().getValue().entrySet()) {
        jobConf.set(entry.getKey().toString(), entry.getValue().toString());
      }
      
      //TODO Eliminate parameters that are already copied from the job's 
      // configuration file.
      jobConf.setJobName(getName());
      jobConf.setUser(getUser());
      jobConf.setNumMapTasks(getNumberMaps());
      jobConf.setNumReduceTasks(getNumberReduces());
      jobConf.setQueueName(getQueueName());
    }
    return jobConf;
  }
  
  @Override
  public InputSplit[] getInputSplits() {
    if (splits == null) {
      List<InputSplit> splitsList = new ArrayList<InputSplit>();
      Path emptyPath = new Path("/");
      int totalHosts = 0; // use to determine avg # of hosts per split.
      for (LoggedTask mapTask : job.getMapTasks()) {
        Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType();
        if (taskType != Pre21JobHistoryConstants.Values.MAP) {
          LOG.warn("TaskType for a MapTask is not Map. task="
              + mapTask.getTaskID() + " type="
              + ((taskType == null) ? "null" : taskType.toString()));
          continue;
        }
        List<LoggedLocation> locations = mapTask.getPreferredLocations();
        List<String> hostList = new ArrayList<String>();
        if (locations != null) {
          for (LoggedLocation location : locations) {
            List<NodeName> layers = location.getLayers();
            if (layers.size() == 0) {
              LOG.warn("Bad location layer format for task "+mapTask.getTaskID());
              continue;
            }
            String host = layers.get(layers.size() - 1).getValue();
            if (host == null) {
              LOG.warn("Bad location layer format for task "+mapTask.getTaskID() + ": " + layers);
              continue;
            }
            hostList.add(host);
          }
        }
        String[] hosts = hostList.toArray(new String[hostList.size()]);
        totalHosts += hosts.length;
        long mapInputBytes = getTaskInfo(mapTask).getInputBytes();
        if (mapInputBytes < 0) {
          LOG.warn("InputBytes for task "+mapTask.getTaskID()+" is not defined.");
          mapInputBytes = 0;
        }
       
        splitsList.add(new FileSplit(emptyPath, 0, mapInputBytes, hosts));
      }

      // If not all map tasks are in job trace, should make up some splits
      // for missing map tasks.
      int totalMaps = job.getTotalMaps();
      if (totalMaps < splitsList.size()) {
        LOG.warn("TotalMaps for job " + job.getJobID()
            + " is less than the total number of map task descriptions ("
            + totalMaps + "<" + splitsList.size() + ").");
      }

      int avgHostPerSplit;
      if (splitsList.size() == 0) {
        avgHostPerSplit = 3;
      } else {
        avgHostPerSplit = totalHosts / splitsList.size();
        if (avgHostPerSplit == 0) {
          avgHostPerSplit = 3;
        }
      }

      for (int i = splitsList.size(); i < totalMaps; i++) {
        if (cluster == null) {
          splitsList.add(new FileSplit(emptyPath, 0, 0, new String[0]));
        } else {
          MachineNode[] mNodes = cluster.getRandomMachines(avgHostPerSplit,
                                                           random);
          String[] hosts = new String[mNodes.length];
          for (int j = 0; j < hosts.length; ++j) {
            hosts[j] = mNodes[j].getName();
          }
          // TODO set size of a split to 0 now.
          splitsList.add(new FileSplit(emptyPath, 0, 0, hosts));
        }
      }

      splits = splitsList.toArray(new InputSplit[splitsList.size()]);
    }
    return splits;
  }

  @Override
  public String getName() {
    JobName jobName = job.getJobName();
    if (jobName == null || jobName.getValue() == null) {
      return "(name unknown)";
    } else {
      return jobName.getValue();
    }
  }

  @Override
  public JobID getJobID() {
    return getLoggedJob().getJobID();
  }

  private int sanitizeValue(int oldVal, int defaultVal, String name, JobID id) {
    if (oldVal == -1) {
      LOG.warn(name +" not defined for "+id);
      return defaultVal;
    }
    return oldVal;
  }
  
  @Override
  public int getNumberMaps() {
    return sanitizeValue(job.getTotalMaps(), 0, "NumberMaps", job.getJobID());
  }

  @Override
  public int getNumberReduces() {
    return sanitizeValue(job.getTotalReduces(), 0, "NumberReduces", job.getJobID());
  }

  @Override
  public Values getOutcome() {
    return job.getOutcome();
  }

  @Override
  public long getSubmissionTime() {
    return job.getSubmitTime() - job.getRelativeTime();
  }

  @Override
  public String getQueueName() {
    QueueName queue = job.getQueue();
    return (queue == null || queue.getValue() == null) 
           ? JobConf.DEFAULT_QUEUE_NAME 
           : queue.getValue();
  }
  
  /**
   * Getting the number of map tasks that are actually logged in the trace.
   * @return The number of map tasks that are actually logged in the trace.
   */
  public int getNumLoggedMaps() {
    return job.getMapTasks().size();
  }


  /**
   * Getting the number of reduce tasks that are actually logged in the trace.
   * @return The number of map tasks that are actually logged in the trace.
   */
  public int getNumLoggedReduces() {
    return job.getReduceTasks().size();
  }
  
  /**
   * Mask the job ID part in a {@link TaskID}.
   * 
   * @param taskId
   *          raw {@link TaskID} read from trace
   * @return masked {@link TaskID} with empty {@link JobID}.
   */
  private TaskID maskTaskID(TaskID taskId) {
    JobID jobId = new JobID();
    TaskType taskType = taskId.getTaskType();
    return new TaskID(jobId, taskType, taskId.getId());
  }

  /**
   * Mask the job ID part in a {@link TaskAttemptID}.
   * 
   * @param attemptId
   *          raw {@link TaskAttemptID} read from trace
   * @return masked {@link TaskAttemptID} with empty {@link JobID}.
   */
  private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
    JobID jobId = new JobID();
    TaskType taskType = attemptId.getTaskType();
    TaskID taskId = attemptId.getTaskID();
    return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
        taskId.getId(), attemptId.getId());
  }

  private LoggedTask sanitizeLoggedTask(LoggedTask task) {
    if (task == null) {
      return null;
    }
    if (task.getTaskType() == null) {
      LOG.warn("Task " + task.getTaskID() + " has nulll TaskType");
      return null;
    }
    if (task.getTaskStatus() == null) {
      LOG.warn("Task " + task.getTaskID() + " has nulll TaskStatus");
      return null;
    }
    return task;
  }

  private LoggedTaskAttempt sanitizeLoggedTaskAttempt(LoggedTaskAttempt attempt) {
    if (attempt == null) {
      return null;
    }
    if (attempt.getResult() == null) {
      LOG.warn("TaskAttempt " + attempt.getResult() + " has nulll Result");
      return null;
    }

    return attempt;
  }

  /**
   * Build task mapping and task attempt mapping, to be later used to find
   * information of a particular {@link TaskID} or {@link TaskAttemptID}.
   */
  private synchronized void buildMaps() {
    if (loggedTaskMap == null) {
      loggedTaskMap = new HashMap<TaskID, LoggedTask>();
      loggedTaskAttemptMap = new HashMap<TaskAttemptID, LoggedTaskAttempt>();
      
      for (LoggedTask map : job.getMapTasks()) {
        map = sanitizeLoggedTask(map);
        if (map != null) {
          loggedTaskMap.put(maskTaskID(map.taskID), map);

          for (LoggedTaskAttempt mapAttempt : map.getAttempts()) {
            mapAttempt = sanitizeLoggedTaskAttempt(mapAttempt);
            if (mapAttempt != null) {
              TaskAttemptID id = mapAttempt.getAttemptID();
              loggedTaskAttemptMap.put(maskAttemptID(id), mapAttempt);
            }
          }
        }
      }
      for (LoggedTask reduce : job.getReduceTasks()) {
        reduce = sanitizeLoggedTask(reduce);
        if (reduce != null) {
          loggedTaskMap.put(maskTaskID(reduce.taskID), reduce);

          for (LoggedTaskAttempt reduceAttempt : reduce.getAttempts()) {
            reduceAttempt = sanitizeLoggedTaskAttempt(reduceAttempt);
            if (reduceAttempt != null) {
              TaskAttemptID id = reduceAttempt.getAttemptID();
              loggedTaskAttemptMap.put(maskAttemptID(id), reduceAttempt);
            }
          }
        }
      }

      // TODO: do not care about "other" tasks, "setup" or "clean"
    }
  }

  @Override
  public String getUser() {
    UserName retval = job.getUser();
    return (retval == null || retval.getValue() == null)
           ? "(unknown)"
           : retval.getValue();
  }

  /**
   * Get the underlining {@link LoggedJob} object read directly from the trace.
   * This is mainly for debugging.
   * 
   * @return the underlining {@link LoggedJob} object
   */
  public LoggedJob getLoggedJob() {
    return job;
  }

  /**
   * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
   * taskType, taskNumber, and taskAttemptNumber. This function does not care
   * about locality, and follows the following decision logic: 1. Make up a
   * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
   * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
   * trace, 3. Otherwise (final state is SUCCEEDED or FAILED), construct the
   * {@link TaskAttemptInfo} from the trace.
   */
  public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
      int taskAttemptNumber) {
    // does not care about locality. assume default locality is NODE_LOCAL.
    // But if both task and task attempt exist in trace, use logged locality.
    int locality = 0;
    LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
    if (loggedTask == null) {
      // TODO insert parameters
      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
          taskNumber, locality);
    }

    LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
        taskNumber, taskAttemptNumber);
    if (loggedAttempt == null) {
      // Task exists, but attempt is missing.
      TaskInfo taskInfo = getTaskInfo(loggedTask);
      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
          taskNumber, locality);
    } else {
      // TODO should we handle killed attempts later?
      if (loggedAttempt.getResult()== Values.KILLED) {
        TaskInfo taskInfo = getTaskInfo(loggedTask);
        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
            taskNumber, locality);
      } else {
        return getTaskAttemptInfo(loggedTask, loggedAttempt);
      }
    }
  }

  @Override
  public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
    return getTaskInfo(getLoggedTask(taskType, taskNumber));
  }

  /**
   * Get a {@link TaskAttemptInfo} with a {@link TaskAttemptID} associated with
   * taskType, taskNumber, and taskAttemptNumber. This function considers
   * locality, and follows the following decision logic: 1. Make up a
   * {@link TaskAttemptInfo} if the task attempt is missing in trace, 2. Make up
   * a {@link TaskAttemptInfo} if the task attempt has a KILLED final status in
   * trace, 3. If final state is FAILED, construct a {@link TaskAttemptInfo}
   * from the trace, without considering locality. 4. If final state is
   * SUCCEEDED, construct a {@link TaskAttemptInfo} from the trace, with runtime
   * scaled according to locality in simulation and locality in trace.
   */
  @Override
  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
      int taskAttemptNumber, int locality) {
    TaskType taskType = TaskType.MAP;
    LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
    if (loggedTask == null) {
      // TODO insert parameters
      TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
          taskNumber, locality);
    }
    LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
        taskNumber, taskAttemptNumber);
    if (loggedAttempt == null) {
      // Task exists, but attempt is missing.
      TaskInfo taskInfo = getTaskInfo(loggedTask);
      return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
          taskNumber, locality);
    } else {
      // Task and TaskAttempt both exist.
      if (loggedAttempt.getResult() == Values.KILLED) {
        TaskInfo taskInfo = getTaskInfo(loggedTask);
        return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
            taskNumber, locality);
      } else if (loggedAttempt.getResult() == Values.FAILED) {
        /**
         * FAILED attempt is not affected by locality however, made-up FAILED
         * attempts ARE affected by locality, since statistics are present for
         * attempts of different locality.
         */
        return getTaskAttemptInfo(loggedTask, loggedAttempt);
      } else if (loggedAttempt.getResult() == Values.SUCCESS) {
        int loggedLocality = getLocality(loggedTask, loggedAttempt);
        if (locality == loggedLocality) {
          return getTaskAttemptInfo(loggedTask, loggedAttempt);
        } else {
          // attempt succeeded in trace. It is scheduled in simulation with
          // a different locality.
          return scaleInfo(loggedTask, loggedAttempt, locality, loggedLocality,
              rackLocalOverNodeLocal, rackRemoteOverNodeLocal);
        }
      } else {
        throw new IllegalArgumentException(
            "attempt result is not SUCCEEDED, FAILED or KILLED: "
                + loggedAttempt.getResult());
      }
    }
  }

  private long sanitizeTaskRuntime(long time, ID id) {
    if (time < 0) {
      LOG.warn("Negative running time for task "+id+": "+time);
      return 100L; // set default to 100ms.
    }
    return time;
  }
  
  @SuppressWarnings("hiding") 
  private TaskAttemptInfo scaleInfo(LoggedTask loggedTask,
      LoggedTaskAttempt loggedAttempt, int locality, int loggedLocality,
      double rackLocalOverNodeLocal, double rackRemoteOverNodeLocal) {
    TaskInfo taskInfo = getTaskInfo(loggedTask);
    double[] factors = new double[] { 1.0, rackLocalOverNodeLocal,
        rackRemoteOverNodeLocal };
    double scaleFactor = factors[locality] / factors[loggedLocality];
    State state = convertState(loggedAttempt.getResult());
    if (loggedTask.getTaskType() == Values.MAP) {
      long taskTime = 0;
      if (loggedAttempt.getStartTime() == 0) {
        taskTime = makeUpMapRuntime(state, locality);
      } else {
        taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
      }
      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
      taskTime *= scaleFactor;
      return new MapTaskAttemptInfo
        (state, taskInfo, taskTime, loggedAttempt.allSplitVectors());
    } else {
      throw new IllegalArgumentException("taskType can only be MAP: "
          + loggedTask.getTaskType());
    }
  }

  private int getLocality(LoggedTask loggedTask, LoggedTaskAttempt loggedAttempt) {
    int distance = cluster.getMaximumDistance();
    String rackHostName = loggedAttempt.getHostName().getValue();
    if (rackHostName == null) {
      return distance;
    }
    MachineNode mn = getMachineNode(rackHostName);
    if (mn == null) {
      return distance;
    }
    List<LoggedLocation> locations = loggedTask.getPreferredLocations();
    if (locations != null) {
      for (LoggedLocation location : locations) {
        List<NodeName> layers = location.getLayers();
        if ((layers == null) || (layers.isEmpty())) {
          continue;
        }
        String dataNodeName = layers.get(layers.size()-1).getValue();
        MachineNode dataNode = cluster.getMachineByName(dataNodeName);
        if (dataNode != null) {
          distance = Math.min(distance, cluster.distance(mn, dataNode));
        }
      }
    }
    return distance;
  }

  private MachineNode getMachineNode(String rackHostName) {
    ParsedHost parsedHost = ParsedHost.parse(rackHostName);
    String hostName = (parsedHost == null) ? rackHostName 
                                           : parsedHost.getNodeName();
    if (hostName == null) {
      return null;
    }
    return (cluster == null) ? null : cluster.getMachineByName(hostName);
  }

  private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
      LoggedTaskAttempt loggedAttempt) {
    TaskInfo taskInfo = getTaskInfo(loggedTask);
    
    List<List<Integer>> allSplitVectors = loggedAttempt.allSplitVectors();

    State state = convertState(loggedAttempt.getResult());
    if (loggedTask.getTaskType() == Values.MAP) {
      long taskTime;
      if (loggedAttempt.getStartTime() == 0) {
        int locality = getLocality(loggedTask, loggedAttempt);
        taskTime = makeUpMapRuntime(state, locality);
      } else {
        taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
      }
      taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
      return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors);
    } else if (loggedTask.getTaskType() == Values.REDUCE) {
      long startTime = loggedAttempt.getStartTime();
      long mergeDone = loggedAttempt.getSortFinished();
      long shuffleDone = loggedAttempt.getShuffleFinished();
      long finishTime = loggedAttempt.getFinishTime();
      if (startTime <= 0 || startTime >= finishTime) {
        // have seen startTime>finishTime.
        // haven't seen reduce task with startTime=0 ever. But if this happens,
        // make up a reduceTime with no shuffle/merge.
        long reduceTime = makeUpReduceRuntime(state);
        return new ReduceTaskAttemptInfo
          (state, taskInfo, 0, 0, reduceTime, allSplitVectors);
      } else {
        if (shuffleDone <= 0) {
          shuffleDone = startTime;
        }
        if (mergeDone <= 0) {
          mergeDone = finishTime;
        }
        long shuffleTime = shuffleDone - startTime;
        long mergeTime = mergeDone - shuffleDone;
        long reduceTime = finishTime - mergeDone;
        reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
        
        return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
            mergeTime, reduceTime, allSplitVectors);
      }
    } else {
      throw new IllegalArgumentException("taskType for "
          + loggedTask.getTaskID() + " is neither MAP nor REDUCE: "
          + loggedTask.getTaskType());
    }
  }

  private TaskInfo getTaskInfo(LoggedTask loggedTask) {
    if (loggedTask == null) {
      return new TaskInfo(0, 0, 0, 0, 0, 0);
    }
    List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();

    long inputBytes = -1;
    long inputRecords = -1;
    long outputBytes = -1;
    long outputRecords = -1;
    long heapMegabytes = -1;
    ResourceUsageMetrics metrics = new ResourceUsageMetrics();

    Values type = loggedTask.getTaskType();
    if ((type != Values.MAP) && (type != Values.REDUCE)) {
      throw new IllegalArgumentException(
          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString()
              + " for task = " + loggedTask.getTaskID());
    }

    for (LoggedTaskAttempt attempt : attempts) {
      attempt = sanitizeLoggedTaskAttempt(attempt);
      // ignore bad attempts or unsuccessful attempts.
      if ((attempt == null) || (attempt.getResult() != Values.SUCCESS)) {
        continue;
      }

      if (type == Values.MAP) {
        inputBytes = attempt.getHdfsBytesRead();
        inputRecords = attempt.getMapInputRecords();
        outputBytes =
            (job.getTotalReduces() > 0) ? attempt.getMapOutputBytes() : attempt
                .getHdfsBytesWritten();
        outputRecords = attempt.getMapOutputRecords();
        heapMegabytes =
            (job.getJobMapMB() > 0) ? job.getJobMapMB() : job
                .getHeapMegabytes();
      } else {
        inputBytes = attempt.getReduceShuffleBytes();
        inputRecords = attempt.getReduceInputRecords();
        outputBytes = attempt.getHdfsBytesWritten();
        outputRecords = attempt.getReduceOutputRecords();
        heapMegabytes =
            (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
                .getHeapMegabytes();
      }
      // set the resource usage metrics
      metrics = attempt.getResourceUsageMetrics();
      break;
    }

    //note: hardcoding vCores, as they are not collected
    TaskInfo taskInfo =
        new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
            (int) outputRecords, (int) heapMegabytes, 1,
            metrics);
    return taskInfo;
  }

  private TaskAttemptID makeTaskAttemptID(TaskType taskType, int taskNumber,
      int taskAttemptNumber) {
    return new TaskAttemptID(new TaskID(job.getJobID(), taskType, taskNumber), 
                             taskAttemptNumber);
  }
  
  private TaskAttemptInfo makeUpTaskAttemptInfo(TaskType taskType, TaskInfo taskInfo,
      int taskAttemptNumber, int taskNumber, int locality) {
    if (taskType == TaskType.MAP) {
      State state = State.SUCCEEDED;
      long runtime = 0;

      // make up state
      state = makeUpState(taskAttemptNumber, job.getMapperTriesToSucceed());
      runtime = makeUpMapRuntime(state, locality);
      runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
                                               taskNumber, taskAttemptNumber));
      TaskAttemptInfo tai
        = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
      return tai;
    } else if (taskType == TaskType.REDUCE) {
      State state = State.SUCCEEDED;
      long shuffleTime = 0;
      long sortTime = 0;
      long reduceTime = 0;

      // TODO make up state
      // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
      reduceTime = makeUpReduceRuntime(state);
      TaskAttemptInfo tai = new ReduceTaskAttemptInfo
        (state, taskInfo, shuffleTime, sortTime, reduceTime, null);
      return tai;
    }

    throw new IllegalArgumentException("taskType is neither MAP nor REDUCE: "
        + taskType);
  }

  private long makeUpReduceRuntime(State state) {
    long reduceTime = 0;
    for (int i = 0; i < 5; i++) {
      reduceTime = doMakeUpReduceRuntime(state);
      if (reduceTime >= 0) {
        return reduceTime;
      }
    }
    return 0;
  }

  private long doMakeUpReduceRuntime(State state) {
    long reduceTime;
    try {
      if (state == State.SUCCEEDED) {
        reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
      } else if (state == State.FAILED) {
        reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
      } else {
        throw new IllegalArgumentException(
            "state is neither SUCCEEDED nor FAILED: " + state);
      }
      return reduceTime;
    } catch (NoValueToMakeUpRuntime e) {
      return 0;
    }
  }

  private long makeUpMapRuntime(State state, int locality) {
    long runtime;
    // make up runtime
    if (state == State.SUCCEEDED || state == State.FAILED) {
      List<LoggedDiscreteCDF> cdfList =
          state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
              .getFailedMapAttemptCDFs();
      // XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
      // the last group is "distance cannot be determined". All pig jobs
      // would have only the 4th group, and pig tasks usually do not have
      // any locality, so this group should count as "distance=2".
      // However, setup/cleanup tasks are also counted in the 4th group.
      // These tasks do not make sense.
      if(cdfList==null) {
    	  runtime = -1;
    	  return runtime;
      }
      try {
        runtime = makeUpRuntime(cdfList.get(locality));
      } catch (NoValueToMakeUpRuntime e) {
        runtime = makeUpRuntime(cdfList);
      }
    } else {
      throw new IllegalArgumentException(
          "state is neither SUCCEEDED nor FAILED: " + state);
    }
    return runtime;
  }

  /**
   * Perform a weighted random selection on a list of CDFs, and produce a random
   * variable using the selected CDF.
   * 
   * @param mapAttemptCDFs
   *          A list of CDFs for the distribution of runtime for the 1st, 2nd,
   *          ... map attempts for the job.
   */
  private long makeUpRuntime(List<LoggedDiscreteCDF> mapAttemptCDFs) {
    int total = 0;
    if(mapAttemptCDFs == null) {
    	return -1;
    }
    for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
      total += cdf.getNumberValues();
    }
    if (total == 0) {
      return -1;
    }
    int index = random.nextInt(total);
    for (LoggedDiscreteCDF cdf : mapAttemptCDFs) {
      if (index >= cdf.getNumberValues()) {
        index -= cdf.getNumberValues();
      } else {
        if (index < 0) {
          throw new IllegalStateException("application error");
        }
        return makeUpRuntime(cdf);
      }
    }
    throw new IllegalStateException("not possible to get here");
  }

  private long makeUpRuntime(LoggedDiscreteCDF loggedDiscreteCDF) {
    /*
     * We need this odd-looking code because if a seed exists we need to ensure
     * that only one interpolator is generated per LoggedDiscreteCDF, but if no
     * seed exists then the potentially lengthy process of making an
     * interpolator can happen outside the lock. makeUpRuntimeCore only locks
     * around the two hash map accesses.
     */
    if (hasRandomSeed) {
      synchronized (interpolatorMap) {
        return makeUpRuntimeCore(loggedDiscreteCDF);
      }
    }

    return makeUpRuntimeCore(loggedDiscreteCDF);
  }
  
  private synchronized long getNextRandomSeed() {
    numRandomSeeds++;
    return RandomSeedGenerator.getSeed("forZombieJob" + job.getJobID(),
                                       numRandomSeeds);
  }
   
  private long makeUpRuntimeCore(LoggedDiscreteCDF loggedDiscreteCDF) {
    CDFRandomGenerator interpolator;

    synchronized (interpolatorMap) {
      interpolator = interpolatorMap.get(loggedDiscreteCDF);
    }

    if (interpolator == null) {
      if (loggedDiscreteCDF.getNumberValues() == 0) {
        throw new NoValueToMakeUpRuntime("no value to use to make up runtime");
      }

      interpolator =
          hasRandomSeed ? new CDFPiecewiseLinearRandomGenerator(
              loggedDiscreteCDF, getNextRandomSeed())
              : new CDFPiecewiseLinearRandomGenerator(loggedDiscreteCDF);

      /*
       * It doesn't matter if we compute and store an interpolator twice because
       * the two instances will be semantically identical and stateless, unless
       * we're seeded, in which case we're not stateless but this code will be
       * called synchronizedly.
       */
      synchronized (interpolatorMap) {
        interpolatorMap.put(loggedDiscreteCDF, interpolator);
      }
    }

    return interpolator.randomValue();
  }

  static private class NoValueToMakeUpRuntime extends IllegalArgumentException {
    static final long serialVersionUID = 1L;

    NoValueToMakeUpRuntime() {
      super();
    }

    NoValueToMakeUpRuntime(String detailMessage) {
      super(detailMessage);
    }

    NoValueToMakeUpRuntime(String detailMessage, Throwable cause) {
      super(detailMessage, cause);
    }

    NoValueToMakeUpRuntime(Throwable cause) {
      super(cause);
    }
  }

  private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
	  
  // if numAttempts == null we are returning FAILED.
  if(numAttempts == null) {
    return State.FAILED;
  }
    if (taskAttemptNumber >= numAttempts.length - 1) {
      // always succeed
      return State.SUCCEEDED;
    } else {
      double pSucceed = numAttempts[taskAttemptNumber];
      double pFail = 0;
      for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
        pFail += numAttempts[i];
      }
      return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
          : State.FAILED;
    }
  }

  private TaskID getMaskedTaskID(TaskType taskType, int taskNumber) {
    return new TaskID(new JobID(), taskType, taskNumber);
  }

  private LoggedTask getLoggedTask(TaskType taskType, int taskNumber) {
    buildMaps();
    return loggedTaskMap.get(getMaskedTaskID(taskType, taskNumber));
  }

  private LoggedTaskAttempt getLoggedTaskAttempt(TaskType taskType,
      int taskNumber, int taskAttemptNumber) {
    buildMaps();
    TaskAttemptID id =
        new TaskAttemptID(getMaskedTaskID(taskType, taskNumber),
            taskAttemptNumber);
    return loggedTaskAttemptMap.get(id);
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractClusterStory 源码

hadoop Anonymizer 源码

hadoop CDFPiecewiseLinearRandomGenerator 源码

hadoop CDFRandomGenerator 源码

hadoop ClusterStory 源码

hadoop ClusterTopologyReader 源码

hadoop CurrentJHParser 源码

hadoop DeepCompare 源码

hadoop DeepInequalityException 源码

hadoop DefaultInputDemuxer 源码

0  赞