hadoop DataStorage 源码

  • 2022-10-20
  • 浏览 (198)

haddop DataStorage 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ComparisonChain;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;

/** 
 * Data storage information file.
 * <p>
 * @see Storage
 */
@InterfaceAudience.Private
public class DataStorage extends Storage {

  public final static String BLOCK_SUBDIR_PREFIX = "subdir";
  final static String STORAGE_DIR_DETACHED = "detach";
  public final static String STORAGE_DIR_RBW = "rbw";
  public final static String STORAGE_DIR_FINALIZED = "finalized";
  public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
  public final static String STORAGE_DIR_TMP = "tmp";

  /**
   * Set of bpids for which 'trash' is currently enabled.
   * When trash is enabled block files are moved under a separate
   * 'trash' folder instead of being deleted right away. This can
   * be useful during rolling upgrades, for example.
   * The set is backed by a concurrent HashMap.
   *
   * Even if trash is enabled, it is not used if a layout upgrade
   * is in progress for a storage directory i.e. if the previous
   * directory exists.
   */
  private Set<String> trashEnabledBpids;

  /**
   * Datanode UUID that this storage is currently attached to. This
   *  is the same as the legacy StorageID for datanodes that were
   *  upgraded from a pre-UUID version. For compatibility with prior
   *  versions of Datanodes we cannot make this field a UUID.
   */
  private volatile String datanodeUuid = null;
  
  // Maps block pool IDs to block pool storage
  private final Map<String, BlockPoolSliceStorage> bpStorageMap
      = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());


  DataStorage() {
    super(NodeType.DATA_NODE);
    trashEnabledBpids = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
  }
  
  public BlockPoolSliceStorage getBPStorage(String bpid) {
    return bpStorageMap.get(bpid);
  }
  
  public DataStorage(StorageInfo storageInfo) {
    super(storageInfo);
  }

  public String getDatanodeUuid() {
    return datanodeUuid;
  }

  public void setDatanodeUuid(String newDatanodeUuid) {
    this.datanodeUuid = newDatanodeUuid;
  }

  private static boolean createStorageID(StorageDirectory sd, int lv,
      Configuration conf) {
    // Clusters previously upgraded from layout versions earlier than
    // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
    // new storage ID. We check for that and fix it now.
    final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
        && DatanodeStorage.isValidStorageId(sd.getStorageUuid());
    return createStorageID(sd, !haveValidStorageId, conf);
  }

  /** Create an ID for this storage.
   * @return true if a new storage ID was generated.
   * */
  public static boolean createStorageID(
      StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) {
    final String oldStorageID = sd.getStorageUuid();
    if (sd.getStorageLocation() != null &&
        sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
      // Only one provided storage id is supported.
      // TODO support multiple provided storage ids
      sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
          DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
      return false;
    }
    if (oldStorageID == null || regenerateStorageIds) {
      sd.setStorageUuid(DatanodeStorage.generateUuid());
      LOG.info("Generated new storageID {} for directory {} {}", sd
              .getStorageUuid(), sd.getRoot(),
          (oldStorageID == null ? "" : (" to replace " + oldStorageID)));
      return true;
    }
    return false;
  }

  /**
   * Enable trash for the specified block pool storage. Even if trash is
   * enabled by the caller, it is superseded by the 'previous' directory
   * if a layout upgrade is in progress.
   */
  public void enableTrash(String bpid) {
    if (trashEnabledBpids.add(bpid)) {
      getBPStorage(bpid).stopTrashCleaner();
      LOG.info("Enabled trash for bpid {}",  bpid);
    }
  }

  public void clearTrash(String bpid) {
    if (trashEnabledBpids.contains(bpid)) {
      getBPStorage(bpid).clearTrash();
      trashEnabledBpids.remove(bpid);
      LOG.info("Cleared trash for bpid {}", bpid);
    }
  }

  public boolean trashEnabled(String bpid) {
    return trashEnabledBpids.contains(bpid);
  }

  public void setRollingUpgradeMarker(String bpid) throws IOException {
    getBPStorage(bpid).setRollingUpgradeMarkers(getStorageDirs());
  }

  public void clearRollingUpgradeMarker(String bpid) throws IOException {
    getBPStorage(bpid).clearRollingUpgradeMarkers(getStorageDirs());
  }

  /**
   * If rolling upgrades are in progress then do not delete block files
   * immediately. Instead we move the block files to an intermediate
   * 'trash' directory. If there is a subsequent rollback, then the block
   * files will be restored from trash.
   *
   * @return trash directory if rolling upgrade is in progress, null
   *         otherwise.
   */
  public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) {
    if (trashEnabledBpids.contains(bpid)) {
      return getBPStorage(bpid).getTrashDirectory(info);
    }
    return null;
  }

  /**
   * VolumeBuilder holds the metadata (e.g., the storage directories) of the
   * prepared volume returned from
   * {@link #prepareVolume(DataNode, StorageLocation, List)}.
   * Calling {@link VolumeBuilder#build()}
   * to add the metadata to {@link DataStorage} so that this prepared volume can
   * be active.
   */
  @InterfaceAudience.Private
  @InterfaceStability.Unstable
  static public class VolumeBuilder {
    private DataStorage storage;
    /** Volume level storage directory. */
    private StorageDirectory sd;
    /** Mapping from block pool ID to an array of storage directories. */
    private Map<String, List<StorageDirectory>> bpStorageDirMap =
        Maps.newHashMap();

    @VisibleForTesting
    public VolumeBuilder(DataStorage storage, StorageDirectory sd) {
      this.storage = storage;
      this.sd = sd;
    }

    public final StorageDirectory getStorageDirectory() {
      return this.sd;
    }

    private void addBpStorageDirectories(String bpid,
        List<StorageDirectory> dirs) {
      bpStorageDirMap.put(bpid, dirs);
    }

    /**
     * Add loaded metadata of a data volume to {@link DataStorage}.
     */
    public void build() {
      assert this.sd != null;
      synchronized (storage) {
        for (Map.Entry<String, List<StorageDirectory>> e :
            bpStorageDirMap.entrySet()) {
          final String bpid = e.getKey();
          BlockPoolSliceStorage bpStorage = this.storage.bpStorageMap.get(bpid);
          assert bpStorage != null;
          for (StorageDirectory bpSd : e.getValue()) {
            bpStorage.addStorageDir(bpSd);
          }
        }
        storage.addStorageDir(sd);
      }
    }
  }

  private StorageDirectory loadStorageDirectory(DataNode datanode,
      NamespaceInfo nsInfo, StorageLocation location, StartupOption startOpt,
      List<Callable<StorageDirectory>> callables) throws IOException {
    StorageDirectory sd = new StorageDirectory(null, false, location);
    try {
      StorageState curState = sd.analyzeStorage(startOpt, this, true);
      // sd is locked but not opened
      switch (curState) {
      case NORMAL:
        break;
      case NON_EXISTENT:
        LOG.info("Storage directory with location {} does not exist", location);
        throw new IOException("Storage directory with location " + location
            + " does not exist");
      case NOT_FORMATTED: // format
        LOG.info("Storage directory with location {} is not formatted for "
            + "namespace {}. Formatting...", location, nsInfo.getNamespaceID());
        format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
        break;
      default:  // recovery part is common
        sd.doRecover(curState);
      }

      // 2. Do transitions
      // Each storage directory is treated individually.
      // During startup some of them can upgrade or roll back
      // while others could be up-to-date for the regular startup.
      if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {

        // 3. Update successfully loaded storage.
        setServiceLayoutVersion(getServiceLayoutVersion());
        writeProperties(sd);
      }

      return sd;
    } catch (IOException ioe) {
      sd.unlock();
      throw ioe;
    }
  }

  /**
   * Prepare a storage directory. It creates a builder which can be used to add
   * to the volume. If the volume cannot be added, it is OK to discard the
   * builder later.
   *
   * @param datanode DataNode object.
   * @param location the StorageLocation for the storage directory.
   * @param nsInfos an array of namespace infos.
   * @return a VolumeBuilder that holds the metadata of this storage directory
   * and can be added to DataStorage later.
   * @throws IOException if encounters I/O errors.
   *
   * Note that if there is IOException, the state of DataStorage is not modified.
   */
  public VolumeBuilder prepareVolume(DataNode datanode,
      StorageLocation location, List<NamespaceInfo> nsInfos)
          throws IOException {
    if (containsStorageDir(location)) {
      final String errorMessage = "Storage directory is in use.";
      LOG.warn(errorMessage);
      throw new IOException(errorMessage);
    }

    StorageDirectory sd = loadStorageDirectory(
        datanode, nsInfos.get(0), location, StartupOption.HOTSWAP, null);
    VolumeBuilder builder =
        new VolumeBuilder(this, sd);
    for (NamespaceInfo nsInfo : nsInfos) {
      location.makeBlockPoolDir(nsInfo.getBlockPoolID(), null);

      final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
      final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
          nsInfo, location, StartupOption.HOTSWAP, null, datanode.getConf());
      builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
    }
    return builder;
  }

  static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
    final String key
        = DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
    final int n = conf.getInt(key, dataDirs);
    if (n < 1) {
      throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
    }
    final int min = Math.min(n, dataDirs);
    LOG.info("Using {} threads to upgrade data directories ({}={}, "
        + "dataDirs={})", min, key, n, dataDirs);
    return min;
  }

  static class UpgradeTask {
    private final StorageLocation dataDir;
    private final Future<StorageDirectory> future;

    UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
      this.dataDir = dataDir;
      this.future = future;
    }
  }

  /**
   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
   * format it, otherwise recover it from previous transitions if required.
   *
   * @param datanode the reference to DataNode.
   * @param nsInfo namespace information
   * @param dataDirs array of data storage directories
   * @param startOpt startup option
   * @return a list of successfully loaded storage directories.
   */
  @VisibleForTesting
  synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
      StartupOption startOpt) throws IOException {
    final int numThreads = getParallelVolumeLoadThreadsNum(
        dataDirs.size(), datanode.getConf());
    final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
    try {
      final List<StorageLocation> successLocations = loadDataStorage(
          datanode, nsInfo, dataDirs, startOpt, executor);

      if (successLocations.isEmpty()) {
        return Lists.newArrayList();
      }

      return loadBlockPoolSliceStorage(
          datanode, nsInfo, successLocations, startOpt, executor);
    } finally {
      executor.shutdown();
    }
  }

  private List<StorageLocation> loadDataStorage(DataNode datanode,
      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
      StartupOption startOpt, ExecutorService executor) throws IOException {
    final List<StorageLocation> success = Lists.newArrayList();
    final List<UpgradeTask> tasks = Lists.newArrayList();
    for (StorageLocation dataDir : dataDirs) {
      if (!containsStorageDir(dataDir)) {
        try {
          // It first ensures the datanode level format is completed.
          final List<Callable<StorageDirectory>> callables
              = Lists.newArrayList();
          final StorageDirectory sd = loadStorageDirectory(
              datanode, nsInfo, dataDir, startOpt, callables);
          if (callables.isEmpty()) {
            addStorageDir(sd);
            success.add(dataDir);
          } else {
            for(Callable<StorageDirectory> c : callables) {
              tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
            }
          }
        } catch (IOException e) {
          LOG.warn("Failed to add storage directory {}", dataDir, e);
        }
      } else {
        LOG.info("Storage directory {} has already been used.", dataDir);
        success.add(dataDir);
      }
    }

    if (!tasks.isEmpty()) {
      LOG.info("loadDataStorage: {} upgrade tasks", tasks.size());
      for(UpgradeTask t : tasks) {
        try {
          addStorageDir(t.future.get());
          success.add(t.dataDir);
        } catch (ExecutionException e) {
          LOG.warn("Failed to upgrade storage directory {}", t.dataDir, e);
        } catch (InterruptedException e) {
          throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
        }
      }
    }

    return success;
  }

  private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
      StartupOption startOpt, ExecutorService executor) throws IOException {
    final String bpid = nsInfo.getBlockPoolID();
    final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
    Map<StorageLocation, List<Callable<StorageDirectory>>> upgradeCallableMap =
        new HashMap<>();
    final List<StorageDirectory> success = Lists.newArrayList();
    final List<UpgradeTask> tasks = Lists.newArrayList();
    for (StorageLocation dataDir : dataDirs) {
      dataDir.makeBlockPoolDir(bpid, null);
      try {
        final List<Callable<StorageDirectory>> sdCallables =
            Lists.newArrayList();
        final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
            nsInfo, dataDir, startOpt, sdCallables, datanode.getConf());
        if (sdCallables.isEmpty()) {
          for(StorageDirectory sd : dirs) {
            success.add(sd);
          }
        } else {
          upgradeCallableMap.put(dataDir, sdCallables);
        }
      } catch (IOException e) {
        LOG.warn("Failed to add storage directory {} for block pool {}",
            dataDir, bpid, e);
      }
    }

    for (Map.Entry<StorageLocation, List<Callable<StorageDirectory>>> entry :
        upgradeCallableMap.entrySet()) {
      for(Callable<StorageDirectory> c : entry.getValue()) {
        tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c)));
      }
    }

    if (!tasks.isEmpty()) {
      LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size());
      for(UpgradeTask t : tasks) {
        try {
          success.add(t.future.get());
        } catch (ExecutionException e) {
          LOG.warn("Failed to upgrade storage directory {} for block pool {}",
              t.dataDir, bpid, e);
        } catch (InterruptedException e) {
          throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
        }
      }
    }

    return success;
  }

  /**
   * Remove storage dirs from DataStorage. All storage dirs are removed even when the
   * IOException is thrown.
   *
   * @param storageLocations a set of storage directories to be removed.
   * @throws IOException if I/O error when unlocking storage directory.
   */
  synchronized void removeVolumes(
      final Collection<StorageLocation> storageLocations)
      throws IOException {
    if (storageLocations.isEmpty()) {
      return;
    }

    StringBuilder errorMsgBuilder = new StringBuilder();
    for (Iterator<StorageDirectory> it = getStorageDirs().iterator();
         it.hasNext(); ) {
      StorageDirectory sd = it.next();
      StorageLocation sdLocation = sd.getStorageLocation();
      if (storageLocations.contains(sdLocation)) {
        // Remove the block pool level storage first.
        for (Map.Entry<String, BlockPoolSliceStorage> entry :
            this.bpStorageMap.entrySet()) {
          String bpid = entry.getKey();
          BlockPoolSliceStorage bpsStorage = entry.getValue();
          File bpRoot =
              BlockPoolSliceStorage.getBpRoot(bpid, sd.getCurrentDir());
          bpsStorage.remove(bpRoot.getAbsoluteFile());
        }

        getStorageDirs().remove(sd);
        try {
          sd.unlock();
        } catch (IOException e) {
          LOG.warn("I/O error attempting to unlock storage directory {}.",
              sd.getRoot(), e);
          errorMsgBuilder.append(String.format("Failed to remove %s: %s%n",
              sd.getRoot(), e.getMessage()));
        }
      }
    }
    if (errorMsgBuilder.length() > 0) {
      throw new IOException(errorMsgBuilder.toString());
    }
  }

  /**
   * Analyze storage directories for a specific block pool.
   * Recover from previous transitions if required.
   * Perform fs state transition if necessary depending on the namespace info.
   * Read storage info.
   * <br>
   * This method should be synchronized between multiple DN threads.  Only the
   * first DN thread does DN level storage dir recoverTransitionRead.
   *
   * @param datanode DataNode
   * @param nsInfo Namespace info of namenode corresponding to the block pool
   * @param dataDirs Storage directories
   * @param startOpt startup option
   * @throws IOException on error
   */
  void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
      Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
    if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
      throw new IOException("All specified directories have failed to load.");
    }
  }

  void format(StorageDirectory sd, NamespaceInfo nsInfo,
              String newDatanodeUuid, Configuration conf) throws IOException {
    sd.clearDirectory(); // create directory
    this.layoutVersion = DataNodeLayoutVersion.getCurrentLayoutVersion();
    this.clusterID = nsInfo.getClusterID();
    this.namespaceID = nsInfo.getNamespaceID();
    this.cTime = 0;
    setDatanodeUuid(newDatanodeUuid);

    createStorageID(sd, false, conf);
    writeProperties(sd);
  }

  /*
   * Set ClusterID, StorageID, StorageType, CTime into
   * DataStorage VERSION file.
   * Always called just before writing the properties to
   * the VERSION file.
  */
  @Override
  protected void setPropertiesFromFields(Properties props, 
                           StorageDirectory sd 
                           ) throws IOException {
    props.setProperty("storageType", storageType.toString());
    props.setProperty("clusterID", clusterID);
    props.setProperty("cTime", String.valueOf(cTime));
    props.setProperty("layoutVersion", String.valueOf(layoutVersion));
    props.setProperty("storageID", sd.getStorageUuid());

    String datanodeUuid = getDatanodeUuid();
    if (datanodeUuid != null) {
      props.setProperty("datanodeUuid", datanodeUuid);
    }

    // Set NamespaceID in version before federation
    if (!DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
      props.setProperty("namespaceID", String.valueOf(namespaceID));
    }
  }

  /*
   * Read ClusterID, StorageID, StorageType, CTime from 
   * DataStorage VERSION file and verify them.
   * Always called just after reading the properties from the VERSION file.
   */
  @Override
  protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
      throws IOException {
    setFieldsFromProperties(props, sd, false, 0);
  }

  private void setFieldsFromProperties(Properties props, StorageDirectory sd,
      boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
    if (props == null) {
      return;
    }
    if (overrideLayoutVersion) {
      this.layoutVersion = toLayoutVersion;
    } else {
      setLayoutVersion(props, sd);
    }
    setcTime(props, sd);
    checkStorageType(props, sd);
    setClusterId(props, layoutVersion, sd);
    
    // Read NamespaceID in version before federation
    if (!DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
      setNamespaceID(props, sd);
    }
    

    // valid storage id, storage id may be empty
    String ssid = props.getProperty("storageID");
    if (ssid == null) {
      throw new InconsistentFSStateException(sd.getRoot(), "file "
          + STORAGE_FILE_VERSION + " is invalid.");
    }
    String sid = sd.getStorageUuid();
    if (!(sid == null || sid.equals("") ||
          ssid.equals("") || sid.equals(ssid))) {
      throw new InconsistentFSStateException(sd.getRoot(),
          "has incompatible storage Id.");
    }

    if (sid == null) { // update id only if it was null
      sd.setStorageUuid(ssid);
    }

    // Update the datanode UUID if present.
    if (props.getProperty("datanodeUuid") != null) {
      String dnUuid = props.getProperty("datanodeUuid");

      if (getDatanodeUuid() == null) {
        setDatanodeUuid(dnUuid);
      } else if (getDatanodeUuid().compareTo(dnUuid) != 0) {
        throw new InconsistentFSStateException(sd.getRoot(),
            "Root " + sd.getRoot() + ": DatanodeUuid=" + dnUuid +
            ", does not match " + getDatanodeUuid() + " from other" +
            " StorageDirectory.");
      }
    }
  }

  @Override
  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
    File oldF = new File(sd.getRoot(), "storage");
    if (!oldF.exists()) {
      return false;
    }
    // check the layout version inside the storage file
    // Lock and Read old storage file
    try (RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
      FileLock oldLock = oldFile.getChannel().tryLock()) {
      if (null == oldLock) {
        LOG.error("Unable to acquire file lock on path {}", oldF);
        throw new OverlappingFileLockException();
      }
      oldFile.seek(0);
      int oldVersion = oldFile.readInt();
      if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {
        return false;
      }
    }
    return true;
  }
  
  /** Read VERSION file for rollback */
  void readProperties(StorageDirectory sd, int rollbackLayoutVersion)
      throws IOException {
    Properties props = readPropertiesFile(sd.getVersionFile());
    setFieldsFromProperties(props, sd, true, rollbackLayoutVersion);
  }

  /**
   * Analize which and whether a transition of the fs state is required
   * and perform it if necessary.
   * 
   * Rollback if the rollback startup option was specified.
   * Upgrade if this.LV > LAYOUT_VERSION
   * Regular startup if this.LV = LAYOUT_VERSION
   * 
   * @param sd  storage directory
   * @param nsInfo  namespace info
   * @param startOpt  startup option
   * @return true if the new properties has been written.
   */
  private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
      Configuration conf) throws IOException {
    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
      createStorageID(sd, layoutVersion, conf);
      return false; // regular start up for PROVIDED storage directories
    }
    if (startOpt == StartupOption.ROLLBACK) {
      doRollback(sd, nsInfo); // rollback if applicable
    }
    readProperties(sd);
    checkVersionUpgradable(this.layoutVersion);
    assert this.layoutVersion >=
        DataNodeLayoutVersion.getCurrentLayoutVersion() :
        "Future version is not allowed";
    
    boolean federationSupported = 
      DataNodeLayoutVersion.supports(
          LayoutVersion.Feature.FEDERATION, layoutVersion);
    // For pre-federation version - validate the namespaceID
    if (!federationSupported &&
        getNamespaceID() != nsInfo.getNamespaceID()) {
      throw new IOException("Incompatible namespaceIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
          + nsInfo.getNamespaceID() + "; datanode namespaceID = "
          + getNamespaceID());
    }
    
    // For version that supports federation, validate clusterID
    if (federationSupported
        && !getClusterID().equals(nsInfo.getClusterID())) {
      throw new IOException("Incompatible clusterIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode clusterID = "
          + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
    }

    // regular start up.
    if (this.layoutVersion == DataNodeLayoutVersion.getCurrentLayoutVersion()) {
      createStorageID(sd, layoutVersion, conf);
      return false; // need to write properties
    }

    // do upgrade
    if (this.layoutVersion > DataNodeLayoutVersion.getCurrentLayoutVersion()) {
      if (federationSupported) {
        // If the existing on-disk layout version supports federation,
        // simply update the properties.
        upgradeProperties(sd, conf);
      } else {
        doUpgradePreFederation(sd, nsInfo, callables, conf);
      }
      return true; // doUgrade already has written properties
    }
    
    // layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer
    // than the version supported by datanode. This should have been caught
    // in readProperties(), even if rollback was not carried out or somehow
    // failed.
    throw new IOException("BUG: The stored LV = " + this.getLayoutVersion()
        + " is newer than the supported LV = "
        + DataNodeLayoutVersion.getCurrentLayoutVersion());
  }

  /**
   * Upgrade from a pre-federation layout.
   * Move current storage into a backup directory,
   * and hardlink all its blocks into the new current directory.
   * 
   * Upgrade from pre-0.22 to 0.22 or later release e.g. 0.19/0.20/ => 0.22/0.23
   * <ul>
   * <li> If <SD>/previous exists then delete it </li>
   * <li> Rename <SD>/current to <SD>/previous.tmp </li>
   * <li>Create new <SD>/current/<bpid>/current directory<li>
   * <ul>
   * <li> Hard links for block files are created from <SD>/previous.tmp 
   * to <SD>/current/<bpid>/current </li>
   * <li> Saves new version file in <SD>/current/<bpid>/current directory </li>
   * </ul>
   * <li> Rename <SD>/previous.tmp to <SD>/previous </li>
   * </ul>
   * 
   * There should be only ONE namenode in the cluster for first 
   * time upgrade to 0.22
   * @param sd  storage directory
   */
  void doUpgradePreFederation(final StorageDirectory sd,
      final NamespaceInfo nsInfo,
      final List<Callable<StorageDirectory>> callables,
      final Configuration conf) throws IOException {
    final int oldLV = getLayoutVersion();
    LOG.info("Upgrading storage directory {}.\n old LV = {}; old CTime = {}"
            + ".\n new LV = {}; new CTime = {}", sd.getRoot(), oldLV,
        this.getCTime(), DataNodeLayoutVersion.getCurrentLayoutVersion(),
        nsInfo.getCTime());
    
    final File curDir = sd.getCurrentDir();
    final File prevDir = sd.getPreviousDir();
    final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);

    assert curDir.exists() : "Data node current directory must exist.";
    // Cleanup directory "detach"
    cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
    
    // 1. delete <SD>/previous dir before upgrading
    if (prevDir.exists())
      deleteDir(prevDir);
    // get previous.tmp directory, <SD>/previous.tmp
    final File tmpDir = sd.getPreviousTmp();
    assert !tmpDir.exists() : 
      "Data node previous.tmp directory must not exist.";
    
    // 2. Rename <SD>/current to <SD>/previous.tmp
    rename(curDir, tmpDir);
    
    // 3.1. Format BP
    File curBpDir = BlockPoolSliceStorage.getBpRoot(nsInfo.getBlockPoolID(), curDir);
    BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
    bpStorage.format(curDir, nsInfo);

    final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
    if (callables == null) {
      doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
    } else {
      callables.add(new Callable<StorageDirectory>() {
        @Override
        public StorageDirectory call() throws Exception {
          doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
          return sd;
        }
      });
    }
  }

  private void doUpgrade(final StorageDirectory sd,
      final NamespaceInfo nsInfo, final File prevDir,
      final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
      Configuration conf) throws IOException {
    // 3.2. Link block files from <SD>/previous.tmp to <SD>/current
    linkAllBlocks(tmpDir, bbwDir, toDir, oldLV, conf);

    // 4. Write version file under <SD>/current
    clusterID = nsInfo.getClusterID();
    upgradeProperties(sd, conf);
    
    // 5. Rename <SD>/previous.tmp to <SD>/previous
    rename(tmpDir, prevDir);
    LOG.info("Upgrade of {} is complete", sd.getRoot());
  }

  void upgradeProperties(StorageDirectory sd, Configuration conf)
      throws IOException {
    createStorageID(sd, layoutVersion, conf);
    LOG.info("Updating layout version from {} to {} for storage {}",
        layoutVersion, DataNodeLayoutVersion.getCurrentLayoutVersion(),
        sd.getRoot());
    layoutVersion = DataNodeLayoutVersion.getCurrentLayoutVersion();
    writeProperties(sd);
  }

  /**
   * Cleanup the detachDir. 
   * 
   * If the directory is not empty report an error; 
   * Otherwise remove the directory.
   * 
   * @param detachDir detach directory
   * @throws IOException if the directory is not empty or it can not be removed
   */
  private void cleanupDetachDir(File detachDir) throws IOException {
    if (!DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion) &&
        detachDir.exists() && detachDir.isDirectory() ) {
      
        if (FileUtil.list(detachDir).length != 0 ) {
          throw new IOException("Detached directory " + detachDir +
              " is not empty. Please manually move each file under this " +
              "directory to the finalized directory if the finalized " +
              "directory tree does not have the file.");
        } else if (!detachDir.delete()) {
          throw new IOException("Cannot remove directory " + detachDir);
        }
    }
  }
  
  /** 
   * Rolling back to a snapshot in previous directory by moving it to current
   * directory.
   * Rollback procedure:
   * <br>
   * If previous directory exists:
   * <ol>
   * <li> Rename current to removed.tmp </li>
   * <li> Rename previous to current </li>
   * <li> Remove removed.tmp </li>
   * </ol>
   * 
   * If previous directory does not exist and the current version supports
   * federation, perform a simple rollback of layout version. This does not
   * involve saving/restoration of actual data.
   */
  void doRollback( StorageDirectory sd,
                   NamespaceInfo nsInfo
                   ) throws IOException {
    File prevDir = sd.getPreviousDir();
    // This is a regular startup or a post-federation rollback
    if (!prevDir.exists()) {
      if (DataNodeLayoutVersion.supports(LayoutVersion.Feature.FEDERATION,
          DataNodeLayoutVersion.getCurrentLayoutVersion())) {
        readProperties(sd, DataNodeLayoutVersion.getCurrentLayoutVersion());
        writeProperties(sd);
        LOG.info("Layout version rolled back to {} for storage {}",
            DataNodeLayoutVersion.getCurrentLayoutVersion(), sd.getRoot());
      }
      return;
    }
    DataStorage prevInfo = new DataStorage();
    prevInfo.readPreviousVersionProperties(sd);

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    if (!(prevInfo.getLayoutVersion() >=
        DataNodeLayoutVersion.getCurrentLayoutVersion()
        && prevInfo.getCTime() <= nsInfo.getCTime())) {  // cannot rollback
      throw new InconsistentFSStateException(sd.getRoot(),
          "Cannot rollback to a newer state.\nDatanode previous state: LV = "
              + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
              + " is newer than the namespace state: LV = "
              + DataNodeLayoutVersion.getCurrentLayoutVersion() + " CTime = "
              + nsInfo.getCTime());
    }
    LOG.info("Rolling back storage directory {}.\n   target LV = {}; target "
            + "CTime = {}", sd.getRoot(),
        DataNodeLayoutVersion.getCurrentLayoutVersion(), nsInfo.getCTime());
    File tmpDir = sd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // rename current to tmp
    File curDir = sd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    // rename previous to current
    rename(prevDir, curDir);
    // delete tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of {} is complete", sd.getRoot());
  }
  
  /**
   * Finalize procedure deletes an existing snapshot.
   * <ol>
   * <li>Rename previous to finalized.tmp directory</li>
   * <li>Fully delete the finalized.tmp directory</li>
   * </ol>
   * 
   * Do nothing, if previous directory does not exist
   */
  void doFinalize(StorageDirectory sd) throws IOException {
    File prevDir = sd.getPreviousDir();
    if (!prevDir.exists())
      return; // already discarded
    
    final String dataDirPath = sd.getRoot().getCanonicalPath();
    LOG.info("Finalizing upgrade for storage directory {}.\n   cur LV = {}; "
        + "cur CTime = {}", dataDirPath, this.getLayoutVersion(), this
        .getCTime());
    assert sd.getCurrentDir().exists() : "Current directory must exist.";
    final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
    final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
    // 1. rename previous to finalized.tmp
    rename(prevDir, tmpDir);

    // 2. delete finalized.tmp dir in a separate thread
    // Also delete the blocksBeingWritten from HDFS 1.x and earlier, if
    // it exists.
    new Daemon(new Runnable() {
        @Override
        public void run() {
          try {
            deleteDir(tmpDir);
            if (bbwDir.exists()) {
              deleteDir(bbwDir);
            }
          } catch(IOException ex) {
            LOG.error("Finalize upgrade for " + dataDirPath + " failed", ex);
          }
          LOG.info("Finalize upgrade for " + dataDirPath + " is complete");
        }
        @Override
        public String toString() { return "Finalize " + dataDirPath; }
      }).start();
  }
  
  
  /*
   * Finalize the upgrade for a block pool
   * This also empties trash created during rolling upgrade and disables
   * trash functionality.
   */
  void finalizeUpgrade(String bpID) throws IOException {
    // To handle finalizing a snapshot taken at datanode level while
    // upgrading to federation, if datanode level snapshot previous exists, 
    // then finalize it. Else finalize the corresponding BP.
    for (StorageDirectory sd : getStorageDirs()) {
      File prevDir = sd.getPreviousDir();
      if (prevDir != null && prevDir.exists()) {
        // data node level storage finalize
        doFinalize(sd);
      } else {
        // block pool storage finalize using specific bpID
        BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpID);
        bpStorage.doFinalize(sd.getCurrentDir());
      }
    }
  }

  /**
   * Hardlink all finalized and RBW blocks in fromDir to toDir
   *
   * @param fromDir      The directory where the 'from' snapshot is stored
   * @param fromBbwDir   In HDFS 1.x, the directory where blocks
   *                     that are under construction are stored.
   * @param toDir        The current data directory
   *
   * @throws IOException If error occurs during hardlink
   */
  private static void linkAllBlocks(File fromDir, File fromBbwDir, File toDir,
      int diskLayoutVersion, Configuration conf) throws IOException {
    HardLink hardLink = new HardLink();
    // do the link
    if (DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
      // hardlink finalized blocks in tmpDir/finalized
      linkBlocks(fromDir, toDir, STORAGE_DIR_FINALIZED,
          diskLayoutVersion, hardLink, conf);
      // hardlink rbw blocks in tmpDir/rbw
      linkBlocks(fromDir, toDir, STORAGE_DIR_RBW,
          diskLayoutVersion, hardLink, conf);
    } else { // pre-RBW version
      // hardlink finalized blocks in tmpDir
      linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
          diskLayoutVersion, hardLink, conf);
      if (fromBbwDir.exists()) {
        /*
         * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
         * directory.  It's a little messy, because the blocksBeingWriten was
         * NOT underneath the 'current' directory in those releases.  See
         * HDFS-3731 for details.
         */
        linkBlocks(fromBbwDir, new File(toDir, STORAGE_DIR_RBW),
            diskLayoutVersion, hardLink, conf);
      }
    }
    LOG.info("Linked blocks from {} to {}. {}", fromDir, toDir, hardLink
        .linkStats.report());
  }

  private static class LinkArgs {
    private File srcDir;
    private File dstDir;
    private String blockFile;

    LinkArgs(File srcDir, File dstDir, String blockFile) {
      this.srcDir = srcDir;
      this.dstDir = dstDir;
      this.blockFile = blockFile;
    }

    public File src() {
      return new File(srcDir, blockFile);
    }

    public File dst() {
      return new File(dstDir, blockFile);
    }

    public String blockFile() {
      return blockFile;
    }
  }

  static void linkBlocks(File fromDir, File toDir, String subdir, int oldLV,
      HardLink hl, Configuration conf) throws IOException {
    linkBlocks(new File(fromDir, subdir), new File(toDir, subdir),
        oldLV, hl, conf);
  }

  private static void linkBlocks(File from, File to, int oldLV,
      HardLink hl, Configuration conf) throws IOException {
    LOG.info("Start linking block files from {} to {}", from, to);
    boolean upgradeToIdBasedLayout = false;
    // If we are upgrading from a version older than the one where we introduced
    // block ID-based layout (32x32) AND we're working with the finalized
    // directory, we'll need to upgrade from the old layout to the new one. The
    // upgrade path from pre-blockid based layouts (>-56) and blockid based
    // 256x256 layouts (-56) is fortunately the same.
    if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT_32_by_32
        .getInfo().getLayoutVersion()
        && to.getName().equals(STORAGE_DIR_FINALIZED)) {
      upgradeToIdBasedLayout = true;
    }

    final ArrayList<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
    final Map<File, File> pathCache = new HashMap<>();
    linkBlocksHelper(from, to, hl, upgradeToIdBasedLayout, to,
        idBasedLayoutSingleLinks, pathCache);

    // Detect and remove duplicate entries.
    final ArrayList<LinkArgs> duplicates =
        findDuplicateEntries(idBasedLayoutSingleLinks);
    if (!duplicates.isEmpty()) {
      LOG.error("There are {} duplicate block " +
          "entries within the same volume.", duplicates.size());
      removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
    }

    final int numLinkWorkers = conf.getInt(
        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
    ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
    final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
    List<Future<Void>> futures = Lists.newArrayList();
    for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
      final int iCopy = i;
      futures.add(linkWorkers.submit(new Callable<Void>() {
        @Override
        public Void call() throws IOException {
          int upperBound = Math.min(iCopy + step,
              idBasedLayoutSingleLinks.size());
          for (int j = iCopy; j < upperBound; j++) {
            LinkArgs cur = idBasedLayoutSingleLinks.get(j);
            HardLink.createHardLink(cur.src(), cur.dst());
          }
          return null;
        }
      }));
    }
    linkWorkers.shutdown();
    for (Future<Void> f : futures) {
      try {
        f.get();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e);
      }
    }
  }

  /**
   * Find duplicate entries with an array of LinkArgs.
   * Duplicate entries are entries with the same last path component.
   */
  static ArrayList<LinkArgs> findDuplicateEntries(ArrayList<LinkArgs> all) {
    // Find duplicates by sorting the list by the final path component.
    Collections.sort(all, new Comparator<LinkArgs>() {
      /**
       * Compare two LinkArgs objects, such that objects with the same
       * terminal source path components are grouped together.
       */
      @Override
      public int compare(LinkArgs a, LinkArgs b) {
        return ComparisonChain.start().
            compare(a.blockFile(), b.blockFile()).
            compare(a.src(), b.src()).
            compare(a.dst(), b.dst()).
            result();
      }
    });
    final ArrayList<LinkArgs> duplicates = Lists.newArrayList();
    Long prevBlockId = null;
    boolean prevWasMeta = false;
    boolean addedPrev = false;
    for (int i = 0; i < all.size(); i++) {
      LinkArgs args = all.get(i);
      long blockId = Block.getBlockId(args.blockFile());
      boolean isMeta = Block.isMetaFilename(args.blockFile());
      if ((prevBlockId == null) ||
          (prevBlockId.longValue() != blockId)) {
        prevBlockId = blockId;
        addedPrev = false;
      } else if (isMeta == prevWasMeta) {
        // If we saw another file for the same block ID previously,
        // and it had the same meta-ness as this file, we have a
        // duplicate.
        duplicates.add(args);
        if (!addedPrev) {
          duplicates.add(all.get(i - 1));
        }
        addedPrev = true;
      } else {
        addedPrev = false;
      }
      prevWasMeta = isMeta;
    }
    return duplicates;
  }

  /**
   * Remove duplicate entries from the list.
   * We do this by choosing:
   * 1. the entries with the highest genstamp (this takes priority),
   * 2. the entries with the longest block files,
   * 3. arbitrarily, if neither #1 nor #2 gives a clear winner.
   *
   * Block and metadata files form a pair-- if you take a metadata file from
   * one subdirectory, you must also take the block file from that
   * subdirectory.
   */
  private static void removeDuplicateEntries(ArrayList<LinkArgs> all,
                                             ArrayList<LinkArgs> duplicates) {
    // Maps blockId -> metadata file with highest genstamp
    TreeMap<Long, List<LinkArgs>> highestGenstamps =
        new TreeMap<Long, List<LinkArgs>>();
    for (LinkArgs duplicate : duplicates) {
      if (!Block.isMetaFilename(duplicate.blockFile())) {
        continue;
      }
      long blockId = Block.getBlockId(duplicate.blockFile());
      List<LinkArgs> prevHighest = highestGenstamps.get(blockId);
      if (prevHighest == null) {
        List<LinkArgs> highest = new LinkedList<LinkArgs>();
        highest.add(duplicate);
        highestGenstamps.put(blockId, highest);
        continue;
      }
      long prevGenstamp =
          Block.getGenerationStamp(prevHighest.get(0).blockFile());
      long genstamp = Block.getGenerationStamp(duplicate.blockFile());
      if (genstamp < prevGenstamp) {
        continue;
      }
      if (genstamp > prevGenstamp) {
        prevHighest.clear();
      }
      prevHighest.add(duplicate);
    }

    // Remove data / metadata entries that don't have the highest genstamp
    // from the duplicates list.
    for (Iterator<LinkArgs> iter = duplicates.iterator(); iter.hasNext(); ) {
      LinkArgs duplicate = iter.next();
      long blockId = Block.getBlockId(duplicate.blockFile());
      List<LinkArgs> highest = highestGenstamps.get(blockId);
      if (highest != null) {
        boolean found = false;
        for (LinkArgs high : highest) {
          if (high.src().getParent().equals(duplicate.src().getParent())) {
            found = true;
            break;
          }
        }
        if (!found) {
          LOG.warn("Unexpectedly low genstamp on {}.",
              duplicate.src().getAbsolutePath());
          iter.remove();
        }
      }
    }

    // Find the longest block files
    // We let the "last guy win" here, since we're only interested in
    // preserving one block file / metadata file pair.
    TreeMap<Long, LinkArgs> longestBlockFiles = new TreeMap<Long, LinkArgs>();
    for (LinkArgs duplicate : duplicates) {
      if (Block.isMetaFilename(duplicate.blockFile())) {
        continue;
      }
      long blockId = Block.getBlockId(duplicate.blockFile());
      LinkArgs prevLongest = longestBlockFiles.get(blockId);
      if (prevLongest == null) {
        longestBlockFiles.put(blockId, duplicate);
        continue;
      }
      long blockLength = duplicate.src().length();
      long prevBlockLength = prevLongest.src().length();
      if (blockLength < prevBlockLength) {
        LOG.warn("Unexpectedly short length on {}.",
            duplicate.src().getAbsolutePath());
        continue;
      }
      if (blockLength > prevBlockLength) {
        LOG.warn("Unexpectedly short length on {}.",
            prevLongest.src().getAbsolutePath());
      }
      longestBlockFiles.put(blockId, duplicate);
    }

    // Remove data / metadata entries that aren't the longest, or weren't
    // arbitrarily selected by us.
    for (Iterator<LinkArgs> iter = all.iterator(); iter.hasNext(); ) {
      LinkArgs args = iter.next();
      long blockId = Block.getBlockId(args.blockFile());
      LinkArgs bestDuplicate = longestBlockFiles.get(blockId);
      if (bestDuplicate == null) {
        continue; // file has no duplicates
      }
      if (!bestDuplicate.src().getParent().equals(args.src().getParent())) {
        LOG.warn("Discarding {}.", args.src().getAbsolutePath());
        iter.remove();
      }
    }
  }

  static void linkBlocksHelper(File from, File to, HardLink hl,
      boolean upgradeToIdBasedLayout, File blockRoot,
      List<LinkArgs> idBasedLayoutSingleLinks, Map<File, File> pathCache)
      throws IOException {
    if (!from.exists()) {
      return;
    }
    if (!from.isDirectory()) {
      HardLink.createHardLink(from, to);
      hl.linkStats.countSingleLinks++;
      return;
    }
    // from is a directory
    hl.linkStats.countDirs++;
    
    String[] blockNames = from.list(new java.io.FilenameFilter() {
      @Override
      public boolean accept(File dir, String name) {
        return name.startsWith(Block.BLOCK_FILE_PREFIX);
      }
    });

    // If we are upgrading to block ID-based layout, we don't want to recreate
    // any subdirs from the source that contain blocks, since we have a new
    // directory structure
    if (!upgradeToIdBasedLayout || !to.getName().startsWith(
        BLOCK_SUBDIR_PREFIX)) {
      if (!to.mkdirs())
        throw new IOException("Cannot create directory " + to);
    }

    // Block files just need hard links with the same file names
    // but a different directory
    if (blockNames.length > 0) {
      if (upgradeToIdBasedLayout) {
        for (String blockName : blockNames) {
          long blockId = Block.getBlockId(blockName);
          File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
          if (!blockLocation.exists()) {
            if (!blockLocation.mkdirs()) {
              throw new IOException("Failed to mkdirs " + blockLocation);
            }
          }
          /**
           * The destination path is 32x32, so 1024 distinct paths. Therefore
           * we cache the destination path and reuse the same File object on
           * potentially thousands of blocks located on this volume.
           * This method is called recursively so the cache is passed through
           * each recursive call. There is one cache per volume, and it is only
           * accessed by a single thread so no locking is needed.
           */
          File cachedDest = pathCache
              .computeIfAbsent(blockLocation, k -> blockLocation);
          idBasedLayoutSingleLinks.add(new LinkArgs(from,
              cachedDest, blockName));
          hl.linkStats.countSingleLinks++;
        }
      } else {
        HardLink.createHardLinkMult(from, blockNames, to);
        hl.linkStats.countMultLinks++;
        hl.linkStats.countFilesMultLinks += blockNames.length;
      }
    } else {
      hl.linkStats.countEmptyDirs++;
    }
    
    // Now take care of the rest of the files and subdirectories
    String[] otherNames = from.list(new java.io.FilenameFilter() {
        @Override
        public boolean accept(File dir, String name) {
          return name.startsWith(BLOCK_SUBDIR_PREFIX);
        }
      });

    if (otherNames != null) {
      for (int i = 0; i < otherNames.length; i++) {
        linkBlocksHelper(new File(from, otherNames[i]),
            new File(to, otherNames[i]), hl, upgradeToIdBasedLayout,
            blockRoot, idBasedLayoutSingleLinks, pathCache);
      }
    }
  }

  /**
   * Get the BlockPoolSliceStorage from {@link bpStorageMap}.
   * If the object is not found, create a new object and put it to the map.
   */
  synchronized BlockPoolSliceStorage getBlockPoolSliceStorage(
      final NamespaceInfo nsInfo) {
    final String bpid = nsInfo.getBlockPoolID();
    BlockPoolSliceStorage bpStorage = bpStorageMap.get(bpid);
    if (bpStorage == null) {
      bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), bpid,
            nsInfo.getCTime(), nsInfo.getClusterID());
      bpStorageMap.put(bpid, bpStorage);
    }
    return bpStorage;
  }

  synchronized void removeBlockPoolStorage(String bpId) {
    bpStorageMap.remove(bpId);
  }

  /**
   * Prefer FileIoProvider#fullydelete.
   * @param dir
   * @return
   */
  @Deprecated
  public static boolean fullyDelete(final File dir) {
    boolean result = FileUtil.fullyDelete(dir);
    return result;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockPoolSliceStorage 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

0  赞