hadoop OpenFileCtxCache 源码

  • 2022-10-20
  • 浏览 (179)

haddop OpenFileCtxCache 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.nfs.nfs3;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;

/**
 * A cache saves OpenFileCtx objects for different users. Each cache entry is
 * used to maintain the writing context for a single file.
 */
class OpenFileCtxCache {
  private static final Logger LOG =
      LoggerFactory.getLogger(OpenFileCtxCache.class);
  // Insert and delete with openFileMap are synced
  private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
      .newConcurrentMap();

  private final int maxStreams;
  private final long streamTimeout;
  private final StreamMonitor streamMonitor;

  OpenFileCtxCache(NfsConfiguration config, long streamTimeout) {
    maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY,
        NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT);
    LOG.info("Maximum open streams is " + maxStreams);
    this.streamTimeout = streamTimeout;
    streamMonitor = new StreamMonitor();
  }

  /**
   * The entry to be evicted is based on the following rules:<br>
   * 1. if the OpenFileCtx has any pending task, it will not be chosen.<br>
   * 2. if there is inactive OpenFileCtx, the first found one is to evict. <br>
   * 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one 
   * is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it
   * will be evicted. Otherwise, the whole eviction request is failed.
   */
  @VisibleForTesting
  Entry<FileHandle, OpenFileCtx> getEntryToEvict() {
    Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
        .iterator();
    if (LOG.isTraceEnabled()) {
      LOG.trace("openFileMap size:" + size());
    }

    Entry<FileHandle, OpenFileCtx> idlest = null;
    
    while (it.hasNext()) {
      Entry<FileHandle, OpenFileCtx> pairs = it.next();
      OpenFileCtx ctx = pairs.getValue();
      if (!ctx.getActiveState()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Got one inactive stream: " + ctx);
        }
        return pairs;
      }
      if (ctx.hasPendingWork()) {
        // Always skip files with pending work.
        continue;
      }
      if (idlest == null) {
        idlest = pairs;
      } else {
        if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) {
          idlest = pairs;
        }
      }
    }

    if (idlest == null) {
      LOG.warn("No eviction candidate. All streams have pending work.");
      return null;
    } else {
      long idleTime = Time.monotonicNow()
          - idlest.getValue().getLastAccessTime();
      if (idleTime < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("idlest stream's idle time:" + idleTime);
        }
        LOG.warn("All opened streams are busy, can't remove any from cache.");
        return null;
      } else {
        return idlest;
      }
    }
  }

  boolean put(FileHandle h, OpenFileCtx context) {
    OpenFileCtx toEvict = null;
    synchronized (this) {
      Preconditions.checkState(size() <= this.maxStreams,
          "stream cache size " + size() + "  is larger than maximum" + this
              .maxStreams);
      if (size() == this.maxStreams) {
        Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict();
        if (pairs ==null) {
          return false;
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Evict stream ctx: " + pairs.getValue());
          }
          toEvict = openFileMap.remove(pairs.getKey());
          Preconditions.checkState(toEvict == pairs.getValue(),
              "The deleted entry is not the same as odlest found.");
        }
      }
      openFileMap.put(h, context);
    }
    
    // Cleanup the old stream outside the lock
    if (toEvict != null) {
      toEvict.cleanup();
    }
    return true;
  }

  @VisibleForTesting
  void scan(long streamTimeout) {
    ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>();
    Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
        .iterator();
    if (LOG.isTraceEnabled()) {
      LOG.trace("openFileMap size:" + size());
    }

    while (it.hasNext()) {
      Entry<FileHandle, OpenFileCtx> pairs = it.next();
      FileHandle handle = pairs.getKey();
      OpenFileCtx ctx = pairs.getValue();
      if (!ctx.streamCleanup(handle, streamTimeout)) {
        continue;
      }

      // Check it again inside lock before removing
      synchronized (this) {
        OpenFileCtx ctx2 = openFileMap.get(handle);
        if (ctx2 != null) {
          if (ctx2.streamCleanup(handle, streamTimeout)) {
            openFileMap.remove(handle);
            if (LOG.isDebugEnabled()) {
              LOG.debug("After remove stream " + handle.dumpFileHandle()
                  + ", the stream number:" + size());
            }
            ctxToRemove.add(ctx2);
          }
        }
      }
    }

    // Invoke the cleanup outside the lock
    for (OpenFileCtx ofc : ctxToRemove) {
      ofc.cleanup();
    }
  }

  OpenFileCtx get(FileHandle key) {
    return openFileMap.get(key);
  }

  int size() {
    return openFileMap.size();
  }

  void start() {
    streamMonitor.start();
  }

  // Evict all entries
  void cleanAll() {
    ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>();
    synchronized (this) {
      Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
          .iterator();
      if (LOG.isTraceEnabled()) {
        LOG.trace("openFileMap size:" + size());
      }

      while (it.hasNext()) {
        Entry<FileHandle, OpenFileCtx> pairs = it.next();
        OpenFileCtx ctx = pairs.getValue();
        it.remove();
        cleanedContext.add(ctx);
      }
    }

    // Invoke the cleanup outside the lock
    for (OpenFileCtx ofc : cleanedContext) {
      ofc.cleanup();
    }
  }

  void shutdown() {
    // stop the dump thread
    if (streamMonitor.isAlive()) {
      streamMonitor.shouldRun(false);
      streamMonitor.interrupt();
      try {
        streamMonitor.join(3000);
      } catch (InterruptedException ignored) {
      }
    }
    
    cleanAll();
  }

  /**
   * StreamMonitor wakes up periodically to find and closes idle streams.
   */
  class StreamMonitor extends Daemon {
    private final static int rotation = 5 * 1000; // 5 seconds
    private long lastWakeupTime = 0;
    private boolean shouldRun = true;
    
    void shouldRun(boolean shouldRun) {
      this.shouldRun = shouldRun;
    }
    
    @Override
    public void run() {
      while (shouldRun) {
        scan(streamTimeout);

        // Check if it can sleep
        try {
          long workedTime = Time.monotonicNow() - lastWakeupTime;
          if (workedTime < rotation) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("StreamMonitor can still have a sleep:"
                  + ((rotation - workedTime) / 1000));
            }
            Thread.sleep(rotation - workedTime);
          }
          lastWakeupTime = Time.monotonicNow();

        } catch (InterruptedException e) {
          LOG.info("StreamMonitor got interrupted");
          return;
        }
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AsyncDataService 源码

hadoop DFSClientCache 源码

hadoop Nfs3 源码

hadoop Nfs3HttpServer 源码

hadoop Nfs3Metrics 源码

hadoop Nfs3Utils 源码

hadoop OffsetRange 源码

hadoop OpenFileCtx 源码

hadoop PrivilegedNfsGatewayStarter 源码

hadoop RpcProgramNfs3 源码

0  赞