hadoop LocalReplica 源码

  • 2022-10-20
  • 浏览 (209)

haddop LocalReplica 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * This class is used for all replicas which are on local storage media
 * and hence, are backed by files.
 */
abstract public class LocalReplica extends ReplicaInfo {

  /**
   * Base directory containing numerically-identified sub directories and
   * possibly blocks.
   */
  private File baseDir;

  /**
   * Whether or not this replica's parent directory includes subdirs, in which
   * case we can generate them based on the replica's block ID
   */
  private boolean hasSubdirs;

  private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();

  static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class);

  /**
   * Constructor
   * @param block a block
   * @param vol volume where replica is located
   * @param dir directory path where block and meta files are located
   */
  LocalReplica(Block block, FsVolumeSpi vol, File dir) {
    this(block.getBlockId(), block.getNumBytes(),
        block.getGenerationStamp(), vol, dir);
  }

  /**
   * Constructor
   * @param blockId block id
   * @param len replica length
   * @param genStamp replica generation stamp
   * @param vol volume where replica is located
   * @param dir directory path where block and meta files are located
   */
  LocalReplica(long blockId, long len, long genStamp,
      FsVolumeSpi vol, File dir) {
    super(vol, blockId, len, genStamp);
    setDirInternal(dir);
  }

  /**
   * Copy constructor.
   * @param from the source replica
   */
  LocalReplica(LocalReplica from) {
    this(from, from.getVolume(), from.getDir());
  }

  /**
   * Get the full path of this replica's data file.
   * @return the full path of this replica's data file
   */
  @VisibleForTesting
  public File getBlockFile() {
    return new File(getDir(), getBlockName());
  }

  /**
   * Get the full path of this replica's meta file.
   * @return the full path of this replica's meta file
   */
  @VisibleForTesting
  public File getMetaFile() {
    return new File(getDir(),
        DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
  }

  /**
   * Return the parent directory path where this replica is located.
   * @return the parent directory path where this replica is located
   */
  protected File getDir() {
    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
        getBlockId()) : baseDir;
  }

  /**
   * Set the parent directory where this replica is located.
   * @param dir the parent directory where the replica is located
   */
  private void setDirInternal(File dir) {
    if (dir == null) {
      baseDir = null;
      return;
    }

    ReplicaDirInfo dirInfo = parseBaseDir(dir, getBlockId());
    this.hasSubdirs = dirInfo.hasSubidrs;

    synchronized (internedBaseDirs) {
      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
        // Create a new String path of this file and make a brand new File object
        // to guarantee we drop the reference to the underlying char[] storage.
        File baseDir = new File(dirInfo.baseDirPath);
        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
      }
      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
    }
  }

  @VisibleForTesting
  public static class ReplicaDirInfo {
    public String baseDirPath;
    public boolean hasSubidrs;

    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
      this.baseDirPath = baseDirPath;
      this.hasSubidrs = hasSubidrs;
    }
  }

  @VisibleForTesting
  public static ReplicaDirInfo parseBaseDir(File dir, long blockId) {
    File currentDir = dir;
    boolean hasSubdirs = false;
    while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
      hasSubdirs = true;
      currentDir = currentDir.getParentFile();
    }
    if (hasSubdirs) {
      // set baseDir to currentDir if it matches id(idToBlockDir).
      File idToBlockDir = DatanodeUtil.idToBlockDir(currentDir, blockId);
      if (idToBlockDir.equals(dir)) {
        return new ReplicaDirInfo(currentDir.getAbsolutePath(), true);
      }
    }
    return new ReplicaDirInfo(dir.getAbsolutePath(), false);
  }

  /**
   * Copy specified file into a temporary file. Then rename the
   * temporary file to the original name. This will cause any
   * hardlinks to the original file to be removed. The temporary
   * files are created in the same directory. The temporary files will
   * be recovered (especially on Windows) on datanode restart.
   */
  private void breakHardlinks(File file, Block b) throws IOException {
    final FileIoProvider fileIoProvider = getFileIoProvider();
    final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
        getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
    try {
      try (FileInputStream in = fileIoProvider.getFileInputStream(
          getVolume(), file)) {
        try (FileOutputStream out = fileIoProvider.getFileOutputStream(
            getVolume(), tmpFile)) {
          IOUtils.copyBytes(in, out, 16 * 1024);
        }
      }
      if (file.length() != tmpFile.length()) {
        throw new IOException("Copy of file " + file + " size " + file.length()
            + " into file " + tmpFile + " resulted in a size of "
            + tmpFile.length());
      }
      fileIoProvider.replaceFile(getVolume(), tmpFile, file);
    } catch (IOException e) {
      if (!fileIoProvider.delete(getVolume(), tmpFile)) {
        DataNode.LOG.info("detachFile failed to delete temporary file " +
                          tmpFile);
      }
      throw e;
    }
  }

  /**
   * This function "breaks hardlinks" to the current replica file.
   *
   * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
   * file.  This cleverly ensures that both the old and the new storage
   * directories can contain the same block file, without using additional space
   * for the data.
   *
   * However, when we want to append to the replica file, we need to "break" the
   * hardlink to ensure that the old snapshot continues to contain the old data
   * length.  If we failed to do that, we could roll back to the previous/
   * directory during a downgrade, and find that the block contents were longer
   * than they were at the time of upgrade.
   *
   * @return true only if data was copied.
   * @throws IOException
   */
  public boolean breakHardLinksIfNeeded() throws IOException {
    final File file = getBlockFile();
    final FileIoProvider fileIoProvider = getFileIoProvider();
    if (file == null || getVolume() == null) {
      throw new IOException("detachBlock:Block not found. " + this);
    }
    File meta = getMetaFile();

    int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
    if (linkCount > 1) {
      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
          "block " + this);
      breakHardlinks(file, this);
    }
    if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
      breakHardlinks(meta, this);
    }
    return true;
  }

  @Override
  public URI getBlockURI() {
    return getBlockFile().toURI();
  }

  @Override
  public InputStream getDataInputStream(long seekOffset) throws IOException {
    return getDataInputStream(getBlockFile(), seekOffset);
  }

  @Override
  public OutputStream getDataOutputStream(boolean append) throws IOException {
    return getFileIoProvider().getFileOutputStream(
        getVolume(), getBlockFile(), append);
  }

  @Override
  public boolean blockDataExists() {
    return getFileIoProvider().exists(getVolume(), getBlockFile());
  }

  @Override
  public boolean deleteBlockData() {
    return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
  }

  @Override
  public long getBlockDataLength() {
    return getBlockFile().length();
  }

  @Override
  public URI getMetadataURI() {
    return getMetaFile().toURI();
  }

  @Override
  public LengthInputStream getMetadataInputStream(long offset)
      throws IOException {
    final File meta = getMetaFile();
    return new LengthInputStream(
        getFileIoProvider().openAndSeek(getVolume(), meta, offset),
        meta.length());
  }

  @Override
  public OutputStream getMetadataOutputStream(boolean append)
      throws IOException {
    return new FileOutputStream(getMetaFile(), append);
  }

  @Override
  public boolean metadataExists() {
    return getFileIoProvider().exists(getVolume(), getMetaFile());
  }

  @Override
  public boolean deleteMetadata() {
    return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
  }

  @Override
  public long getMetadataLength() {
    return getMetaFile().length();
  }

  @Override
  public boolean renameMeta(URI destURI) throws IOException {
    return renameFile(getMetaFile(), new File(destURI));
  }

  @Override
  public boolean renameData(URI destURI) throws IOException {
    return renameFile(getBlockFile(), new File(destURI));
  }

  private boolean renameFile(File srcfile, File destfile) throws IOException {
    try {
      getFileIoProvider().rename(getVolume(), srcfile, destfile);
      return true;
    } catch (IOException e) {
      throw new IOException("Failed to move block file for " + this
          + " from " + srcfile + " to " + destfile.getAbsolutePath(), e);
    }
  }

  @Override
  public void updateWithReplica(StorageLocation replicaLocation) {
    // for local replicas, the replica location is assumed to be a file.
    File diskFile = null;
    try {
      diskFile = new File(replicaLocation.getUri());
    } catch (IllegalArgumentException e) {
      diskFile = null;
    }

    if (null == diskFile) {
      setDirInternal(null);
    } else {
      setDirInternal(diskFile.getParentFile());
    }
  }

  @Override
  public boolean getPinning(LocalFileSystem localFS) throws IOException {
    return getPinning(localFS, new Path(getBlockFile().getAbsolutePath()));
  }

  @Override
  public void setPinning(LocalFileSystem localFS) throws IOException {
    File f = getBlockFile();
    Path p = new Path(f.getAbsolutePath());
    setPinning(localFS, p);
  }

  @Override
  public void bumpReplicaGS(long newGS) throws IOException {
    long oldGS = getGenerationStamp();
    final File oldmeta = getMetaFile();
    setGenerationStamp(newGS);
    final File newmeta = getMetaFile();

    // rename meta file to new GS
    if (LOG.isDebugEnabled()) {
      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
    }
    try {
      // calling renameMeta on the ReplicaInfo doesn't work here
      getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
    } catch (IOException e) {
      setGenerationStamp(oldGS); // restore old GS
      throw new IOException("Block " + this + " reopen failed. " +
                            " Unable to move meta file  " + oldmeta +
                            " to " + newmeta, e);
    }
  }

  @Override
  public void truncateBlock(long newLength) throws IOException {
    truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
        getNumBytes(), newLength, getFileIoProvider());
  }

  @Override
  public int compareWith(ScanInfo info) {
    return info.getBlockFile().compareTo(getBlockFile());
  }

  @Override
  public void copyMetadata(URI destination) throws IOException {
    //for local replicas, we assume the destination URI is file
    getFileIoProvider().nativeCopyFileUnbuffered(
        getVolume(), getMetaFile(), new File(destination), true);
  }

  @Override
  public void copyBlockdata(URI destination) throws IOException {
    //for local replicas, we assume the destination URI is file
    getFileIoProvider().nativeCopyFileUnbuffered(
        getVolume(), getBlockFile(), new File(destination), true);
  }

  /**
   * Get input stream for a local file and optionally seek to the offset.
   * @param f path to the file
   * @param seekOffset offset to seek
   * @return input stream for read
   * @throws IOException
   */
  private FileInputStream getDataInputStream(File f, long seekOffset)
      throws IOException {
    FileInputStream fis;
    final FileIoProvider fileIoProvider = getFileIoProvider();
    if (NativeIO.isAvailable()) {
      fis = fileIoProvider.getShareDeleteFileInputStream(
          getVolume(), f, seekOffset);
    } else {
      try {
        fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
      } catch (FileNotFoundException fnfe) {
        throw new IOException("Expected block file at " + f +
            " does not exist.");
      }
    }
    return fis;
  }

  /**
   *  Get pin status of a file by checking the sticky bit.
   * @param localFS local file system
   * @param path path to be checked
   * @return true if the file is pinned with sticky bit
   * @throws IOException
   */
  public boolean getPinning(LocalFileSystem localFS, Path path) throws
      IOException {
    boolean stickyBit =
        localFS.getFileStatus(path).getPermission().getStickyBit();
    return stickyBit;
  }

  /**
   * Set sticky bit on path to pin file.
   * @param localFS local file system
   * @param path path to be pinned with sticky bit
   * @throws IOException
   */
  public void setPinning(LocalFileSystem localFS, Path path) throws
      IOException {
    FsPermission oldPermission = localFS.getFileStatus(path).getPermission();
    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
    localFS.setPermission(path, permission);
  }

  public static void truncateBlock(
      FsVolumeSpi volume, File blockFile, File metaFile,
      long oldlen, long newlen, FileIoProvider fileIoProvider)
      throws IOException {
    LOG.info("truncateBlock: blockFile=" + blockFile
        + ", metaFile=" + metaFile
        + ", oldlen=" + oldlen
        + ", newlen=" + newlen);

    if (newlen == oldlen) {
      return;
    }
    if (newlen > oldlen) {
      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
          + ") to newlen (=" + newlen + ")");
    }

    // fis is closed by BlockMetadataHeader.readHeader.
    final FileInputStream fis = fileIoProvider.getFileInputStream(
        volume, metaFile);
    DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
    int checksumsize = dcs.getChecksumSize();
    int bpc = dcs.getBytesPerChecksum();
    long n = (newlen - 1)/bpc + 1;
    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
    long lastchunkoffset = (n - 1)*bpc;
    int lastchunksize = (int)(newlen - lastchunkoffset);
    byte[] b = new byte[Math.max(lastchunksize, checksumsize)];

    try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
        volume, blockFile, "rw")) {
      //truncate blockFile
      blockRAF.setLength(newlen);

      //read last chunk
      blockRAF.seek(lastchunkoffset);
      blockRAF.readFully(b, 0, lastchunksize);
    }

    //compute checksum
    dcs.update(b, 0, lastchunksize);
    dcs.writeValue(b, 0, false);

    //update metaFile
    try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
        volume, metaFile, "rw")) {
      metaRAF.setLength(newmetalen);
      metaRAF.seek(newmetalen - checksumsize);
      metaRAF.write(b, 0, checksumsize);
    }
  }

  /**
   * Sync the parent directory changes to durable device.
   * @throws IOException
   */
  public void fsyncDirectory() throws IOException {
    File dir = getDir();
    try {
      getFileIoProvider().dirSync(getVolume(), getDir());
    } catch (IOException e) {
      throw new IOException("Failed to sync " + dir, e);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockPoolSliceStorage 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

0  赞