hadoop ValidateRenamedFilesStage 源码

  • 2022-10-20
  • 浏览 (193)

haddop ValidateRenamedFilesStage 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/ValidateRenamedFilesStage.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.OutputValidationException;
import org.apache.hadoop.util.functional.TaskPool;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_VALIDATE_OUTPUT;
import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;

/**
 * This stage validates all files by scanning the manifests
 * and verifying every file in every manifest is of the given size.
 * Returns a list of all files committed.
 *
 * Its cost is one getFileStatus() call (parallelized) per file.
 * Raises a {@link OutputValidationException} on a validation failure.
 */
public class ValidateRenamedFilesStage extends
    AbstractJobOrTaskStage<
        List<TaskManifest>,
        List<FileEntry>> {

  private static final Logger LOG = LoggerFactory.getLogger(
      ValidateRenamedFilesStage.class);

  /**
   * Set this to halt all workers.
   */
  private final AtomicBoolean halt = new AtomicBoolean();

  /**
   * List of all files committed.
   */
  private List<FileEntry> filesCommitted = new ArrayList<>();

  public ValidateRenamedFilesStage(final StageConfig stageConfig) {
    super(false, stageConfig, OP_STAGE_JOB_VALIDATE_OUTPUT, true);
  }

  /**
   * Get the list of files committed.
   * @return a possibly empty list.
   */
  private synchronized List<FileEntry> getFilesCommitted() {
    return filesCommitted;
  }

  /**
   * Add a file entry to the list of committed files.
   * @param entry entry
   */
  private synchronized void addFileCommitted(FileEntry entry) {
    filesCommitted.add(entry);
  }

  /**
   * Validate the task manifests.
   * This is done by listing all the directories
   * and verifying that every file in the source list
   * has a file in the destination of the same size.
   * If two tasks have both written the same file or
   * a source file was changed after the task was committed,
   * then a mistmatch will be detected -provided the file
   * length is now different.
   * @param taskManifests list of manifests.
   * @return list of files committed.
   */
  @Override
  protected List<FileEntry> executeStage(
      final List<TaskManifest> taskManifests)
      throws IOException {

    // set the list of files to be as big as the number of tasks.
    // synchronized to stop complaints.
    synchronized (this) {
      filesCommitted = new ArrayList<>(taskManifests.size());
    }

    // validate all the files.

    final Iterable<FileEntry> filesToCommit = concat(taskManifests.stream()
        .map(TaskManifest::getFilesToCommit)
        .collect(Collectors.toList()));

    TaskPool.foreach(filesToCommit)
        .executeWith(getIOProcessors())
        .stopOnFailure()
        .run(this::validateOneFile);

    return getFilesCommitted();
  }

  /**
   * Validate a file.
   * @param entry entry to probe for
   * @throws IOException IO problem.
   * @throws OutputValidationException if the entry is not valid
   */
  private void validateOneFile(FileEntry entry) throws IOException {
    updateAuditContext(OP_STAGE_JOB_VALIDATE_OUTPUT);

    if (halt.get()) {
      // told to stop
      return;
    }
    // report progress back
    progress();
    // look validate the file.
    // raising an FNFE if the file isn't there.
    FileStatus destStatus;
    final Path sourcePath = entry.getSourcePath();
    Path destPath = entry.getDestPath();
    try {
      destStatus = getFileStatus(destPath);

      // it must be a file
      if (!destStatus.isFile()) {
        throw new OutputValidationException(destPath,
            "Expected a file renamed from " + sourcePath
                + "; found " + destStatus);
      }
      final long sourceSize = entry.getSize();
      final long destSize = destStatus.getLen();

      // etags, if the source had one.
      final String sourceEtag = entry.getEtag();
      if (isNotBlank(sourceEtag)) {
        final String destEtag = ManifestCommitterSupport.getEtag(destStatus);
        if (!sourceEtag.equals(destEtag)) {
          LOG.warn("Etag of dest file {}: {} does not match that of manifest entry {}",
              destPath, destStatus, entry);
          throw new OutputValidationException(destPath,
              String.format("Expected the file"
                      + " renamed from %s"
                      + " with etag %s and length %s"
                      + " but found a file with etag %s and length %d",
                  sourcePath,
                  sourceEtag,
                  sourceSize,
                  destEtag,
                  destSize));

        }
      }
      // check the expected length after any etag validation
      if (destSize != sourceSize) {
        LOG.warn("Length of dest file {}: {} does not match that of manifest entry {}",
            destPath, destStatus, entry);
        throw new OutputValidationException(destPath,
            String.format("Expected the file"
                    + " renamed from %s"
                    + " with length %d"
                    + " but found a file of length %d",
                sourcePath,
                sourceSize,
                destSize));
      }

    } catch (FileNotFoundException e) {
      // file didn't exist
      throw new OutputValidationException(destPath,
          "Expected a file, but it was not found", e);
    }
    addFileCommitted(entry);
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AbortTaskStage 源码

hadoop AbstractJobOrTaskStage 源码

hadoop CleanupJobStage 源码

hadoop CommitJobStage 源码

hadoop CommitTaskStage 源码

hadoop CreateOutputDirectoriesStage 源码

hadoop JobOrTaskStage 源码

hadoop LoadManifestsStage 源码

hadoop RenameFilesStage 源码

hadoop SaveSuccessFileStage 源码

0  赞