hadoop ConnectionContext 源码

  • 2022-10-20
  • 浏览 (200)

haddop ConnectionContext 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Context to track a connection in a {@link ConnectionPool}. When a client uses
 * a connection, it increments a counter to mark it as active. Once the client
 * is done with the connection, it decreases the counter. It also takes care of
 * closing the connection once is not active.
 *
 * The protocols currently used are:
 * <ul>
 * <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
 * <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
 * </ul>
 */
public class ConnectionContext {

  private static final Logger LOG =
      LoggerFactory.getLogger(ConnectionContext.class);

  /** Client for the connection. */
  private final ProxyAndInfo<?> client;
  /** How many threads are using this connection. */
  private int numThreads = 0;
  /** If the connection is closed. */
  private boolean closed = false;
  /** Last timestamp the connection was active. */
  private long lastActiveTs = 0;
  /** The connection's active status would expire after this window. */
  private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
  /** The maximum number of requests that this connection can handle concurrently. **/
  private final int maxConcurrencyPerConn;

  public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
    this.client = connection;
    this.maxConcurrencyPerConn = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
  }

  /**
   * Check if the connection is active.
   *
   * @return True if the connection is active.
   */
  public synchronized boolean isActive() {
    return this.numThreads > 0;
  }

  /**
   * Check if the connection is/was active recently.
   *
   * @return True if the connection is active or
   * was active in the past period of time.
   */
  public synchronized boolean isActiveRecently() {
    return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
  }

  /**
   * Check if the connection is closed.
   *
   * @return If the connection is closed.
   */
  public synchronized boolean isClosed() {
    return this.closed;
  }

  /**
   * Check if the connection can be used. It checks if the connection is used by
   * another thread or already closed.
   *
   * @return True if the connection can be used.
   */
  public synchronized boolean isUsable() {
    return hasAvailableConcurrency() && !isClosed();
  }

  /**
   * Return true if this connection context still has available concurrency,
   * else return false.
   */
  private synchronized boolean hasAvailableConcurrency() {
    return this.numThreads < maxConcurrencyPerConn;
  }

  /**
   *  Check if the connection is idle. It checks if the connection is not used
   *  by another thread.
   * @return True if the connection is not used by another thread.
   */
  public synchronized boolean isIdle() {
    return !isActive() && !isClosed();
  }

  /**
   * Get the connection client.
   *
   * @return Connection client.
   */
  public synchronized ProxyAndInfo<?> getClient() {
    this.numThreads++;
    this.lastActiveTs = Time.monotonicNow();
    return this.client;
  }

  /**
   * Release this connection.
   */
  public synchronized void release() {
    if (this.numThreads > 0) {
      this.numThreads--;
    }
  }

  /**
   * Close a connection. Only idle connections can be closed since
   * the RPC proxy would be shut down immediately.
   *
   * @param force whether the connection should be closed anyway.
   */
  public synchronized void close(boolean force) {
    if (!force && this.numThreads > 0) {
      // this is an erroneous case, but we have to close the connection
      // anyway since there will be connection leak if we don't do so
      // the connection has been moved out of the pool
      LOG.error("Active connection with {} handlers will be closed",
          this.numThreads);
    }
    this.closed = true;
    Object proxy = this.client.getProxy();
    // Nobody should be using this anymore, so it should close right away
    RPC.stopProxy(proxy);
  }

  public synchronized void close() {
    close(false);
  }

  @Override
  public String toString() {
    InetSocketAddress addr = this.client.getAddress();
    Object proxy = this.client.getProxy();
    Class<?> clazz = proxy.getClass();

    StringBuilder sb = new StringBuilder();
    sb.append(clazz.getSimpleName())
        .append("@")
        .append(addr)
        .append("x")
        .append(numThreads);
    if (closed) {
      sb.append("[CLOSED]");
    }
    return sb.toString();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

hadoop MountTableRefresherService 源码

0  赞