hadoop BlockPoolSliceStorage 源码

  • 2022-10-20
  • 浏览 (238)

haddop BlockPoolSliceStorage 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

/**
 * Manages storage for the set of BlockPoolSlices which share a particular 
 * block pool id, on this DataNode.
 * 
 * This class supports the following functionality:
 * <ul>
 * <li> Formatting a new block pool storage</li>
 * <li> Recovering a storage state to a consistent state (if possible)</li>
 * <li> Taking a snapshot of the block pool during upgrade</li>
 * <li> Rolling back a block pool to a previous snapshot</li>
 * <li> Finalizing block storage by deletion of a snapshot</li>
 * </ul>
 * 
 * @see Storage
 */
@InterfaceAudience.Private
public class BlockPoolSliceStorage extends Storage {
  static final String TRASH_ROOT_DIR = "trash";

  /**
   * A marker file that is created on each root directory if a rolling upgrade
   * is in progress. The NN does not inform the DN when a rolling upgrade is
   * finalized. All the DN can infer is whether or not a rolling upgrade is
   * currently in progress. When the rolling upgrade is not in progress:
   *   1. If the marker file is present, then a rolling upgrade just completed.
   *      If a 'previous' directory exists, it can be deleted now.
   *   2. If the marker file is absent, then a regular upgrade may be in
   *      progress. Do not delete the 'previous' directory.
   */
  static final String ROLLING_UPGRADE_MARKER_FILE = "RollingUpgradeInProgress";

  private static final String BLOCK_POOL_ID_PATTERN_BASE =
      Pattern.quote(File.separator) +
      "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" +
      Pattern.quote(File.separator);

  private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern.compile(
      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(.*)$");

  private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = Pattern.compile(
      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + STORAGE_DIR_CURRENT + ")(.*)$");

  private static final Pattern BLOCK_POOL_TRASH_PATH_PATTERN = Pattern.compile(
      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + TRASH_ROOT_DIR + ")(.*)$");

  private String blockpoolID = ""; // id of the blockpool
  private Daemon trashCleaner;

  public BlockPoolSliceStorage(StorageInfo storageInfo, String bpid) {
    super(storageInfo);
    blockpoolID = bpid;
  }

  /**
   * These maps are used as an optimization to avoid one filesystem operation
   * per storage on each heartbeat response.
   */
  private static Set<String> storagesWithRollingUpgradeMarker;
  private static Set<String> storagesWithoutRollingUpgradeMarker;

  BlockPoolSliceStorage(int namespaceID, String bpID, long cTime,
      String clusterId) {
    super(NodeType.DATA_NODE);
    this.namespaceID = namespaceID;
    this.blockpoolID = bpID;
    this.cTime = cTime;
    this.clusterID = clusterId;
    storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
    storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
  }

  private BlockPoolSliceStorage() {
    super(NodeType.DATA_NODE);
    storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
    storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
  }

  // Expose visibility for VolumeBuilder#commit().
  public void addStorageDir(StorageDirectory sd) {
    super.addStorageDir(sd);
  }

  /**
   * Load one storage directory. Recover from previous transitions if required.
   * @param nsInfo  namespace information
   * @param location  the root path of the storage directory
   * @param startOpt  startup option
   * @param callables list of callable storage directory
   * @param conf configuration
   * @return
   * @throws IOException
   */
  private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
      StorageLocation location, StartupOption startOpt,
      List<Callable<StorageDirectory>> callables, Configuration conf)
          throws IOException {
    StorageDirectory sd = new StorageDirectory(
        nsInfo.getBlockPoolID(), null, true, location);
    try {
      StorageState curState = sd.analyzeStorage(startOpt, this, true);
      // sd is locked but not opened
      switch (curState) {
      case NORMAL:
        break;
      case NON_EXISTENT:
        LOG.info("Block pool storage directory for location {} and block pool"
            + " id {} does not exist", location, nsInfo.getBlockPoolID());
        throw new IOException("Storage directory for location " + location +
            " and block pool id " + nsInfo.getBlockPoolID() +
            " does not exist");
      case NOT_FORMATTED: // format
        LOG.info("Block pool storage directory for location {} and block pool"
                + " id {} is not formatted. Formatting ...", location,
            nsInfo.getBlockPoolID());
        format(sd, nsInfo);
        break;
      default:  // recovery part is common
        sd.doRecover(curState);
      }

      // 2. Do transitions
      // Each storage directory is treated individually.
      // During startup some of them can upgrade or roll back
      // while others could be up-to-date for the regular startup.
      if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {

        // 3. Check CTime and update successfully loaded storage.
        if (getCTime() != nsInfo.getCTime()) {
          throw new IOException("Datanode CTime (=" + getCTime()
              + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
        }
        setServiceLayoutVersion(getServiceLayoutVersion());
        writeProperties(sd);
      }

      return sd;
    } catch (IOException ioe) {
      sd.unlock();
      throw ioe;
    }
  }

  /**
   * Analyze and load storage directories. Recover from previous transitions if
   * required.
   *
   * The block pool storages are either all analyzed or none of them is loaded.
   * Therefore, a failure on loading any block pool storage results a faulty
   * data volume.
   *
   * @param nsInfo namespace information
   * @param location storage directories of block pool
   * @param startOpt startup option
   * @param callables list of callable storage directory
   * @param conf configuration
   * @return an array of loaded block pool directories.
   * @throws IOException on error
   */
  List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
      StorageLocation location, StartupOption startOpt,
      List<Callable<StorageDirectory>> callables, Configuration conf)
          throws IOException {
    List<StorageDirectory> succeedDirs = Lists.newArrayList();
    try {
      if (containsStorageDir(location, nsInfo.getBlockPoolID())) {
        throw new IOException(
            "BlockPoolSliceStorage.recoverTransitionRead: " +
                "attempt to load an used block storage: " + location);
      }
      final StorageDirectory sd = loadStorageDirectory(
          nsInfo, location, startOpt, callables, conf);
      succeedDirs.add(sd);
    } catch (IOException e) {
      LOG.warn("Failed to analyze storage directories for block pool {}",
          nsInfo.getBlockPoolID(), e);
      throw e;
    }
    return succeedDirs;
  }

  /**
   * Analyze storage directories. Recover from previous transitions if required.
   *
   * The block pool storages are either all analyzed or none of them is loaded.
   * Therefore, a failure on loading any block pool storage results a faulty
   * data volume.
   *
   * @param nsInfo namespace information
   * @param location storage directories of block pool
   * @param startOpt startup option
   * @param callables list of callable storage directory
   * @param conf configuration
   * @throws IOException on error
   */
  List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
      StorageLocation location, StartupOption startOpt,
      List<Callable<StorageDirectory>> callables, Configuration conf)
          throws IOException {
    LOG.info("Analyzing storage directories for bpid {}", nsInfo
        .getBlockPoolID());
    final List<StorageDirectory> loaded = loadBpStorageDirectories(
        nsInfo, location, startOpt, callables, conf);
    for (StorageDirectory sd : loaded) {
      addStorageDir(sd);
    }
    return loaded;
  }

  /**
   * Format a block pool slice storage. 
   * @param dnCurDir DataStorage current directory
   * @param nsInfo the name space info
   * @throws IOException Signals that an I/O exception has occurred.
   */
  void format(File dnCurDir, NamespaceInfo nsInfo) throws IOException {
    File curBpDir = getBpRoot(nsInfo.getBlockPoolID(), dnCurDir);
    StorageDirectory bpSdir = new StorageDirectory(curBpDir);
    format(bpSdir, nsInfo);
  }

  /**
   * Format a block pool slice storage. 
   * @param bpSdir the block pool storage
   * @param nsInfo the name space info
   * @throws IOException Signals that an I/O exception has occurred.
   */
  private void format(StorageDirectory bpSdir, NamespaceInfo nsInfo) throws IOException {
    LOG.info("Formatting block pool {} directory {}", blockpoolID, bpSdir
        .getCurrentDir());
    bpSdir.clearDirectory(); // create directory
    this.layoutVersion = DataNodeLayoutVersion.getCurrentLayoutVersion();
    this.cTime = nsInfo.getCTime();
    this.namespaceID = nsInfo.getNamespaceID();
    this.blockpoolID = nsInfo.getBlockPoolID();
    writeProperties(bpSdir);
  }

  /**
   * Remove block pool level storage directory.
   * @param absPathToRemove the absolute path of the root for the block pool
   *                        level storage to remove.
   */
  void remove(File absPathToRemove) {
    Preconditions.checkArgument(absPathToRemove.isAbsolute());
    LOG.info("Removing block level storage: {}", absPathToRemove);
    for (Iterator<StorageDirectory> it = getStorageDirs().iterator();
         it.hasNext(); ) {
      StorageDirectory sd = it.next();
      if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
        getStorageDirs().remove(sd);
        break;
      }
    }
  }

  /**
   * Set layoutVersion, namespaceID and blockpoolID into block pool storage
   * VERSION file
   */
  @Override
  protected void setPropertiesFromFields(Properties props, StorageDirectory sd)
      throws IOException {
    props.setProperty("layoutVersion", String.valueOf(layoutVersion));
    props.setProperty("namespaceID", String.valueOf(namespaceID));
    props.setProperty("blockpoolID", blockpoolID);
    props.setProperty("cTime", String.valueOf(cTime));
  }

  /** Validate and set block pool ID */
  private void setBlockPoolID(File storage, String bpid)
      throws InconsistentFSStateException {
    if (bpid == null || bpid.equals("")) {
      throw new InconsistentFSStateException(storage, "file "
          + STORAGE_FILE_VERSION + " is invalid.");
    }
    
    if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
      throw new InconsistentFSStateException(storage,
          "Unexpected blockpoolID " + bpid + ". Expected " + blockpoolID);
    }
    blockpoolID = bpid;
  }
  
  @Override
  protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
      throws IOException {
    setLayoutVersion(props, sd);
    setNamespaceID(props, sd);
    setcTime(props, sd);
    
    String sbpid = props.getProperty("blockpoolID");
    setBlockPoolID(sd.getRoot(), sbpid);
  }

  /**
   * Analyze whether a transition of the BP state is required and
   * perform it if necessary.
   * <br>
   * Rollback if:
   * previousLV &gt;= LAYOUT_VERSION && prevCTime &lt;= namenode.cTime.
   * Upgrade if:
   * this.LV &gt; LAYOUT_VERSION || this.cTime &lt; namenode.cTime
   * Regular startup if:
   * this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
   * 
   * @param sd storage directory @{literal <SD>/current/<bpid>}
   * @param nsInfo namespace info
   * @param startOpt startup option
   * @param callables list of callable storage directory
   * @param conf configuration
   * @return true if the new properties has been written.
   */
  private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
      Configuration conf) throws IOException {
    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
      return false; // regular startup for PROVIDED storage directories
    }
    if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
      Preconditions.checkState(!getTrashRootDir(sd).exists(),
          sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
          " both be present.");
      doRollback(sd, nsInfo); // rollback if applicable
    } else if (startOpt == StartupOption.ROLLBACK &&
        !sd.getPreviousDir().exists()) {
      // Restore all the files in the trash. The restored files are retained
      // during rolling upgrade rollback. They are deleted during rolling
      // upgrade downgrade.
      int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
      LOG.info("Restored {} block files from trash.", restored);
    }
    readProperties(sd);
    checkVersionUpgradable(this.layoutVersion);
    assert this.layoutVersion >= DataNodeLayoutVersion.getCurrentLayoutVersion()
       : "Future version is not allowed";
    if (getNamespaceID() != nsInfo.getNamespaceID()) {
      throw new IOException("Incompatible namespaceIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode namespaceID = "
          + nsInfo.getNamespaceID() + "; datanode namespaceID = "
          + getNamespaceID());
    }
    if (!blockpoolID.equals(nsInfo.getBlockPoolID())) {
      throw new IOException("Incompatible blockpoolIDs in "
          + sd.getRoot().getCanonicalPath() + ": namenode blockpoolID = "
          + nsInfo.getBlockPoolID() + "; datanode blockpoolID = "
          + blockpoolID);
    }
    if (this.layoutVersion == DataNodeLayoutVersion.getCurrentLayoutVersion()
        && this.cTime == nsInfo.getCTime()) {
      return false; // regular startup
    }
    if (this.layoutVersion > DataNodeLayoutVersion.getCurrentLayoutVersion()) {
      int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
      LOG.info("Restored {} block files from trash " +
          "before the layout upgrade. These blocks will be moved to " +
          "the previous directory during the upgrade", restored);
    }
    if (this.layoutVersion > DataNodeLayoutVersion.getCurrentLayoutVersion()
        || this.cTime < nsInfo.getCTime()) {
      doUpgrade(sd, nsInfo, callables, conf); // upgrade
      return true;
    }
    // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
    // must shutdown
    throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
        + " CTime = " + this.getCTime()
        + " is newer than the namespace state: LV = "
        + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
  }

  /**
   * Upgrade to any release after 0.22 (0.22 included) release
   * e.g. 0.22 =&gt; 0.23
   * Upgrade procedure is as follows:
   * <ol>
   * <li>If {@literal <SD>/current/<bpid>/previous} exists then delete it</li>
   * <li>Rename {@literal <SD>/current/<bpid>/current} to
   * {@literal <SD>/current/bpid/current/previous.tmp}</li>
   * <li>Create new {@literal <SD>current/<bpid>/current} directory</li>
   * <li>Hard links for block files are created from previous.tmp to current</li>
   * <li>Save new version file in current directory</li>
   * <li>Rename previous.tmp to previous</li>
   * </ol>
   * 
   * @param bpSd storage directory {@literal <SD>/current/<bpid>}
   * @param nsInfo Namespace Info from the namenode
   * @throws IOException on error
   */
  private void doUpgrade(final StorageDirectory bpSd,
      final NamespaceInfo nsInfo,
      final List<Callable<StorageDirectory>> callables,
      final Configuration conf) throws IOException {
    // Upgrading is applicable only to release with federation or after
    if (!DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
      return;
    }
    // no upgrades for storage directories that are PROVIDED
    if (bpSd.getRoot() == null) {
      return;
    }
    final int oldLV = getLayoutVersion();
    LOG.info("Upgrading block pool storage directory {}.\n   old LV = {}; old"
        + " CTime = {}.\n   new LV = {}; new CTime = {}",
        bpSd.getRoot(), oldLV, this.getCTime(),
        DataNodeLayoutVersion.getCurrentLayoutVersion(), nsInfo.getCTime());
    // get <SD>/previous directory
    String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
    StorageDirectory dnSdStorage = new StorageDirectory(new File(dnRoot));
    File dnPrevDir = dnSdStorage.getPreviousDir();
    
    // If <SD>/previous directory exists delete it
    if (dnPrevDir.exists()) {
      deleteDir(dnPrevDir);
    }
    final File bpCurDir = bpSd.getCurrentDir();
    final File bpPrevDir = bpSd.getPreviousDir();
    assert bpCurDir.exists() : "BP level current directory must exist.";
    cleanupDetachDir(new File(bpCurDir, DataStorage.STORAGE_DIR_DETACHED));
    
    // 1. Delete <SD>/current/<bpid>/previous dir before upgrading
    if (bpPrevDir.exists()) {
      deleteDir(bpPrevDir);
    }
    final File bpTmpDir = bpSd.getPreviousTmp();
    assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
    
    // 2. Rename <SD>/current/<bpid>/current to
    //    <SD>/current/<bpid>/previous.tmp
    rename(bpCurDir, bpTmpDir);
    
    final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
    if (callables == null) {
      doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
    } else {
      callables.add(new Callable<StorageDirectory>() {
        @Override
        public StorageDirectory call() throws Exception {
          doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
              conf);
          return bpSd;
        }
      });
    }
  }

  private void doUpgrade(String name, final StorageDirectory bpSd,
      NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
      final File bpCurDir, final int oldLV, Configuration conf)
          throws IOException {
    // 3. Create new <SD>/current with block files hardlinks and VERSION
    linkAllBlocks(bpTmpDir, bpCurDir, oldLV, conf);
    this.layoutVersion = DataNodeLayoutVersion.getCurrentLayoutVersion();
    assert this.namespaceID == nsInfo.getNamespaceID() 
        : "Data-node and name-node layout versions must be the same.";
    this.cTime = nsInfo.getCTime();
    writeProperties(bpSd);
    
    // 4.rename <SD>/current/<bpid>/previous.tmp to
    // <SD>/current/<bpid>/previous
    rename(bpTmpDir, bpPrevDir);
    LOG.info("Upgrade of {} is complete", name);
  }

  /**
   * Cleanup the detachDir.
   * 
   * If the directory is not empty report an error; Otherwise remove the
   * directory.
   * 
   * @param detachDir detach directory
   * @throws IOException if the directory is not empty or it can not be removed
   */
  private void cleanupDetachDir(File detachDir) throws IOException {
    if (!DataNodeLayoutVersion.supports(
        LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion)
        && detachDir.exists() && detachDir.isDirectory()) {

      if (FileUtil.list(detachDir).length != 0) {
        throw new IOException("Detached directory " + detachDir
            + " is not empty. Please manually move each file under this "
            + "directory to the finalized directory if the finalized "
            + "directory tree does not have the file.");
      } else if (!detachDir.delete()) {
        throw new IOException("Cannot remove directory " + detachDir);
      }
    }
  }

  /**
   * Restore all files from the trash directory to their corresponding
   * locations under current/
   */
  private int restoreBlockFilesFromTrash(File trashRoot)
      throws  IOException {
    int filesRestored = 0;
    File[] children = trashRoot.exists() ? trashRoot.listFiles() : null;
    if (children == null) {
      return 0;
    }

    File restoreDirectory = null;
    for (File child : children) {
      if (child.isDirectory()) {
        // Recurse to process subdirectories.
        filesRestored += restoreBlockFilesFromTrash(child);
        continue;
      }

      if (restoreDirectory == null) {
        restoreDirectory = new File(getRestoreDirectory(child));
        if (!restoreDirectory.exists() && !restoreDirectory.mkdirs()) {
          throw new IOException("Failed to create directory " + restoreDirectory);
        }
      }

      final File newChild = new File(restoreDirectory, child.getName());

      if (newChild.exists() && newChild.length() >= child.length()) {
        // Failsafe - we should not hit this case but let's make sure
        // we never overwrite a newer version of a block file with an
        // older version.
        LOG.info("Not overwriting {} with smaller file from " +
            "trash directory. This message can be safely ignored.", newChild);
      } else if (!child.renameTo(newChild)) {
        throw new IOException("Failed to rename " + child + " to " + newChild);
      } else {
        ++filesRestored;
      }
    }
    FileUtil.fullyDelete(trashRoot);
    return filesRestored;
  }

  /*
   * Roll back to old snapshot at the block pool level
   * If previous directory exists: 
   * <ol>
   * <li>Rename <SD>/current/<bpid>/current to removed.tmp</li>
   * <li>Rename * <SD>/current/<bpid>/previous to current</li>
   * <li>Remove removed.tmp</li>
   * </ol>
   * 
   * Do nothing if previous directory does not exist.
   * @param bpSd Block pool storage directory at <SD>/current/<bpid>
   */
  void doRollback(StorageDirectory bpSd, NamespaceInfo nsInfo)
      throws IOException {
    File prevDir = bpSd.getPreviousDir();
    // regular startup if previous dir does not exist
    if (prevDir == null || !prevDir.exists()) {
      return;
    }
    // read attributes out of the VERSION file of previous directory
    BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
    prevInfo.readPreviousVersionProperties(bpSd);

    // We allow rollback to a state, which is either consistent with
    // the namespace state or can be further upgraded to it.
    // In another word, we can only roll back when ( storedLV >= software LV)
    // && ( DN.previousCTime <= NN.ctime)
    if (!(prevInfo.getLayoutVersion() >=
        DataNodeLayoutVersion.getCurrentLayoutVersion() &&
        prevInfo.getCTime() <= nsInfo.getCTime())) { // cannot rollback
      throw new InconsistentFSStateException(bpSd.getRoot(),
          "Cannot rollback to a newer state.\nDatanode previous state: LV = "
              + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
              + " is newer than the namespace state: LV = "
              + DataNodeLayoutVersion.getCurrentLayoutVersion() + " CTime = "
              + nsInfo.getCTime());
    }

    LOG.info("Rolling back storage directory {}.\n   target LV = {}; target "
            + "CTime = {}", bpSd.getRoot(), nsInfo.getLayoutVersion(),
        nsInfo.getCTime());
    File tmpDir = bpSd.getRemovedTmp();
    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
    // 1. rename current to tmp
    File curDir = bpSd.getCurrentDir();
    assert curDir.exists() : "Current directory must exist.";
    rename(curDir, tmpDir);
    
    // 2. rename previous to current
    rename(prevDir, curDir);
    
    // 3. delete removed.tmp dir
    deleteDir(tmpDir);
    LOG.info("Rollback of {} is complete", bpSd.getRoot());
  }

  /*
   * Finalize the block pool storage by deleting <BP>/previous directory
   * that holds the snapshot.
   */
  void doFinalize(File dnCurDir) throws IOException {
    if (dnCurDir == null) {
      return; //we do nothing if the directory is null
    }
    File bpRoot = getBpRoot(blockpoolID, dnCurDir);
    StorageDirectory bpSd = new StorageDirectory(bpRoot);
    // block pool level previous directory
    File prevDir = bpSd.getPreviousDir();
    if (!prevDir.exists()) {
      return; // already finalized
    }
    final String dataDirPath = bpSd.getRoot().getCanonicalPath();
    LOG.info("Finalizing upgrade for storage directory {}.\n   cur LV = {}; "
            + "cur CTime = {}", dataDirPath, this.getLayoutVersion(),
        this.getCTime());
    assert bpSd.getCurrentDir().exists() : "Current directory must exist.";
    
    // rename previous to finalized.tmp
    final File tmpDir = bpSd.getFinalizedTmp();
    rename(prevDir, tmpDir);

    // delete finalized.tmp dir in a separate thread
    new Daemon(new Runnable() {
      @Override
      public void run() {
        try {
          deleteDir(tmpDir);
        } catch (IOException ex) {
          LOG.error("Finalize upgrade for {} failed.", dataDirPath, ex);
        }
        LOG.info("Finalize upgrade for {} is complete.", dataDirPath);
      }

      @Override
      public String toString() {
        return "Finalize " + dataDirPath;
      }
    }).start();
  }

  /**
   * Hardlink all finalized and RBW blocks in fromDir to toDir
   * 
   * @param fromDir directory where the snapshot is stored
   * @param toDir the current data directory
   * @throws IOException if error occurs during hardlink
   */
  private static void linkAllBlocks(File fromDir, File toDir,
      int diskLayoutVersion, Configuration conf) throws IOException {
    // do the link
    // hardlink finalized blocks in tmpDir
    HardLink hardLink = new HardLink();
    DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_FINALIZED,
        diskLayoutVersion, hardLink, conf);
    DataStorage.linkBlocks(fromDir, toDir, DataStorage.STORAGE_DIR_RBW,
        diskLayoutVersion, hardLink, conf);
    LOG.info("Linked blocks from {} to {}. {}", fromDir, toDir,
        hardLink.linkStats.report());
  }

  /**
   * gets the data node storage directory based on block pool storage
   */
  private static String getDataNodeStorageRoot(String bpRoot) {
    Matcher matcher = BLOCK_POOL_PATH_PATTERN.matcher(bpRoot);
    if (matcher.matches()) {
      // return the data node root directory
      return matcher.group(1);
    }
    return bpRoot;
  }

  @Override
  public String toString() {
    return super.toString() + ";bpid=" + blockpoolID;
  }
  
  /**
   * Get a block pool storage root based on data node storage root
   * @param bpID block pool ID
   * @param dnCurDir data node storage root directory
   * @return root directory for block pool storage
   */
  public static File getBpRoot(String bpID, File dnCurDir) {
    return new File(dnCurDir, bpID);
  }

  @Override
  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
    return false;
  }

  private File getTrashRootDir(StorageDirectory sd) {
    return new File(sd.getRoot(), TRASH_ROOT_DIR);
  }

  /**
   * Determine whether we can use trash for the given blockFile. Trash
   * is disallowed if a 'previous' directory exists for the
   * storage directory containing the block.
   */
  @VisibleForTesting
  public boolean isTrashAllowed(File blockFile) {
    Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
    String previousDir = matcher.replaceFirst("$1$2" + STORAGE_DIR_PREVIOUS);
    return !(new File(previousDir)).exists();
  }

  /**
   * Get a target subdirectory under trash/ for a given block file that is being
   * deleted.
   *
   * The subdirectory structure under trash/ mirrors that under current/ to keep
   * implicit memory of where the files are to be restored (if necessary).
   *
   * @return the trash directory for a given block file that is being deleted.
   */
  public String getTrashDirectory(ReplicaInfo info) {

    URI blockURI = info.getBlockURI();
    try{
      File blockFile = new File(blockURI);
      return getTrashDirectory(blockFile);
    } catch (IllegalArgumentException e) {
      LOG.warn("Failed to get block file for replica {}", info, e);
    }

    return null;
  }

  private String getTrashDirectory(File blockFile) {
    if (isTrashAllowed(blockFile)) {
      Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
      String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
      return trashDirectory;
    }
    return null;
  }

  /**
   * Get a target subdirectory under current/ for a given block file that is
   * being restored from trash.
   *
   * The subdirectory structure under trash/ mirrors that under current/ to keep
   * implicit memory of where the files are to be restored.
   * @param blockFile  block file that is being restored from trash.
   * @return the target directory to restore a previously deleted block file.
   */
  @VisibleForTesting
  String getRestoreDirectory(File blockFile) {
    Matcher matcher = BLOCK_POOL_TRASH_PATH_PATTERN.matcher(blockFile.getParent());
    String restoreDirectory = matcher.replaceFirst("$1$2" + STORAGE_DIR_CURRENT + "$4");
    LOG.info("Restoring {} to {}", blockFile, restoreDirectory);
    return restoreDirectory;
  }

  /**
   * Delete all files and directories in the trash directories.
   */
  public void clearTrash() {
    final List<File> trashRoots = new ArrayList<>();
    for (StorageDirectory sd : getStorageDirs()) {
      File trashRoot = getTrashRootDir(sd);
      if (trashRoot.exists() && sd.getPreviousDir().exists()) {
        LOG.error("Trash and PreviousDir shouldn't both exist for storage "
            + "directory {}", sd);
        assert false;
      } else {
        trashRoots.add(trashRoot);
      }
    }

    stopTrashCleaner();
    trashCleaner = new Daemon(new Runnable() {
      @Override
      public void run() {
        for(File trashRoot : trashRoots){
          FileUtil.fullyDelete(trashRoot);
          LOG.info("Cleared trash for storage directory {}", trashRoot);
        }
      }

      @Override
      public String toString() {
        return "clearTrash() for " + blockpoolID;
      }
    });
    trashCleaner.start();
  }

  public void stopTrashCleaner() {
    if (trashCleaner != null) {
      trashCleaner.interrupt();
    }
  }

  /** trash is enabled if at least one storage directory contains trash root */
  @VisibleForTesting
  public boolean trashEnabled() {
    for (StorageDirectory sd : getStorageDirs()) {
      if (getTrashRootDir(sd).exists()) {
        return true;
      }
    }
    return false;
  }

  /**
   * Create a rolling upgrade marker file for each BP storage root, if it
   * does not exist already.
   * @param dnStorageDirs
   */
  public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
      throws IOException {
    for (StorageDirectory sd : dnStorageDirs) {
      if (sd.getCurrentDir() == null) {
        return;
      }
      File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
      File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
      if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
        if (!markerFile.exists() && markerFile.createNewFile()) {
          LOG.info("Created {}", markerFile);
        } else {
          LOG.info("{} already exists.", markerFile);
        }
        storagesWithRollingUpgradeMarker.add(bpRoot.toString());
        storagesWithoutRollingUpgradeMarker.remove(bpRoot.toString());
      }
    }
  }

  /**
   * Check whether the rolling upgrade marker file exists for each BP storage
   * root. If it does exist, then the marker file is cleared and more
   * importantly the layout upgrade is finalized.
   * @param dnStorageDirs
   */
  public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
      throws IOException {
    for (StorageDirectory sd : dnStorageDirs) {
      if (sd.getCurrentDir() == null) {
        continue;
      }
      File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
      File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
      if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {
        if (markerFile.exists()) {
          LOG.info("Deleting {}", markerFile);
          doFinalize(sd.getCurrentDir());
          if (!markerFile.delete()) {
            LOG.warn("Failed to delete {}", markerFile);
          }
        }
        storagesWithoutRollingUpgradeMarker.add(bpRoot.toString());
        storagesWithRollingUpgradeMarker.remove(bpRoot.toString());
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

hadoop BlockSender 源码

0  赞