hadoop RawLocalFileSystem 源码

  • 2022-10-20
  • 浏览 (487)

haddop RawLocalFileSystem 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.fs;

import org.apache.hadoop.classification.VisibleForTesting;

import java.io.BufferedOutputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.FileDescriptor;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;

 * Implement the FileSystem API for the raw local filesystem.
public class RawLocalFileSystem extends FileSystem {
  static final URI NAME = URI.create("file:///");
  private Path workingDir;
  private long defaultBlockSize;
  // Temporary workaround for HADOOP-9652.
  private static boolean useDeprecatedFileStatus = true;

  public static void useStatIfAvailable() {
    useDeprecatedFileStatus = !Stat.isAvailable();
  public RawLocalFileSystem() {
    workingDir = getInitialWorkingDirectory();
  private Path makeAbsolute(Path f) {
    if (f.isAbsolute()) {
      return f;
    } else {
      return new Path(workingDir, f);
   * Convert a path to a File.
   * @param path the path.
   * @return file.
  public File pathToFile(Path path) {
    if (!path.isAbsolute()) {
      path = new Path(getWorkingDirectory(), path);
    return new File(path.toUri().getPath());

  public URI getUri() { return NAME; }
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    defaultBlockSize = getDefaultBlockSize(new Path(uri));
   * For open()'s FSInputStream.
  class LocalFSFileInputStream extends FSInputStream implements
      HasFileDescriptor, IOStatisticsSource, StreamCapabilities {
    private FileInputStream fis;
    private final File name;
    private long position;
    private AsynchronousFileChannel asyncChannel = null;

     * Minimal set of counters.
    private final IOStatisticsStore ioStatistics = iostatisticsStore()

    /** Reference to the bytes read counter for slightly faster counting. */
    private final AtomicLong bytesRead;

     * Thread level IOStatistics aggregator to update in close().
    private final IOStatisticsAggregator

    public LocalFSFileInputStream(Path f) throws IOException {
      name = pathToFile(f);
      fis = new FileInputStream(name);
      bytesRead = ioStatistics.getCounterReference(
      ioStatisticsAggregator =
    public void seek(long pos) throws IOException {
      if (pos < 0) {
        throw new EOFException(
      this.position = pos;
    public long getPos() throws IOException {
      return this.position;
    public boolean seekToNewSource(long targetPos) throws IOException {
      return false;
     * Just forward to the fis.
    public int available() throws IOException { return fis.available(); }
    public boolean markSupported() { return false; }

    public void close() throws IOException {
      try {
        if (asyncChannel != null) {
      } finally {

    public int read() throws IOException {
      try {
        int value = fis.read();
        if (value >= 0) {
        return value;
      } catch (IOException e) {                 // unexpected exception
        throw new FSError(e);                   // assume native fs error
    public int read(byte[] b, int off, int len) throws IOException {
      // parameter check
      validatePositionedReadArgs(position, b, off, len);
      try {
        int value = fis.read(b, off, len);
        if (value > 0) {
          this.position += value;
        return value;
      } catch (IOException e) {                 // unexpected exception
        throw new FSError(e);                   // assume native fs error
    public int read(long position, byte[] b, int off, int len)
      throws IOException {
      // parameter check
      validatePositionedReadArgs(position, b, off, len);
      if (len == 0) {
        return 0;

      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
      try {
        int value = fis.getChannel().read(bb, position);
        if (value > 0) {
          ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
        return value;
      } catch (IOException e) {
        throw new FSError(e);
    public long skip(long n) throws IOException {
      long value = fis.skip(n);
      if (value > 0) {
        this.position += value;
        ioStatistics.incrementCounter(STREAM_READ_SKIP_BYTES, value);
      return value;

    public FileDescriptor getFileDescriptor() throws IOException {
      return fis.getFD();

    public boolean hasCapability(String capability) {
      // a bit inefficient, but intended to make it easier to add
      // new capabilities.
      switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.IOSTATISTICS:
      case StreamCapabilities.IOSTATISTICS_CONTEXT:
      case StreamCapabilities.VECTOREDIO:
        return true;
        return false;

    public IOStatistics getIOStatistics() {
      return ioStatistics;

    AsynchronousFileChannel getAsyncChannel() throws IOException {
      if (asyncChannel == null) {
        synchronized (this) {
          asyncChannel = AsynchronousFileChannel.open(name.toPath(),
      return asyncChannel;

    public void readVectored(List<? extends FileRange> ranges,
                             IntFunction<ByteBuffer> allocate) throws IOException {

      List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
      // Set up all of the futures, so that we can use them if things fail
      for(FileRange range: sortedRanges) {
        range.setData(new CompletableFuture<>());
      try {
        AsynchronousFileChannel channel = getAsyncChannel();
        ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
        AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
        for(int i = 0; i < sortedRanges.size(); ++i) {
          FileRange range = sortedRanges.get(i);
          buffers[i] = allocate.apply(range.getLength());
          channel.read(buffers[i], range.getOffset(), i, asyncHandler);
      } catch (IOException ioe) {
        LOG.debug("Exception occurred during vectored read ", ioe);
        for(FileRange range: sortedRanges) {

   * A CompletionHandler that implements readFully and translates back
   * into the form of CompletionHandler that our users expect.
  static class AsyncHandler implements CompletionHandler<Integer, Integer> {
    private final AsynchronousFileChannel channel;
    private final List<? extends FileRange> ranges;
    private final ByteBuffer[] buffers;

    AsyncHandler(AsynchronousFileChannel channel,
                 List<? extends FileRange> ranges,
                 ByteBuffer[] buffers) {
      this.channel = channel;
      this.ranges = ranges;
      this.buffers = buffers;

    public void completed(Integer result, Integer r) {
      FileRange range = ranges.get(r);
      ByteBuffer buffer = buffers[r];
      if (result == -1) {
        failed(new EOFException("Read past End of File"), r);
      } else {
        if (buffer.remaining() > 0) {
          // issue a read for the rest of the buffer
          // QQ: What if this fails? It has the same handler.
          channel.read(buffer, range.getOffset() + buffer.position(), r, this);
        } else {
          // QQ: Why  is this required? I think because we don't want the
          // user to read data beyond limit.

    public void failed(Throwable exc, Integer r) {
      LOG.debug("Failed while reading range {} ", r, exc);

  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    return new FSDataInputStream(new BufferedFSInputStream(
        new LocalFSFileInputStream(f), bufferSize));

  public FSDataInputStream open(PathHandle fd, int bufferSize)
      throws IOException {
    if (!(fd instanceof LocalFileSystemPathHandle)) {
      fd = new LocalFileSystemPathHandle(fd.bytes());
    LocalFileSystemPathHandle id = (LocalFileSystemPathHandle) fd;
    id.verify(getFileStatus(new Path(id.getPath())));
    return new FSDataInputStream(new BufferedFSInputStream(
        new LocalFSFileInputStream(new Path(id.getPath())), bufferSize));

   * For create()'s FSOutputStream.
  final class LocalFSFileOutputStream extends OutputStream implements
      IOStatisticsSource, StreamCapabilities, Syncable {
    private FileOutputStream fos;

     * Minimal set of counters.
    private final IOStatisticsStore ioStatistics = iostatisticsStore()

     * Thread level IOStatistics aggregator to update in close().
    private final IOStatisticsAggregator

    private LocalFSFileOutputStream(Path f, boolean append,
        FsPermission permission) throws IOException {
      File file = pathToFile(f);
      // store the aggregator before attempting any IO.
      ioStatisticsAggregator =

      if (!append && permission == null) {
        permission = FsPermission.getFileDefault();
      if (permission == null) {
        this.fos = new FileOutputStream(file, append);
      } else {
        permission = permission.applyUMask(FsPermission.getUMask(getConf()));
        if (Shell.WINDOWS && NativeIO.isAvailable()) {
          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
              append, permission.toShort());
        } else {
          this.fos = new FileOutputStream(file, append);
          boolean success = false;
          try {
            setPermission(f, permission);
            success = true;
          } finally {
            if (!success) {
              IOUtils.cleanupWithLogger(LOG, this.fos);

     * Close the fos; update the IOStatisticsContext.
    public void close() throws IOException {
      try {
      } finally {

    public void flush() throws IOException { fos.flush(); }
    public void write(byte[] b, int off, int len) throws IOException {
      try {
        fos.write(b, off, len);
        ioStatistics.incrementCounter(STREAM_WRITE_BYTES, len);
      } catch (IOException e) {                // unexpected exception
        throw new FSError(e);                  // assume native fs error
    public void write(int b) throws IOException {
      try {
      } catch (IOException e) {              // unexpected exception
        throw new FSError(e);                // assume native fs error

    public void hflush() throws IOException {

     * HSync calls sync on fhe file descriptor after a local flush() call.
     * @throws IOException failure
    public void hsync() throws IOException {

    public boolean hasCapability(String capability) {
      // a bit inefficient, but intended to make it easier to add
      // new capabilities.
      switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.IOSTATISTICS:
      case StreamCapabilities.IOSTATISTICS_CONTEXT:
        return true;
        return StoreImplementationUtils.isProbeForSyncable(capability);

    public IOStatistics getIOStatistics() {
      return ioStatistics;

  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    FileStatus status = getFileStatus(f);
    if (status.isDirectory()) {
      throw new IOException("Cannot append to a diretory (=" + f + " )");
    return new FSDataOutputStream(new BufferedOutputStream(
        createOutputStreamWithMode(f, true, null), bufferSize), statistics,

  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
    short replication, long blockSize, Progressable progress)
    throws IOException {
    return create(f, overwrite, true, bufferSize, replication, blockSize,
        progress, null);

  private FSDataOutputStream create(Path f, boolean overwrite,
      boolean createParent, int bufferSize, short replication, long blockSize,
      Progressable progress, FsPermission permission) throws IOException {
    if (exists(f) && !overwrite) {
      throw new FileAlreadyExistsException("File already exists: " + f);
    Path parent = f.getParent();
    if (parent != null && !mkdirs(parent)) {
      throw new IOException("Mkdirs failed to create " + parent.toString());
    return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
        createOutputStreamWithMode(f, false, permission), bufferSize, true),
  protected OutputStream createOutputStream(Path f, boolean append) 
      throws IOException {
    return createOutputStreamWithMode(f, append, null);

  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
      FsPermission permission) throws IOException {
    return new LocalFSFileOutputStream(f, append, permission);
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
      throw new FileAlreadyExistsException("File already exists: " + f);
    return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
        createOutputStreamWithMode(f, false, permission), bufferSize, true),

  public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite, int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {

    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
        blockSize, progress, permission);
    return out;

  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      boolean overwrite,
      int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
        blockSize, progress, permission);
    return out;

  public void concat(final Path trg, final Path [] psrcs) throws IOException {
    final int bufferSize = 4096;
    try(FSDataOutputStream out = create(trg)) {
      for (Path src : psrcs) {
        try(FSDataInputStream in = open(src)) {
          IOUtils.copyBytes(in, out, bufferSize, false);

  public boolean rename(Path src, Path dst) throws IOException {
    // Attempt rename using Java API.
    File srcFile = pathToFile(src);
    File dstFile = pathToFile(dst);
    if (srcFile.renameTo(dstFile)) {
      return true;

    // Else try POSIX style rename on Windows only
    if (Shell.WINDOWS &&
        handleEmptyDstDirectoryOnWindows(src, srcFile, dst, dstFile)) {
      return true;

    // The fallback behavior accomplishes the rename by a full copy.
    if (LOG.isDebugEnabled()) {
      LOG.debug("Falling through to a copy of " + src + " to " + dst);
    return FileUtil.copy(this, src, this, dst, true, getConf());

  public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
      Path dst, File dstFile) throws IOException {

    // Enforce POSIX rename behavior that a source directory replaces an
    // existing destination if the destination is an empty directory. On most
    // platforms, this is already handled by the Java API call above. Some
    // platforms (notably Windows) do not provide this behavior, so the Java API
    // call renameTo(dstFile) fails. Delete destination and attempt rename
    // again.
    try {
      FileStatus sdst = this.getFileStatus(dst);
      String[] dstFileList = dstFile.list();
      if (dstFileList != null) {
        if (sdst.isDirectory() && dstFileList.length == 0) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Deleting empty destination and renaming " + src +
                " to " + dst);
          if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
            return true;
    } catch (FileNotFoundException ignored) {
    return false;

  public boolean truncate(Path f, final long newLength) throws IOException {
    FileStatus status = getFileStatus(f);
    if(status == null) {
      throw new FileNotFoundException("File " + f + " not found");
    if(status.isDirectory()) {
      throw new IOException("Cannot truncate a directory (=" + f + ")");
    long oldLength = status.getLen();
    if(newLength > oldLength) {
      throw new IllegalArgumentException(
          "Cannot truncate to a larger file size. Current size: " + oldLength +
          ", truncate size: " + newLength + ".");
    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
      try {
      } catch(IOException e) {
        throw new FSError(e);
    return true;
   * Delete the given path to a file or directory.
   * @param p the path to delete
   * @param recursive to delete sub-directories
   * @return true if the file or directory and all its contents were deleted
   * @throws IOException if p is non-empty and recursive is false 
  public boolean delete(Path p, boolean recursive) throws IOException {
    File f = pathToFile(p);
    if (!f.exists()) {
      //no path, return false "nothing to delete"
      return false;
    if (f.isFile()) {
      return f.delete();
    } else if (!recursive && f.isDirectory() && 
        (FileUtil.listFiles(f).length != 0)) {
      throw new IOException("Directory " + f.toString() + " is not empty");
    return FileUtil.fullyDelete(f);
   * {@inheritDoc}
   * (<b>Note</b>: Returned list is not sorted in any given order,
   * due to reliance on Java's {@link File#list()} API.)
  public FileStatus[] listStatus(Path f) throws IOException {
    File localf = pathToFile(f);
    FileStatus[] results;

    if (!localf.exists()) {
      throw new FileNotFoundException("File " + f + " does not exist");

    if (localf.isDirectory()) {
      String[] names = FileUtil.list(localf);
      results = new FileStatus[names.length];
      int j = 0;
      for (int i = 0; i < names.length; i++) {
        try {
          // Assemble the path using the Path 3 arg constructor to make sure
          // paths with colon are properly resolved on Linux
          results[j] = getFileStatus(new Path(f, new Path(null, null,
        } catch (FileNotFoundException e) {
          // ignore the files not found since the dir list may have have
          // changed since the names[] list was generated.
      if (j == names.length) {
        return results;
      return Arrays.copyOf(results, j);

    if (!useDeprecatedFileStatus) {
      return new FileStatus[] { getFileStatus(f) };
    return new FileStatus[] {
        new DeprecatedRawLocalFileStatus(localf,
        defaultBlockSize, this) };

  public boolean exists(Path f) throws IOException {
    return pathToFile(f).exists();
  protected boolean mkOneDir(File p2f) throws IOException {
    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);

  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
      throws IOException {
    if (permission == null) {
      permission = FsPermission.getDirDefault();
    permission = permission.applyUMask(FsPermission.getUMask(getConf()));
    if (Shell.WINDOWS && NativeIO.isAvailable()) {
      try {
        NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
        return true;
      } catch (IOException e) {
        if (LOG.isDebugEnabled()) {
              "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
              p2f, permission.toShort()), e);
        return false;
    } else {
      boolean b = p2f.mkdir();
      if (b) {
        setPermission(p, permission);
      return b;

   * Creates the specified directory hierarchy. Does not
   * treat existence as an error.
  public boolean mkdirs(Path f) throws IOException {
    return mkdirsWithOptionalPermission(f, null);

  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsWithOptionalPermission(f, permission);

  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
      throws IOException {
    if(f == null) {
      throw new IllegalArgumentException("mkdirs path arg is null");
    Path parent = f.getParent();
    File p2f = pathToFile(f);
    File parent2f = null;
    if(parent != null) {
      parent2f = pathToFile(parent);
      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
        throw new ParentNotDirectoryException("Parent path is not a directory: "
            + parent);
    if (p2f.exists() && !p2f.isDirectory()) {
      throw new FileAlreadyExistsException("Destination exists" +
              " and is not a directory: " + p2f.getCanonicalPath());
    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
  public Path getHomeDirectory() {
    return this.makeQualified(new Path(System.getProperty("user.home")));

   * Set the working directory to the given directory.
  public void setWorkingDirectory(Path newDir) {
    workingDir = makeAbsolute(newDir);
  public Path getWorkingDirectory() {
    return workingDir;
  protected Path getInitialWorkingDirectory() {
    return this.makeQualified(new Path(System.getProperty("user.dir")));

  public FsStatus getStatus(Path p) throws IOException {
    File partition = pathToFile(p == null ? new Path("/") : p);
    //File provides getUsableSpace() and getFreeSpace()
    //File provides no API to obtain used space, assume used = total - free
    return new FsStatus(partition.getTotalSpace(), 
      partition.getTotalSpace() - partition.getFreeSpace(),
  // In the case of the local filesystem, we can just rename the file.
  public void moveFromLocalFile(Path src, Path dst) throws IOException {
    rename(src, dst);
  // We can write output directly to the final location
  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
    throws IOException {
    return fsOutputFile;
  // It's in the right place - nothing to do.
  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
    throws IOException {
  public void close() throws IOException {
  public String toString() {
    return "LocalFS";
  public FileStatus getFileStatus(Path f) throws IOException {
    return getFileLinkStatusInternal(f, true);

  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
    File path = pathToFile(f);
    if (path.exists()) {
      return new DeprecatedRawLocalFileStatus(pathToFile(f),
          defaultBlockSize, this);
    } else {
      throw new FileNotFoundException("File " + f + " does not exist");

  static class DeprecatedRawLocalFileStatus extends FileStatus {
    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
     * We recognize if the information is already loaded by check if
     * onwer.equals("").
    private boolean isPermissionLoaded() {
      return !super.getOwner().isEmpty(); 

    private static long getLastAccessTime(File f) throws IOException {
      long accessTime;
      try {
        accessTime = Files.readAttributes(f.toPath(),
      } catch (NoSuchFileException e) {
        throw new FileNotFoundException("File " + f + " does not exist");
      return accessTime;

    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
      throws IOException {
      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
          f.lastModified(), getLastAccessTime(f),
          null, null, null,
          new Path(f.getPath()).makeQualified(fs.getUri(),
    public FsPermission getPermission() {
      if (!isPermissionLoaded()) {
      return super.getPermission();

    public String getOwner() {
      if (!isPermissionLoaded()) {
      return super.getOwner();

    public String getGroup() {
      if (!isPermissionLoaded()) {
      return super.getGroup();

     * Load file permission information (UNIX symbol rwxrwxrwx, sticky bit info).
     * To improve peformance, give priority to native stat() call. First try get
     * permission information by using native JNI call then fall back to use non
     * native (ProcessBuilder) call in case native lib is not loaded or native
     * call is not successful
    private synchronized void loadPermissionInfo() {
      if (!isPermissionLoaded() && NativeIO.isAvailable()) {
        try {
        } catch (IOException ex) {
          LOG.debug("Native call failed", ex);

      if (!isPermissionLoaded()) {

    /// loads permissions, owner, and group from `ls -ld`
    void loadPermissionInfoByNonNativeIO() {
      IOException e = null;
      try {
        String output = FileUtil.execCommand(new File(getPath().toUri()),
        StringTokenizer t =
            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
        //expected format
        //-rw-------    1 username groupname ...
        String permission = t.nextToken();
        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
          //files with ACLs might have a '+'
          permission = permission.substring(0,

        String owner = t.nextToken();
        String group = t.nextToken();
        // If on windows domain, token format is DOMAIN\\user and we want to
        // extract only the user name
        // same as to the group name
        if (Shell.WINDOWS) {
          owner = removeDomain(owner);
          group = removeDomain(group);
      } catch (Shell.ExitCodeException ioe) {
        if (ioe.getExitCode() != 1) {
          e = ioe;
        } else {
      } catch (IOException ioe) {
        e = ioe;
      } finally {
        if (e != null) {
          throw new RuntimeException("Error while running command to get " +
                                     "file permissions : " + 

    // In Windows, domain name is added.
    // For example, given machine name (domain name) dname, user name i, then
    // the result for user is dname\\i and for group is dname\\None. So we need
    // remove domain name as follows:
    // DOMAIN\\user => user, DOMAIN\\group => group
    private String removeDomain(String str) {
      int index = str.indexOf("\\");
      if (index != -1) {
        str = str.substring(index + 1);
      return str;

    // loads permissions, owner, and group from `ls -ld`
    // but use JNI to more efficiently get file mode (permission, owner, group)
    // by calling file stat() in *nix or some similar calls in Windows
    void loadPermissionInfoByNativeIO() throws IOException {
      Path path = getPath();
      String pathName = path.toUri().getPath();
      // remove leading slash for Windows path
      if (Shell.WINDOWS && pathName.startsWith("/")) {
        pathName = pathName.substring(1);
      try {
        NativeIO.POSIX.Stat stat = NativeIO.POSIX.getStat(pathName);
        String owner = stat.getOwner();
        String group = stat.getGroup();
        int mode = stat.getMode();
        setPermission(new FsPermission(mode));
      } catch (IOException e) {
        throw e;

    public void write(DataOutput out) throws IOException {
      if (!isPermissionLoaded()) {

   * Use the command chown to set owner.
  public void setOwner(Path p, String username, String groupname)
    throws IOException {
    FileUtil.setOwner(pathToFile(p), username, groupname);

   * Use the command chmod to set permission.
  public void setPermission(Path p, FsPermission permission)
    throws IOException {
    if (NativeIO.isAvailable()) {
    } else {
      String perm = String.format("%04o", permission.toShort());
      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
        FileUtil.makeShellPath(pathToFile(p), true)));
   * Sets the {@link Path}'s last modified time and last access time to
   * the given valid times.
   * @param mtime the modification time to set (only if no less than zero).
   * @param atime the access time to set (only if no less than zero).
   * @throws IOException if setting the times fails.
  public void setTimes(Path p, long mtime, long atime) throws IOException {
    try {
      BasicFileAttributeView view = Files.getFileAttributeView(
          pathToFile(p).toPath(), BasicFileAttributeView.class);
      FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
      FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
      view.setTimes(fmtime, fatime, null);
    } catch (NoSuchFileException e) {
      throw new FileNotFoundException("File " + p + " does not exist");

   * Hook to implement support for {@link PathHandle} operations.
   * @param stat Referent in the target FileSystem
   * @param opts Constraints that determine the validity of the
   *            {@link PathHandle} reference.
  protected PathHandle createPathHandle(FileStatus stat,
      Options.HandleOpt... opts) {
    if (stat.isDirectory() || stat.isSymlink()) {
      throw new IllegalArgumentException("PathHandle only available for files");
    String authority = stat.getPath().toUri().getAuthority();
    if (authority != null && !authority.equals("file://")) {
      throw new IllegalArgumentException("Wrong FileSystem: " + stat.getPath());
    Options.HandleOpt.Data data =
        Options.HandleOpt.getOpt(Options.HandleOpt.Data.class, opts)
    Options.HandleOpt.Location loc =
        Options.HandleOpt.getOpt(Options.HandleOpt.Location.class, opts)
    if (loc.allowChange()) {
      throw new UnsupportedOperationException("Tracking file movement in " +
          "basic FileSystem is not supported");
    final Path p = stat.getPath();
    final Optional<Long> mtime = !data.allowChange()
        ? Optional.of(stat.getModificationTime())
        : Optional.empty();
    return new LocalFileSystemPathHandle(p.toString(), mtime);

  public boolean supportsSymlinks() {
    return true;

  public void createSymlink(Path target, Path link, boolean createParent)
      throws IOException {
    if (!FileSystem.areSymlinksEnabled()) {
      throw new UnsupportedOperationException("Symlinks not supported");
    final String targetScheme = target.toUri().getScheme();
    if (targetScheme != null && !"file".equals(targetScheme)) {
      throw new IOException("Unable to create symlink to non-local file "+
                            "system: "+target.toString());
    if (createParent) {

    // NB: Use createSymbolicLink in java.nio.file.Path once available
    int result = FileUtil.symLink(target.toString(),
    if (result != 0) {
      throw new IOException("Error " + result + " creating symlink " +
          link + " to " + target);

   * Return a FileStatus representing the given path. If the path refers
   * to a symlink return a FileStatus representing the link rather than
   * the object the link refers to.
  public FileStatus getFileLinkStatus(final Path f) throws IOException {
    FileStatus fi = getFileLinkStatusInternal(f, false);
    // getFileLinkStatus is supposed to return a symlink with a
    // qualified path
    if (fi.isSymlink()) {
      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
          fi.getPath(), fi.getSymlink());
    return fi;

   * Public {@link FileStatus} methods delegate to this function, which in turn
   * either call the new {@link Stat} based implementation or the deprecated
   * methods based on platform support.
   * @param f Path to stat
   * @param dereference whether to dereference the final path component if a
   *          symlink
   * @return FileStatus of f
   * @throws IOException
  private FileStatus getFileLinkStatusInternal(final Path f,
      boolean dereference) throws IOException {
    if (!useDeprecatedFileStatus) {
      return getNativeFileLinkStatus(f, dereference);
    } else if (dereference) {
      return deprecatedGetFileStatus(f);
    } else {
      return deprecatedGetFileLinkStatusInternal(f);

   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
   * gains support for Windows and other operating systems.
  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
      throws IOException {
    String target = FileUtil.readLink(new File(f.toString()));

    try {
      FileStatus fs = getFileStatus(f);
      // If f refers to a regular file or directory
      if (target.isEmpty()) {
        return fs;
      // Otherwise f refers to a symlink
      return new FileStatus(fs.getLen(),
          new Path(target),
    } catch (FileNotFoundException e) {
      /* The exists method in the File class returns false for dangling
       * links so we can get a FileNotFoundException for links that exist.
       * It's also possible that we raced with a delete of the link. Use
       * the readBasicFileAttributes method in java.nio.file.attributes
       * when available.
      if (!target.isEmpty()) {
        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
            "", "", new Path(target), f);
      // f refers to a file or directory that does not exist
      throw e;
   * Calls out to platform's native stat(1) implementation to get file metadata
   * (permissions, user, group, atime, mtime, etc). This works around the lack
   * of lstat(2) in Java 6.
   *  Currently, the {@link Stat} class used to do this only supports Linux
   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
   *  implementation (deprecated) remains further OS support is added.
   * @param f File to stat
   * @param dereference whether to dereference symlinks
   * @return FileStatus of f
   * @throws IOException
  private FileStatus getNativeFileLinkStatus(final Path f,
      boolean dereference) throws IOException {
    Stat stat = new Stat(f, defaultBlockSize, dereference, this);
    FileStatus status = stat.getFileStatus();
    return status;

  public Path getLinkTarget(Path f) throws IOException {
    FileStatus fi = getFileLinkStatusInternal(f, false);
    // return an unqualified symlink target
    return fi.getSymlink();

  public boolean hasPathCapability(final Path path, final String capability)
      throws IOException {
    switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
    case CommonPathCapabilities.FS_APPEND:
    case CommonPathCapabilities.FS_CONCAT:
    case CommonPathCapabilities.FS_PATHHANDLES:
    case CommonPathCapabilities.FS_PERMISSIONS:
    case CommonPathCapabilities.FS_TRUNCATE:
      return true;
    case CommonPathCapabilities.FS_SYMLINKS:
      return FileSystem.areSymlinksEnabled();
      return super.hasPathCapability(path, capability);

  static void setUseDeprecatedFileStatus(boolean useDeprecatedFileStatus) {
    RawLocalFileSystem.useDeprecatedFileStatus = useDeprecatedFileStatus;


hadoop 源码目录


hadoop Abortable 源码

hadoop AbstractFileSystem 源码

hadoop AvroFSInput 源码

hadoop BBPartHandle 源码

hadoop BBUploadHandle 源码

hadoop BatchListingOperations 源码

hadoop BatchedRemoteIterator 源码

hadoop BlockLocation 源码

hadoop BlockStoragePolicySpi 源码

hadoop BufferedFSInputStream 源码

0  赞