hadoop InstrumentedLock 源码

  • 2022-10-20
  • 浏览 (279)

haddop InstrumentedLock 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;

/**
 * This is a debugging class that can be used by callers to track
 * whether a specific lock is being held for too long and periodically
 * log a warning and stack trace, if so.
 *
 * The logged warnings are throttled so that logs are not spammed.
 *
 * A new instance of InstrumentedLock can be created for each object
 * that needs to be instrumented.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InstrumentedLock implements Lock {

  private final Lock lock;
  private final Logger logger;
  private final String name;
  private final Timer clock;

  /** Minimum gap between two lock warnings. */
  private final long minLoggingGap;
  /** Threshold for detecting long lock held time. */
  private final long lockWarningThreshold;

  // Tracking counters for lock statistics.
  private volatile long lockAcquireTimestamp;
  private final AtomicLong lastHoldLogTimestamp;
  private final AtomicLong lastWaitLogTimestamp;
  private final SuppressedStats holdStats = new SuppressedStats();
  private final SuppressedStats waitStats = new SuppressedStats();

  /**
   * Create a instrumented lock instance which logs a warning message
   * when lock held time is above given threshold.
   *
   * @param name the identifier of the lock object
   * @param logger this class does not have its own logger, will log to the
   *               given logger instead
   * @param minLoggingGapMs  the minimum time gap between two log messages,
   *                         this is to avoid spamming to many logs
   * @param lockWarningThresholdMs the time threshold to view lock held
   *                               time as being "too long"
   */
  public InstrumentedLock(String name, Logger logger, long minLoggingGapMs,
                          long lockWarningThresholdMs) {
    this(name, logger, new ReentrantLock(),
        minLoggingGapMs, lockWarningThresholdMs);
  }

  public InstrumentedLock(String name, Logger logger, Lock lock,
      long minLoggingGapMs, long lockWarningThresholdMs) {
    this(name, logger, lock,
        minLoggingGapMs, lockWarningThresholdMs, new Timer());
  }

  @VisibleForTesting
  InstrumentedLock(String name, Logger logger, Lock lock,
      long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
    this.name = name;
    this.lock = lock;
    this.clock = clock;
    this.logger = logger;
    minLoggingGap = minLoggingGapMs;
    lockWarningThreshold = lockWarningThresholdMs;
    lastHoldLogTimestamp = new AtomicLong(
      clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
    lastWaitLogTimestamp = new AtomicLong(lastHoldLogTimestamp.get());
  }

  @Override
  public void lock() {
    long waitStart = clock.monotonicNow();
    lock.lock();
    check(waitStart, clock.monotonicNow(), false);
    startLockTiming();
  }

  @Override
  public void lockInterruptibly() throws InterruptedException {
    long waitStart = clock.monotonicNow();
    lock.lockInterruptibly();
    check(waitStart, clock.monotonicNow(), false);
    startLockTiming();
  }

  @Override
  public boolean tryLock() {
    if (lock.tryLock()) {
      startLockTiming();
      return true;
    }
    return false;
  }

  @Override
  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    long waitStart = clock.monotonicNow();
    boolean retval = false;
    if (lock.tryLock(time, unit)) {
      startLockTiming();
      retval = true;
    }
    check(waitStart, clock.monotonicNow(), false);
    return retval;
  }

  @Override
  public void unlock() {
    long localLockReleaseTime = clock.monotonicNow();
    long localLockAcquireTime = lockAcquireTimestamp;
    lock.unlock();
    check(localLockAcquireTime, localLockReleaseTime, true);
  }

  @Override
  public Condition newCondition() {
    return lock.newCondition();
  }

  @VisibleForTesting
  void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
    logger.warn(String.format("Lock held time above threshold(%d ms): " +
        "lock identifier: %s " +
        "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
        "Longest suppressed LockHeldTimeMs=%d. " +
        "The stack trace is: %s" ,
        lockWarningThreshold, name, lockHeldTime, stats.getSuppressedCount(),
        stats.getMaxSuppressedWait(),
        StringUtils.getStackTrace(Thread.currentThread())));
  }

  @VisibleForTesting
  void logWaitWarning(long lockWaitTime, SuppressedSnapshot stats) {
    logger.warn(String.format("Waited above threshold(%d ms) to acquire lock: " +
        "lock identifier: %s " +
        "waitTimeMs=%d ms. Suppressed %d lock wait warnings. " +
        "Longest suppressed WaitTimeMs=%d. " +
        "The stack trace is: %s", lockWarningThreshold, name, lockWaitTime,
        stats.getSuppressedCount(), stats.getMaxSuppressedWait(),
        StringUtils.getStackTrace(Thread.currentThread())));
  }

  /**
   * Starts timing for the instrumented lock.
   */
  protected void startLockTiming() {
    lockAcquireTimestamp = clock.monotonicNow();
  }

  /**
   * Log a warning if the lock was held for too long.
   *
   * Should be invoked by the caller immediately AFTER releasing the lock.
   *
   * @param acquireTime  - timestamp just after acquiring the lock.
   * @param releaseTime - timestamp just before releasing the lock.
   * @param checkLockHeld checkLockHeld.
   */
  protected void check(long acquireTime, long releaseTime,
       boolean checkLockHeld) {
    if (!logger.isWarnEnabled()) {
      return;
    }

    final long lockHeldTime = releaseTime - acquireTime;
    if (lockWarningThreshold - lockHeldTime < 0) {
      AtomicLong lastLogTime;
      SuppressedStats stats;
      if (checkLockHeld) {
        lastLogTime = lastHoldLogTimestamp;
        stats = holdStats;
      } else {
        lastLogTime = lastWaitLogTimestamp;
        stats = waitStats;
      }
      long now;
      long localLastLogTs;
      do {
        now = clock.monotonicNow();
        localLastLogTs = lastLogTime.get();
        long deltaSinceLastLog = now - localLastLogTs;
        // check should print log or not
        if (deltaSinceLastLog - minLoggingGap < 0) {
          stats.incrementSuppressed(lockHeldTime);
          return;
        }
      } while (!lastLogTime.compareAndSet(localLastLogTs, now));
      SuppressedSnapshot statsSnapshot = stats.snapshot();
      if (checkLockHeld) {
        logWarning(lockHeldTime, statsSnapshot);
      } else {
        logWaitWarning(lockHeldTime, statsSnapshot);
      }
    }
  }

  protected Lock getLock() {
    return lock;
  }

  protected Timer getTimer() {
    return clock;
  }

  /**
   * Internal class to track statistics about suppressed log messages in an
   * atomic way.
   */
  private static class SuppressedStats {
    private long suppressedCount = 0;
    private long maxSuppressedWait = 0;

    /**
     * Increments the suppressed counter and increases the max wait time if the
     * passed wait is greater than the current maxSuppressedWait.
     * @param wait The wait time for this suppressed message
     */
    synchronized public void incrementSuppressed(long wait) {
      suppressedCount++;
      if (wait > maxSuppressedWait) {
        maxSuppressedWait = wait;
      }
    }

    /**
     * Captures the current value of the counts into a SuppressedSnapshot object
     * and resets the values to zero.
     *
     * @return SuppressedSnapshot containing the current value of the counters
     */
    synchronized public SuppressedSnapshot snapshot() {
      SuppressedSnapshot snap =
          new SuppressedSnapshot(suppressedCount, maxSuppressedWait);
      suppressedCount = 0;
      maxSuppressedWait = 0;
      return snap;
    }
  }

  /**
   * Immutable class to capture a snapshot of suppressed log message stats.
   */
  protected static class SuppressedSnapshot {
    private long suppressedCount = 0;
    private long maxSuppressedWait = 0;

    public SuppressedSnapshot(long suppressedCount, long maxWait) {
      this.suppressedCount = suppressedCount;
      this.maxSuppressedWait = maxWait;
    }

    public long getMaxSuppressedWait() {
      return maxSuppressedWait;
    }

    public long getSuppressedCount() {
      return suppressedCount;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ApplicationClassLoader 源码

hadoop AsyncDiskService 源码

hadoop AutoCloseableLock 源码

hadoop BasicDiskValidator 源码

hadoop BlockingThreadPoolExecutorService 源码

hadoop CacheableIPList 源码

hadoop ChunkedArrayList 源码

hadoop ClassUtil 源码

hadoop Classpath 源码

hadoop CleanerUtil 源码

0  赞