hadoop RouterRpcClient 源码

  • 2022-10-20
  • 浏览 (240)

haddop RouterRpcClient 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * A client proxy for Router to NN communication using the NN ClientProtocol.
 * <p>
 * Provides routers to invoke remote ClientProtocol methods and handle
 * retries/failover.
 * <ul>
 * <li>invokeSingle Make a single request to a single namespace
 * <li>invokeSequential Make a sequential series of requests to multiple
 * ordered namespaces until a condition is met.
 * <li>invokeConcurrent Make concurrent requests to multiple namespaces and
 * return all of the results.
 * </ul>
 * Also maintains a cached pool of connections to NNs. Connections are managed
 * by the ConnectionManager and are unique to each user + NN. The size of the
 * connection pool can be configured. Larger pools allow for more simultaneous
 * requests to a single NN from a single user.
 */
public class RouterRpcClient {

  private static final Logger LOG =
      LoggerFactory.getLogger(RouterRpcClient.class);


  /** Router using this RPC client. */
  private final Router router;

  /** Interface to identify the active NN for a nameservice or blockpool ID. */
  private final ActiveNamenodeResolver namenodeResolver;

  /** Connection pool to the Namenodes per user for performance. */
  private final ConnectionManager connectionManager;
  /** Service to run asynchronous calls. */
  private final ThreadPoolExecutor executorService;
  /** Retry policy for router -> NN communication. */
  private final RetryPolicy retryPolicy;
  /** Optional perf monitor. */
  private final RouterRpcMonitor rpcMonitor;
  /** Field separator of CallerContext. */
  private final String contextFieldSeparator;
  /** Observer read enabled. Default for all nameservices. */
  private final boolean observerReadEnabledDefault;
  /** Nameservice specific overrides of the default setting for enabling observer reads. */
  private HashSet<String> observerReadEnabledOverrides = new HashSet<>();

  /** Pattern to parse a stack trace line. */
  private static final Pattern STACK_TRACE_PATTERN =
      Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

  /** Fairness manager to control handlers assigned per NS. */
  private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
  private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
  private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();

  private final boolean enableProxyUser;

  /**
   * Create a router RPC client to manage remote procedure calls to NNs.
   *
   * @param conf Hdfs Configuration.
   * @param router A router using this RPC client.
   * @param resolver A NN resolver to determine the currently active NN in HA.
   * @param monitor Optional performance monitor.
   */
  public RouterRpcClient(Configuration conf, Router router,
      ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
      RouterStateIdContext routerStateIdContext) {
    this.router = router;

    this.namenodeResolver = resolver;

    Configuration clientConf = getClientConfiguration(conf);
    this.contextFieldSeparator =
        clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
            HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
    this.connectionManager = new ConnectionManager(clientConf, routerStateIdContext);
    this.connectionManager.start();
    this.routerRpcFairnessPolicyController =
        FederationUtil.newFairnessPolicyController(conf);

    int numThreads = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
        RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("RPC Router Client-%d")
        .build();
    BlockingQueue<Runnable> workQueue;
    if (conf.getBoolean(
        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
        RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
      workQueue = new ArrayBlockingQueue<>(numThreads);
    } else {
      workQueue = new LinkedBlockingQueue<>();
    }
    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);

    this.rpcMonitor = monitor;

    int maxFailoverAttempts = conf.getInt(
        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
        HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
    int maxRetryAttempts = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
    int failoverSleepBaseMillis = conf.getInt(
        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
        HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
    int failoverSleepMaxMillis = conf.getInt(
        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
        HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
    this.retryPolicy = RetryPolicies.failoverOnNetworkException(
        RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts,
        failoverSleepBaseMillis, failoverSleepMaxMillis);
    String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
    this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0;
    this.observerReadEnabledDefault = conf.getBoolean(
        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
        RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
    String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
    if (observerReadOverrides != null) {
      observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
    }
    if (this.observerReadEnabledDefault) {
      LOG.info("Observer read is enabled for router.");
    }
  }

  /**
   * Get the configuration for the RPC client. It takes the Router
   * configuration and transforms it into regular RPC Client configuration.
   * @param conf Input configuration.
   * @return Configuration for the RPC client.
   */
  private Configuration getClientConfiguration(final Configuration conf) {
    Configuration clientConf = new Configuration(conf);
    int maxRetries = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT,
        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT);
    if (maxRetries >= 0) {
      clientConf.setInt(
          IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries);
    }
    long connectTimeOut = conf.getTimeDuration(
        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT,
        TimeUnit.MILLISECONDS);
    if (connectTimeOut >= 0) {
      clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut);
    }
    return clientConf;
  }

  /**
   * Get the active namenode resolver used by this client.
   * @return Active namenode resolver.
   */
  public ActiveNamenodeResolver getNamenodeResolver() {
    return this.namenodeResolver;
  }

  /**
   * Shutdown the client.
   */
  public void shutdown() {
    if (this.connectionManager != null) {
      this.connectionManager.close();
    }
    if (this.executorService != null) {
      this.executorService.shutdownNow();
    }
    if (this.routerRpcFairnessPolicyController != null) {
      this.routerRpcFairnessPolicyController.shutdown();
    }
  }

  /**
   * Total number of available sockets between the router and NNs.
   *
   * @return Number of namenode clients.
   */
  public int getNumConnections() {
    return this.connectionManager.getNumConnections();
  }

  /**
   * Total number of available sockets between the router and NNs.
   *
   * @return Number of namenode clients.
   */
  public int getNumActiveConnections() {
    return this.connectionManager.getNumActiveConnections();
  }

  /**
   * Total number of idle sockets between the router and NNs.
   *
   * @return Number of namenode clients.
   */
  public int getNumIdleConnections() {
    return this.connectionManager.getNumIdleConnections();
  }

  /**
   * Total number of active sockets between the router and NNs.
   *
   * @return Number of recently active namenode clients.
   */
  public int getNumActiveConnectionsRecently() {
    return this.connectionManager.getNumActiveConnectionsRecently();
  }

  /**
   * Total number of open connection pools to a NN. Each connection pool.
   * represents one user + one NN.
   *
   * @return Number of connection pools.
   */
  public int getNumConnectionPools() {
    return this.connectionManager.getNumConnectionPools();
  }

  /**
   * Number of connections between the router and NNs being created sockets.
   *
   * @return Number of connections waiting to be created.
   */
  public int getNumCreatingConnections() {
    return this.connectionManager.getNumCreatingConnections();
  }

  /**
   * JSON representation of the connection pool.
   *
   * @return String representation of the JSON.
   */
  public String getJSON() {
    return this.connectionManager.getJSON();
  }

  /**
   * JSON representation of the async caller thread pool.
   *
   * @return String representation of the JSON.
   */
  public String getAsyncCallerPoolJson() {
    final Map<String, Integer> info = new LinkedHashMap<>();
    info.put("active", executorService.getActiveCount());
    info.put("total", executorService.getPoolSize());
    info.put("max", executorService.getMaximumPoolSize());
    return JSON.toString(info);
  }

  /**
   * JSON representation of the rejected permits for each nameservice.
   *
   * @return String representation of the rejected permits for each nameservice.
   */
  public String getRejectedPermitsPerNsJSON() {
    return JSON.toString(rejectedPermitsPerNs);
  }

  /**
   * JSON representation of the accepted permits for each nameservice.
   *
   * @return String representation of the accepted permits for each nameservice.
   */
  public String getAcceptedPermitsPerNsJSON() {
    return JSON.toString(acceptedPermitsPerNs);
  }
  /**
   * Get ClientProtocol proxy client for a NameNode. Each combination of user +
   * NN must use a unique proxy client. Previously created clients are cached
   * and stored in a connection pool by the ConnectionManager.
   *
   * @param ugi User group information.
   * @param nsId Nameservice identifier.
   * @param rpcAddress RPC server address of the NN.
   * @param proto Protocol of the connection.
   * @return ConnectionContext containing a ClientProtocol proxy client for the
   *         NN + current user.
   * @throws IOException If we cannot get a connection to the NameNode.
   */
  private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
      String rpcAddress, Class<?> proto) throws IOException {
    ConnectionContext connection = null;
    try {
      // Each proxy holds the UGI info for the current user when it is created.
      // This cache does not scale very well, one entry per user per namenode,
      // and may need to be adjusted and/or selectively pruned. The cache is
      // important due to the excessive overhead of creating a new proxy wrapper
      // for each individual request.

      // TODO Add tokens from the federated UGI
      UserGroupInformation connUGI = ugi;
      if (UserGroupInformation.isSecurityEnabled() || this.enableProxyUser) {
        UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
        connUGI = UserGroupInformation.createProxyUser(
            ugi.getUserName(), routerUser);
      }
      connection = this.connectionManager.getConnection(
          connUGI, rpcAddress, proto, nsId);
      LOG.debug("User {} NN {} is using connection {}",
          ugi.getUserName(), rpcAddress, connection);
    } catch (Exception ex) {
      LOG.error("Cannot open NN client to address: {}", rpcAddress, ex);
    }

    if (connection == null) {
      throw new ConnectionNullException("Cannot get a connection to "
          + rpcAddress);
    }
    return connection;
  }

  /**
   * Convert an exception to an IOException.
   *
   * For a non-IOException, wrap it with IOException. For a RemoteException,
   * unwrap it. For an IOException which is not a RemoteException, return it.
   *
   * @param e Exception to convert into an exception.
   * @return Created IO exception.
   */
  private static IOException toIOException(Exception e) {
    if (e instanceof RemoteException) {
      return ((RemoteException) e).unwrapRemoteException();
    }
    if (e instanceof IOException) {
      return (IOException)e;
    }
    return new IOException(e);
  }

  /**
   * If we should retry the RPC call.
   *
   * @param ioe IOException reported.
   * @param retryCount Number of retries.
   * @param nsId Nameservice ID.
   * @return Retry decision.
   * @throws NoNamenodesAvailableException Exception that the retry policy
   *         generates for no available namenodes.
   */
  private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
      final String nsId) throws IOException {
    // check for the case of cluster unavailable state
    if (isClusterUnAvailable(nsId)) {
      // we allow to retry once if cluster is unavailable
      if (retryCount == 0) {
        return RetryDecision.RETRY;
      } else {
        throw new NoNamenodesAvailableException(nsId, ioe);
      }
    }

    try {
      final RetryPolicy.RetryAction a =
          this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
      return a.action;
    } catch (Exception ex) {
      LOG.error("Re-throwing API exception, no more retries", ex);
      throw toIOException(ex);
    }
  }

  /**
   * Invokes a method against the ClientProtocol proxy server. If a standby
   * exception is generated by the call to the client, retries using the
   * alternate server.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param ugi User group information.
   * @param namenodes A prioritized list of namenodes within the same
   *                  nameservice.
   * @param useObserver Whether to use observer namenodes.
   * @param method Remote ClientProtocol method to invoke.
   * @param params Variable list of parameters matching the method.
   * @return The result of invoking the method.
   * @throws ConnectException If it cannot connect to any Namenode.
   * @throws StandbyException If all Namenodes are in Standby.
   * @throws IOException If it cannot invoke the method.
   */
  @VisibleForTesting
  public Object invokeMethod(
      final UserGroupInformation ugi,
      final List<? extends FederationNamenodeContext> namenodes,
      boolean useObserver,
      final Class<?> protocol, final Method method, final Object... params)
          throws ConnectException, StandbyException, IOException {

    if (namenodes == null || namenodes.isEmpty()) {
      throw new IOException("No namenodes to invoke " + method.getName() +
          " with params " + Arrays.deepToString(params) + " from "
          + router.getRouterId());
    }

    addClientInfoToCallerContext();

    Object ret = null;
    if (rpcMonitor != null) {
      rpcMonitor.proxyOp();
    }
    boolean failover = false;
    boolean shouldUseObserver = useObserver;
    Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
    for (FederationNamenodeContext namenode : namenodes) {
      if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) {
        continue;
      }
      ConnectionContext connection = null;
      String nsId = namenode.getNameserviceId();
      String rpcAddress = namenode.getRpcAddress();
      try {
        connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
        ProxyAndInfo<?> client = connection.getClient();
        final Object proxy = client.getProxy();

        ret = invoke(nsId, 0, method, proxy, params);
        if (failover &&
            FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
          // Success on alternate server, update
          InetSocketAddress address = client.getAddress();
          namenodeResolver.updateActiveNamenode(nsId, address);
        }
        if (this.rpcMonitor != null) {
          this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
        }
        if (this.router.getRouterClientMetrics() != null) {
          this.router.getRouterClientMetrics().incInvokedMethod(method);
        }
        return ret;
      } catch (IOException ioe) {
        ioes.put(namenode, ioe);
        if (ioe instanceof ObserverRetryOnActiveException) {
          LOG.info("Encountered ObserverRetryOnActiveException from {}."
                  + " Retry active namenode directly.", namenode);
          shouldUseObserver = false;
        } else if (ioe instanceof StandbyException) {
          // Fail over indicated by retry policy and/or NN
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpFailureStandby(nsId);
          }
          failover = true;
        } else if (isUnavailableException(ioe)) {
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
          }
          if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) {
            namenodeResolver.updateUnavailableNamenode(nsId,
                NetUtils.createSocketAddr(namenode.getRpcAddress()));
          } else {
            failover = true;
          }
        } else if (ioe instanceof RemoteException) {
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState());
          }
          RemoteException re = (RemoteException) ioe;
          ioe = re.unwrapRemoteException();
          ioe = getCleanException(ioe);
          // RemoteException returned by NN
          throw ioe;
        } else if (ioe instanceof ConnectionNullException) {
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
          }
          LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
              ioe.getMessage());
          // Throw StandbyException so that client can retry
          StandbyException se = new StandbyException(ioe.getMessage());
          se.initCause(ioe);
          throw se;
        } else if (ioe instanceof NoNamenodesAvailableException) {
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpNoNamenodes(nsId);
          }
          LOG.error("Cannot get available namenode for {} {} error: {}",
              nsId, rpcAddress, ioe.getMessage());
          // Throw RetriableException so that client can retry
          throw new RetriableException(ioe);
        } else {
          // Other communication error, this is a failure
          // Communication retries are handled by the retry policy
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpFailureCommunicate(nsId);
            this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState());
          }
          throw ioe;
        }
      } finally {
        if (connection != null) {
          connection.release();
        }
      }
    }
    if (this.rpcMonitor != null) {
      this.rpcMonitor.proxyOpComplete(false, null, null);
    }

    // All namenodes were unavailable or in standby
    String msg = "No namenode available to invoke " + method.getName() + " " +
        Arrays.deepToString(params) + " in " + namenodes + " from " +
        router.getRouterId();
    LOG.error(msg);
    int exConnect = 0;
    for (Entry<FederationNamenodeContext, IOException> entry :
        ioes.entrySet()) {
      FederationNamenodeContext namenode = entry.getKey();
      String nnKey = namenode.getNamenodeKey();
      String addr = namenode.getRpcAddress();
      IOException ioe = entry.getValue();
      if (ioe instanceof StandbyException) {
        LOG.error("{} at {} is in Standby: {}",
            nnKey, addr, ioe.getMessage());
      } else if (isUnavailableException(ioe)) {
        exConnect++;
        LOG.error("{} at {} cannot be reached: {}",
            nnKey, addr, ioe.getMessage());
      } else {
        LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage());
      }
    }
    if (exConnect == ioes.size()) {
      throw new ConnectException(msg);
    } else {
      throw new StandbyException(msg);
    }
  }

  /**
   * For tracking some information about the actual client.
   * It adds trace info "clientIp:ip", "clientPort:port",
   * "clientId:id" and "clientCallId:callId"
   * in the caller context, removing the old values if they were
   * already present.
   */
  private void addClientInfoToCallerContext() {
    CallerContext ctx = CallerContext.getCurrent();
    String origContext = ctx == null ? null : ctx.getContext();
    byte[] origSignature = ctx == null ? null : ctx.getSignature();
    CallerContext.Builder builder =
        new CallerContext.Builder("", contextFieldSeparator)
            .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
            .append(CallerContext.CLIENT_PORT_STR,
                Integer.toString(Server.getRemotePort()))
            .append(CallerContext.CLIENT_ID_STR,
                StringUtils.byteToHexString(Server.getClientId()))
            .append(CallerContext.CLIENT_CALL_ID_STR,
                Integer.toString(Server.getCallId()))
            .setSignature(origSignature);
    // Append the original caller context
    if (origContext != null) {
      for (String part : origContext.split(contextFieldSeparator)) {
        String[] keyValue =
            part.split(CallerContext.Builder.KEY_VALUE_SEPARATOR, 2);
        if (keyValue.length == 2) {
          builder.appendIfAbsent(keyValue[0], keyValue[1]);
        } else if (keyValue.length == 1) {
          builder.append(keyValue[0]);
        }
      }
    }
    CallerContext.setCurrent(builder.build());
  }

  /**
   * Invokes a method on the designated object. Catches exceptions specific to
   * the invocation.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param nsId Identifier for the namespace
   * @param retryCount Current retry times
   * @param method Method to invoke
   * @param obj Target object for the method
   * @param params Variable parameters
   * @return Response from the remote server
   * @throws IOException
   */
  private Object invoke(String nsId, int retryCount, final Method method,
      final Object obj, final Object... params) throws IOException {
    try {
      return method.invoke(obj, params);
    } catch (IllegalAccessException | IllegalArgumentException e) {
      LOG.error("Unexpected exception while proxying API", e);
      return null;
    } catch (InvocationTargetException e) {
      Throwable cause = e.getCause();
      if (cause instanceof IOException) {
        IOException ioe = (IOException) cause;

        // Check if we should retry.
        RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
        if (decision == RetryDecision.RETRY) {
          if (this.rpcMonitor != null) {
            this.rpcMonitor.proxyOpRetries();
          }

          // retry
          return invoke(nsId, ++retryCount, method, obj, params);
        } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
          // failover, invoker looks for standby exceptions for failover.
          if (ioe instanceof StandbyException) {
            throw ioe;
          } else if (isUnavailableException(ioe)) {
            throw ioe;
          } else {
            throw new StandbyException(ioe.getMessage());
          }
        } else {
          throw ioe;
        }
      } else {
        throw new IOException(e);
      }
    }
  }

  /**
   * Check if the exception comes from an unavailable subcluster.
   * @param ioe IOException to check.
   * @return If the exception comes from an unavailable subcluster.
   */
  public static boolean isUnavailableException(IOException ioe) {
    if (ioe instanceof ConnectTimeoutException ||
        ioe instanceof EOFException ||
        ioe instanceof SocketException ||
        ioe instanceof StandbyException) {
      return true;
    }
    if (ioe instanceof RetriableException) {
      Throwable cause = ioe.getCause();
      if (cause instanceof NoNamenodesAvailableException) {
        return true;
      }
    }
    return false;
  }

  /**
   * Check if the cluster of given nameservice id is available.
   * @param nsId nameservice ID.
   * @return
   * @throws IOException
   */
  private boolean isClusterUnAvailable(String nsId) throws IOException {
    List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
        .getNamenodesForNameserviceId(nsId, false);

    if (nnState != null) {
      for (FederationNamenodeContext nnContext : nnState) {
        // Once we find one NN is in active state, we assume this
        // cluster is available.
        if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
          return false;
        }
      }
    }

    return true;
  }

  /**
   * Get a clean copy of the exception. Sometimes the exceptions returned by the
   * server contain the full stack trace in the message.
   *
   * @param ioe Exception to clean up.
   * @return Copy of the original exception with a clean message.
   */
  private static IOException getCleanException(IOException ioe) {
    IOException ret = null;

    String msg = ioe.getMessage();
    Throwable cause = ioe.getCause();
    StackTraceElement[] stackTrace = ioe.getStackTrace();

    // Clean the message by removing the stack trace
    int index = msg.indexOf("\n");
    if (index > 0) {
      String[] msgSplit = msg.split("\n");
      msg = msgSplit[0];

      // Parse stack trace from the message
      List<StackTraceElement> elements = new LinkedList<>();
      for (int i=1; i<msgSplit.length; i++) {
        String line = msgSplit[i];
        Matcher matcher = STACK_TRACE_PATTERN.matcher(line);
        if (matcher.find()) {
          String declaringClass = matcher.group(1);
          String methodName = matcher.group(2);
          String fileName = matcher.group(3);
          int lineNumber = Integer.parseInt(matcher.group(4));
          StackTraceElement element = new StackTraceElement(
              declaringClass, methodName, fileName, lineNumber);
          elements.add(element);
        }
      }
      stackTrace = elements.toArray(new StackTraceElement[elements.size()]);
    }

    // Create the new output exception
    if (ioe instanceof RemoteException) {
      RemoteException re = (RemoteException)ioe;
      ret = new RemoteException(re.getClassName(), msg);
    } else {
      // Try the simple constructor and initialize the fields
      Class<? extends IOException> ioeClass = ioe.getClass();
      try {
        Constructor<? extends IOException> constructor =
            ioeClass.getDeclaredConstructor(String.class);
        ret = constructor.newInstance(msg);
      } catch (ReflectiveOperationException e) {
        // If there are errors, just use the input one
        LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e);
        ret = ioe;
      }
    }
    if (ret != null) {
      ret.initCause(cause);
      ret.setStackTrace(stackTrace);
    }

    return ret;
  }

  /**
   * Invokes a ClientProtocol method. Determines the target nameservice via a
   * provided block.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param block Block used to determine appropriate nameservice.
   * @param method The remote method and parameters to invoke.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  public Object invokeSingle(final ExtendedBlock block, RemoteMethod method)
      throws IOException {
    String bpId = block.getBlockPoolId();
    return invokeSingleBlockPool(bpId, method);
  }

  /**
   * Invokes a ClientProtocol method. Determines the target nameservice using
   * the block pool id.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param bpId Block pool identifier.
   * @param method The remote method and parameters to invoke.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
      throws IOException {
    String nsId = getNameserviceForBlockPoolId(bpId);
    return invokeSingle(nsId, method);
  }

  /**
   * Invokes a ClientProtocol method against the specified namespace.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param nsId Target namespace for the method.
   * @param method The remote method and parameters to invoke.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  public Object invokeSingle(final String nsId, RemoteMethod method)
      throws IOException {
    UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
    acquirePermit(nsId, ugi, method, controller);
    try {
      boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod());
      List<? extends FederationNamenodeContext> nns = getOrderedNamenodes(nsId, isObserverRead);
      RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
      Class<?> proto = method.getProtocol();
      Method m = method.getMethod();
      Object[] params = method.getParams(loc);
      return invokeMethod(ugi, nns, isObserverRead, proto, m, params);
    } finally {
      releasePermit(nsId, ugi, method, controller);
    }
  }

  /**
   * Invokes a remote method against the specified namespace.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote method return.
   * @param nsId Target namespace for the method.
   * @param method The remote method and parameters to invoke.
   * @param clazz Class for the return type.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  public <T> T invokeSingle(final String nsId, RemoteMethod method,
      Class<T> clazz) throws IOException {
    @SuppressWarnings("unchecked")
    T ret = (T)invokeSingle(nsId, method);
    return ret;
  }

  /**
   * Invokes a remote method against the specified extendedBlock.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote method return.
   * @param extendedBlock Target extendedBlock for the method.
   * @param method The remote method and parameters to invoke.
   * @param clazz Class for the return type.
   * @return The result of invoking the method.
   * @throws IOException If the invoke generated an error.
   */
  public <T> T invokeSingle(final ExtendedBlock extendedBlock,
      RemoteMethod method, Class<T> clazz) throws IOException {
    String nsId = getNameserviceForBlockPoolId(extendedBlock.getBlockPoolId());
    @SuppressWarnings("unchecked")
    T ret = (T)invokeSingle(nsId, method);
    return ret;
  }

  /**
   * Invokes a single proxy call for a single location.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param location RemoteLocation to invoke.
   * @param remoteMethod The remote method and parameters to invoke.
   * @return The result of invoking the method if successful.
   * @throws IOException If the invoke generated an error.
   */
  public <T> T invokeSingle(final RemoteLocationContext location,
      RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
    List<RemoteLocationContext> locations = Collections.singletonList(location);
    @SuppressWarnings("unchecked")
    T ret = (T)invokeSequential(locations, remoteMethod);
    return ret;
  }

  /**
   * Invokes sequential proxy calls to different locations. Continues to invoke
   * calls until a call returns without throwing a remote exception.
   *
   * @param locations List of locations/nameservices to call concurrently.
   * @param remoteMethod The remote method and parameters to invoke.
   * @return The result of the first successful call, or if no calls are
   *         successful, the result of the last RPC call executed.
   * @throws IOException if the success condition is not met and one of the RPC
   *           calls generated a remote exception.
   */
  public <T> T invokeSequential(
      final List<? extends RemoteLocationContext> locations,
      final RemoteMethod remoteMethod) throws IOException {
    return invokeSequential(locations, remoteMethod, null, null);
  }

  /**
   * Invokes sequential proxy calls to different locations. Continues to invoke
   * calls until the success condition is met, or until all locations have been
   * attempted.
   *
   * The success condition may be specified by:
   * <ul>
   * <li>An expected result class
   * <li>An expected result value
   * </ul>
   *
   * If no expected result class/values are specified, the success condition is
   * a call that does not throw a remote exception.
   *
   * @param <T> The type of the remote method return.
   * @param locations List of locations/nameservices to call concurrently.
   * @param remoteMethod The remote method and parameters to invoke.
   * @param expectedResultClass In order to be considered a positive result, the
   *          return type must be of this class.
   * @param expectedResultValue In order to be considered a positive result, the
   *          return value must equal the value of this object.
   * @return The result of the first successful call, or if no calls are
   *         successful, the result of the first RPC call executed.
   * @throws IOException if the success condition is not met, return the first
   *                     remote exception generated.
   */
  public <T> T invokeSequential(
      final List<? extends RemoteLocationContext> locations,
      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
      Object expectedResultValue) throws IOException {
    return (T) invokeSequential(remoteMethod, locations, expectedResultClass,
        expectedResultValue).getResult();
  }

  /**
   * Invokes sequential proxy calls to different locations. Continues to invoke
   * calls until the success condition is met, or until all locations have been
   * attempted.
   *
   * The success condition may be specified by:
   * <ul>
   * <li>An expected result class
   * <li>An expected result value
   * </ul>
   *
   * If no expected result class/values are specified, the success condition is
   * a call that does not throw a remote exception.
   *
   * This returns RemoteResult, which contains the invoked location as well
   * as the result.
   *
   * @param <R> The type of the remote location.
   * @param <T> The type of the remote method return.
   * @param remoteMethod The remote method and parameters to invoke.
   * @param locations List of locations/nameservices to call concurrently.
   * @param expectedResultClass In order to be considered a positive result, the
   *          return type must be of this class.
   * @param expectedResultValue In order to be considered a positive result, the
   *          return value must equal the value of this object.
   * @return The result of the first successful call, or if no calls are
   *         successful, the result of the first RPC call executed, along with
   *         the invoked location in form of RemoteResult.
   * @throws IOException if the success condition is not met, return the first
   *                     remote exception generated.
   */
  public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
      final RemoteMethod remoteMethod, final List<R> locations,
      Class<T> expectedResultClass, Object expectedResultValue)
      throws IOException {

    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    final Method m = remoteMethod.getMethod();
    List<IOException> thrownExceptions = new ArrayList<>();
    Object firstResult = null;
    // Invoke in priority order
    for (final RemoteLocationContext loc : locations) {
      String ns = loc.getNameserviceId();
      acquirePermit(ns, ugi, remoteMethod, controller);
      boolean isObserverRead = isObserverReadEligible(ns, m);
      List<? extends FederationNamenodeContext> namenodes =
          getOrderedNamenodes(ns, isObserverRead);
      try {
        Class<?> proto = remoteMethod.getProtocol();
        Object[] params = remoteMethod.getParams(loc);
        Object result = invokeMethod(
            ugi, namenodes, isObserverRead, proto, m, params);
        // Check if the result is what we expected
        if (isExpectedClass(expectedResultClass, result) &&
            isExpectedValue(expectedResultValue, result)) {
          // Valid result, stop here
          @SuppressWarnings("unchecked") R location = (R) loc;
          @SuppressWarnings("unchecked") T ret = (T) result;
          return new RemoteResult<>(location, ret);
        }
        if (firstResult == null) {
          firstResult = result;
        }
      } catch (IOException ioe) {
        // Localize the exception

        ioe = processException(ioe, loc);

        // Record it and move on
        thrownExceptions.add(ioe);
      } catch (Exception e) {
        // Unusual error, ClientProtocol calls always use IOException (or
        // RemoteException). Re-wrap in IOException for compatibility with
        // ClientProtocol.
        LOG.error("Unexpected exception {} proxying {} to {}",
            e.getClass(), m.getName(), ns, e);
        IOException ioe = new IOException(
            "Unexpected exception proxying API " + e.getMessage(), e);
        thrownExceptions.add(ioe);
      } finally {
        releasePermit(ns, ugi, remoteMethod, controller);
      }
    }

    if (!thrownExceptions.isEmpty()) {
      // An unavailable subcluster may be the actual cause
      // We cannot surface other exceptions (e.g., FileNotFoundException)
      for (int i = 0; i < thrownExceptions.size(); i++) {
        IOException ioe = thrownExceptions.get(i);
        if (isUnavailableException(ioe)) {
          throw ioe;
        }
      }

      // re-throw the first exception thrown for compatibility
      throw thrownExceptions.get(0);
    }
    // Return the first result, whether it is the value or not
    @SuppressWarnings("unchecked") T ret = (T) firstResult;
    return new RemoteResult<>(locations.get(0), ret);
  }

  /**
   * Exception messages might contain local subcluster paths. This method
   * generates a new exception with the proper message.
   * @param ioe Original IOException.
   * @param loc Location we are processing.
   * @return Exception processed for federation.
   */
  private IOException processException(
      IOException ioe, RemoteLocationContext loc) {

    if (ioe instanceof RemoteException) {
      RemoteException re = (RemoteException)ioe;
      String newMsg = processExceptionMsg(
          re.getMessage(), loc.getDest(), loc.getSrc());
      RemoteException newException =
          new RemoteException(re.getClassName(), newMsg);
      newException.setStackTrace(ioe.getStackTrace());
      return newException;
    }

    if (ioe instanceof FileNotFoundException) {
      String newMsg = processExceptionMsg(
          ioe.getMessage(), loc.getDest(), loc.getSrc());
      FileNotFoundException newException = new FileNotFoundException(newMsg);
      newException.setStackTrace(ioe.getStackTrace());
      return newException;
    }

    if (ioe instanceof SnapshotException) {
      String newMsg = processExceptionMsg(
          ioe.getMessage(), loc.getDest(), loc.getSrc());
      SnapshotException newException = new SnapshotException(newMsg);
      newException.setStackTrace(ioe.getStackTrace());
      return newException;
    }

    return ioe;
  }

  /**
   * Process a subcluster message and make it federated.
   * @param msg Original exception message.
   * @param dst Path in federation.
   * @param src Path in the subcluster.
   * @return Message processed for federation.
   */
  @VisibleForTesting
  static String processExceptionMsg(
      final String msg, final String dst, final String src) {
    if (dst.equals(src) || !dst.startsWith("/") || !src.startsWith("/")) {
      return msg;
    }

    String newMsg = msg.replaceFirst(dst, src);
    int minLen = Math.min(dst.length(), src.length());
    for (int i = 0; newMsg.equals(msg) && i < minLen; i++) {
      // Check if we can replace sub folders
      String dst1 = dst.substring(0, dst.length() - 1 - i);
      String src1 = src.substring(0, src.length() - 1 - i);
      newMsg = msg.replaceFirst(dst1, src1);
    }

    return newMsg;
  }

  /**
   * Checks if a result matches the required result class.
   *
   * @param expectedClass Required result class, null to skip the check.
   * @param clazz The result to check.
   * @return True if the result is an instance of the required class or if the
   *         expected class is null.
   */
  private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
    if (expectedClass == null) {
      return true;
    } else if (clazz == null) {
      return false;
    } else {
      return expectedClass.isInstance(clazz);
    }
  }

  /**
   * Checks if a result matches the expected value.
   *
   * @param expectedValue The expected value, null to skip the check.
   * @param value The result to check.
   * @return True if the result is equals to the expected value or if the
   *         expected value is null.
   */
  private static boolean isExpectedValue(Object expectedValue, Object value) {
    if (expectedValue == null) {
      return true;
    } else if (value == null) {
      return false;
    } else {
      return value.equals(expectedValue);
    }
  }

  /**
   * Invoke method in all locations and return success if any succeeds.
   *
   * @param <T> The type of the remote location.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @return If the call succeeds in any location.
   * @throws IOException If any of the calls return an exception.
   */
  public <T extends RemoteLocationContext> boolean invokeAll(
      final Collection<T> locations, final RemoteMethod method)
      throws IOException {
    Map<T, Boolean> results =
        invokeConcurrent(locations, method, false, false, Boolean.class);
    return results.containsValue(true);
  }

  /**
   * Invoke multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @throws IOException If all the calls throw an exception.
   */
  public <T extends RemoteLocationContext, R> void invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method)
          throws IOException {
    invokeConcurrent(locations, method, void.class);
  }

  /**
   * Invoke multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @return Result of invoking the method per subcluster: nsId to result.
   * @throws IOException If all the calls throw an exception.
   */
  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method, Class<R> clazz)
          throws IOException {
    return invokeConcurrent(locations, method, false, false, clazz);
  }

  /**
   * Invoke multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @param requireResponse If true an exception will be thrown if all calls do
   *          not complete. If false exceptions are ignored and all data results
   *          successfully received are returned.
   * @param standby If the requests should go to the standby namenodes too.
   * @throws IOException If all the calls throw an exception.
   */
  public <T extends RemoteLocationContext, R> void invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method,
      boolean requireResponse, boolean standby) throws IOException {
    invokeConcurrent(locations, method, requireResponse, standby, void.class);
  }

  /**
   * Invokes multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @param requireResponse If true an exception will be thrown if all calls do
   *          not complete. If false exceptions are ignored and all data results
   *          successfully received are returned.
   * @param standby If the requests should go to the standby namenodes too.
   * @param clazz Type of the remote return type.
   * @return Result of invoking the method per subcluster: nsId to result.
   * @throws IOException If requiredResponse=true and any of the calls throw an
   *           exception.
   */
  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method,
      boolean requireResponse, boolean standby, Class<R> clazz)
          throws IOException {
    return invokeConcurrent(
        locations, method, requireResponse, standby, -1, clazz);
  }

  /**
   * Invokes multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return.
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @param requireResponse If true an exception will be thrown if all calls do
   *          not complete. If false exceptions are ignored and all data results
   *          successfully received are returned.
   * @param standby If the requests should go to the standby namenodes too.
   * @param timeOutMs Timeout for each individual call.
   * @param clazz Type of the remote return type.
   * @return Result of invoking the method per subcluster: nsId to result.
   * @throws IOException If requiredResponse=true and any of the calls throw an
   *           exception.
   */
  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
      final Collection<T> locations, final RemoteMethod method,
      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
          throws IOException {
    final List<RemoteResult<T, R>> results = invokeConcurrent(
        locations, method, standby, timeOutMs, clazz);

    // Go over the results and exceptions
    final Map<T, R> ret = new TreeMap<>();
    final List<IOException> thrownExceptions = new ArrayList<>();
    IOException firstUnavailableException = null;
    for (final RemoteResult<T, R> result : results) {
      if (result.hasException()) {
        IOException ioe = result.getException();
        thrownExceptions.add(ioe);
        // Track unavailable exceptions to throw them first
        if (isUnavailableException(ioe)) {
          firstUnavailableException = ioe;
        }
      }
      if (result.hasResult()) {
        ret.put(result.getLocation(), result.getResult());
      }
    }

    // Throw exceptions if needed
    if (!thrownExceptions.isEmpty()) {
      // Throw if response from all servers required or no results
      if (requireResponse || ret.isEmpty()) {
        // Throw unavailable exceptions first
        if (firstUnavailableException != null) {
          throw firstUnavailableException;
        } else {
          throw thrownExceptions.get(0);
        }
      }
    }

    return ret;
  }

  /**
   * Invokes multiple concurrent proxy calls to different clients. Returns an
   * array of results.
   *
   * Re-throws exceptions generated by the remote RPC call as either
   * RemoteException or IOException.
   *
   * @param <T> The type of the remote location.
   * @param <R> The type of the remote method return
   * @param locations List of remote locations to call concurrently.
   * @param method The remote method and parameters to invoke.
   * @param standby If the requests should go to the standby namenodes too.
   * @param timeOutMs Timeout for each individual call.
   * @param clazz Type of the remote return type.
   * @return Result of invoking the method per subcluster (list of results).
   *         This includes the exception for each remote location.
   * @throws IOException If there are errors invoking the method.
   */
  @SuppressWarnings("unchecked")
  public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>>
      invokeConcurrent(final Collection<T> locations,
          final RemoteMethod method, boolean standby, long timeOutMs,
          Class<R> clazz) throws IOException {

    final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
    final Method m = method.getMethod();

    if (locations.isEmpty()) {
      throw new IOException("No remote locations available");
    } else if (locations.size() == 1 && timeOutMs <= 0) {
      // Shortcut, just one call
      T location = locations.iterator().next();
      String ns = location.getNameserviceId();
      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
      acquirePermit(ns, ugi, method, controller);
      boolean isObserverRead = isObserverReadEligible(ns, m);
      final List<? extends FederationNamenodeContext> namenodes =
          getOrderedNamenodes(ns, isObserverRead);
      try {
        Class<?> proto = method.getProtocol();
        Object[] paramList = method.getParams(location);
        R result = (R) invokeMethod(
            ugi, namenodes, isObserverRead, proto, m, paramList);
        RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
        return Collections.singletonList(remoteResult);
      } catch (IOException ioe) {
        // Localize the exception
        throw processException(ioe, location);
      } finally {
        releasePermit(ns, ugi, method, controller);
      }
    }

    List<T> orderedLocations = new ArrayList<>();
    List<Callable<Object>> callables = new ArrayList<>();
    // transfer originCall & callerContext to worker threads of executor.
    final Call originCall = Server.getCurCall().get();
    final CallerContext originContext = CallerContext.getCurrent();
    for (final T location : locations) {
      String nsId = location.getNameserviceId();
      boolean isObserverRead = isObserverReadEligible(nsId, m);
      final List<? extends FederationNamenodeContext> namenodes =
          getOrderedNamenodes(nsId, isObserverRead);
      final Class<?> proto = method.getProtocol();
      final Object[] paramList = method.getParams(location);
      if (standby) {
        // Call the objectGetter to all NNs (including standby)
        for (final FederationNamenodeContext nn : namenodes) {
          String nnId = nn.getNamenodeId();
          final List<FederationNamenodeContext> nnList =
              Collections.singletonList(nn);
          T nnLocation = location;
          if (location instanceof RemoteLocation) {
            nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
          }
          orderedLocations.add(nnLocation);
          callables.add(
              () -> {
                transferThreadLocalContext(originCall, originContext);
                return invokeMethod(
                    ugi, nnList, isObserverRead, proto, m, paramList);
              });
        }
      } else {
        // Call the objectGetter in order of nameservices in the NS list
        orderedLocations.add(location);
        callables.add(
            () -> {
              transferThreadLocalContext(originCall, originContext);
              return invokeMethod(
                  ugi, namenodes, isObserverRead, proto, m, paramList);
            });
      }
    }

    if (rpcMonitor != null) {
      rpcMonitor.proxyOp();
    }
    if (this.router.getRouterClientMetrics() != null) {
      this.router.getRouterClientMetrics().incInvokedConcurrent(m);
    }

    RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
    acquirePermit(CONCURRENT_NS, ugi, method, controller);
    try {
      List<Future<Object>> futures = null;
      if (timeOutMs > 0) {
        futures = executorService.invokeAll(
            callables, timeOutMs, TimeUnit.MILLISECONDS);
      } else {
        futures = executorService.invokeAll(callables);
      }
      List<RemoteResult<T, R>> results = new ArrayList<>();
      for (int i=0; i<futures.size(); i++) {
        T location = orderedLocations.get(i);
        try {
          Future<Object> future = futures.get(i);
          R result = (R) future.get();
          results.add(new RemoteResult<>(location, result));
        } catch (CancellationException ce) {
          T loc = orderedLocations.get(i);
          String msg = "Invocation to \"" + loc + "\" for \""
              + method.getMethodName() + "\" timed out";
          LOG.error(msg);
          IOException ioe = new SubClusterTimeoutException(msg);
          results.add(new RemoteResult<>(location, ioe));
        } catch (ExecutionException ex) {
          Throwable cause = ex.getCause();
          LOG.debug("Cannot execute {} in {}: {}",
              m.getName(), location, cause.getMessage());

          // Convert into IOException if needed
          IOException ioe = null;
          if (cause instanceof IOException) {
            ioe = (IOException) cause;
          } else {
            ioe = new IOException("Unhandled exception while proxying API " +
                m.getName() + ": " + cause.getMessage(), cause);
          }

          // Store the exceptions
          results.add(new RemoteResult<>(location, ioe));
        }
      }

      return results;
    } catch (RejectedExecutionException e) {
      if (rpcMonitor != null) {
        rpcMonitor.proxyOpFailureClientOverloaded();
      }
      int active = executorService.getActiveCount();
      int total = executorService.getMaximumPoolSize();
      String msg = "Not enough client threads " + active + "/" + total;
      LOG.error(msg);
      throw new StandbyException(
          "Router " + router.getRouterId() + " is overloaded: " + msg);
    } catch (InterruptedException ex) {
      LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
      throw new IOException(
          "Unexpected error while invoking API " + ex.getMessage(), ex);
    } finally {
      releasePermit(CONCURRENT_NS, ugi, method, controller);
    }
  }

  /**
   * Transfer origin thread local context which is necessary to current
   * worker thread when invoking method concurrently by executor service.
   *
   * @param originCall origin Call required for getting remote client ip.
   * @param originContext origin CallerContext which should be transferred
   *                      to server side.
   */
  private void transferThreadLocalContext(
      final Call originCall, final CallerContext originContext) {
    Server.getCurCall().set(originCall);
    CallerContext.setCurrent(originContext);
  }

  /**
   * Get a prioritized list of NNs that share the same block pool ID (in the
   * same namespace). NNs that are reported as ACTIVE will be first in the list.
   *
   * @param bpId The blockpool ID for the namespace.
   * @return A prioritized list of NNs to use for communication.
   * @throws IOException If a NN cannot be located for the block pool ID.
   */
  private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
      final String bpId) throws IOException {

    List<? extends FederationNamenodeContext> namenodes =
        namenodeResolver.getNamenodesForBlockPoolId(bpId);

    if (namenodes == null || namenodes.isEmpty()) {
      throw new IOException("Cannot locate a registered namenode for " + bpId +
          " from " + router.getRouterId());
    }
    return namenodes;
  }

  /**
   * Get the nameservice identifier for a block pool.
   *
   * @param bpId Identifier of the block pool.
   * @return Nameservice identifier.
   * @throws IOException If a NN cannot be located for the block pool ID.
   */
  private String getNameserviceForBlockPoolId(final String bpId)
      throws IOException {
    List<? extends FederationNamenodeContext> namenodes =
        getNamenodesForBlockPoolId(bpId);
    FederationNamenodeContext namenode = namenodes.get(0);
    return namenode.getNameserviceId();
  }

  /**
   * Acquire permit to continue processing the request for specific nsId.
   *
   * @param nsId Identifier of the block pool.
   * @param ugi UserGroupIdentifier associated with the user.
   * @param m Remote method that needs to be invoked.
   * @param controller fairness policy controller to acquire permit from
   * @throws IOException If permit could not be acquired for the nsId.
   */
  private void acquirePermit(final String nsId, final UserGroupInformation ugi,
      final RemoteMethod m, RouterRpcFairnessPolicyController controller)
      throws IOException {
    if (controller != null) {
      if (!controller.acquirePermit(nsId)) {
        // Throw StandByException,
        // Clients could fail over and try another router.
        if (rpcMonitor != null) {
          rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
        }
        incrRejectedPermitForNs(nsId);
        LOG.debug("Permit denied for ugi: {} for method: {}",
            ugi, m.getMethodName());
        String msg =
            "Router " + router.getRouterId() +
                " is overloaded for NS: " + nsId;
        throw new StandbyException(msg);
      }
      incrAcceptedPermitForNs(nsId);
    }
  }

  /**
   * Release permit for specific nsId after processing against downstream
   * nsId is completed.
   *  @param nsId Identifier of the block pool.
   * @param ugi UserGroupIdentifier associated with the user.
   * @param m Remote method that needs to be invoked.
   * @param controller fairness policy controller to release permit from
   */
  private void releasePermit(final String nsId, final UserGroupInformation ugi,
      final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
    if (controller != null) {
      controller.releasePermit(nsId);
      LOG.trace("Permit released for ugi: {} for method: {}", ugi,
          m.getMethodName());
    }
  }

  public RouterRpcFairnessPolicyController
      getRouterRpcFairnessPolicyController() {
    return routerRpcFairnessPolicyController;
  }

  private void incrRejectedPermitForNs(String ns) {
    rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
  }

  public Long getRejectedPermitForNs(String ns) {
    return rejectedPermitsPerNs.containsKey(ns) ?
        rejectedPermitsPerNs.get(ns).longValue() : 0L;
  }

  private void incrAcceptedPermitForNs(String ns) {
    acceptedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
  }

  public Long getAcceptedPermitForNs(String ns) {
    return acceptedPermitsPerNs.containsKey(ns) ?
        acceptedPermitsPerNs.get(ns).longValue() : 0L;
  }

  /**
   * Refreshes/changes the fairness policy controller implementation if possible
   * and returns the controller class name.
   * @param conf Configuration
   * @return New controller class name if successfully refreshed, else old controller class name
   */
  public synchronized String refreshFairnessPolicyController(Configuration conf) {
    RouterRpcFairnessPolicyController newController;
    try {
      newController = FederationUtil.newFairnessPolicyController(conf);
    } catch (RuntimeException e) {
      LOG.error("Failed to create router fairness policy controller", e);
      return getCurrentFairnessPolicyControllerClassName();
    }

    if (newController != null) {
      if (routerRpcFairnessPolicyController != null) {
        routerRpcFairnessPolicyController.shutdown();
      }
      routerRpcFairnessPolicyController = newController;
    }
    return getCurrentFairnessPolicyControllerClassName();
  }

  private String getCurrentFairnessPolicyControllerClassName() {
    if (routerRpcFairnessPolicyController != null) {
      return routerRpcFairnessPolicyController.getClass().getCanonicalName();
    }
    return null;
  }

  /**
   * Get a prioritized list of NNs that share the same nameservice ID (in the
   * same namespace).
   * In observer read case, OBSERVER NNs will be first in the list.
   * Otherwise, ACTIVE NNs will be first in the list.
   *
   * @param nsId The nameservice ID for the namespace.
   * @param isObserverRead Read on observer namenode.
   * @return A prioritized list of NNs to use for communication.
   * @throws IOException If a NN cannot be located for the nameservice ID.
   */
  private List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
      boolean isObserverRead) throws IOException {
    final List<? extends FederationNamenodeContext> namenodes;

    if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) {
      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead);
    } else {
      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false);
    }

    if (namenodes == null || namenodes.isEmpty()) {
      throw new IOException("Cannot locate a registered namenode for " + nsId +
          " from " + router.getRouterId());
    }
    return namenodes;
  }

  private boolean isObserverReadEligible(String nsId, Method method) {
    boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
    return isReadEnabledForNamespace && isReadCall(method);
  }

  /**
   * Check if a method is read-only.
   * @return whether the 'method' is a read-only operation.
   */
  private static boolean isReadCall(Method method) {
    if (!method.isAnnotationPresent(ReadOnly.class)) {
      return false;
    }
    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConnectionContext 源码

hadoop ConnectionManager 源码

hadoop ConnectionNullException 源码

hadoop ConnectionPool 源码

hadoop ConnectionPoolId 源码

hadoop DFSRouter 源码

hadoop ErasureCoding 源码

hadoop FederationConnectionId 源码

hadoop FederationUtil 源码

hadoop IsRouterActiveServlet 源码

0  赞