hadoop ManifestCommitterConfig 源码

  • 2022-10-20
  • 浏览 (187)

haddop ManifestCommitterConfig 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.IOException;
import java.util.Objects;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageEventCallbacks;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;

import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.*;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.buildJobUUID;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getAppAttemptId;

/**
 * The configuration for the committer as built up from the job configuration
 * and data passed down from the committer factory.
 * Isolated for ease of dev/test
 */
public final class ManifestCommitterConfig implements IOStatisticsSource {

  /**
   * Final destination of work.
   * This is <i>unqualified</i>.
   */
  private final Path destinationDir;

  /**
   * Role: used in log/text messages.
   */
  private final String role;

  /**
   * This is the directory for all intermediate work: where the output
   * format will write data.
   * Will be null if built from a job context.
   */
  private final Path taskAttemptDir;

  /** Configuration of the job. */
  private final Configuration conf;

  /** The job context. For a task, this can be cast to a TaskContext. */
  private final JobContext jobContext;

  /** Should a job marker be created? */
  private final boolean createJobMarker;

  /**
   * Job ID Or UUID -without any attempt suffix.
   * This is expected/required to be unique, though
   * Spark has had "issues" there until recently
   * with lack of uniqueness of generated MR Job IDs.
   */
  private final String jobUniqueId;

  /**
   * Where did the job Unique ID come from?
   */
  private final String jobUniqueIdSource;

  /**
   * Number of this attempt; starts at zero.
   */
  private final int jobAttemptNumber;

  /**
   * Job ID + AttemptID.
   */
  private final String jobAttemptId;

  /**
   * Task ID: used as the filename of the manifest.
   * Will be "" if built from a job context.
   */
  private final String taskId;

  /**
   * Task attempt ID. Determines the working
   * directory for task attempts to write data into,
   * and for the task committer to scan.
   * Will be "" if built from a job context.
   */
  private final String taskAttemptId;

  /** Any progressable for progress callbacks. */
  private final Progressable progressable;

  /**
   * IOStatistics to update.
   */
  private final IOStatisticsStore iostatistics;


  /** Should the output be validated after the commit? */
  private final boolean validateOutput;

  /**
   * Attempt directory management.
   */
  private final ManifestCommitterSupport.AttemptDirectories dirs;

  /**
   * Callback when a stage is entered.
   */
  private final StageEventCallbacks stageEventCallbacks;

  /**
   * Name for logging.
   */
  private final String name;

  /**
   * Delete target paths on commit? Stricter, but
   * higher IO cost.
   */
  private final boolean deleteTargetPaths;

  /**
   * Constructor.
   * @param outputPath destination path of the job.
   * @param role role for log messages.
   * @param context job/task context
   * @param iostatistics IO Statistics
   * @param stageEventCallbacks stage event callbacks.
   */

  ManifestCommitterConfig(
      final Path outputPath,
      final String role,
      final JobContext context,
      final IOStatisticsStore iostatistics,
      final StageEventCallbacks stageEventCallbacks) {
    this.role = role;
    this.jobContext = context;
    this.conf = context.getConfiguration();
    this.destinationDir = outputPath;
    this.iostatistics = iostatistics;
    this.stageEventCallbacks = stageEventCallbacks;

    Pair<String, String> pair = buildJobUUID(conf, context.getJobID());
    this.jobUniqueId = pair.getLeft();
    this.jobUniqueIdSource = pair.getRight();
    this.jobAttemptNumber = getAppAttemptId(context);
    this.jobAttemptId = this.jobUniqueId + "_" + jobAttemptNumber;

    // build directories
    this.dirs = new ManifestCommitterSupport.AttemptDirectories(outputPath,
        this.jobUniqueId, jobAttemptNumber);

    // read in configuration options
    this.createJobMarker = conf.getBoolean(
        SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
        DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
    this.validateOutput = conf.getBoolean(
        OPT_VALIDATE_OUTPUT,
        OPT_VALIDATE_OUTPUT_DEFAULT);
    this.deleteTargetPaths = conf.getBoolean(
        OPT_DELETE_TARGET_FILES,
        OPT_DELETE_TARGET_FILES_DEFAULT);

    // if constructed with a task attempt, build the task ID and path.
    if (context instanceof TaskAttemptContext) {
      // it's a task
      final TaskAttemptContext tac = (TaskAttemptContext) context;
      TaskAttemptID taskAttempt = Objects.requireNonNull(
          tac.getTaskAttemptID());
      taskAttemptId = taskAttempt.toString();
      taskId = taskAttempt.getTaskID().toString();
      // Task attempt dir; must be different across instances
      taskAttemptDir = dirs.getTaskAttemptPath(taskAttemptId);
      // the context is also the progress callback.
      progressable = tac;
      name = String.format(InternalConstants.NAME_FORMAT_TASK_ATTEMPT, taskAttemptId);

    } else {
      // it's a job
      taskId = "";
      taskAttemptId = "";
      taskAttemptDir = null;
      progressable = null;
      name = String.format(InternalConstants.NAME_FORMAT_JOB_ATTEMPT, jobAttemptId);
    }
  }

  @Override
  public String toString() {
    return "ManifestCommitterConfig{" +
        "name=" + name +
        ", destinationDir=" + destinationDir +
        ", role='" + role + '\'' +
        ", taskAttemptDir=" + taskAttemptDir +
        ", createJobMarker=" + createJobMarker +
        ", jobUniqueId='" + jobUniqueId + '\'' +
        ", jobUniqueIdSource='" + jobUniqueIdSource + '\'' +
        ", jobAttemptNumber=" + jobAttemptNumber +
        ", jobAttemptId='" + jobAttemptId + '\'' +
        ", taskId='" + taskId + '\'' +
        ", taskAttemptId='" + taskAttemptId + '\'' +
        '}';
  }

  /**
   * Get the destination filesystem.
   * @return destination FS.
   * @throws IOException Problems binding to the destination FS.
   */
  FileSystem getDestinationFileSystem() throws IOException {
    return FileSystem.get(destinationDir.toUri(), conf);
  }

  /**
   * Create the stage config from the committer
   * configuration.
   * This does not bind the store operations
   * or processors.
   * @return a stage config with configuration options passed in.
   */
  StageConfig createStageConfig() {
    StageConfig stageConfig = new StageConfig();
    stageConfig
        .withIOStatistics(iostatistics)
        .withJobAttemptNumber(jobAttemptNumber)
        .withJobDirectories(dirs)
        .withJobId(jobUniqueId)
        .withJobIdSource(jobUniqueIdSource)
        .withName(name)
        .withProgressable(progressable)
        .withStageEventCallbacks(stageEventCallbacks)
        .withTaskAttemptDir(taskAttemptDir)
        .withTaskAttemptId(taskAttemptId)
        .withTaskId(taskId)
        .withDeleteTargetPaths(deleteTargetPaths);

    return stageConfig;
  }

  public Path getDestinationDir() {
    return destinationDir;
  }

  public String getRole() {
    return role;
  }

  public Path getTaskAttemptDir() {
    return taskAttemptDir;
  }

  public Path getJobAttemptDir() {
    return dirs.getJobAttemptDir();
  }

  public Path getTaskManifestDir() {
    return dirs.getTaskManifestDir();
  }

  public Configuration getConf() {
    return conf;
  }

  public JobContext getJobContext() {
    return jobContext;
  }

  public boolean getCreateJobMarker() {
    return createJobMarker;
  }

  public String getJobAttemptId() {
    return jobAttemptId;
  }

  public String getTaskAttemptId() {
    return taskAttemptId;
  }

  public String getTaskId() {
    return taskId;
  }

  public String getJobUniqueId() {
    return jobUniqueId;
  }

  public boolean getValidateOutput() {
    return validateOutput;
  }

  public String getName() {
    return name;
  }

  @Override
  public IOStatisticsStore getIOStatistics() {
    return iostatistics;
  }

  /**
   * Create a new submitter task pool from the
   * {@link ManifestCommitterConstants#OPT_IO_PROCESSORS}
   * settings.
   * @return a new thread pool.
   */
  public CloseableTaskPoolSubmitter createSubmitter() {
    return createSubmitter(
        OPT_IO_PROCESSORS, OPT_IO_PROCESSORS_DEFAULT);
  }

  /**
   * Create a new submitter task pool.
   * @param key config key with pool size.
   * @param defVal default value.
   * @return a new task pool.
   */
  public CloseableTaskPoolSubmitter createSubmitter(String key, int defVal) {
    int numThreads = conf.getInt(key, defVal);
    if (numThreads <= 0) {
      // ignore the setting if it is too invalid.
      numThreads = defVal;
    }
    return createCloseableTaskSubmitter(numThreads, getJobAttemptId());
  }

  /**
   * Create a new submitter task pool.
   *
   * @param numThreads thread count.
   * @param jobAttemptId job ID
   * @return a new task pool.
   */
  public static CloseableTaskPoolSubmitter createCloseableTaskSubmitter(
      final int numThreads,
      final String jobAttemptId) {
    return new CloseableTaskPoolSubmitter(
        HadoopExecutors.newFixedThreadPool(numThreads,
            new ThreadFactoryBuilder()
                .setDaemon(true)
                .setNameFormat("manifest-committer-" + jobAttemptId + "-%d")
                .build()));
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop ManifestCommitter 源码

hadoop ManifestCommitterConstants 源码

hadoop ManifestCommitterFactory 源码

hadoop ManifestCommitterStatisticNames 源码

hadoop package-info 源码

0  赞