hadoop ConnectionPool 源码

  • 2022-10-20
  • 浏览 (197)

haddop ConnectionPool 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.SocketFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Maintains a pool of connections for each User (including tokens) + NN. The
 * RPC client maintains a single socket, to achieve throughput similar to a NN,
 * each request is multiplexed across multiple sockets/connections from a
 * pool.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ConnectionPool {

  private static final Logger LOG =
      LoggerFactory.getLogger(ConnectionPool.class);

  /** Configuration settings for the connection pool. */
  private final Configuration conf;

  /** Identifier for this connection pool. */
  private final ConnectionPoolId connectionPoolId;
  /** Namenode this pool connects to. */
  private final String namenodeAddress;
  /** User for this connections. */
  private final UserGroupInformation ugi;
  /** Class of the protocol. */
  private final Class<?> protocol;

  /** Pool of connections. We mimic a COW array. */
  private volatile List<ConnectionContext> connections = new ArrayList<>();
  /** Connection index for round-robin. */
  private final AtomicInteger clientIndex = new AtomicInteger(0);
  /** Underlying socket index. **/
  private final AtomicInteger socketIndex = new AtomicInteger(0);

  /** Min number of connections per user. */
  private final int minSize;
  /** Max number of connections per user. */
  private final int maxSize;
  /** Min ratio of active connections per user. */
  private final float minActiveRatio;

  /** The last time a connection was active. */
  private volatile long lastActiveTime = 0;

  /** Enable using multiple physical socket or not. **/
  private final boolean enableMultiSocket;
  /** StateID alignment context. */
  private final PoolAlignmentContext alignmentContext;

  /** Map for the protocols and their protobuf implementations. */
  private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
  static {
    PROTO_MAP.put(ClientProtocol.class,
        new ProtoImpl(ClientNamenodeProtocolPB.class,
            ClientNamenodeProtocolTranslatorPB.class));
    PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl(
        NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class));
    PROTO_MAP.put(RefreshUserMappingsProtocol.class,
        new ProtoImpl(RefreshUserMappingsProtocolPB.class,
            RefreshUserMappingsProtocolClientSideTranslatorPB.class));
    PROTO_MAP.put(GetUserMappingsProtocol.class,
        new ProtoImpl(GetUserMappingsProtocolPB.class,
            GetUserMappingsProtocolClientSideTranslatorPB.class));
  }

  /** Class to store the protocol implementation. */
  private static class ProtoImpl {
    private final Class<?> protoPb;
    private final Class<?> protoClientPb;

    ProtoImpl(Class<?> pPb, Class<?> pClientPb) {
      this.protoPb = pPb;
      this.protoClientPb = pClientPb;
    }
  }

  protected ConnectionPool(Configuration config, String address,
      UserGroupInformation user, int minPoolSize, int maxPoolSize,
      float minActiveRatio, Class<?> proto, PoolAlignmentContext alignmentContext)
      throws IOException {

    this.conf = config;

    // Connection pool target
    this.ugi = user;
    this.namenodeAddress = address;
    this.protocol = proto;
    this.connectionPoolId =
        new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol);

    // Set configuration parameters for the pool
    this.minSize = minPoolSize;
    this.maxSize = maxPoolSize;
    this.minActiveRatio = minActiveRatio;
    this.enableMultiSocket = conf.getBoolean(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT);

    this.alignmentContext = alignmentContext;

    // Add minimum connections to the pool
    for (int i = 0; i < this.minSize; i++) {
      ConnectionContext newConnection = newConnection();
      this.connections.add(newConnection);
    }
    LOG.debug("Created connection pool \"{}\" with {} connections",
        this.connectionPoolId, this.minSize);
  }

  /**
   * Get the maximum number of connections allowed in this pool.
   *
   * @return Maximum number of connections.
   */
  protected int getMaxSize() {
    return this.maxSize;
  }

  /**
   * Get the minimum number of connections in this pool.
   *
   * @return Minimum number of connections.
   */
  protected int getMinSize() {
    return this.minSize;
  }

  /**
   * Get the minimum ratio of active connections in this pool.
   *
   * @return Minimum ratio of active connections.
   */
  protected float getMinActiveRatio() {
    return this.minActiveRatio;
  }

  /**
   * Get the connection pool identifier.
   *
   * @return Connection pool identifier.
   */
  protected ConnectionPoolId getConnectionPoolId() {
    return this.connectionPoolId;
  }

  /**
   * Get the clientIndex used to calculate index for lookup.
   * @return Client index.
   */
  @VisibleForTesting
  public AtomicInteger getClientIndex() {
    return this.clientIndex;
  }

  /**
   * Get the alignment context for this pool.
   * @return Alignment context
   */
  public PoolAlignmentContext getPoolAlignmentContext() {
    return this.alignmentContext;
  }

  /**
   * Return the next connection round-robin.
   *
   * @return Connection context.
   */
  protected ConnectionContext getConnection() {
    this.lastActiveTime = Time.now();
    List<ConnectionContext> tmpConnections = this.connections;
    for (ConnectionContext tmpConnection : tmpConnections) {
      if (tmpConnection != null && tmpConnection.isUsable()) {
        return tmpConnection;
      }
    }

    ConnectionContext conn = null;
    // We return a connection even if it's busy
    int size = tmpConnections.size();
    if (size > 0) {
      // Get a connection from the pool following round-robin
      // Inc and mask off sign bit, lookup index should be non-negative int
      int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF;
      conn = tmpConnections.get(threadIndex % size);
    }
    return conn;
  }

  /**
   * Add a connection to the current pool. It uses a Copy-On-Write approach.
   *
   * @param conn New connection to add to the pool.
   */
  public synchronized void addConnection(ConnectionContext conn) {
    List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections);
    tmpConnections.add(conn);
    this.connections = tmpConnections;

    this.lastActiveTime = Time.now();
  }

  /**
   * Remove connections from the current pool.
   *
   * @param num Number of connections to remove.
   * @return Removed connections.
   */
  public synchronized List<ConnectionContext> removeConnections(int num) {
    List<ConnectionContext> removed = new LinkedList<>();
    if (this.connections.size() > this.minSize) {
      int targetCount = Math.min(num, this.connections.size() - this.minSize);
      // Remove and close targetCount of connections
      List<ConnectionContext> tmpConnections = new ArrayList<>();
      for (ConnectionContext conn : this.connections) {
        // Only pick idle connections to close
        if (removed.size() < targetCount && conn.isIdle()) {
          removed.add(conn);
        } else {
          tmpConnections.add(conn);
        }
      }
      this.connections = tmpConnections;
    }
    LOG.debug("Expected to remove {} connection and actually removed {} connections",
        num, removed.size());
    return removed;
  }

  /**
   * Close the connection pool.
   */
  protected synchronized void close() {
    long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds(
        Time.now() - getLastActiveTime());
    LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago",
        this.connectionPoolId, timeSinceLastActive);

    for (ConnectionContext connection : this.connections) {
      connection.close(true);
    }
    this.connections.clear();
  }

  /**
   * Number of connections in the pool.
   *
   * @return Number of connections.
   */
  protected int getNumConnections() {
    return this.connections.size();
  }

  /**
   * Number of active connections in the pool.
   *
   * @return Number of active connections.
   */
  protected int getNumActiveConnections() {
    int ret = 0;
    List<ConnectionContext> tmpConnections = this.connections;
    for (ConnectionContext conn : tmpConnections) {
      if (conn.isActive()) {
        ret++;
      }
    }
    return ret;
  }

  /**
   * Number of usable i.e. no active thread connections.
   *
   * @return Number of idle connections
   */
  protected int getNumIdleConnections() {
    int ret = 0;
    List<ConnectionContext> tmpConnections = this.connections;
    for (ConnectionContext conn : tmpConnections) {
      if (conn.isIdle()) {
        ret++;
      }
    }
    return ret;
  }

  /**
   * Number of active connections recently in the pool.
   *
   * @return Number of active connections recently.
   */
  protected int getNumActiveConnectionsRecently() {
    int ret = 0;
    List<ConnectionContext> tmpConnections = this.connections;
    for (ConnectionContext conn : tmpConnections) {
      if (conn.isActiveRecently()) {
        ret++;
      }
    }
    return ret;
  }

  /**
   * Get the last time the connection pool was used.
   *
   * @return Last time the connection pool was used.
   */
  protected long getLastActiveTime() {
    return this.lastActiveTime;
  }

  @Override
  public String toString() {
    return this.connectionPoolId.toString();
  }

  /**
   * JSON representation of the connection pool.
   *
   * @return String representation of the JSON.
   */
  public String getJSON() {
    final Map<String, String> info = new LinkedHashMap<>();
    info.put("active", Integer.toString(getNumActiveConnections()));
    info.put("recent_active",
        Integer.toString(getNumActiveConnectionsRecently()));
    info.put("idle", Integer.toString(getNumIdleConnections()));
    info.put("total", Integer.toString(getNumConnections()));
    if (LOG.isDebugEnabled()) {
      List<ConnectionContext> tmpConnections = this.connections;
      for (int i=0; i<tmpConnections.size(); i++) {
        ConnectionContext connection = tmpConnections.get(i);
        info.put(i + " active", Boolean.toString(connection.isActive()));
        info.put(i + " recent_active",
            Integer.toString(getNumActiveConnectionsRecently()));
        info.put(i + " idle", Boolean.toString(connection.isUsable()));
        info.put(i + " closed", Boolean.toString(connection.isClosed()));
      }
    }
    return JSON.toString(info);
  }

  /**
   * Create a new proxy wrapper for a client NN connection.
   * @return Proxy for the target ClientProtocol that contains the user's
   *         security context.
   * @throws IOException If it cannot get a new connection.
   */
  public ConnectionContext newConnection() throws IOException {
    return newConnection(this.conf, this.namenodeAddress,
        this.ugi, this.protocol, this.enableMultiSocket,
        this.socketIndex.incrementAndGet(), alignmentContext);
  }

  /**
   * Creates a proxy wrapper for a client NN connection. Each proxy contains
   * context for a single user/security context. To maximize throughput it is
   * recommended to use multiple connection per user+server, allowing multiple
   * writes and reads to be dispatched in parallel.
   * @param <T> Input type T.
   *
   * @param conf Configuration for the connection.
   * @param nnAddress Address of server supporting the ClientProtocol.
   * @param ugi User context.
   * @param proto Interface of the protocol.
   * @param enableMultiSocket Enable multiple socket or not.
   * @param alignmentContext Client alignment context.
   * @return proto for the target ClientProtocol that contains the user's
   *         security context.
   * @throws IOException If it cannot be created.
   */
  protected static <T> ConnectionContext newConnection(Configuration conf,
      String nnAddress, UserGroupInformation ugi, Class<T> proto,
      boolean enableMultiSocket, int socketIndex,
      AlignmentContext alignmentContext) throws IOException {
    if (!PROTO_MAP.containsKey(proto)) {
      String msg = "Unsupported protocol for connection to NameNode: "
          + ((proto != null) ? proto.getName() : "null");
      LOG.error(msg);
      throw new IllegalStateException(msg);
    }
    ProtoImpl classes = PROTO_MAP.get(proto);
    RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine2.class);

    final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf,
        HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
        HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
        HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
        HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
        HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);

    SocketFactory factory = SocketFactory.getDefault();
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
    final long version = RPC.getProtocolVersion(classes.protoPb);
    Object proxy;
    if (enableMultiSocket) {
      FederationConnectionId connectionId = new FederationConnectionId(
          socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf),
          defaultPolicy, conf, socketIndex);
      proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId,
          conf, factory, alignmentContext).getProxy();
    } else {
      proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
          conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null,
          alignmentContext).getProxy();
    }

    T client = newProtoClient(proto, classes, proxy);
    Text dtService = SecurityUtil.buildTokenService(socket);

    ProxyAndInfo<T> clientProxy = new ProxyAndInfo<T>(client, dtService, socket);
    return new ConnectionContext(clientProxy, conf);
  }

  private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
      Object proxy) {
    try {
      Constructor<?> constructor =
          classes.protoClientPb.getConstructor(classes.protoPb);
      Object o = constructor.newInstance(proxy);
      if (proto.isAssignableFrom(o.getClass())) {
        @SuppressWarnings("unchecked")
        T client = (T) o;
        return client;
      }
    } catch (Exception e) {
      LOG.error(e.getMessage());
    }
    return null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

hadoop MountTableRefresherService 源码

0  赞