hadoop RetryInvocationHandler 源码

  • 2022-10-20
  • 浏览 (450)

haddop RetryInvocationHandler 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.hadoop.io.retry;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;

 * A {@link RpcInvocationHandler} which supports client side retry .
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
  public static final Logger LOG = LoggerFactory.getLogger(

  public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
      ThreadLocal.withInitial(() -> true);

  static class Call {
    private final Method method;
    private final Object[] args;
    private final boolean isRpc;
    private final int callId;
    private final Counters counters = new Counters();

    private final RetryPolicy retryPolicy;
    private final RetryInvocationHandler<?> retryInvocationHandler;

    private RetryInfo retryInfo;

    Call(Method method, Object[] args, boolean isRpc, int callId,
         RetryInvocationHandler<?> retryInvocationHandler) {
      this.method = method;
      this.args = args;
      this.isRpc = isRpc;
      this.callId = callId;

      this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
      this.retryInvocationHandler = retryInvocationHandler;

    int getCallId() {
      return callId;

    Counters getCounters() {
      return counters;

    synchronized Long getWaitTime(final long now) {
      return retryInfo == null? null: retryInfo.retryTime - now;

    /** Invoke the call once without retrying. */
    synchronized CallReturn invokeOnce() {
      try {
        if (retryInfo != null) {
          return processWaitTimeAndRetryInfo();

        // The number of times this invocation handler has ever been failed over
        // before this method invocation attempt. Used to prevent concurrent
        // failed method invocations from triggering multiple failover attempts.
        final long failoverCount = retryInvocationHandler.getFailoverCount();
        try {
          return invoke();
        } catch (Exception e) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(toString(), e);
          if (Thread.currentThread().isInterrupted()) {
            // If interrupted, do not retry.
            throw e;

          retryInfo = retryInvocationHandler.handleException(
              method, callId, retryPolicy, counters, failoverCount, e);
          return processWaitTimeAndRetryInfo();
      } catch(Throwable t) {
        return new CallReturn(t);

     * It first processes the wait time, if there is any,
     * and then invokes {@link #processRetryInfo()}.
     * If the wait time is positive, it either sleeps for synchronous calls
     * or immediately returns for asynchronous calls.
     * @return {@link CallReturn#RETRY} if the retryInfo is processed;
     *         otherwise, return {@link CallReturn#WAIT_RETRY}.
    CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
      final Long waitTime = getWaitTime(Time.monotonicNow());
      LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
          callId, retryInfo, waitTime);
      if (waitTime != null && waitTime > 0) {
        try {
        } catch (InterruptedException e) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Interrupted while waiting to retry", e);
          InterruptedIOException intIOE = new InterruptedIOException(
              "Retry interrupted");
          throw intIOE;
      return CallReturn.RETRY;

    synchronized void processRetryInfo() {
      if (retryInfo.isFailover()) {
            retryInfo.expectedFailoverCount, method, callId);
      retryInfo = null;

    CallReturn invoke() throws Throwable {
      return new CallReturn(invokeMethod());

    Object invokeMethod() throws Throwable {
      if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
        Client.setCallIdAndRetryCount(callId, counters.retries,
      return retryInvocationHandler.invokeMethod(method, args);

    public String toString() {
      return getClass().getSimpleName() + "#" + callId + ": "
          + method.getDeclaringClass().getSimpleName() + "." + method.getName()
          + "(" + (args == null || args.length == 0? "": Arrays.toString(args))
          +  ")";

  static class Counters {
    /** Counter for retries. */
    private int retries;
    /** Counter for method invocation has been failed over. */
    private int failovers;

    boolean isZeros() {
      return retries == 0 && failovers == 0;

  private static class ProxyDescriptor<T> {
    private final FailoverProxyProvider<T> fpp;
    /** Count the associated proxy provider has ever been failed over. */
    private long failoverCount = 0;

    private ProxyInfo<T> proxyInfo;

    ProxyDescriptor(FailoverProxyProvider<T> fpp) {
      this.fpp = fpp;
      this.proxyInfo = fpp.getProxy();

    synchronized ProxyInfo<T> getProxyInfo() {
      return proxyInfo;

    synchronized T getProxy() {
      return proxyInfo.proxy;

    synchronized long getFailoverCount() {
      return failoverCount;

    synchronized void failover(long expectedFailoverCount, Method method,
                               int callId) {
      // Make sure that concurrent failed invocations only cause a single
      // actual failover.
      if (failoverCount == expectedFailoverCount) {
      } else {
        LOG.warn("A failover has occurred since the start of call #" + callId
            + " " + proxyInfo.getString(method.getName()));
      proxyInfo = fpp.getProxy();

    boolean idempotentOrAtMostOnce(Method method) throws NoSuchMethodException {
      final Method m = fpp.getInterface()
          .getMethod(method.getName(), method.getParameterTypes());
      return m.isAnnotationPresent(Idempotent.class)
          || m.isAnnotationPresent(AtMostOnce.class);

    void close() throws IOException {

  private static class RetryInfo {
    private final long retryTime;
    private final long delay;
    private final RetryAction action;
    private final long expectedFailoverCount;
    private final Exception failException;

    RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
        Exception failException) {
      this.delay = delay;
      this.retryTime = Time.monotonicNow() + delay;
      this.action = action;
      this.expectedFailoverCount = expectedFailoverCount;
      this.failException = failException;

    boolean isFailover() {
      return action != null
          && action.action ==  RetryAction.RetryDecision.FAILOVER_AND_RETRY;

    boolean isFail() {
      return action != null
          && action.action ==  RetryAction.RetryDecision.FAIL;

    Exception getFailException() {
      return failException;

    static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
        Counters counters, boolean idempotentOrAtMostOnce,
        long expectedFailoverCount) throws Exception {
      RetryAction max = null;
      long maxRetryDelay = 0;
      Exception ex = null;

      final Iterable<Exception> exceptions = e instanceof MultiException ?
          ((MultiException) e).getExceptions().values()
          : Collections.singletonList(e);
      for (Exception exception : exceptions) {
        final RetryAction a = policy.shouldRetry(exception,
            counters.retries, counters.failovers, idempotentOrAtMostOnce);
        if (a.action != RetryAction.RetryDecision.FAIL) {
          // must be a retry or failover
          if (a.delayMillis > maxRetryDelay) {
            maxRetryDelay = a.delayMillis;

        if (max == null || max.action.compareTo(a.action) < 0) {
          max = a;
          if (a.action == RetryAction.RetryDecision.FAIL) {
            ex = exception;

      return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);

    public String toString() {
      return "RetryInfo{" +
              "retryTime=" + retryTime +
              ", delay=" + delay +
              ", action=" + action +
              ", expectedFailoverCount=" + expectedFailoverCount +
              ", failException=" + failException +

  private final ProxyDescriptor<T> proxyDescriptor;

  private volatile boolean hasSuccessfulCall = false;

  private HashSet<String> failedAtLeastOnce = new HashSet<>();

  private final RetryPolicy defaultPolicy;
  private final Map<String,RetryPolicy> methodNameToPolicyMap;

  private final AsyncCallHandler asyncCallHandler = new AsyncCallHandler();

  protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
      RetryPolicy retryPolicy) {
    this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());

  protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
      RetryPolicy defaultPolicy,
      Map<String, RetryPolicy> methodNameToPolicyMap) {
    this.proxyDescriptor = new ProxyDescriptor<>(proxyProvider);
    this.defaultPolicy = defaultPolicy;
    this.methodNameToPolicyMap = methodNameToPolicyMap;

  private RetryPolicy getRetryPolicy(Method method) {
    final RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
    return policy != null? policy: defaultPolicy;

  private long getFailoverCount() {
    return proxyDescriptor.getFailoverCount();

  private Call newCall(Method method, Object[] args, boolean isRpc,
                       int callId) {
    if (Client.isAsynchronousMode()) {
      return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
    } else {
      return new Call(method, args, isRpc, callId, this);

  public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
    final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
    final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;

    final Call call = newCall(method, args, isRpc, callId);
    while (true) {
      final CallReturn c = call.invokeOnce();
      final CallReturn.State state = c.getState();
      if (state == CallReturn.State.ASYNC_INVOKED) {
        return null; // return null for async calls
      } else if (c.getState() != CallReturn.State.RETRY) {
        return c.getReturnValue();

  private RetryInfo handleException(final Method method, final int callId,
      final RetryPolicy policy, final Counters counters,
      final long expectFailoverCount, final Exception e) throws Exception {
    final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
        counters, proxyDescriptor.idempotentOrAtMostOnce(method),
    if (retryInfo.isFail()) {
      // fail.
      if (retryInfo.action.reason != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Exception while invoking call #" + callId + " "
              + proxyDescriptor.getProxyInfo().getString(method.getName())
              + ". Not retrying because " + retryInfo.action.reason, e);
      throw retryInfo.getFailException();

    log(method, retryInfo.isFailover(), counters.failovers, counters.retries, retryInfo.delay, e);
    return retryInfo;

  private void log(final Method method, final boolean isFailover, final int failovers,
      final int retries, final long delay, final Exception ex) {
    boolean info = true;
    // If this is the first failover to this proxy, skip logging at INFO level
    if (!failedAtLeastOnce.contains(proxyDescriptor.getProxyInfo().toString()))

      // If successful calls were made to this proxy, log info even for first
      // failover
      info = hasSuccessfulCall || asyncCallHandler.hasSuccessfulCall();
      if (!info && !LOG.isDebugEnabled()) {

    final StringBuilder b = new StringBuilder()
        .append(", while invoking ")
    if (failovers > 0) {
      b.append(" after ").append(failovers).append(" failover attempts");
    b.append(isFailover? ". Trying to failover ": ". Retrying ");
    b.append(delay > 0? "after sleeping for " + delay + "ms.": "immediately.");
    b.append(" Current retry count: ").append(retries).append(".");

    if (info) {
    } else {
      LOG.debug(b.toString(), ex);

  protected Object invokeMethod(Method method, Object[] args) throws Throwable {
    try {
      if (!method.isAccessible()) {
      final Object r = method.invoke(proxyDescriptor.getProxy(), args);
      hasSuccessfulCall = true;
      return r;
    } catch (InvocationTargetException e) {
      throw e.getCause();

  static boolean isRpcInvocation(Object proxy) {
    if (proxy instanceof ProtocolTranslator) {
      proxy = ((ProtocolTranslator) proxy).getUnderlyingProxyObject();
    if (!Proxy.isProxyClass(proxy.getClass())) {
      return false;
    final InvocationHandler ih = Proxy.getInvocationHandler(proxy);
    return ih instanceof RpcInvocationHandler;

  public void close() throws IOException {

  @Override //RpcInvocationHandler
  public ConnectionId getConnectionId() {
    return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy());

  public FailoverProxyProvider<T> getProxyProvider() {
    return proxyDescriptor.fpp;


hadoop 源码目录


hadoop AsyncCallHandler 源码

hadoop AtMostOnce 源码

hadoop CallReturn 源码

hadoop DefaultFailoverProxyProvider 源码

hadoop FailoverProxyProvider 源码

hadoop Idempotent 源码

hadoop LossyRetryInvocationHandler 源码

hadoop MultiException 源码

hadoop RetryPolicies 源码

hadoop RetryPolicy 源码

0  赞