hadoop ShortCircuitCache 源码

  • 2022-10-20
  • 浏览 (183)

haddop ShortCircuitCache 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.hadoop.hdfs.shortcircuit;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketException;
import java.nio.MappedByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Waitable;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * The ShortCircuitCache tracks things which the client needs to access
 * HDFS block files via short-circuit.
 * These things include: memory-mapped regions, file descriptors, and shared
 * memory areas for communicating with the DataNode.
public class ShortCircuitCache implements Closeable {
  public static final Logger LOG = LoggerFactory.getLogger(

   * Expiry thread which makes sure that the file descriptors get closed
   * after a while.
  private class CacheCleaner implements Runnable, Closeable {
    private ScheduledFuture<?> future;

     * Run the CacheCleaner thread.
     * Whenever a thread requests a ShortCircuitReplica object, we will make
     * sure it gets one.  That ShortCircuitReplica object can then be re-used
     * when another thread requests a ShortCircuitReplica object for the same
     * block.  So in that sense, there is no maximum size to the cache.
     * However, when a ShortCircuitReplica object is unreferenced by the
     * thread(s) that are using it, it becomes evictable.  There are two
     * separate eviction lists-- one for mmaped objects, and another for
     * non-mmaped objects.  We do this in order to avoid having the regular
     * files kick the mmaped files out of the cache too quickly.  Reusing
     * an already-existing mmap gives a huge performance boost, since the
     * page table entries don't have to be re-populated.  Both the mmap
     * and non-mmap evictable lists have maximum sizes and maximum lifespans.
    public void run() {
      try {
        if (ShortCircuitCache.this.closed) return;
        long curMs = Time.monotonicNow();

        LOG.debug("{}: cache cleaner running at {}", this, curMs);

        int numDemoted = demoteOldEvictableMmaped(curMs);
        int numPurged = 0;
        Long evictionTimeNs;
        while (!evictable.isEmpty()) {
          Object eldestKey = evictable.firstKey();
          evictionTimeNs = (Long)eldestKey;
          long evictionTimeMs =
              TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
          if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
          ShortCircuitReplica replica = (ShortCircuitReplica)evictable.get(
          if (LOG.isTraceEnabled()) {
            LOG.trace("CacheCleaner: purging " + replica + ": " +

        LOG.debug("{}: finishing cache cleaner run started at {}. Demoted {} "
                + "mmapped replicas; purged {} replicas.",
            this, curMs, numDemoted, numPurged);
      } finally {

    public void close() throws IOException {
      if (future != null) {

    public void setFuture(ScheduledFuture<?> future) {
      this.future = future;

     * Get the rate at which this cleaner thread should be scheduled.
     * We do this by taking the minimum expiration time and dividing by 4.
     * @return the rate in milliseconds at which this thread should be
     *         scheduled.
    public long getRateInMs() {
      long minLifespanMs =
      long sampleTimeMs = minLifespanMs / 4;
      return (sampleTimeMs < 1) ? 1 : sampleTimeMs;

   * A task which asks the DataNode to release a short-circuit shared memory
   * slot.  If successful, this will tell the DataNode to stop monitoring
   * changes to the mlock status of the replica associated with the slot.
   * It will also allow us (the client) to re-use this slot for another
   * replica.  If we can't communicate with the DataNode for some reason,
   * we tear down the shared memory segment to avoid being in an inconsistent
   * state.
  private class SlotReleaser implements Runnable {
     * The slot that we need to release.
    private final Slot slot;

    SlotReleaser(Slot slot) {
      this.slot = slot;

    public void run() {
      if (slot == null) {
      LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
      final DfsClientShm shm = (DfsClientShm)slot.getShm();
      final DomainSocket shmSock = shm.getPeer().getDomainSocket();
      final String path = shmSock.getPath();
      DomainSocket domainSocket = pathToDomainSocket.get(path);
      DataOutputStream out = null;
      boolean success = false;
      int retries = 2;
      try {
        while (retries > 0) {
          try {
            if (domainSocket == null || !domainSocket.isOpen()) {
              domainSocket = DomainSocket.connect(path);
              // we are running in single thread mode, no protection needed for
              // pathToDomainSocket
              pathToDomainSocket.put(path, domainSocket);

            out = new DataOutputStream(
                new BufferedOutputStream(domainSocket.getOutputStream()));
            new Sender(out).releaseShortCircuitFds(slot.getSlotId());
            DataInputStream in =
                new DataInputStream(domainSocket.getInputStream());
            ReleaseShortCircuitAccessResponseProto resp =
            if (resp.getStatus() != Status.SUCCESS) {
              String error = resp.hasError() ? resp.getError() : "(unknown)";
              throw new IOException(resp.getStatus().toString() + ": " + error);

            LOG.trace("{}: released {}", this, slot);
            success = true;

          } catch (SocketException se) {
            // the domain socket on datanode may be timed out, we retry once
            if (domainSocket != null) {
              domainSocket = null;
            if (retries == 0) {
              throw new SocketException("Create domain socket failed");
        } // end of while block
      } catch (IOException e) {
        LOG.warn(ShortCircuitCache.this + ": failed to release "
            + "short-circuit shared memory slot " + slot + " by sending "
            + "ReleaseShortCircuitAccessRequestProto to " + path
            + ".  Closing shared memory segment. "
            + "DataNode may have been stopped or restarted", e);
      } finally {
        if (success) {
        } else {
          IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
    } // end of run()

  public interface ShortCircuitReplicaCreator {
     * Attempt to create a ShortCircuitReplica object.
     * This callback will be made without holding any locks.
     * @return a non-null ShortCircuitReplicaInfo object.
    ShortCircuitReplicaInfo createShortCircuitReplicaInfo();

   * Lock protecting the cache.
  private final ReentrantLock lock = new ReentrantLock();

   * The executor service that runs the cacheCleaner.
  private final ScheduledThreadPoolExecutor cleanerExecutor
      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().

   * The executor service that runs the cacheCleaner.
  private final ScheduledThreadPoolExecutor releaserExecutor
      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().

   * A map containing all ShortCircuitReplicaInfo objects, organized by Key.
   * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
   * exception.
  private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>>
      replicaInfoMap = new HashMap<>();

   * The CacheCleaner.  We don't create this and schedule it until it becomes
   * necessary.
  private CacheCleaner cacheCleaner;

   * LinkedMap of evictable elements.
   * Maps (unique) insertion time in nanoseconds to the element.
  private final LinkedMap evictable = new LinkedMap();

   * Maximum total size of the cache, including both mmapped and
   * no$-mmapped elements.
  private int maxTotalSize;

   * Non-mmaped elements older than this will be closed.
  private long maxNonMmappedEvictableLifespanMs;

   * LinkedMap of mmaped evictable elements.
   * Maps (unique) insertion time in nanoseconds to the element.
  private final LinkedMap evictableMmapped = new LinkedMap();

   * Maximum number of mmaped evictable elements.
  private int maxEvictableMmapedSize;

   * Mmaped elements older than this will be closed.
  private final long maxEvictableMmapedLifespanMs;

   * The minimum number of milliseconds we'll wait after an unsuccessful
   * mmap attempt before trying again.
  private final long mmapRetryTimeoutMs;

   * How long we will keep replicas in the cache before declaring them
   * to be stale.
  private final long staleThresholdMs;

   * True if the ShortCircuitCache is closed.
  private boolean closed = false;

   * Number of existing mmaps associated with this cache.
  private int outstandingMmapCount = 0;

   * Manages short-circuit shared memory segments for the client.
  private final DfsClientShmManager shmManager;

   * A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
   * paths of short-circuit shared memory segments.
  private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();

  public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
    return new ShortCircuitCache(

  public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
      int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
      long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
    Preconditions.checkArgument(maxTotalSize >= 0);
    this.maxTotalSize = maxTotalSize;
    Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
    this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs;
    Preconditions.checkArgument(maxEvictableMmapedSize >= 0);
    this.maxEvictableMmapedSize = maxEvictableMmapedSize;
    Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0);
    this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
    this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
    this.staleThresholdMs = staleThresholdMs;
    DfsClientShmManager shmManager = null;
    if ((shmInterruptCheckMs > 0) &&
        (DomainSocketWatcher.getLoadingFailureReason() == null)) {
      try {
        shmManager = new DfsClientShmManager(shmInterruptCheckMs);
      } catch (IOException e) {
        LOG.error("failed to create ShortCircuitShmManager", e);
    this.shmManager = shmManager;

  public long getStaleThresholdMs() {
    return staleThresholdMs;

  public void setMaxTotalSize(int maxTotalSize) {
    this.maxTotalSize = maxTotalSize;

   * Increment the reference count of a replica, and remove it from any free
   * list it may be in.
   * You must hold the cache lock while calling this function.
   * @param replica      The replica we're removing.
  private void ref(ShortCircuitReplica replica) {
    try {
      Preconditions.checkArgument(replica.refCount > 0,
          "can't ref %s because its refCount reached %d", replica,
      Long evictableTimeNs = replica.getEvictableTimeNs();
      if (evictableTimeNs != null) {
        String removedFrom = removeEvictable(replica);
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": " + removedFrom +
              " no longer contains " + replica + ".  refCount " +
              (replica.refCount - 1) + " -> " + replica.refCount +

      } else if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": replica  refCount " +
            (replica.refCount - 1) + " -> " + replica.refCount +
    } finally {

   * Unreference a replica.
   * You must hold the cache lock while calling this function.
   * @param replica   The replica being unreferenced.
  void unref(ShortCircuitReplica replica) {
    try {
      // If the replica is stale or unusable, but we haven't purged it yet,
      // let's do that.  It would be a shame to evict a non-stale replica so
      // that we could put a stale or unusable one into the cache.
      if (!replica.purged) {
        String purgeReason = null;
        if (!replica.getDataStream().getChannel().isOpen()) {
          purgeReason = "purging replica because its data channel is closed.";
        } else if (!replica.getMetaStream().getChannel().isOpen()) {
          purgeReason = "purging replica because its meta channel is closed.";
        } else if (replica.isStale()) {
          purgeReason = "purging replica because it is stale.";
        if (purgeReason != null) {
          LOG.debug("{}: {}", this, purgeReason);
      String addedString = "";
      boolean shouldTrimEvictionMaps = false;
      int newRefCount = --replica.refCount;
      if (newRefCount == 0) {
        // Close replica, since there are no remaining references to it.
            "Replica %s reached a refCount of 0 without being purged", replica);
      } else if (newRefCount == 1) {
        Preconditions.checkState(null == replica.getEvictableTimeNs(),
            "Replica %s had a refCount higher than 1, " +
                "but was still evictable (evictableTimeNs = %d)",
            replica, replica.getEvictableTimeNs());
        if (!replica.purged) {
          // Add the replica to the end of an eviction list.
          // Eviction lists are sorted by time.
          if (replica.hasMmap()) {
            insertEvictable(System.nanoTime(), replica, evictableMmapped);
            addedString = "added to evictableMmapped, ";
          } else {
            insertEvictable(System.nanoTime(), replica, evictable);
            addedString = "added to evictable, ";
          shouldTrimEvictionMaps = true;
      } else {
        Preconditions.checkArgument(replica.refCount >= 0,
            "replica's refCount went negative (refCount = %d" +
                " for %s)", replica.refCount, replica);
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": unref replica " + replica +
            ": " + addedString + " refCount " +
            (newRefCount + 1) + " -> " + newRefCount +
      if (shouldTrimEvictionMaps) {
    } finally {

   * Demote old evictable mmaps into the regular eviction map.
   * You must hold the cache lock while calling this function.
   * @param now   Current time in monotonic milliseconds.
   * @return      Number of replicas demoted.
  private int demoteOldEvictableMmaped(long now) {
    int numDemoted = 0;
    boolean needMoreSpace = false;
    Long evictionTimeNs;

    while (!evictableMmapped.isEmpty()) {
      Object eldestKey = evictableMmapped.firstKey();
      evictionTimeNs = (Long)eldestKey;
      long evictionTimeMs =
          TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
      if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
        if (evictableMmapped.size() < maxEvictableMmapedSize) {
        needMoreSpace = true;
      ShortCircuitReplica replica = (ShortCircuitReplica)evictableMmapped.get(
      if (LOG.isTraceEnabled()) {
        String rationale = needMoreSpace ? "because we need more space" :
            "because it's too old";
        LOG.trace("demoteOldEvictable: demoting " + replica + ": " +
            rationale + ": " +
      removeEvictable(replica, evictableMmapped);
      insertEvictable(evictionTimeNs, replica, evictable);
    return numDemoted;

   * Trim the eviction lists.
  private void trimEvictionMaps() {
    long now = Time.monotonicNow();

    while (evictable.size() + evictableMmapped.size() > maxTotalSize) {
      ShortCircuitReplica replica;
      if (evictable.isEmpty()) {
        replica = (ShortCircuitReplica) evictableMmapped
      } else {
        replica = (ShortCircuitReplica) evictable.get(evictable.firstKey());

      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": trimEvictionMaps is purging " + replica +

   * Munmap a replica, updating outstandingMmapCount.
   * @param replica  The replica to munmap.
  private void munmap(ShortCircuitReplica replica) {

   * Remove a replica from an evictable map.
   * @param replica   The replica to remove.
   * @return          The map it was removed from.
  private String removeEvictable(ShortCircuitReplica replica) {
    if (replica.hasMmap()) {
      removeEvictable(replica, evictableMmapped);
      return "evictableMmapped";
    } else {
      removeEvictable(replica, evictable);
      return "evictable";

   * Remove a replica from an evictable map.
   * @param replica   The replica to remove.
   * @param map       The map to remove it from.
  private void removeEvictable(ShortCircuitReplica replica,
      LinkedMap map) {
    Long evictableTimeNs = replica.getEvictableTimeNs();
    ShortCircuitReplica removed = (ShortCircuitReplica)map.remove(
    Preconditions.checkState(removed == replica,
        "failed to make %s unevictable", replica);

   * Insert a replica into an evictable map.
   * If an element already exists with this eviction time, we add a nanosecond
   * to it until we find an unused key.
   * @param evictionTimeNs   The eviction time in absolute nanoseconds.
   * @param replica          The replica to insert.
   * @param map              The map to insert it into.
  private void insertEvictable(Long evictionTimeNs,
      ShortCircuitReplica replica, LinkedMap map) {
    while (map.containsKey(evictionTimeNs)) {
    Preconditions.checkState(null == replica.getEvictableTimeNs());
    map.put(evictionTimeNs, replica);

   * Purge a replica from the cache.
   * This doesn't necessarily close the replica, since there may be
   * outstanding references to it.  However, it does mean the cache won't
   * hand it out to anyone after this.
   * You must hold the cache lock while calling this function.
   * @param replica   The replica being removed.
  private void purge(ShortCircuitReplica replica) {
    boolean removedFromInfoMap = false;
    String evictionMapName = null;
    replica.purged = true;
    Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key);
    if (val != null) {
      ShortCircuitReplicaInfo info = val.getVal();
      if ((info != null) && (info.getReplica() == replica)) {
        removedFromInfoMap = true;
    Long evictableTimeNs = replica.getEvictableTimeNs();
    if (evictableTimeNs != null) {
      evictionMapName = removeEvictable(replica);
    if (LOG.isTraceEnabled()) {
      StringBuilder builder = new StringBuilder();
      builder.append(this).append(": ").append(": purged ").
          append(replica).append(" from the cache.");
      if (removedFromInfoMap) {
        builder.append("  Removed from the replicaInfoMap.");
      if (evictionMapName != null) {
        builder.append("  Removed from ").append(evictionMapName);

  static final int FETCH_OR_CREATE_RETRY_TIMES = 3;
   * Fetch or create a replica.
   * You must hold the cache lock while calling this function.
   * @param key          Key to use for lookup.
   * @param creator      Replica creator callback.  Will be called without
   *                     the cache lock being held.
   * @return             Null if no replica could be found or created.
   *                     The replica, otherwise.
  public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
      ShortCircuitReplicaCreator creator) {
    Waitable<ShortCircuitReplicaInfo> newWaitable;
    try {
      ShortCircuitReplicaInfo info = null;
      for (int i = 0; i < FETCH_OR_CREATE_RETRY_TIMES; i++){
        if (closed) {
          LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
              this, key);
          return null;
        Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
        if (waitable != null) {
          try {
            info = fetch(key, waitable);
          } catch (RetriableException e) {
            LOG.debug("{}: retrying {}", this, e.getMessage());
      if (info != null) return info;
      // We need to load the replica ourselves.
      newWaitable = new Waitable<>(lock.newCondition());
      replicaInfoMap.put(key, newWaitable);
    } finally {
    return create(key, creator, newWaitable);

   * Fetch an existing ReplicaInfo object.
   * @param key       The key that we're using.
   * @param waitable  The waitable object to wait on.
   * @return          The existing ReplicaInfo object, or null if there is
   *                  none.
   * @throws RetriableException   If the caller needs to retry.
  @VisibleForTesting // ONLY for testing
  protected ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
      Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
    // Another thread is already in the process of loading this
    // ShortCircuitReplica.  So we simply wait for it to complete.
    ShortCircuitReplicaInfo info;
    try {
      LOG.trace("{}: found waitable for {}", this, key);
      info = waitable.await();
    } catch (InterruptedException e) {
      LOG.info(this + ": interrupted while waiting for " + key);
      throw new RetriableException("interrupted");
    if (info.getInvalidTokenException() != null) {
      LOG.info(this + ": could not get " + key + " due to InvalidToken " +
          "exception.", info.getInvalidTokenException());
      return info;
    ShortCircuitReplica replica = info.getReplica();
    if (replica == null) {
      LOG.warn(this + ": failed to get " + key);
      return info;
    if (replica.purged) {
      // Ignore replicas that have already been purged from the cache.
      throw new RetriableException("Ignoring purged replica " +
          replica + ".  Retrying.");
    // Check if the replica is stale before using it.
    // If it is, purge it and retry.
    if (replica.isStale()) {
      LOG.info(this + ": got stale replica " + replica + ".  Removing " +
          "this replica from the replicaInfoMap and retrying.");
      // Remove the cache's reference to the replica.  This may or may not
      // trigger a close.
      throw new RetriableException("ignoring stale replica " + replica);
    return info;

  private ShortCircuitReplicaInfo create(ExtendedBlockId key,
      ShortCircuitReplicaCreator creator,
      Waitable<ShortCircuitReplicaInfo> newWaitable) {
    // Handle loading a new replica.
    ShortCircuitReplicaInfo info = null;
    try {
      LOG.trace("{}: loading {}", this, key);
      info = creator.createShortCircuitReplicaInfo();
    } catch (RuntimeException e) {
      LOG.warn(this + ": failed to load " + key, e);
    if (info == null) info = new ShortCircuitReplicaInfo();
    try {
      if (info.getReplica() != null) {
        // On success, make sure the cache cleaner thread is running.
        LOG.trace("{}: successfully loaded {}", this, info.getReplica());
        // Note: new ShortCircuitReplicas start with a refCount of 2,
        // indicating that both this cache and whoever requested the
        // creation of the replica hold a reference.  So we don't need
        // to increment the reference count here.
      } else {
        // On failure, remove the waitable from the replicaInfoMap.
        Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key);
        if (waitableInMap == newWaitable) replicaInfoMap.remove(key);
        if (info.getInvalidTokenException() != null) {
          LOG.info(this + ": could not load " + key + " due to InvalidToken " +
              "exception.", info.getInvalidTokenException());
        } else {
          LOG.warn(this + ": failed to load " + key);
    } finally {
    return info;

  private void startCacheCleanerThreadIfNeeded() {
    if (cacheCleaner == null) {
      cacheCleaner = new CacheCleaner();
      long rateMs = cacheCleaner.getRateInMs();
      ScheduledFuture<?> future =
          cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
      LOG.debug("{}: starting cache cleaner thread which will run every {} ms",
          this, rateMs);

  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
      boolean anchored) {
    Condition newCond;
    try {
      while (replica.mmapData != null) {
        if (replica.mmapData instanceof MappedByteBuffer) {
          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
          return new ClientMmap(replica, mmap, anchored);
        } else if (replica.mmapData instanceof Long) {
          long lastAttemptTimeMs = (Long)replica.mmapData;
          long delta = Time.monotonicNow() - lastAttemptTimeMs;
          if (delta < mmapRetryTimeoutMs) {
            LOG.trace("{}: can't create client mmap for {} because we failed to"
                + " create one just {}ms ago.", this, replica, delta);
            return null;
          LOG.trace("{}: retrying client mmap for {}, {} ms after the previous "
              + "failure.", this, replica, delta);
        } else if (replica.mmapData instanceof Condition) {
          Condition cond = (Condition)replica.mmapData;
        } else {
          Preconditions.checkState(false, "invalid mmapData type %s",
      newCond = lock.newCondition();
      replica.mmapData = newCond;
    } finally {
    MappedByteBuffer map = replica.loadMmapInternal();
    try {
      if (map == null) {
        replica.mmapData = Time.monotonicNow();
        return null;
      } else {
        replica.mmapData = map;
        return new ClientMmap(replica, map, anchored);
    } finally {

   * Close the cache and free all associated resources.
  public void close() {
    try {
      if (closed) return;
      closed = true;
      LOG.info(this + ": closing");
      maxNonMmappedEvictableLifespanMs = 0;
      maxEvictableMmapedSize = 0;
      // Close and join cacheCleaner thread.
      IOUtilsClient.cleanupWithLogger(LOG, cacheCleaner);
      // Purge all replicas.
      while (!evictable.isEmpty()) {
        Object eldestKey = evictable.firstKey();
        purge((ShortCircuitReplica) evictable.get(eldestKey));
      while (!evictableMmapped.isEmpty()) {
        Object eldestKey = evictableMmapped.firstKey();
        purge((ShortCircuitReplica) evictableMmapped.get(eldestKey));
    } finally {

    // wait for existing tasks to terminate
    try {
      if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
        LOG.error("Forcing SlotReleaserThreadPool to shutdown!");
    } catch (InterruptedException e) {
      LOG.error("Interrupted while waiting for SlotReleaserThreadPool "
          + "to terminate", e);

    // wait for existing tasks to terminate
    try {
      if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
        LOG.error("Forcing CleanerThreadPool to shutdown!");
    } catch (InterruptedException e) {
      LOG.error("Interrupted while waiting for CleanerThreadPool "
          + "to terminate", e);
    IOUtilsClient.cleanupWithLogger(LOG, shmManager);

  @VisibleForTesting // ONLY for testing
  public interface CacheVisitor {
    void visit(int numOutstandingMmaps,
        Map<ExtendedBlockId, ShortCircuitReplica> replicas,
        Map<ExtendedBlockId, InvalidToken> failedLoads,
        LinkedMap evictable,
        LinkedMap evictableMmapped);

  @VisibleForTesting // ONLY for testing
  public void accept(CacheVisitor visitor) {
    try {
      Map<ExtendedBlockId, ShortCircuitReplica> replicas = new HashMap<>();
      Map<ExtendedBlockId, InvalidToken> failedLoads = new HashMap<>();
      for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
          replicaInfoMap.entrySet()) {
        Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
        if (waitable.hasVal()) {
          if (waitable.getVal().getReplica() != null) {
            replicas.put(entry.getKey(), waitable.getVal().getReplica());
          } else {
            // The exception may be null here, indicating a failed load that
            // isn't the result of an invalid block token.
      LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, "
              + "failedLoads={}, evictable={}, evictableMmapped={}",
          visitor.getClass().getName(), outstandingMmapCount, replicas,
          failedLoads, evictable, evictableMmapped);
      visitor.visit(outstandingMmapCount, replicas, failedLoads,
          evictable, evictableMmapped);
    } finally {

  public String toString() {
    return "ShortCircuitCache(0x" +
        Integer.toHexString(System.identityHashCode(this)) + ")";

   * Allocate a new shared memory slot.
   * @param datanode       The datanode to allocate a shm slot with.
   * @param peer           A peer connected to the datanode.
   * @param usedPeer       Will be set to true if we use up the provided peer.
   * @param blockId        The block id and block pool id of the block we're
   *                         allocating this slot for.
   * @param clientName     The name of the DFSClient allocating the shared
   *                         memory.
   * @return               Null if short-circuit shared memory is disabled;
   *                         a short-circuit memory slot otherwise.
   * @throws IOException   An exception if there was an error talking to
   *                         the datanode.
  public Slot allocShmSlot(DatanodeInfo datanode,
      DomainPeer peer, MutableBoolean usedPeer,
      ExtendedBlockId blockId, String clientName) throws IOException {
    if (shmManager != null) {
      return shmManager.allocSlot(datanode, peer, usedPeer,
          blockId, clientName);
    } else {
      return null;

   * Free a slot immediately.
   * ONLY use this if the DataNode is not yet aware of the slot.
   * @param slot           The slot to free.
  public void freeSlot(Slot slot) {
    Preconditions.checkState(shmManager != null);

   * Schedule a shared memory slot to be released.
   * @param slot           The slot to release.
  public void scheduleSlotReleaser(Slot slot) {
    if (slot == null) {
    Preconditions.checkState(shmManager != null);
    releaserExecutor.execute(new SlotReleaser(slot));

  public DfsClientShmManager getDfsClientShmManager() {
    return shmManager;

   * Can be used in testing to verify whether a read went through SCR, after
   * the read is done and before the stream is closed.
  public int getReplicaInfoMapSize() {
    return replicaInfoMap.size();


hadoop 源码目录


hadoop ClientMmap 源码

hadoop DfsClientShm 源码

hadoop DfsClientShmManager 源码

hadoop DomainSocketFactory 源码

hadoop ShortCircuitReplica 源码

hadoop ShortCircuitReplicaInfo 源码

hadoop ShortCircuitShm 源码

0  赞