hadoop DirectoryScanner 源码

  • 2022-10-20
  • 浏览 (213)

haddop DirectoryScanner 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ListMultimap;

/**
 * Periodically scans the data directories for block and block metadata files.
 * Reconciles the differences with block information maintained in the dataset.
 */
@InterfaceAudience.Private
public class DirectoryScanner implements Runnable {
  private static final Logger LOG =
      LoggerFactory.getLogger(DirectoryScanner.class);

  private static final int DEFAULT_MAP_SIZE = 32768;
  private final int reconcileBlocksBatchSize;
  private final long reconcileBlocksBatchInterval;
  private final FsDatasetSpi<?> dataset;
  private final ExecutorService reportCompileThreadPool;
  private final ScheduledExecutorService masterThread;
  private final long scanPeriodMsecs;
  private final long throttleLimitMsPerSec;
  private final AtomicBoolean shouldRun = new AtomicBoolean();

  private boolean retainDiffs = false;

  /**
   * Total combined wall clock time (in milliseconds) spent by the report
   * compiler threads executing. Used for testing purposes.
   */
  @VisibleForTesting
  final AtomicLong timeRunningMs = new AtomicLong(0L);

  /**
   * Total combined wall clock time (in milliseconds) spent by the report
   * compiler threads blocked by the throttle. Used for testing purposes.
   */
  @VisibleForTesting
  final AtomicLong timeWaitingMs = new AtomicLong(0L);

  /**
   * The complete list of block differences indexed by block pool ID.
   */
  @VisibleForTesting
  final BlockPoolReport diffs = new BlockPoolReport();

  /**
   * Statistics about the block differences in each blockpool, indexed by block
   * pool ID.
   */
  @VisibleForTesting
  final Map<String, Stats> stats;

  /**
   * Allow retaining diffs for unit test and analysis. Defaults to false (off).
   *
   * @param b whether to retain diffs
   */
  @VisibleForTesting
  public void setRetainDiffs(boolean b) {
    retainDiffs = b;
  }

  /**
   * Stats tracked for reporting and testing, per blockpool
   */
  @VisibleForTesting
  static class Stats {
    final String bpid;
    long totalBlocks = 0;
    long missingMetaFile = 0;
    long missingBlockFile = 0;
    long missingMemoryBlocks = 0;
    long mismatchBlocks = 0;
    long duplicateBlocks = 0;

    /**
     * Create a new Stats object for the given blockpool ID.
     *
     * @param bpid blockpool ID
     */
    public Stats(String bpid) {
      this.bpid = bpid;
    }

    @Override
    public String toString() {
      return "BlockPool " + bpid + " Total blocks: " + totalBlocks
          + ", missing metadata files: " + missingMetaFile
          + ", missing block files: " + missingBlockFile
          + ", missing blocks in memory: " + missingMemoryBlocks
          + ", mismatched blocks: " + mismatchBlocks
          + ", duplicated blocks: " + duplicateBlocks;
    }
  }

  /**
   * Helper class for compiling block info reports from report compiler threads.
   * Contains a volume, a set of block pool IDs, and a collection of ScanInfo
   * objects. If a block pool exists but has no ScanInfo objects associated with
   * it, there will be no mapping for that particular block pool.
   */
  @VisibleForTesting
  public static class ScanInfoVolumeReport {

    @SuppressWarnings("unused")
    private static final long serialVersionUID = 1L;

    private final FsVolumeSpi volume;

    private final BlockPoolReport blockPoolReport;

    /**
     * Create a new info list.
     *
     * @param volume
     */
    ScanInfoVolumeReport(final FsVolumeSpi volume) {
      this.volume = volume;
      this.blockPoolReport = new BlockPoolReport();
    }

    /**
     * Create a new info list initialized to the given expected size.
     *
     * @param volume
     * @param blockPools list of known block pools
     */
    ScanInfoVolumeReport(final FsVolumeSpi volume,
        final Collection<String> blockPools) {
      this.volume = volume;
      this.blockPoolReport = new BlockPoolReport(blockPools);
    }

    public void addAll(final String bpid,
        final Collection<ScanInfo> scanInfos) {
      this.blockPoolReport.addAll(bpid, scanInfos);
    }

    public Set<String> getBlockPoolIds() {
      return this.blockPoolReport.getBlockPoolIds();
    }

    public List<ScanInfo> getScanInfo(final String bpid) {
      return this.blockPoolReport.getScanInfo(bpid);
    }

    public FsVolumeSpi getVolume() {
      return volume;
    }

    @Override
    public String toString() {
      return "ScanInfoVolumeReport [volume=" + volume + ", blockPoolReport="
          + blockPoolReport + "]";
    }
  }

  /**
   * Helper class for compiling block info reports per block pool.
   */
  @VisibleForTesting
  public static class BlockPoolReport {

    @SuppressWarnings("unused")
    private static final long serialVersionUID = 1L;

    private final Set<String> blockPools;

    private final ListMultimap<String, ScanInfo> map;

    /**
     * Create a block pool report.
     */
    BlockPoolReport() {
      this.blockPools = new HashSet<>(2);
      this.map = ArrayListMultimap.create(2, DEFAULT_MAP_SIZE);
    }

    /**
     * Create a new block pool report initialized to the given expected size.
     *
     * @param blockPools initial list of known block pools
     */
    BlockPoolReport(final Collection<String> blockPools) {
      this.blockPools = new HashSet<>(blockPools);
      this.map = ArrayListMultimap.create(blockPools.size(), DEFAULT_MAP_SIZE);

    }

    public void addAll(final String bpid,
        final Collection<ScanInfo> scanInfos) {
      this.blockPools.add(bpid);
      this.map.putAll(bpid, scanInfos);
    }

    public void sortBlocks() {
      for (final String bpid : this.map.keySet()) {
        final List<ScanInfo> list = this.map.get(bpid);
        // Sort array based on blockId
        Collections.sort(list);
      }
    }

    public Set<String> getBlockPoolIds() {
      return Collections.unmodifiableSet(this.blockPools);
    }

    public List<ScanInfo> getScanInfo(final String bpid) {
      return this.map.get(bpid);
    }

    public Collection<Map.Entry<String, ScanInfo>> getEntries() {
      return Collections.unmodifiableCollection(this.map.entries());
    }

    public void clear() {
      this.map.clear();
      this.blockPools.clear();
    }

    @Override
    public String toString() {
      return "BlockPoolReport [blockPools=" + blockPools + ", map=" + map + "]";
    }
  }

  /**
   * Create a new directory scanner, but don't cycle it running yet.
   *
   * @param dataset the dataset to scan
   * @param conf the Configuration object
   */
  public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
    this.dataset = dataset;
    this.stats = new HashMap<>(DEFAULT_MAP_SIZE);
    int interval = (int) conf.getTimeDuration(
        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT,
        TimeUnit.SECONDS);

    scanPeriodMsecs = TimeUnit.SECONDS.toMillis(interval);

    int throttle = conf.getInt(
        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
        DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);

    if (throttle >= TimeUnit.SECONDS.toMillis(1)) {
      LOG.warn(
          "{} set to value above 1000 ms/sec. Assuming default value of {}",
          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
      throttle =
          DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
    }

    throttleLimitMsPerSec = throttle;

    int threads =
        conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
            DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);

    reportCompileThreadPool =
        Executors.newFixedThreadPool(threads, new Daemon.DaemonFactory());

    masterThread =
        new ScheduledThreadPoolExecutor(1, new Daemon.DaemonFactory());

    int reconcileBatchSize =
        conf.getInt(DFSConfigKeys.
                DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE,
            DFSConfigKeys.
                DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT);

    if (reconcileBatchSize <= 0) {
      LOG.warn("Invalid value configured for " +
              "dfs.datanode.reconcile.blocks.batch.size, " +
              "should be greater than 0, Using default.");
      reconcileBatchSize =
          DFSConfigKeys.
              DFS_DATANODE_RECONCILE_BLOCKS_BATCH_SIZE_DEFAULT;
    }

    reconcileBlocksBatchSize = reconcileBatchSize;

    long reconcileBatchInterval =
        conf.getTimeDuration(DFSConfigKeys.
                DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL,
            DFSConfigKeys.
                DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT,
            TimeUnit.MILLISECONDS);

    if (reconcileBatchInterval <= 0) {
      LOG.warn("Invalid value configured for " +
              "dfs.datanode.reconcile.blocks.batch.interval, " +
              "should be greater than 0, Using default.");
      reconcileBatchInterval =
          DFSConfigKeys.
              DFS_DATANODE_RECONCILE_BLOCKS_BATCH_INTERVAL_DEFAULT;
    }

    reconcileBlocksBatchInterval = reconcileBatchInterval;
  }

  /**
   * Start the scanner. The scanner will run every
   * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
   */
  @VisibleForTesting
  public void start() {
    shouldRun.set(true);
    long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);

    LOG.info(
        "Periodic Directory Tree Verification scan starting in {}ms with interval of {}ms and throttle limit of {}ms/s",
        firstScanTime, scanPeriodMsecs, throttleLimitMsPerSec);

    masterThread.scheduleAtFixedRate(this, firstScanTime, scanPeriodMsecs,
        TimeUnit.MILLISECONDS);
  }

  /**
   * Return whether the scanner has been started.
   *
   * @return whether the scanner has been started
   */
  @VisibleForTesting
  boolean getRunStatus() {
    return shouldRun.get();
  }

  /**
   * Clear the current cache of diffs and statistics.
   */
  private void clear() {
    synchronized (diffs) {
      diffs.clear();
    }
    stats.clear();
  }

  /**
   * Main program loop for DirectoryScanner. Runs {@link #reconcile()} and
   * handles any exceptions.
   */
  @Override
  public void run() {
    if (!shouldRun.get()) {
      // shutdown has been activated
      LOG.warn(
          "This cycle terminating immediately because 'shouldRun' has been deactivated");
      return;
    }
    try {
      reconcile();
    } catch (Exception e) {
      // Log and continue - allows Executor to run again next cycle
      LOG.error(
          "Exception during DirectoryScanner execution - will continue next cycle",
          e);
    } catch (Error er) {
      // Non-recoverable error - re-throw after logging the problem
      LOG.error(
          "System Error during DirectoryScanner execution - permanently terminating periodic scanner",
          er);
      throw er;
    }
  }

  /**
   * Stops the directory scanner. This method will wait for 1 minute for the
   * main thread to exit and an additional 1 minute for the report compilation
   * threads to exit. If a thread does not exit in that time period, it is left
   * running, and an error is logged.
   */
  void shutdown() {
    LOG.info("Shutdown has been called");
    if (!shouldRun.getAndSet(false)) {
      LOG.warn("Shutdown has been called, but periodic scanner not started");
    }
    if (masterThread != null) {
      masterThread.shutdown();
    }
    if (reportCompileThreadPool != null) {
      reportCompileThreadPool.shutdownNow();
    }
    if (masterThread != null) {
      try {
        masterThread.awaitTermination(1, TimeUnit.MINUTES);
      } catch (InterruptedException e) {
        LOG.error(
            "interrupted while waiting for masterThread to " + "terminate", e);
      }
    }
    if (reportCompileThreadPool != null) {
      try {
        reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
      } catch (InterruptedException e) {
        LOG.error("interrupted while waiting for reportCompileThreadPool to "
            + "terminate", e);
      }
    }
    if (!retainDiffs) {
      clear();
    }
  }

  /**
   * Reconcile differences between disk and in-memory blocks
   */
  @VisibleForTesting
  public void reconcile() throws IOException {
    LOG.debug("reconcile start DirectoryScanning");
    scan();

    // HDFS-14476: run checkAndUpdate with batch to avoid holding the lock too
    // long
    int loopCount = 0;
    synchronized (diffs) {
      for (final Map.Entry<String, ScanInfo> entry : diffs.getEntries()) {
        dataset.checkAndUpdate(entry.getKey(), entry.getValue());

        if (loopCount % reconcileBlocksBatchSize == 0) {
          try {
            Thread.sleep(reconcileBlocksBatchInterval);
          } catch (InterruptedException e) {
            // do nothing
          }
        }
        loopCount++;
      }
    }

    if (!retainDiffs) {
      clear();
    }
  }

  /**
   * Scan for the differences between disk and in-memory blocks Scan only the
   * "finalized blocks" lists of both disk and memory.
   */
  private void scan() {
    BlockPoolReport blockPoolReport = new BlockPoolReport();

    clear();

    Collection<ScanInfoVolumeReport> volumeReports = getVolumeReports();
    for (ScanInfoVolumeReport volumeReport : volumeReports) {
      for (String blockPoolId : volumeReport.getBlockPoolIds()) {
        List<ScanInfo> scanInfos = volumeReport.getScanInfo(blockPoolId);
        blockPoolReport.addAll(blockPoolId, scanInfos);
      }
    }

    // Pre-sort the reports outside of the lock
    blockPoolReport.sortBlocks();

    for (final String bpid : blockPoolReport.getBlockPoolIds()) {
      List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);

      Stats statsRecord = new Stats(bpid);
      stats.put(bpid, statsRecord);
      Collection<ScanInfo> diffRecord = new ArrayList<>();

      statsRecord.totalBlocks = blockpoolReport.size();
      final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
      Collections.sort(bl); // Sort based on blockId

      int d = 0; // index for blockpoolReport
      int m = 0; // index for memReprot
      while (m < bl.size() && d < blockpoolReport.size()) {
        ReplicaInfo memBlock = bl.get(m);
        ScanInfo info = blockpoolReport.get(d);
        if (info.getBlockId() < memBlock.getBlockId()) {
          if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
            // Block is missing in memory
            statsRecord.missingMemoryBlocks++;
            addDifference(diffRecord, statsRecord, info);
          }
          d++;
          continue;
        }
        if (info.getBlockId() > memBlock.getBlockId()) {
          // Block is missing on the disk
          addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
              info.getVolume());
          m++;
          continue;
        }

        // Block and meta must be regular file
        boolean isRegular = FileUtil.isRegularFile(info.getBlockFile(), false) &&
                FileUtil.isRegularFile(info.getMetaFile(), false);
        if (!isRegular) {
          statsRecord.mismatchBlocks++;
          addDifference(diffRecord, statsRecord, info);
        } else {
          // Block file and/or metadata file exists on the disk
          // Block exists in memory
          if (info.getBlockFile() == null) {
            // Block metadata file exits and block file is missing
            addDifference(diffRecord, statsRecord, info);
          } else if (info.getGenStamp() != memBlock.getGenerationStamp()
                  || info.getBlockLength() != memBlock.getNumBytes()) {
            // Block metadata file is missing or has wrong generation stamp,
            // or block file length is different than expected
            statsRecord.mismatchBlocks++;
            addDifference(diffRecord, statsRecord, info);
          } else if (memBlock.compareWith(info) != 0) {
            // volumeMap record and on-disk files do not match.
            statsRecord.duplicateBlocks++;
            addDifference(diffRecord, statsRecord, info);
          }
        }
        d++;

        if (d < blockpoolReport.size()) {
          // There may be multiple on-disk records for the same block, do not
          // increment the memory record pointer if so.
          ScanInfo nextInfo = blockpoolReport.get(d);
          if (nextInfo.getBlockId() != info.getBlockId()) {
            ++m;
          }
        } else {
          ++m;
        }
      }
      while (m < bl.size()) {
        ReplicaInfo current = bl.get(m++);
        addDifference(diffRecord, statsRecord, current.getBlockId(),
            current.getVolume());
      }
      while (d < blockpoolReport.size()) {
        if (!dataset.isDeletingBlock(bpid,
            blockpoolReport.get(d).getBlockId())) {
          statsRecord.missingMemoryBlocks++;
          addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
        }
        d++;
      }
      synchronized (diffs) {
        diffs.addAll(bpid, diffRecord);
      }
      LOG.info("Scan Results: {}", statsRecord);
    }
  }

  /**
   * Add the ScanInfo object to the list of differences and adjust the stats
   * accordingly. This method is called when a block is found on the disk, but
   * the in-memory block is missing or does not match the block on the disk.
   *
   * @param diffRecord the collection to which to add the info
   * @param statsRecord the stats to update
   * @param info the differing info
   */
  private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
      ScanInfo info) {
    statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
    statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
    diffRecord.add(info);
  }

  /**
   * Add a new ScanInfo object to the collection of differences and adjust the
   * stats accordingly. This method is called when a block is not found on the
   * disk.
   *
   * @param diffRecord the collection to which to add the info
   * @param statsRecord the stats to update
   * @param blockId the id of the missing block
   * @param vol the volume that contains the missing block
   */
  private void addDifference(Collection<ScanInfo> diffRecord, Stats statsRecord,
      long blockId, FsVolumeSpi vol) {
    statsRecord.missingBlockFile++;
    statsRecord.missingMetaFile++;
    diffRecord.add(new ScanInfo(blockId, null, null, null, vol));
  }

  /**
   * Get the lists of blocks on the disks in the data set.
   */
  @VisibleForTesting
  public Collection<ScanInfoVolumeReport> getVolumeReports() {
    List<ScanInfoVolumeReport> volReports = new ArrayList<>();
    List<Future<ScanInfoVolumeReport>> compilersInProgress = new ArrayList<>();

    // First get list of data directories
    try (FsDatasetSpi.FsVolumeReferences volumes =
        dataset.getFsVolumeReferences()) {

      for (final FsVolumeSpi volume : volumes) {
        // Disable scanning PROVIDED volumes to keep overhead low
        if (volume.getStorageType() != StorageType.PROVIDED) {
          ReportCompiler reportCompiler = new ReportCompiler(volume);
          Future<ScanInfoVolumeReport> result =
              reportCompileThreadPool.submit(reportCompiler);
          compilersInProgress.add(result);
        }
      }

      for (Future<ScanInfoVolumeReport> future : compilersInProgress) {
        try {
          final ScanInfoVolumeReport result = future.get();
          if (!CollectionUtils.addIgnoreNull(volReports, result)) {
            // This compiler thread were interrupted, give up on this run
            volReports.clear();
            break;
          }
        } catch (Exception ex) {
          LOG.warn("Error compiling report. Continuing.", ex);
        }
      }
    } catch (IOException e) {
      LOG.error("Unexpected IOException by closing FsVolumeReference", e);
    }

    return volReports;
  }

  /**
   * The ReportCompiler class encapsulates the process of searching a datanode's
   * disks for block information. It operates by performing a DFS of the volume
   * to discover block information.
   *
   * When the ReportCompiler discovers block information, it create a new
   * ScanInfo object for it and adds that object to its report list. The report
   * list is returned by the {@link #call()} method.
   */
  public class ReportCompiler implements Callable<ScanInfoVolumeReport> {
    private final FsVolumeSpi volume;
    // Variable for tracking time spent running for throttling purposes
    private final StopWatch throttleTimer = new StopWatch();
    // Variable for tracking time spent running and waiting for testing
    // purposes
    private final StopWatch perfTimer = new StopWatch();

    /**
     * Create a report compiler for the given volume.
     *
     * @param volume the target volume
     */
    public ReportCompiler(FsVolumeSpi volume) {
      this.volume = volume;
    }

    /**
     * Run this report compiler thread.
     *
     * @return the block info report list
     * @throws IOException if the block pool is not found
     */
    @Override
    public ScanInfoVolumeReport call() throws IOException {
      String[] bpList = volume.getBlockPoolList();
      ScanInfoVolumeReport result =
          new ScanInfoVolumeReport(volume, Arrays.asList(bpList));
      perfTimer.start();
      throttleTimer.start();
      for (String bpid : bpList) {
        List<ScanInfo> report = new ArrayList<>(DEFAULT_MAP_SIZE);

        perfTimer.reset().start();
        throttleTimer.reset().start();

        try {
          // ScanInfos are added directly to 'report' list
          volume.compileReport(bpid, report, this);
          result.addAll(bpid, report);
        } catch (InterruptedException ex) {
          // Exit quickly and flag the scanner to do the same
          result = null;
          break;
        }
      }
      LOG.trace("Scanner volume report: {}", result);
      return result;
    }

    /**
     * Called by the thread before each potential disk scan so that a pause can
     * be optionally inserted to limit the number of scans per second. The limit
     * is controlled by
     * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
     */
    public void throttle() throws InterruptedException {
      accumulateTimeRunning();

      if (throttleLimitMsPerSec > 0L) {
        final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS);
        if (runningTime >= throttleLimitMsPerSec) {
          final long sleepTime;
          if (runningTime >= 1000L) {
            LOG.warn("Unable to throttle within the second. Blocking for 1s.");
            sleepTime = 1000L;
          } else {
            // Sleep for the expected time plus any time processing ran over
            final long overTime = runningTime - throttleLimitMsPerSec;
            sleepTime = (1000L - throttleLimitMsPerSec) + overTime;
          }
          Thread.sleep(sleepTime);
          throttleTimer.reset().start();
        }
        accumulateTimeWaiting();
      }
    }

    /**
     * Helper method to measure time running.
     */
    private void accumulateTimeRunning() {
      timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
      perfTimer.reset().start();
    }

    /**
     * Helper method to measure time waiting.
     */
    private void accumulateTimeWaiting() {
      timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
      perfTimer.reset().start();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockPoolSliceStorage 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

0  赞