hadoop ClientDistributedCacheManager 源码

  • 2022-10-20
  • 浏览 (153)

haddop ClientDistributedCacheManager 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.filecache;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.security.Credentials;

/**
 * Manages internal configuration of the cache by the client for job submission.
 */
@InterfaceAudience.Private
public class ClientDistributedCacheManager {

  /**
   * Determines timestamps of files to be cached, and stores those
   * in the configuration. Determines the visibilities of the distributed cache
   * files and archives. The visibility of a cache path is "public" if the leaf
   * component has READ permissions for others, and the parent subdirs have 
   * EXECUTE permissions for others.
   * 
   * This is an internal method!
   * 
   * @param job
   * @throws IOException
   */
  public static void determineTimestampsAndCacheVisibilities(Configuration job)
  throws IOException {
    Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
    determineTimestampsAndCacheVisibilities(job, statCache);
  }

  /**
   * See ClientDistributedCacheManager#determineTimestampsAndCacheVisibilities(
   * Configuration).
   *
   * @param job Configuration of a job
   * @param statCache A map containing cached file status objects
   * @throws IOException if there is a problem with the underlying filesystem
   */
  public static void determineTimestampsAndCacheVisibilities(Configuration job,
      Map<URI, FileStatus> statCache) throws IOException {
    determineTimestamps(job, statCache);
    determineCacheVisibilities(job, statCache);
  }

  /**
   * Determines timestamps of files to be cached, and stores those
   * in the configuration.  This is intended to be used internally by JobClient
   * after all cache files have been added.
   * 
   * This is an internal method!
   * 
   * @param job Configuration of a job.
   * @throws IOException
   */
  public static void determineTimestamps(Configuration job,
      Map<URI, FileStatus> statCache) throws IOException {
    URI[] tarchives = JobContextImpl.getCacheArchives(job);
    if (tarchives != null) {
      FileStatus status = getFileStatus(job, tarchives[0], statCache);
      StringBuilder archiveFileSizes =
        new StringBuilder(String.valueOf(status.getLen()));
      StringBuilder archiveTimestamps =
        new StringBuilder(String.valueOf(status.getModificationTime()));
      for (int i = 1; i < tarchives.length; i++) {
        status = getFileStatus(job, tarchives[i], statCache);
        archiveFileSizes.append(",");
        archiveFileSizes.append(String.valueOf(status.getLen()));
        archiveTimestamps.append(",");
        archiveTimestamps.append(String.valueOf(status.getModificationTime()));
      }
      job.set(MRJobConfig.CACHE_ARCHIVES_SIZES, archiveFileSizes.toString());
      setArchiveTimestamps(job, archiveTimestamps.toString());
    }
  
    URI[] tfiles = JobContextImpl.getCacheFiles(job);
    if (tfiles != null) {
      FileStatus status = getFileStatus(job, tfiles[0], statCache);
      StringBuilder fileSizes =
        new StringBuilder(String.valueOf(status.getLen()));
      StringBuilder fileTimestamps = new StringBuilder(String.valueOf(
        status.getModificationTime()));
      for (int i = 1; i < tfiles.length; i++) {
        status = getFileStatus(job, tfiles[i], statCache);
        fileSizes.append(",");
        fileSizes.append(String.valueOf(status.getLen()));
        fileTimestamps.append(",");
        fileTimestamps.append(String.valueOf(status.getModificationTime()));
      }
      job.set(MRJobConfig.CACHE_FILES_SIZES, fileSizes.toString());
      setFileTimestamps(job, fileTimestamps.toString());
    }
  }
  
  /**
   * For each archive or cache file - get the corresponding delegation token
   * @param job
   * @param credentials
   * @throws IOException
   */
  public static void getDelegationTokens(Configuration job,
      Credentials credentials) throws IOException {
    URI[] tarchives = JobContextImpl.getCacheArchives(job);
    URI[] tfiles = JobContextImpl.getCacheFiles(job);
    
    int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
    Path[] ps = new Path[size];
    
    int i = 0;
    if (tarchives != null) {
      for (i=0; i < tarchives.length; i++) {
        ps[i] = new Path(tarchives[i].toString());
      }
    }
    
    if (tfiles != null) {
      for(int j=0; j< tfiles.length; j++) {
        ps[i+j] = new Path(tfiles[j].toString());
      }
    }
    
    TokenCache.obtainTokensForNamenodes(credentials, ps, job);
  }
  
  /**
   * Determines the visibilities of the distributed cache files and 
   * archives. The visibility of a cache path is "public" if the leaf component
   * has READ permissions for others, and the parent subdirs have 
   * EXECUTE permissions for others
   * @param job
   * @throws IOException
   */
  public static void determineCacheVisibilities(Configuration job,
      Map<URI, FileStatus> statCache) throws IOException {
    URI[] tarchives = JobContextImpl.getCacheArchives(job);
    if (tarchives != null) {
      StringBuilder archiveVisibilities =
        new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
      for (int i = 1; i < tarchives.length; i++) {
        archiveVisibilities.append(",");
        archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache)));
      }
      setArchiveVisibilities(job, archiveVisibilities.toString());
    }
    URI[] tfiles = JobContextImpl.getCacheFiles(job);
    if (tfiles != null) {
      StringBuilder fileVisibilities =
        new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
      for (int i = 1; i < tfiles.length; i++) {
        fileVisibilities.append(",");
        fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache)));
      }
      setFileVisibilities(job, fileVisibilities.toString());
    }
  }
  
  /**
   * This is to check the public/private visibility of the archives to be
   * localized.
   * 
   * @param conf Configuration which stores the timestamp's
   * @param booleans comma separated list of booleans (true - public)
   * The order should be the same as the order in which the archives are added.
   */
  static void setArchiveVisibilities(Configuration conf, String booleans) {
    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, booleans);
  }

  /**
   * This is to check the public/private visibility of the files to be localized
   * 
   * @param conf Configuration which stores the timestamp's
   * @param booleans comma separated list of booleans (true - public)
   * The order should be the same as the order in which the files are added.
   */
  static void setFileVisibilities(Configuration conf, String booleans) {
    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, booleans);
  }

  /**
   * This is to check the timestamp of the archives to be localized.
   * 
   * @param conf Configuration which stores the timestamp's
   * @param timestamps comma separated list of timestamps of archives.
   * The order should be the same as the order in which the archives are added.
   */
  static void setArchiveTimestamps(Configuration conf, String timestamps) {
    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, timestamps);
  }

  /**
   * This is to check the timestamp of the files to be localized.
   * 
   * @param conf Configuration which stores the timestamp's
   * @param timestamps comma separated list of timestamps of files.
   * The order should be the same as the order in which the files are added.
   */
  static void setFileTimestamps(Configuration conf, String timestamps) {
    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps);
  }

  /**
   * Gets the file status for the given URI.  If the URI is in the cache,
   * returns it.  Otherwise, fetches it and adds it to the cache.
   */
  private static FileStatus getFileStatus(Configuration job, URI uri,
      Map<URI, FileStatus> statCache) throws IOException {
    FileSystem fileSystem = FileSystem.get(uri, job);
    return getFileStatus(fileSystem, uri, statCache);
  }

  /**
   * Returns a boolean to denote whether a cache file is visible to all(public)
   * or not
   * @param conf the configuration
   * @param uri the URI to test
   * @return true if the path in the uri is visible to all, false otherwise
   * @throws IOException thrown if a file system operation fails
   */
  static boolean isPublic(Configuration conf, URI uri,
      Map<URI, FileStatus> statCache) throws IOException {
    boolean isPublic = true;
    FileSystem fs = FileSystem.get(uri, conf);
    Path current = new Path(uri.getPath());
    current = fs.makeQualified(current);

    // If we're looking at a wildcarded path, we only need to check that the
    // ancestors allow execution.  Otherwise, look for read permissions in
    // addition to the ancestors' permissions.
    if (!current.getName().equals(DistributedCache.WILDCARD)) {
      isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache);
    }

    return isPublic &&
        ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
  }

  /**
   * Returns true if all ancestors of the specified path have the 'execute'
   * permission set for all users (i.e. that other users can traverse
   * the directory heirarchy to the given path)
   */
  static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
      Map<URI, FileStatus> statCache) throws IOException {
    Path current = path;
    while (current != null) {
      //the subdirs in the path should have execute permissions for others
      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
        return false;
      }
      current = current.getParent();
    }
    return true;
  }

  /**
   * Checks for a given path whether the Other permissions on it 
   * imply the permission in the passed FsAction
   * @param fs
   * @param path
   * @param action
   * @return true if the path in the uri is visible to all, false otherwise
   * @throws IOException
   */
  private static boolean checkPermissionOfOther(FileSystem fs, Path path,
      FsAction action, Map<URI, FileStatus> statCache) throws IOException {
    FileStatus status = getFileStatus(fs, path.toUri(), statCache);

    // Encrypted files are always treated as private. This stance has two
    // important side effects.  The first is that the encrypted files will be
    // downloaded as the job owner instead of the YARN user, which is required
    // for the KMS ACLs to work as expected.  Second, it prevent a file with
    // world readable permissions that is stored in an encryption zone from
    // being localized as a publicly shared file with world readable
    // permissions.
    if (!status.isEncrypted()) {
      FsAction otherAction = status.getPermission().getOtherAction();
      if (otherAction.implies(action)) {
        return true;
      }
    }

    return false;
  }

  private static FileStatus getFileStatus(FileSystem fs, URI uri,
      Map<URI, FileStatus> statCache) throws IOException {
    Path path = new Path(uri);

    if (path.getName().equals(DistributedCache.WILDCARD)) {
      path = path.getParent();
      uri = path.toUri();
    }

    FileStatus stat = statCache.get(uri);

    if (stat == null) {
      stat = fs.getFileStatus(path);
      statCache.put(uri, stat);
    }

    return stat;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DistributedCache 源码

hadoop package-info 源码

0  赞