hadoop ConnectionManager 源码

  • 2022-10-20
  • 浏览 (175)

haddop ConnectionManager 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implements a pool of connections for the {@link Router} to be able to open
 * many connections to many Namenodes.
 */
public class ConnectionManager {

  private static final Logger LOG =
      LoggerFactory.getLogger(ConnectionManager.class);

  /** Configuration for the connection manager, pool and sockets. */
  private final Configuration conf;

  /** Min number of connections per user + nn. */
  private final int minSize = 1;
  /** Max number of connections per user + nn. */
  private final int maxSize;
  /** Min ratio of active connections per user + nn. */
  private final float minActiveRatio;

  /** How often we close a pool for a particular user + nn. */
  private final long poolCleanupPeriodMs;
  /** How often we close a connection in a pool. */
  private final long connectionCleanupPeriodMs;

  /** Map of connection pools, one pool per user + NN. */
  private final Map<ConnectionPoolId, ConnectionPool> pools;
  /** Lock for accessing pools. */
  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  /** Queue for creating new connections. */
  private final BlockingQueue<ConnectionPool> creatorQueue;
  /**
   * Global federated namespace context for router.
   */
  private final RouterStateIdContext routerStateIdContext;
  /**
   * Map from connection pool ID to namespace.
   */
  private final Map<ConnectionPoolId, String> connectionPoolToNamespaceMap;
  /** Max size of queue for creating new connections. */
  private final int creatorQueueMaxSize;

  /** Create new connections asynchronously. */
  private final ConnectionCreator creator;
  /** Periodic executor to remove stale connection pools. */
  private final ScheduledThreadPoolExecutor cleaner =
      new ScheduledThreadPoolExecutor(1);

  /** If the connection manager is running. */
  private boolean running = false;

  public ConnectionManager(Configuration config) {
    this(config, new RouterStateIdContext(config));
  }

  /**
   * Creates a proxy client connection pool manager.
   *
   * @param config Configuration for the connections.
   */
  public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
    this.conf = config;
    this.routerStateIdContext = routerStateIdContext;
    this.connectionPoolToNamespaceMap = new HashMap<>();
    // Configure minimum, maximum and active connection pools
    this.maxSize = this.conf.getInt(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
    this.minActiveRatio = this.conf.getFloat(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT);

    // Map with the connections indexed by UGI and Namenode
    this.pools = new HashMap<>();

    // Create connections in a thread asynchronously
    this.creatorQueueMaxSize = this.conf.getInt(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT
        );
    this.creatorQueue = new ArrayBlockingQueue<>(this.creatorQueueMaxSize);
    this.creator = new ConnectionCreator(this.creatorQueue);
    this.creator.setDaemon(true);

    // Cleanup periods
    this.poolCleanupPeriodMs = this.conf.getLong(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
    LOG.info("Cleaning connection pools every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
    this.connectionCleanupPeriodMs = this.conf.getLong(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
    LOG.info("Cleaning connections every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
  }

  /**
   * Start the connection manager.
   */
  public void start() {
    // Start the thread that creates connections asynchronously
    this.creator.start();

    // Schedule a task to remove stale connection pools and sockets
    long recycleTimeMs = Math.min(
        poolCleanupPeriodMs, connectionCleanupPeriodMs);
    LOG.info("Cleaning every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(recycleTimeMs));
    this.cleaner.scheduleAtFixedRate(
        new CleanupTask(), 0, recycleTimeMs, TimeUnit.MILLISECONDS);

    // Mark the manager as running
    this.running = true;
  }

  /**
   * Stop the connection manager by closing all the pools.
   */
  public void close() {
    this.creator.shutdown();
    this.cleaner.shutdown();
    this.running = false;

    writeLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        pool.close();
      }
      this.pools.clear();
      for (String nsID: connectionPoolToNamespaceMap.values()) {
        routerStateIdContext.removeNamespaceStateId(nsID);
      }
      connectionPoolToNamespaceMap.clear();
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Fetches the next available proxy client in the pool. Each client connection
   * is reserved for a single user and cannot be reused until free.
   *
   * @param ugi User group information.
   * @param nnAddress Namenode address for the connection.
   * @param protocol Protocol for the connection.
   * @param nsId Nameservice identity.
   * @return Proxy client to connect to nnId as UGI.
   * @throws IOException If the connection cannot be obtained.
   */
  public ConnectionContext getConnection(UserGroupInformation ugi,
      String nnAddress, Class<?> protocol, String nsId) throws IOException {
    // Check if the manager is shutdown
    if (!this.running) {
      LOG.error(
          "Cannot get a connection to {} because the manager isn't running",
          nnAddress);
      return null;
    }

    // Try to get the pool if created
    ConnectionPoolId connectionId =
        new ConnectionPoolId(ugi, nnAddress, protocol);
    ConnectionPool pool = null;
    readLock.lock();
    try {
      pool = this.pools.get(connectionId);
    } finally {
      readLock.unlock();
    }

    // Create the pool if not created before
    if (pool == null) {
      writeLock.lock();
      try {
        pool = this.pools.get(connectionId);
        if (pool == null) {
          pool = new ConnectionPool(
              this.conf, nnAddress, ugi, this.minSize, this.maxSize,
              this.minActiveRatio, protocol,
              new PoolAlignmentContext(this.routerStateIdContext, nsId));
          this.pools.put(connectionId, pool);
          this.connectionPoolToNamespaceMap.put(connectionId, nsId);
        }
        long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
        pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);
      } finally {
        writeLock.unlock();
      }
    }

    ConnectionContext conn = pool.getConnection();

    // Add a new connection to the pool if it wasn't usable
    if (conn == null || !conn.isUsable()) {
      if (!this.creatorQueue.offer(pool)) {
        LOG.error("Cannot add more than {} connections at the same time",
            this.creatorQueueMaxSize);
      }
    }

    if (conn != null && conn.isClosed()) {
      LOG.error("We got a closed connection from {}", pool);
      conn = null;
    }

    return conn;
  }

  /**
   * Get the number of connection pools.
   *
   * @return Number of connection pools.
   */
  public int getNumConnectionPools() {
    readLock.lock();
    try {
      return pools.size();
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get number of open connections.
   *
   * @return Number of open connections.
   */
  public int getNumConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of active connections.
   *
   * @return Number of active connections.
   */
  public int getNumActiveConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumActiveConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of idle connections.
   *
   * @return Number of active connections.
   */
  public int getNumIdleConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumIdleConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of recently active connections.
   *
   * @return Number of recently active connections.
   */
  public int getNumActiveConnectionsRecently() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumActiveConnectionsRecently();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get the number of connections to be created.
   *
   * @return Number of connections to be created.
   */
  public int getNumCreatingConnections() {
    return this.creatorQueue.size();
  }

  /**
   * Get a JSON representation of the connection pool.
   *
   * @return JSON representation of all the connection pools.
   */
  public String getJSON() {
    final Map<String, String> info = new TreeMap<>();
    readLock.lock();
    try {
      for (Entry<ConnectionPoolId, ConnectionPool> entry :
          this.pools.entrySet()) {
        ConnectionPoolId connectionPoolId = entry.getKey();
        ConnectionPool pool = entry.getValue();
        info.put(connectionPoolId.toString(), pool.getJSON());
      }
    } finally {
      readLock.unlock();
    }
    return JSON.toString(info);
  }

  @VisibleForTesting
  Map<ConnectionPoolId, ConnectionPool> getPools() {
    return this.pools;
  }

  /**
   * Clean the unused connections for this pool.
   *
   * @param pool Connection pool to cleanup.
   */
  @VisibleForTesting
  void cleanup(ConnectionPool pool) {
    if (pool.getNumConnections() > pool.getMinSize()) {
      // Check if the pool hasn't been active in a while or not 50% are used
      long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
      int total = pool.getNumConnections();
      // Active is a transient status in many cases for a connection since
      // the handler thread uses the connection very quickly. Thus, the number
      // of connections with handlers using at the call time is constantly low.
      // Recently active is more lasting status, and it shows how many
      // connections have been used with a recent time period. (i.e. 30 seconds)
      int active = pool.getNumActiveConnectionsRecently();
      float poolMinActiveRatio = pool.getMinActiveRatio();
      if (timeSinceLastActive > connectionCleanupPeriodMs ||
          active < poolMinActiveRatio * total) {
        // Be greedy here to close as many connections as possible in one shot
        // The number should at least be 1
        int targetConnectionsCount = Math.max(1,
            (int)(poolMinActiveRatio * total) - active);
        List<ConnectionContext> connections =
            pool.removeConnections(targetConnectionsCount);
        for (ConnectionContext conn : connections) {
          conn.close();
        }
        LOG.debug("Removed connection {} used {} seconds ago. " +
                "Pool has {}/{} connections", pool.getConnectionPoolId(),
            TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
            pool.getNumConnections(), pool.getMaxSize());
      }
    }
  }

  /**
   * Removes stale connections not accessed recently from the pool. This is
   * invoked periodically.
   */
  private class CleanupTask implements Runnable {

    @Override
    public void run() {
      long currentTime = Time.now();
      List<ConnectionPoolId> toRemove = new LinkedList<>();

      // Look for stale pools
      readLock.lock();
      try {
        for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
          ConnectionPool pool = entry.getValue();
          long lastTimeActive = pool.getLastActiveTime();
          boolean isStale =
              currentTime > (lastTimeActive + poolCleanupPeriodMs);
          if (lastTimeActive > 0 && isStale) {
            // Remove this pool
            LOG.debug("Closing and removing stale pool {}", pool);
            pool.close();
            ConnectionPoolId poolId = entry.getKey();
            toRemove.add(poolId);
          } else {
            // Keep this pool but clean connections inside
            LOG.debug("Cleaning up {}", pool);
            cleanup(pool);
          }
        }
      } finally {
        readLock.unlock();
      }

      // Remove stale pools
      if (!toRemove.isEmpty()) {
        writeLock.lock();
        try {
          for (ConnectionPoolId poolId : toRemove) {
            pools.remove(poolId);
            String nsID = connectionPoolToNamespaceMap.get(poolId);
            connectionPoolToNamespaceMap.remove(poolId);
            if (!connectionPoolToNamespaceMap.values().contains(nsID)) {
              routerStateIdContext.removeNamespaceStateId(nsID);
            }
          }
        } finally {
          writeLock.unlock();
        }
      }
    }
  }

  /**
   * Thread that creates connections asynchronously.
   */
  static class ConnectionCreator extends Thread {
    /** If the creator is running. */
    private boolean running = true;
    /** Queue to push work to. */
    private BlockingQueue<ConnectionPool> queue;

    ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
      super("Connection creator");
      this.queue = blockingQueue;
    }

    @Override
    public void run() {
      while (this.running) {
        try {
          ConnectionPool pool = this.queue.take();
          try {
            int total = pool.getNumConnections();
            int active = pool.getNumActiveConnectionsRecently();
            float poolMinActiveRatio = pool.getMinActiveRatio();
            if (pool.getNumConnections() < pool.getMaxSize() &&
                active >= poolMinActiveRatio * total) {
              ConnectionContext conn = pool.newConnection();
              pool.addConnection(conn);
            } else {
              LOG.debug("Cannot add more than {} connections to {}",
                  pool.getMaxSize(), pool);
            }
          } catch (IOException e) {
            LOG.error("Cannot create a new connection", e);
          }
        } catch (InterruptedException e) {
          LOG.error("The connection creator was interrupted");
          this.running = false;
        } catch (Throwable e) {
          LOG.error("Fatal error caught by connection creator ", e);
        }
      }
    }

    /**
     * Stop this connection creator.
     */
    public void shutdown() {
      this.running = false;
      this.interrupt();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

hadoop MountTableRefresherService 源码

0  赞