hadoop FileIoProvider 源码

  • 2022-10-20
  • 浏览 (210)

haddop FileIoProvider 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.net.SocketOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.Flushable;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;

/**
 * This class abstracts out various file IO operations performed by the
 * DataNode and invokes profiling (for collecting stats) and fault injection
 * (for testing) event hooks before and after each file IO.
 *
 * Behavior can be injected into these events by enabling the
 * profiling and/or fault injection event hooks through
 * {@link DFSConfigKeys#DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY}
 * and {@link DFSConfigKeys#DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY}.
 * These event hooks are disabled by default.
 *
 * Most functions accept an optional {@link FsVolumeSpi} parameter for
 * instrumentation/logging.
 *
 * Some methods may look redundant, especially the multiple variations of
 * move/rename/list. They exist to retain behavior compatibility for existing
 * code.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileIoProvider {
  public static final Logger LOG = LoggerFactory.getLogger(
      FileIoProvider.class);

  private final ProfilingFileIoEvents profilingEventHook;
  private final FaultInjectorFileIoEvents faultInjectorEventHook;
  private final DataNode datanode;

  private static final int LEN_INT = 4;

  /**
   * @param conf  Configuration object. May be null. When null,
   *              the event handlers are no-ops.
   * @param datanode datanode that owns this FileIoProvider. Used for
   *               IO error based volume checker callback
   */
  public FileIoProvider(@Nullable Configuration conf,
                        final DataNode datanode) {
    profilingEventHook = new ProfilingFileIoEvents(conf);
    faultInjectorEventHook = new FaultInjectorFileIoEvents(conf);
    this.datanode = datanode;
  }

  /**
   * Lists the types of file system operations. Passed to the
   * IO hooks so implementations can choose behavior based on
   * specific operations.
   */
  public enum OPERATION {
    OPEN,
    EXISTS,
    LIST,
    DELETE,
    MOVE,
    MKDIRS,
    TRANSFER,
    SYNC,
    FADVISE,
    READ,
    WRITE,
    FLUSH,
    NATIVE_COPY
  }

  /**
   * See {@link Flushable#flush()}.
   *
   * @param  volume target volume. null if unavailable.
   * @throws IOException
   */
  public void flush(
      @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
    final long begin = profilingEventHook.beforeFileIo(volume, FLUSH, 0);
    try {
      faultInjectorEventHook.beforeFileIo(volume, FLUSH, 0);
      f.flush();
      profilingEventHook.afterFileIo(volume, FLUSH, begin, 0);
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Sync the given {@link FileOutputStream}.
   *
   * @param  volume target volume. null if unavailable.
   * @throws IOException
   */
  public void sync(
      @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
    final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
    try {
      faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
      IOUtils.fsync(fos.getChannel(), false);
      profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Sync the given directory changes to durable device.
   * @throws IOException
   */
  public void dirSync(@Nullable FsVolumeSpi volume, File dir)
      throws IOException {
    final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
    try {
      faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
      IOUtils.fsync(dir);
      profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Call sync_file_range on the given file descriptor.
   *
   * @param  volume target volume. null if unavailable.
   */
  public void syncFileRange(
      @Nullable FsVolumeSpi volume, FileDescriptor outFd,
      long offset, long numBytes, int flags) throws NativeIOException {
    final long begin = profilingEventHook.beforeFileIo(volume, SYNC, 0);
    try {
      faultInjectorEventHook.beforeFileIo(volume, SYNC, 0);
      NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
      profilingEventHook.afterFileIo(volume, SYNC, begin, 0);
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Call posix_fadvise on the given file descriptor.
   *
   * @param  volume target volume. null if unavailable.
   */
  public void posixFadvise(
      @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
      long offset, long length, int flags) throws NativeIOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, FADVISE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, FADVISE);
      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
          identifier, outFd, offset, length, flags);
      profilingEventHook.afterMetadataOp(volume, FADVISE, begin);
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Delete a file.
   * @param volume  target volume. null if unavailable.
   * @param f  File to delete.
   * @return  true if the file was successfully deleted.
   */
  public boolean delete(@Nullable FsVolumeSpi volume, File f) {
    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
      boolean deleted = f.delete();
      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
      return deleted;
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Delete a file, first checking to see if it exists.
   * @param volume  target volume. null if unavailable.
   * @param f  File to delete
   * @return  true if the file was successfully deleted or if it never
   *          existed.
   */
  public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
      boolean deleted = !f.exists() || f.delete();
      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
      if (!deleted) {
        LOG.warn("Failed to delete file {}", f);
      }
      return deleted;
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Transfer data from a FileChannel to a SocketOutputStream.
   *
   * @param volume  target volume. null if unavailable.
   * @param sockOut  SocketOutputStream to write the data.
   * @param fileCh  FileChannel from which to read data.
   * @param position  position within the channel where the transfer begins.
   * @param count  number of bytes to transfer.
   * @param waitTime  returns the nanoseconds spent waiting for the socket
   *                  to become writable.
   * @param transferTime  returns the nanoseconds spent transferring data.
   * @throws IOException
   */
  public void transferToSocketFully(
      @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
      FileChannel fileCh, long position, int count,
      LongWritable waitTime, LongWritable transferTime) throws IOException {
    final long begin = profilingEventHook.beforeFileIo(volume, TRANSFER, count);
    try {
      faultInjectorEventHook.beforeFileIo(volume, TRANSFER, count);
      sockOut.transferToFully(fileCh, position, count,
          waitTime, transferTime);
      profilingEventHook.afterFileIo(volume, TRANSFER, begin, count);
    } catch (Exception e) {
      String em = e.getMessage();
      if (em != null) {
        if (!em.startsWith("Broken pipe")
            && !em.startsWith("Connection reset")) {
          onFailure(volume, begin);
        }
      } else {
        onFailure(volume, begin);
      }
      throw e;
    }
  }

  /**
   * Create a file.
   * @param volume  target volume. null if unavailable.
   * @param f  File to be created.
   * @return  true if the file does not exist and was successfully created.
   *          false if the file already exists.
   * @throws IOException
   */
  public boolean createFile(
      @Nullable FsVolumeSpi volume, File f) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      boolean created = f.createNewFile();
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return created;
    } catch (Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Create a FileInputStream using
   * {@link FileInputStream#FileInputStream(File)}.
   *
   * Wraps the created input stream to intercept read calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @return  FileInputStream to the given file.
   * @throws  FileNotFoundException
   */
  public FileInputStream getFileInputStream(
      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    FileInputStream fis = null;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      fis = new WrappedFileInputStream(volume, f);
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return fis;
    } catch(Exception e) {
      IOUtils.closeStream(fis);
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Create a FileOutputStream using
   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
   *
   * Wraps the created output stream to intercept write calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @param append  if true, then bytes will be written to the end of the
   *                file rather than the beginning.
   * @return  FileOutputStream to the given file object.
   * @throws FileNotFoundException
   */
  public FileOutputStream getFileOutputStream(
      @Nullable FsVolumeSpi volume, File f,
      boolean append) throws FileNotFoundException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    FileOutputStream fos = null;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      fos = new WrappedFileOutputStream(volume, f, append);
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return fos;
    } catch(Exception e) {
      IOUtils.closeStream(fos);
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Create a FileOutputStream using
   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
   *
   * Wraps the created output stream to intercept write calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @return  FileOutputStream to the given file object.
   * @throws  FileNotFoundException
   */
  public FileOutputStream getFileOutputStream(
      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
    return getFileOutputStream(volume, f, false);
  }

  /**
   * Create a FileOutputStream using
   * {@link FileOutputStream#FileOutputStream(FileDescriptor)}.
   *
   * Wraps the created output stream to intercept write calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param fd  File descriptor object.
   * @return  FileOutputStream to the given file object.
   */
  public FileOutputStream getFileOutputStream(
      @Nullable FsVolumeSpi volume, FileDescriptor fd) {
    return new WrappedFileOutputStream(volume, fd);
  }

  /**
   * Create a FileInputStream using
   * {@link NativeIO#getShareDeleteFileDescriptor}.
   * Wraps the created input stream to intercept input calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @param offset  the offset position, measured in bytes from the
   *                beginning of the file, at which to set the file
   *                pointer.
   * @return FileOutputStream to the given file object.
   * @throws FileNotFoundException
   */
  public FileInputStream getShareDeleteFileInputStream(
      @Nullable FsVolumeSpi volume, File f,
      long offset) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    FileInputStream fis = null;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      fis = new WrappedFileInputStream(volume,
          NativeIO.getShareDeleteFileDescriptor(f, offset));
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return fis;
    } catch(Exception e) {
      IOUtils.closeStream(fis);
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Create a FileInputStream using
   * {@link FileInputStream#FileInputStream(File)} and position
   * it at the given offset.
   *
   * Wraps the created input stream to intercept read calls
   * before delegating to the wrapped stream.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @param offset  the offset position, measured in bytes from the
   *                beginning of the file, at which to set the file
   *                pointer.
   * @throws FileNotFoundException
   */
  public FileInputStream openAndSeek(
      @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    FileInputStream fis = null;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      fis = new WrappedFileInputStream(volume,
          FsDatasetUtil.openAndSeek(f, offset));
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return fis;
    } catch(Exception e) {
      IOUtils.closeStream(fis);
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Create a RandomAccessFile using
   * {@link RandomAccessFile#RandomAccessFile(File, String)}.
   *
   * Wraps the created input stream to intercept IO calls
   * before delegating to the wrapped RandomAccessFile.
   *
   * @param volume  target volume. null if unavailable.
   * @param f  File object.
   * @param mode  See {@link RandomAccessFile} for a description
   *              of the mode string.
   * @return RandomAccessFile representing the given file.
   * @throws FileNotFoundException
   */
  public RandomAccessFile getRandomAccessFile(
      @Nullable FsVolumeSpi volume, File f,
      String mode) throws FileNotFoundException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, OPEN);
    RandomAccessFile raf = null;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, OPEN);
      raf = new WrappedRandomAccessFile(volume, f, mode);
      profilingEventHook.afterMetadataOp(volume, OPEN, begin);
      return raf;
    } catch(Exception e) {
      IOUtils.closeStream(raf);
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Delete the given directory using {@link FileUtil#fullyDelete(File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param dir  directory to be deleted.
   * @return true on success false on failure.
   */
  public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
    final long begin = profilingEventHook.beforeMetadataOp(volume, DELETE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, DELETE);
      boolean deleted = FileUtil.fullyDelete(dir);
      LOG.trace("Deletion of dir {} {}", dir, deleted ? "succeeded" : "failed");
      profilingEventHook.afterMetadataOp(volume, DELETE, begin);
      return deleted;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Move the src file to the target using
   * {@link FileUtil#replaceFile(File, File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param src  source path.
   * @param target  target path.
   * @throws IOException
   */
  public void replaceFile(
      @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
      FileUtil.replaceFile(src, target);
      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Move the src file to the target using
   * {@link Storage#rename(File, File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param src  source path.
   * @param target  target path.
   * @throws IOException
   */
  public void rename(
      @Nullable FsVolumeSpi volume, File src, File target)
      throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
      Storage.rename(src, target);
      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Move the src file to the target using
   * {@link FileUtils#moveFile(File, File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param src  source path.
   * @param target  target path.
   * @throws IOException
   */
  public void moveFile(
      @Nullable FsVolumeSpi volume, File src, File target)
      throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
      FileUtils.moveFile(src, target);
      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Move the src file to the target using
   * {@link Files#move(Path, Path, CopyOption...)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param src  source path.
   * @param target  target path.
   * @param options  See {@link Files#move} for a description
   *                of the options.
   * @throws IOException
   */
  public void move(
      @Nullable FsVolumeSpi volume, Path src, Path target,
      CopyOption... options) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MOVE);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MOVE);
      Files.move(src, target, options);
      profilingEventHook.afterMetadataOp(volume, MOVE, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param src  an existing file to copy, must not be {@code null}
   * @param target  the new file, must not be {@code null}
   * @param preserveFileDate  true if the file date of the copy
   *                         should be the same as the original
   * @throws IOException
   */
  public void nativeCopyFileUnbuffered(
      @Nullable FsVolumeSpi volume, File src, File target,
      boolean preserveFileDate) throws IOException {
    final long length = src.length();
    final long begin = profilingEventHook.beforeFileIo(volume, NATIVE_COPY,
        length);
    try {
      faultInjectorEventHook.beforeFileIo(volume, NATIVE_COPY, length);
      Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
      profilingEventHook.afterFileIo(volume, NATIVE_COPY, begin, length);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * See {@link File#mkdirs()}.
   *
   * @param volume target volume. null if unavailable.
   * @param dir  directory to be created.
   * @return  true only if the directory was created. false if
   *          the directory already exists.
   * @throws IOException if a directory with the given name does
   *                     not exist and could not be created.
   */
  public boolean mkdirs(
      @Nullable FsVolumeSpi volume, File dir) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
    boolean created = false;
    boolean isDirectory;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
      created = dir.mkdirs();
      isDirectory = !created && dir.isDirectory();
      profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }

    if (!created && !isDirectory) {
      throw new IOException("Mkdirs failed to create " + dir);
    }
    return created;
  }

  /**
   * Create the target directory using {@link File#mkdirs()} only if
   * it doesn't exist already.
   *
   * @param volume  target volume. null if unavailable.
   * @param dir  directory to be created.
   * @throws IOException  if the directory could not created
   */
  public void mkdirsWithExistsCheck(
      @Nullable FsVolumeSpi volume, File dir) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, MKDIRS);
    boolean succeeded = false;
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS);
      succeeded = dir.isDirectory() || dir.mkdirs();
      profilingEventHook.afterMetadataOp(volume, MKDIRS, begin);
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }

    if (!succeeded) {
      throw new IOException("Mkdirs failed to create " + dir);
    }
  }

  /**
   * Get a listing of the given directory using
   * {@link FileUtil#listFiles(File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param dir  Directory to be listed.
   * @return  array of file objects representing the directory entries.
   * @throws IOException
   */
  public File[] listFiles(
      @Nullable FsVolumeSpi volume, File dir) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
      File[] children = FileUtil.listFiles(dir);
      profilingEventHook.afterMetadataOp(volume, LIST, begin);
      return children;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Get a listing of the given directory using
   * {@link FileUtil#listFiles(File)}.
   *
   * @param volume  target volume. null if unavailable.
   * @param   dir directory to be listed.
   * @return  array of strings representing the directory entries.
   * @throws IOException
   */
  public String[] list(
      @Nullable FsVolumeSpi volume, File dir) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
      String[] children = FileUtil.list(dir);
      profilingEventHook.afterMetadataOp(volume, LIST, begin);
      return children;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Get a listing of the given directory using
   * {@link IOUtils#listDirectory(File, FilenameFilter)}.
   *
   * @param volume target volume. null if unavailable.
   * @param dir Directory to list.
   * @param filter {@link FilenameFilter} to filter the directory entries.
   * @throws IOException
   */
  public List<String> listDirectory(
      @Nullable FsVolumeSpi volume, File dir,
      FilenameFilter filter) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
      List<String> children = IOUtils.listDirectory(dir, filter);
      profilingEventHook.afterMetadataOp(volume, LIST, begin);
      return children;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Retrieves the number of links to the specified file.
   *
   * @param volume target volume. null if unavailable.
   * @param f file whose link count is being queried.
   * @return number of hard-links to the given file, including the
   *         given path itself.
   * @throws IOException
   */
  public int getHardLinkCount(
      @Nullable FsVolumeSpi volume, File f) throws IOException {
    final long begin = profilingEventHook.beforeMetadataOp(volume, LIST);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, LIST);
      int count = HardLink.getLinkCount(f);
      profilingEventHook.afterMetadataOp(volume, LIST, begin);
      return count;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * Check for file existence using {@link File#exists()}.
   *
   * @param volume target volume. null if unavailable.
   * @param f file object.
   * @return true if the file exists.
   */
  public boolean exists(@Nullable FsVolumeSpi volume, File f) {
    final long begin = profilingEventHook.beforeMetadataOp(volume, EXISTS);
    try {
      faultInjectorEventHook.beforeMetadataOp(volume, EXISTS);
      boolean exists = f.exists();
      profilingEventHook.afterMetadataOp(volume, EXISTS, begin);
      return exists;
    } catch(Exception e) {
      onFailure(volume, begin);
      throw e;
    }
  }

  /**
   * A thin wrapper over {@link FileInputStream} that allows
   * instrumenting disk IO.
   */
  private final class WrappedFileInputStream extends FileInputStream {
    private @Nullable final FsVolumeSpi volume;

    /**
     * {@inheritDoc}.
     */
    private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f)
        throws FileNotFoundException {
      super(f);
      this.volume = volume;
    }

    /**
     * {@inheritDoc}.
     */
    private WrappedFileInputStream(
        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
      super(fd);
      this.volume = volume;
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public int read() throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, LEN_INT);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
        int b = super.read();
        profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
        return b;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public int read(@Nonnull byte[] b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, b
          .length);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
        int numBytesRead = super.read(b);
        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
        return numBytesRead;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public int read(@Nonnull byte[] b, int off, int len) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, len);
        int numBytesRead = super.read(b, off, len);
        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
        return numBytesRead;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }
  }

  /**
   * A thin wrapper over {@link FileOutputStream} that allows
   * instrumenting disk IO.
   */
  private final class WrappedFileOutputStream extends FileOutputStream {
    private @Nullable final FsVolumeSpi volume;

    /**
     * {@inheritDoc}.
     */
    private WrappedFileOutputStream(
        @Nullable FsVolumeSpi volume, File f,
        boolean append) throws FileNotFoundException {
      super(f, append);
      this.volume = volume;
    }

    /**
     * {@inheritDoc}.
     */
    private WrappedFileOutputStream(
        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
      super(fd);
      this.volume = volume;
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void write(int b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
          LEN_INT);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
        super.write(b);
        profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void write(@Nonnull byte[] b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
          .length);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
        super.write(b);
        profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void write(@Nonnull byte[] b, int off, int len) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
        super.write(b, off, len);
        profilingEventHook.afterFileIo(volume, WRITE, begin, len);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }
  }

  /**
   * A thin wrapper over {@link FileInputStream} that allows
   * instrumenting IO.
   */
  private final class WrappedRandomAccessFile extends RandomAccessFile {
    private @Nullable final FsVolumeSpi volume;

    public WrappedRandomAccessFile(
        @Nullable FsVolumeSpi volume, File f, String mode)
        throws FileNotFoundException {
      super(f, mode);
      this.volume = volume;
    }

    @Override
    public int read() throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, LEN_INT);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, LEN_INT);
        int b = super.read();
        profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT);
        return b;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, len);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, len);
        int numBytesRead = super.read(b, off, len);
        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
        return numBytesRead;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    @Override
    public int read(byte[] b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, READ, b
          .length);
      try {
        faultInjectorEventHook.beforeFileIo(volume, READ, b.length);
        int numBytesRead = super.read(b);
        profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead);
        return numBytesRead;
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    @Override
    public void write(int b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE,
          LEN_INT);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT);
        super.write(b);
        profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    @Override
    public void write(@Nonnull byte[] b) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, b
          .length);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length);
        super.write(b);
        profilingEventHook.afterFileIo(volume, WRITE, begin, b.length);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      final long begin = profilingEventHook.beforeFileIo(volume, WRITE, len);
      try {
        faultInjectorEventHook.beforeFileIo(volume, WRITE, len);
        super.write(b, off, len);
        profilingEventHook.afterFileIo(volume, WRITE, begin, len);
      } catch(Exception e) {
        onFailure(volume, begin);
        throw e;
      }
    }
  }

  private void onFailure(@Nullable FsVolumeSpi volume, long begin) {
    if (datanode != null && volume != null) {
      datanode.checkDiskErrorAsync(volume);
    }
    profilingEventHook.onFailure(volume, begin);
  }

  public ProfilingFileIoEvents getProfilingEventHook() {
    return profilingEventHook;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockPoolSliceStorage 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

0  赞