kafka KafkaRaftClient 源码

  • 2022-10-20
  • 浏览 (398)

kafka KafkaRaftClient 代码

文件路径:/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.raft;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.RequestManager.ConnectionState;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.apache.kafka.raft.internals.BlockingMessageQueue;
import org.apache.kafka.raft.internals.CloseListener;
import org.apache.kafka.raft.internals.FuturePurgatory;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;

/**
 * This class implements a Kafkaesque version of the Raft protocol. Leader election
 * is more or less pure Raft, but replication is driven by replica fetching and we use Kafka's
 * log reconciliation protocol to truncate the log to a common point following each leader
 * election.
 *
 * Like Zookeeper, this protocol distinguishes between voters and observers. Voters are
 * the only ones who are eligible to handle protocol requests and they are the only ones
 * who take part in elections. The protocol does not yet support dynamic quorum changes.
 *
 * These are the APIs in this protocol:
 *
 * 1) {@link VoteRequestData}: Sent by valid voters when their election timeout expires and they
 *    become a candidate. This request includes the last offset in the log which electors use
 *    to tell whether or not to grant the vote.
 *
 * 2) {@link BeginQuorumEpochRequestData}: Sent by the leader of an epoch only to valid voters to
 *    assert its leadership of the new epoch. This request will be retried indefinitely for
 *    each voter until it acknowledges the request or a new election occurs.
 *
 *    This is not needed in usual Raft because the leader can use an empty data push
 *    to achieve the same purpose. The Kafka Raft implementation, however, is driven by
 *    fetch requests from followers, so there must be a way to find the new leader after
 *    an election has completed.
 *
 * 3) {@link EndQuorumEpochRequestData}: Sent by the leader of an epoch to valid voters in order to
 *    gracefully resign from the current epoch. This causes remaining voters to immediately
 *    begin a new election.
 *
 * 4) {@link FetchRequestData}: This is the same as the usual Fetch API in Kafka, but we add snapshot
 *    check before responding, and we also piggyback some additional metadata on responses (i.e. current
 *    leader and epoch). Unlike partition replication, we also piggyback truncation detection on this API
 *    rather than through a separate truncation state.
 *
 * 5) {@link FetchSnapshotRequestData}: Sent by the follower to the epoch leader in order to fetch a snapshot.
 *    This happens when a FetchResponse includes a snapshot ID due to the follower's log end offset being less
 *    than the leader's log start offset. This API is similar to the Fetch API since the snapshot is stored
 *    as FileRecords, but we use {@link UnalignedRecords} in FetchSnapshotResponse because the records
 *    are not necessarily offset-aligned.
 */
public class KafkaRaftClient<T> implements RaftClient<T> {
    private static final int RETRY_BACKOFF_BASE_MS = 100;
    public static final int MAX_FETCH_WAIT_MS = 500;
    public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
    public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;

    private final AtomicReference<GracefulShutdown> shutdown = new AtomicReference<>();
    private final Logger logger;
    private final Time time;
    private final int fetchMaxWaitMs;
    private final String clusterId;
    private final NetworkChannel channel;
    private final ReplicatedLog log;
    private final Random random;
    private final FuturePurgatory<Long> appendPurgatory;
    private final FuturePurgatory<Long> fetchPurgatory;
    private final RecordSerde<T> serde;
    private final MemoryPool memoryPool;
    private final RaftMessageQueue messageQueue;
    private final RaftConfig raftConfig;
    private final KafkaRaftMetrics kafkaRaftMetrics;
    private final QuorumState quorum;
    private final RequestManager requestManager;
    private final RaftMetadataLogCleanerManager snapshotCleaner;

    private final Map<Listener<T>, ListenerContext> listenerContexts = new IdentityHashMap<>();
    private final ConcurrentLinkedQueue<Registration<T>> pendingRegistrations = new ConcurrentLinkedQueue<>();

    /**
     * Create a new instance.
     *
     * Note that if the node ID is empty, then the client will behave as a
     * non-participating observer.
     */
    public KafkaRaftClient(
        RecordSerde<T> serde,
        NetworkChannel channel,
        ReplicatedLog log,
        QuorumStateStore quorumStateStore,
        Time time,
        Metrics metrics,
        ExpirationService expirationService,
        LogContext logContext,
        String clusterId,
        OptionalInt nodeId,
        RaftConfig raftConfig
    ) {
        this(serde,
            channel,
            new BlockingMessageQueue(),
            log,
            quorumStateStore,
            new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
            time,
            metrics,
            expirationService,
            MAX_FETCH_WAIT_MS,
            clusterId,
            nodeId,
            logContext,
            new Random(),
            raftConfig);
    }

    KafkaRaftClient(
        RecordSerde<T> serde,
        NetworkChannel channel,
        RaftMessageQueue messageQueue,
        ReplicatedLog log,
        QuorumStateStore quorumStateStore,
        MemoryPool memoryPool,
        Time time,
        Metrics metrics,
        ExpirationService expirationService,
        int fetchMaxWaitMs,
        String clusterId,
        OptionalInt nodeId,
        LogContext logContext,
        Random random,
        RaftConfig raftConfig
    ) {
        this.serde = serde;
        this.channel = channel;
        this.messageQueue = messageQueue;
        this.log = log;
        this.memoryPool = memoryPool;
        this.fetchPurgatory = new ThresholdPurgatory<>(expirationService);
        this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
        this.time = time;
        this.clusterId = clusterId;
        this.fetchMaxWaitMs = fetchMaxWaitMs;
        this.logger = logContext.logger(KafkaRaftClient.class);
        this.random = random;
        this.raftConfig = raftConfig;
        this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000, log::maybeClean);
        Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds();
        this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(),
            raftConfig.requestTimeoutMs(), random);
        this.quorum = new QuorumState(
            nodeId,
            quorumVoterIds,
            raftConfig.electionTimeoutMs(),
            raftConfig.fetchTimeoutMs(),
            quorumStateStore,
            time,
            logContext,
            random);
        this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
        // All Raft voters are statically configured and known at startup
        // so there are no unknown voter connections. Report this metric as 0.
        kafkaRaftMetrics.updateNumUnknownVoterConnections(0);

        // Update the voter endpoints with what's in RaftConfig
        Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections();
        voterAddresses.entrySet().stream()
            .filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
            .forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
    }

    private void updateFollowerHighWatermark(
        FollowerState state,
        OptionalLong highWatermarkOpt
    ) {
        highWatermarkOpt.ifPresent(highWatermark -> {
            long newHighWatermark = Math.min(endOffset().offset(), highWatermark);
            if (state.updateHighWatermark(OptionalLong.of(newHighWatermark))) {
                logger.debug("Follower high watermark updated to {}", newHighWatermark);
                log.updateHighWatermark(new LogOffsetMetadata(newHighWatermark));
                updateListenersProgress(newHighWatermark);
            }
        });
    }

    private void updateLeaderEndOffsetAndTimestamp(
        LeaderState<T> state,
        long currentTimeMs
    ) {
        final LogOffsetMetadata endOffsetMetadata = log.endOffset();

        if (state.updateLocalState(endOffsetMetadata)) {
            onUpdateLeaderHighWatermark(state, currentTimeMs);
        }

        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
    }

    private void onUpdateLeaderHighWatermark(
        LeaderState<T> state,
        long currentTimeMs
    ) {
        state.highWatermark().ifPresent(highWatermark -> {
            logger.debug("Leader high watermark updated to {}", highWatermark);
            log.updateHighWatermark(highWatermark);

            // After updating the high watermark, we first clear the append
            // purgatory so that we have an opportunity to route the pending
            // records still held in memory directly to the listener
            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);

            // It is also possible that the high watermark is being updated
            // for the first time following the leader election, so we need
            // to give lagging listeners an opportunity to catch up as well
            updateListenersProgress(highWatermark.offset);
        });
    }

    private void updateListenersProgress(long highWatermark) {
        for (ListenerContext listenerContext : listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
                    SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
                        String.format(
                            "Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d",
                            listenerContext.listenerName(),
                            nextExpectedOffset,
                            log.startOffset(),
                            highWatermark
                        )
                    ));
                    listenerContext.fireHandleSnapshot(snapshot);
                }
            });

            // Re-read the expected offset in case the snapshot had to be reloaded
            listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                if (nextExpectedOffset < highWatermark) {
                    LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED);
                    listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records);
                }
            });
        }
    }

    private Optional<SnapshotReader<T>> latestSnapshot() {
        return log.latestSnapshot().map(reader ->
            RecordsSnapshotReader.of(reader,
                serde,
                BufferSupplier.create(),
                MAX_BATCH_SIZE_BYTES,
                true /* Validate batch CRC*/
            )
        );
    }

    private void maybeFireHandleCommit(long baseOffset, int epoch, long appendTimestamp, int sizeInBytes, List<T> records) {
        for (ListenerContext listenerContext : listenerContexts.values()) {
            listenerContext.nextExpectedOffset().ifPresent(nextOffset -> {
                if (nextOffset == baseOffset) {
                    listenerContext.fireHandleCommit(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
                }
            });
        }
    }

    private void maybeFireLeaderChange(LeaderState<T> state) {
        for (ListenerContext listenerContext : listenerContexts.values()) {
            listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset());
        }
    }

    private void maybeFireLeaderChange() {
        for (ListenerContext listenerContext : listenerContexts.values()) {
            listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
        }
    }

    @Override
    public void initialize() {
        quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));

        long currentTimeMs = time.milliseconds();
        if (quorum.isLeader()) {
            throw new IllegalStateException("Voter cannot initialize as a Leader");
        } else if (quorum.isCandidate()) {
            onBecomeCandidate(currentTimeMs);
        } else if (quorum.isFollower()) {
            onBecomeFollower(currentTimeMs);
        }

        // When there is only a single voter, become candidate immediately
        if (quorum.isVoter()
            && quorum.remoteVoters().isEmpty()
            && !quorum.isCandidate()) {

            transitionToCandidate(currentTimeMs);
        }
    }

    @Override
    public void register(Listener<T> listener) {
        pendingRegistrations.add(Registration.register(listener));
        wakeup();
    }

    @Override
    public void unregister(Listener<T> listener) {
        pendingRegistrations.add(Registration.unregister(listener));
        // No need to wakeup the polling thread. It is a removal so the updates can be
        // delayed until the polling thread wakes up for other reasons.
    }

    @Override
    public LeaderAndEpoch leaderAndEpoch() {
        return quorum.leaderAndEpoch();
    }

    @Override
    public OptionalInt nodeId() {
        return quorum.localId();
    }

    private OffsetAndEpoch endOffset() {
        return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
    }

    private void resetConnections() {
        requestManager.resetAll();
    }

    private void onBecomeLeader(long currentTimeMs) {
        long endOffset = log.endOffset().offset;

        BatchAccumulator<T> accumulator = new BatchAccumulator<>(
            quorum.epoch(),
            endOffset,
            raftConfig.appendLingerMs(),
            MAX_BATCH_SIZE_BYTES,
            memoryPool,
            time,
            CompressionType.NONE,
            serde
        );

        LeaderState<T> state = quorum.transitionToLeader(endOffset, accumulator);
        maybeFireLeaderChange(state);

        log.initializeLeaderEpoch(quorum.epoch());

        // The high watermark can only be advanced once we have written a record
        // from the new leader's epoch. Hence we write a control message immediately
        // to ensure there is no delay committing pending data.
        state.appendLeaderChangeMessage(currentTimeMs);

        resetConnections();
        kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
    }

    private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
        // We update the end offset before flushing so that parked fetches can return sooner.
        updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
        log.flush(false);
    }

    private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs) {
        if (state.isVoteGranted()) {
            onBecomeLeader(currentTimeMs);
            return true;
        } else {
            return false;
        }
    }

    private void onBecomeCandidate(long currentTimeMs) {
        CandidateState state = quorum.candidateStateOrThrow();
        if (!maybeTransitionToLeader(state, currentTimeMs)) {
            resetConnections();
            kafkaRaftMetrics.updateElectionStartMs(currentTimeMs);
        }
    }

    private void transitionToCandidate(long currentTimeMs) {
        quorum.transitionToCandidate();
        maybeFireLeaderChange();
        onBecomeCandidate(currentTimeMs);
    }

    private void transitionToUnattached(int epoch) {
        quorum.transitionToUnattached(epoch);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void transitionToResigned(List<Integer> preferredSuccessors) {
        fetchPurgatory.completeAllExceptionally(
            Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
        quorum.transitionToResigned(preferredSuccessors);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void transitionToVoted(int candidateId, int epoch) {
        quorum.transitionToVoted(epoch, candidateId);
        maybeFireLeaderChange();
        resetConnections();
    }

    private void onBecomeFollower(long currentTimeMs) {
        kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);

        resetConnections();

        // After becoming a follower, we need to complete all pending fetches so that
        // they can be re-sent to the leader without waiting for their expirations
        fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
            "Cannot process the fetch request because the node is no longer the leader."));

        // Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader
        appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
            "Failed to receive sufficient acknowledgments for this append before leader change."));
    }

    private void transitionToFollower(
        int epoch,
        int leaderId,
        long currentTimeMs
    ) {
        quorum.transitionToFollower(epoch, leaderId);
        maybeFireLeaderChange();
        onBecomeFollower(currentTimeMs);
    }

    private VoteResponseData buildVoteResponse(Errors partitionLevelError, boolean voteGranted) {
        return VoteResponse.singletonResponse(
            Errors.NONE,
            log.topicPartition(),
            partitionLevelError,
            quorum.epoch(),
            quorum.leaderIdOrSentinel(),
            voteGranted);
    }

    /**
     * Handle a Vote request. This API may return the following errors:
     *
     * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
     *      but different from this node
     * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
     * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
     * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
     *      if this node or the sender is not one of the current known voters)
     * - {@link Errors#INVALID_REQUEST} if the last epoch or offset are invalid
     */
    private VoteResponseData handleVoteRequest(
        RaftRequest.Inbound requestMetadata
    ) {
        VoteRequestData request = (VoteRequestData) requestMetadata.data;

        if (!hasValidClusterId(request.clusterId())) {
            return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }

        if (!hasValidTopicPartition(request, log.topicPartition())) {
            // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
            return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }

        VoteRequestData.PartitionData partitionRequest =
            request.topics().get(0).partitions().get(0);

        int candidateId = partitionRequest.candidateId();
        int candidateEpoch = partitionRequest.candidateEpoch();

        int lastEpoch = partitionRequest.lastOffsetEpoch();
        long lastEpochEndOffset = partitionRequest.lastOffset();
        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) {
            return buildVoteResponse(Errors.INVALID_REQUEST, false);
        }

        Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId, candidateEpoch);
        if (errorOpt.isPresent()) {
            return buildVoteResponse(errorOpt.get(), false);
        }

        if (candidateEpoch > quorum.epoch()) {
            transitionToUnattached(candidateEpoch);
        }

        OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
        boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0);

        if (voteGranted && quorum.isUnattached()) {
            transitionToVoted(candidateId, candidateEpoch);
        }

        logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected");
        return buildVoteResponse(Errors.NONE, voteGranted);
    }

    private boolean handleVoteResponse(
        RaftResponse.Inbound responseMetadata,
        long currentTimeMs
    ) {
        int remoteNodeId = responseMetadata.sourceId();
        VoteResponseData response = (VoteResponseData) responseMetadata.data;
        Errors topLevelError = Errors.forCode(response.errorCode());
        if (topLevelError != Errors.NONE) {
            return handleTopLevelError(topLevelError, responseMetadata);
        }

        if (!hasValidTopicPartition(response, log.topicPartition())) {
            return false;
        }

        VoteResponseData.PartitionData partitionResponse =
            response.topics().get(0).partitions().get(0);

        Errors error = Errors.forCode(partitionResponse.errorCode());
        OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();

        Optional<Boolean> handled = maybeHandleCommonResponse(
            error, responseLeaderId, responseEpoch, currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        } else if (error == Errors.NONE) {
            if (quorum.isLeader()) {
                logger.debug("Ignoring vote response {} since we already became leader for epoch {}",
                    partitionResponse, quorum.epoch());
            } else if (quorum.isCandidate()) {
                CandidateState state = quorum.candidateStateOrThrow();
                if (partitionResponse.voteGranted()) {
                    state.recordGrantedVote(remoteNodeId);
                    maybeTransitionToLeader(state, currentTimeMs);
                } else {
                    state.recordRejectedVote(remoteNodeId);

                    // If our vote is rejected, we go immediately to the random backoff. This
                    // ensures that we are not stuck waiting for the election timeout when the
                    // vote has become gridlocked.
                    if (state.isVoteRejected() && !state.isBackingOff()) {
                        logger.info("Insufficient remaining votes to become leader (rejected by {}). " +
                            "We will backoff before retrying election again", state.rejectingVoters());

                        state.startBackingOff(
                            currentTimeMs,
                            binaryExponentialElectionBackoffMs(state.retries())
                        );
                    }
                }
            } else {
                logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}",
                    partitionResponse, quorum.epoch());
            }
            return true;
        } else {
            return handleUnexpectedError(error, responseMetadata);
        }
    }

    private int binaryExponentialElectionBackoffMs(int retries) {
        if (retries <= 0) {
            throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
        }
        // upper limit exponential co-efficients at 20 to avoid overflow
        return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)),
                raftConfig.electionBackoffMaxMs());
    }

    private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
        if (positionInSuccessors <= 0 || positionInSuccessors >= totalNumSuccessors) {
            throw new IllegalArgumentException("Position " + positionInSuccessors + " should be larger than zero" +
                    " and smaller than total number of successors " + totalNumSuccessors);
        }

        int retryBackOffBaseMs = raftConfig.electionBackoffMaxMs() >> (totalNumSuccessors - 1);
        return Math.min(raftConfig.electionBackoffMaxMs(), retryBackOffBaseMs << (positionInSuccessors - 1));
    }

    private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partitionLevelError) {
        return BeginQuorumEpochResponse.singletonResponse(
            Errors.NONE,
            log.topicPartition(),
            partitionLevelError,
            quorum.epoch(),
            quorum.leaderIdOrSentinel());
    }

    /**
     * Handle a BeginEpoch request. This API may return the following errors:
     *
     * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
     *      but different from this node
     * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
     * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
     *      if this node or the sender is not one of the current known voters)
     * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
     */
    private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) requestMetadata.data;

        if (!hasValidClusterId(request.clusterId())) {
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }

        if (!hasValidTopicPartition(request, log.topicPartition())) {
            // Until we support multi-raft, we treat topic partition mismatches as invalid requests
            return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }

        BeginQuorumEpochRequestData.PartitionData partitionRequest =
            request.topics().get(0).partitions().get(0);

        int requestLeaderId = partitionRequest.leaderId();
        int requestEpoch = partitionRequest.leaderEpoch();

        Optional<Errors> errorOpt = validateVoterOnlyRequest(requestLeaderId, requestEpoch);
        if (errorOpt.isPresent()) {
            return buildBeginQuorumEpochResponse(errorOpt.get());
        }

        maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, currentTimeMs);
        return buildBeginQuorumEpochResponse(Errors.NONE);
    }

    private boolean handleBeginQuorumEpochResponse(
        RaftResponse.Inbound responseMetadata,
        long currentTimeMs
    ) {
        int remoteNodeId = responseMetadata.sourceId();
        BeginQuorumEpochResponseData response = (BeginQuorumEpochResponseData) responseMetadata.data;
        Errors topLevelError = Errors.forCode(response.errorCode());
        if (topLevelError != Errors.NONE) {
            return handleTopLevelError(topLevelError, responseMetadata);
        }

        if (!hasValidTopicPartition(response, log.topicPartition())) {
            return false;
        }

        BeginQuorumEpochResponseData.PartitionData partitionResponse =
            response.topics().get(0).partitions().get(0);

        Errors partitionError = Errors.forCode(partitionResponse.errorCode());
        OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();

        Optional<Boolean> handled = maybeHandleCommonResponse(
            partitionError, responseLeaderId, responseEpoch, currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        } else if (partitionError == Errors.NONE) {
            if (quorum.isLeader()) {
                LeaderState<T> state = quorum.leaderStateOrThrow();
                state.addAcknowledgementFrom(remoteNodeId);
            } else {
                logger.debug("Ignoring BeginQuorumEpoch response {} since " +
                    "this node is not the leader anymore", response);
            }
            return true;
        } else {
            return handleUnexpectedError(partitionError, responseMetadata);
        }
    }

    private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors partitionLevelError) {
        return EndQuorumEpochResponse.singletonResponse(
            Errors.NONE,
            log.topicPartition(),
            partitionLevelError,
            quorum.epoch(),
            quorum.leaderIdOrSentinel());
    }

    /**
     * Handle an EndEpoch request. This API may return the following errors:
     *
     * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
     *      but different from this node
     * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
     * - {@link Errors#INCONSISTENT_VOTER_SET} if the request suggests inconsistent voter membership (e.g.
     *      if this node or the sender is not one of the current known voters)
     * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
     */
    private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;

        if (!hasValidClusterId(request.clusterId())) {
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }

        if (!hasValidTopicPartition(request, log.topicPartition())) {
            // Until we support multi-raft, we treat topic partition mismatches as invalid requests
            return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
        }

        EndQuorumEpochRequestData.PartitionData partitionRequest =
            request.topics().get(0).partitions().get(0);

        int requestEpoch = partitionRequest.leaderEpoch();
        int requestLeaderId = partitionRequest.leaderId();

        Optional<Errors> errorOpt = validateVoterOnlyRequest(requestLeaderId, requestEpoch);
        if (errorOpt.isPresent()) {
            return buildEndQuorumEpochResponse(errorOpt.get());
        }
        maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, currentTimeMs);

        if (quorum.isFollower()) {
            FollowerState state = quorum.followerStateOrThrow();
            if (state.leaderId() == requestLeaderId) {
                List<Integer> preferredSuccessors = partitionRequest.preferredSuccessors();
                long electionBackoffMs = endEpochElectionBackoff(preferredSuccessors);
                logger.debug("Overriding follower fetch timeout to {} after receiving " +
                    "EndQuorumEpoch request from leader {} in epoch {}", electionBackoffMs,
                    requestLeaderId, requestEpoch);
                state.overrideFetchTimeout(currentTimeMs, electionBackoffMs);
            }
        }
        return buildEndQuorumEpochResponse(Errors.NONE);
    }

    private long endEpochElectionBackoff(List<Integer> preferredSuccessors) {
        // Based on the priority inside the preferred successors, choose the corresponding delayed
        // election backoff time based on strict exponential mechanism so that the most up-to-date
        // voter has a higher chance to be elected. If the node's priority is highest, become
        // candidate immediately instead of waiting for next poll.
        int position = preferredSuccessors.indexOf(quorum.localIdOrThrow());
        if (position <= 0) {
            return 0;
        } else {
            return strictExponentialElectionBackoffMs(position, preferredSuccessors.size());
        }
    }

    private boolean handleEndQuorumEpochResponse(
        RaftResponse.Inbound responseMetadata,
        long currentTimeMs
    ) {
        EndQuorumEpochResponseData response = (EndQuorumEpochResponseData) responseMetadata.data;
        Errors topLevelError = Errors.forCode(response.errorCode());
        if (topLevelError != Errors.NONE) {
            return handleTopLevelError(topLevelError, responseMetadata);
        }

        if (!hasValidTopicPartition(response, log.topicPartition())) {
            return false;
        }

        EndQuorumEpochResponseData.PartitionData partitionResponse =
            response.topics().get(0).partitions().get(0);

        Errors partitionError = Errors.forCode(partitionResponse.errorCode());
        OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId());
        int responseEpoch = partitionResponse.leaderEpoch();

        Optional<Boolean> handled = maybeHandleCommonResponse(
            partitionError, responseLeaderId, responseEpoch, currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        } else if (partitionError == Errors.NONE) {
            ResignedState resignedState = quorum.resignedStateOrThrow();
            resignedState.acknowledgeResignation(responseMetadata.sourceId());
            return true;
        } else {
            return handleUnexpectedError(partitionError, responseMetadata);
        }
    }

    private FetchResponseData buildFetchResponse(
        Errors error,
        Records records,
        ValidOffsetAndEpoch validOffsetAndEpoch,
        Optional<LogOffsetMetadata> highWatermark
    ) {
        return RaftUtil.singletonFetchResponse(log.topicPartition(), log.topicId(), Errors.NONE, partitionData -> {
            partitionData
                .setRecords(records)
                .setErrorCode(error.code())
                .setLogStartOffset(log.startOffset())
                .setHighWatermark(highWatermark
                    .map(offsetMetadata -> offsetMetadata.offset)
                    .orElse(-1L));

            partitionData.currentLeader()
                .setLeaderEpoch(quorum.epoch())
                .setLeaderId(quorum.leaderIdOrSentinel());

            switch (validOffsetAndEpoch.kind()) {
                case DIVERGING:
                    partitionData.divergingEpoch()
                        .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch())
                        .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    break;
                case SNAPSHOT:
                    partitionData.snapshotId()
                        .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch())
                        .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset());
                    break;
                default:
            }
        });
    }

    private FetchResponseData buildEmptyFetchResponse(
        Errors error,
        Optional<LogOffsetMetadata> highWatermark
    ) {
        return buildFetchResponse(
            error,
            MemoryRecords.EMPTY,
            ValidOffsetAndEpoch.valid(),
            highWatermark
        );
    }

    private boolean hasValidClusterId(String requestClusterId) {
        // We don't enforce the cluster id if it is not provided.
        if (requestClusterId == null) {
            return true;
        }
        return clusterId.equals(requestClusterId);
    }

    /**
     * Handle a Fetch request. The fetch offset and last fetched epoch are always
     * validated against the current log. In the case that they do not match, the response will
     * indicate the diverging offset/epoch. A follower is expected to truncate its log in this
     * case and resend the fetch.
     *
     * This API may return the following errors:
     *
     * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
     *     but different from this node
     * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
     * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
     * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
     *     or if either the fetch offset or the last fetched epoch is invalid
     */
    private CompletableFuture<FetchResponseData> handleFetchRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        FetchRequestData request = (FetchRequestData) requestMetadata.data;

        if (!hasValidClusterId(request.clusterId())) {
            return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
        }

        if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) {
            // Until we support multi-raft, we treat topic partition mismatches as invalid requests
            return completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        }
        // If the ID is valid, we can set the topic name.
        request.topics().get(0).setTopic(log.topicPartition().topic());

        FetchRequestData.FetchPartition fetchPartition = request.topics().get(0).partitions().get(0);
        if (request.maxWaitMs() < 0
            || fetchPartition.fetchOffset() < 0
            || fetchPartition.lastFetchedEpoch() < 0
            || fetchPartition.lastFetchedEpoch() > fetchPartition.currentLeaderEpoch()) {
            return completedFuture(buildEmptyFetchResponse(
                Errors.INVALID_REQUEST, Optional.empty()));
        }

        FetchResponseData response = tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs);
        FetchResponseData.PartitionData partitionResponse =
            response.responses().get(0).partitions().get(0);

        if (partitionResponse.errorCode() != Errors.NONE.code()
            || FetchResponse.recordsSize(partitionResponse) > 0
            || request.maxWaitMs() == 0) {
            return completedFuture(response);
        }

        CompletableFuture<Long> future = fetchPurgatory.await(
            fetchPartition.fetchOffset(),
            request.maxWaitMs());

        return future.handle((completionTimeMs, exception) -> {
            if (exception != null) {
                Throwable cause = exception instanceof ExecutionException ?
                    exception.getCause() : exception;

                // If the fetch timed out in purgatory, it means no new data is available,
                // and we will complete the fetch successfully. Otherwise, if there was
                // any other error, we need to return it.
                Errors error = Errors.forException(cause);
                if (error != Errors.REQUEST_TIMED_OUT) {
                    logger.debug("Failed to handle fetch from {} at {} due to {}",
                        request.replicaId(), fetchPartition.fetchOffset(), error);
                    return buildEmptyFetchResponse(error, Optional.empty());
                }
            }

            // FIXME: `completionTimeMs`, which can be null
            logger.trace("Completing delayed fetch from {} starting at offset {} at {}",
                request.replicaId(), fetchPartition.fetchOffset(), completionTimeMs);

            return tryCompleteFetchRequest(request.replicaId(), fetchPartition, time.milliseconds());
        });
    }

    private FetchResponseData tryCompleteFetchRequest(
        int replicaId,
        FetchRequestData.FetchPartition request,
        long currentTimeMs
    ) {
        try {
            Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch());
            if (errorOpt.isPresent()) {
                return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
            }

            long fetchOffset = request.fetchOffset();
            int lastFetchedEpoch = request.lastFetchedEpoch();
            LeaderState<T> state = quorum.leaderStateOrThrow();
            ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);

            final Records records;
            if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);

                if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
                    onUpdateLeaderHighWatermark(state, currentTimeMs);
                }

                records = info.records;
            } else {
                records = MemoryRecords.EMPTY;
            }

            return buildFetchResponse(Errors.NONE, records, validOffsetAndEpoch, state.highWatermark());
        } catch (Exception e) {
            logger.error("Caught unexpected error in fetch completion of request {}", request, e);
            return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty());
        }
    }

    private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
        if (leaderIdOrNil < 0)
            return OptionalInt.empty();
        return OptionalInt.of(leaderIdOrNil);
    }

    private static String listenerName(Listener<?> listener) {
        return String.format("%s@%d", listener.getClass().getTypeName(), System.identityHashCode(listener));
    }

    private boolean handleFetchResponse(
        RaftResponse.Inbound responseMetadata,
        long currentTimeMs
    ) {
        FetchResponseData response = (FetchResponseData) responseMetadata.data;
        Errors topLevelError = Errors.forCode(response.errorCode());
        if (topLevelError != Errors.NONE) {
            return handleTopLevelError(topLevelError, responseMetadata);
        }

        if (!RaftUtil.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) {
            return false;
        }
        // If the ID is valid, we can set the topic name.
        response.responses().get(0).setTopic(log.topicPartition().topic());

        FetchResponseData.PartitionData partitionResponse =
            response.responses().get(0).partitions().get(0);

        FetchResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch = partitionResponse.currentLeader();
        OptionalInt responseLeaderId = optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
        int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
        Errors error = Errors.forCode(partitionResponse.errorCode());

        Optional<Boolean> handled = maybeHandleCommonResponse(
            error, responseLeaderId, responseEpoch, currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }

        FollowerState state = quorum.followerStateOrThrow();
        if (error == Errors.NONE) {
            FetchResponseData.EpochEndOffset divergingEpoch = partitionResponse.divergingEpoch();
            if (divergingEpoch.epoch() >= 0) {
                // The leader is asking us to truncate before continuing
                final OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
                    divergingEpoch.endOffset(), divergingEpoch.epoch());

                state.highWatermark().ifPresent(highWatermark -> {
                    if (divergingOffsetAndEpoch.offset() < highWatermark.offset) {
                        throw new KafkaException("The leader requested truncation to offset " +
                            divergingOffsetAndEpoch.offset() + ", which is below the current high watermark" +
                            " " + highWatermark);
                    }
                });

                long truncationOffset = log.truncateToEndOffset(divergingOffsetAndEpoch);
                logger.info("Truncated to offset {} from Fetch response from leader {}", truncationOffset, quorum.leaderIdOrSentinel());
            } else if (partitionResponse.snapshotId().epoch() >= 0 ||
                       partitionResponse.snapshotId().endOffset() >= 0) {
                // The leader is asking us to fetch a snapshot

                if (partitionResponse.snapshotId().epoch() < 0) {
                    logger.error(
                        "The leader sent a snapshot id with a valid end offset {} but with an invalid epoch {}",
                        partitionResponse.snapshotId().endOffset(),
                        partitionResponse.snapshotId().epoch()
                    );
                    return false;
                } else if (partitionResponse.snapshotId().endOffset() < 0) {
                    logger.error(
                        "The leader sent a snapshot id with a valid epoch {} but with an invalid end offset {}",
                        partitionResponse.snapshotId().epoch(),
                        partitionResponse.snapshotId().endOffset()
                    );
                    return false;
                } else {
                    final OffsetAndEpoch snapshotId = new OffsetAndEpoch(
                        partitionResponse.snapshotId().endOffset(),
                        partitionResponse.snapshotId().epoch()
                    );

                    // Do not validate the snapshot id against the local replicated log
                    // since this snapshot is expected to reference offsets and epochs
                    // greater than the log end offset and high-watermark
                    state.setFetchingSnapshot(log.storeSnapshot(snapshotId));
                }
            } else {
                Records records = FetchResponse.recordsOrFail(partitionResponse);
                if (records.sizeInBytes() > 0) {
                    appendAsFollower(records);
                }

                OptionalLong highWatermark = partitionResponse.highWatermark() < 0 ?
                    OptionalLong.empty() : OptionalLong.of(partitionResponse.highWatermark());
                updateFollowerHighWatermark(state, highWatermark);
            }

            state.resetFetchTimeout(currentTimeMs);
            return true;
        } else {
            return handleUnexpectedError(error, responseMetadata);
        }
    }

    private void appendAsFollower(
        Records records
    ) {
        LogAppendInfo info = log.appendAsFollower(records);
        log.flush(false);

        OffsetAndEpoch endOffset = endOffset();
        kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - info.firstOffset + 1);
        kafkaRaftMetrics.updateLogEnd(endOffset);
        logger.trace("Follower end offset updated to {} after append", endOffset);
    }

    private LogAppendInfo appendAsLeader(
        Records records
    ) {
        LogAppendInfo info = log.appendAsLeader(records, quorum.epoch());
        OffsetAndEpoch endOffset = endOffset();
        kafkaRaftMetrics.updateAppendRecords(info.lastOffset - info.firstOffset + 1);
        kafkaRaftMetrics.updateLogEnd(endOffset);
        logger.trace("Leader appended records at base offset {}, new end offset is {}", info.firstOffset, endOffset);
        return info;
    }

    private DescribeQuorumResponseData handleDescribeQuorumRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) requestMetadata.data;
        if (!hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) {
            return DescribeQuorumRequest.getPartitionLevelErrorResponse(
                describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }

        if (!quorum.isLeader()) {
            return DescribeQuorumResponse.singletonErrorResponse(
                log.topicPartition(),
                Errors.NOT_LEADER_OR_FOLLOWER
            );
        }

        LeaderState<T> leaderState = quorum.leaderStateOrThrow();
        return DescribeQuorumResponse.singletonResponse(
            log.topicPartition(),
            leaderState.describeQuorum(currentTimeMs)
        );
    }

    /**
     * Handle a FetchSnapshot request, similar to the Fetch request but we use {@link UnalignedRecords}
     * in response because the records are not necessarily offset-aligned.
     *
     * This API may return the following errors:
     *
     * - {@link Errors#INCONSISTENT_CLUSTER_ID} if the cluster id is presented in request
     *     but different from this node
     * - {@link Errors#BROKER_NOT_AVAILABLE} if this node is currently shutting down
     * - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
     * - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
     *     or if either the fetch offset or the last fetched epoch is invalid
     * - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
     * - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
     */
    private FetchSnapshotResponseData handleFetchSnapshotRequest(
        RaftRequest.Inbound requestMetadata
    ) {
        FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data;

        if (!hasValidClusterId(data.clusterId())) {
            return new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code());
        }

        if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
            return FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
        }

        Optional<FetchSnapshotRequestData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotRequest
            .forTopicPartition(data, log.topicPartition());
        if (!partitionSnapshotOpt.isPresent()) {
            // The Raft client assumes that there is only one topic partition.
            TopicPartition unknownTopicPartition = new TopicPartition(
                data.topics().get(0).name(),
                data.topics().get(0).partitions().get(0).partition()
            );

            return FetchSnapshotResponse.singleton(
                unknownTopicPartition,
                responsePartitionSnapshot -> responsePartitionSnapshot
                    .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
            );
        }

        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get();
        Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
                partitionSnapshot.currentLeaderEpoch()
        );
        if (leaderValidation.isPresent()) {
            return FetchSnapshotResponse.singleton(
                log.topicPartition(),
                responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
                    .setErrorCode(leaderValidation.get().code())
            );
        }

        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
            partitionSnapshot.snapshotId().endOffset(),
            partitionSnapshot.snapshotId().epoch()
        );
        Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
        if (!snapshotOpt.isPresent()) {
            return FetchSnapshotResponse.singleton(
                log.topicPartition(),
                responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
                    .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code())
            );
        }

        RawSnapshotReader snapshot = snapshotOpt.get();
        long snapshotSize = snapshot.sizeInBytes();
        if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) {
            return FetchSnapshotResponse.singleton(
                log.topicPartition(),
                responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
                    .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
            );
        }

        if (partitionSnapshot.position() > Integer.MAX_VALUE) {
            throw new IllegalStateException(
                String.format(
                    "Trying to fetch a snapshot with size (%d) and a position (%d) larger than %d",
                    snapshotSize,
                    partitionSnapshot.position(),
                    Integer.MAX_VALUE
                )
            );
        }

        int maxSnapshotSize;
        try {
            maxSnapshotSize = Math.toIntExact(snapshotSize);
        } catch (ArithmeticException e) {
            maxSnapshotSize = Integer.MAX_VALUE;
        }

        UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));

        return FetchSnapshotResponse.singleton(
            log.topicPartition(),
            responsePartitionSnapshot -> {
                addQuorumLeader(responsePartitionSnapshot)
                    .snapshotId()
                    .setEndOffset(snapshotId.offset())
                    .setEpoch(snapshotId.epoch());

                return responsePartitionSnapshot
                    .setSize(snapshotSize)
                    .setPosition(partitionSnapshot.position())
                    .setUnalignedRecords(records);
            }
        );
    }

    private boolean handleFetchSnapshotResponse(
        RaftResponse.Inbound responseMetadata,
        long currentTimeMs
    ) {
        FetchSnapshotResponseData data = (FetchSnapshotResponseData) responseMetadata.data;
        Errors topLevelError = Errors.forCode(data.errorCode());
        if (topLevelError != Errors.NONE) {
            return handleTopLevelError(topLevelError, responseMetadata);
        }

        if (data.topics().size() != 1 && data.topics().get(0).partitions().size() != 1) {
            return false;
        }

        Optional<FetchSnapshotResponseData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotResponse
            .forTopicPartition(data, log.topicPartition());
        if (!partitionSnapshotOpt.isPresent()) {
            return false;
        }

        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get();

        FetchSnapshotResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch = partitionSnapshot.currentLeader();
        OptionalInt responseLeaderId = optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
        int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
        Errors error = Errors.forCode(partitionSnapshot.errorCode());

        Optional<Boolean> handled = maybeHandleCommonResponse(
            error, responseLeaderId, responseEpoch, currentTimeMs);
        if (handled.isPresent()) {
            return handled.get();
        }

        FollowerState state = quorum.followerStateOrThrow();

        if (Errors.forCode(partitionSnapshot.errorCode()) == Errors.SNAPSHOT_NOT_FOUND ||
            partitionSnapshot.snapshotId().endOffset() < 0 ||
            partitionSnapshot.snapshotId().epoch() < 0) {

            /* The leader deleted the snapshot before the follower could download it. Start over by
             * resetting the fetching snapshot state and sending another fetch request.
             */
            logger.trace(
                "Leader doesn't know about snapshot id {}, returned error {} and snapshot id {}",
                state.fetchingSnapshot(),
                partitionSnapshot.errorCode(),
                partitionSnapshot.snapshotId()
            );
            state.setFetchingSnapshot(Optional.empty());
            state.resetFetchTimeout(currentTimeMs);
            return true;
        }

        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
            partitionSnapshot.snapshotId().endOffset(),
            partitionSnapshot.snapshotId().epoch()
        );

        RawSnapshotWriter snapshot;
        if (state.fetchingSnapshot().isPresent()) {
            snapshot = state.fetchingSnapshot().get();
        } else {
            throw new IllegalStateException(
                String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot)
            );
        }

        if (!snapshot.snapshotId().equals(snapshotId)) {
            throw new IllegalStateException(
                String.format(
                    "Received fetch snapshot response with an invalid id. Expected %s; Received %s",
                    snapshot.snapshotId(),
                    snapshotId
                )
            );
        }
        if (snapshot.sizeInBytes() != partitionSnapshot.position()) {
            throw new IllegalStateException(
                String.format(
                    "Received fetch snapshot response with an invalid position. Expected %d; Received %d",
                    snapshot.sizeInBytes(),
                    partitionSnapshot.position()
                )
            );
        }

        final UnalignedMemoryRecords records;
        if (partitionSnapshot.unalignedRecords() instanceof MemoryRecords) {
            records = new UnalignedMemoryRecords(((MemoryRecords) partitionSnapshot.unalignedRecords()).buffer());
        } else if (partitionSnapshot.unalignedRecords() instanceof UnalignedMemoryRecords) {
            records = (UnalignedMemoryRecords) partitionSnapshot.unalignedRecords();
        } else {
            throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
        }
        snapshot.append(records);

        if (snapshot.sizeInBytes() == partitionSnapshot.size()) {
            // Finished fetching the snapshot.
            snapshot.freeze();
            state.setFetchingSnapshot(Optional.empty());

            if (log.truncateToLatestSnapshot()) {
                updateFollowerHighWatermark(state, OptionalLong.of(log.highWatermark().offset));
            } else {
                throw new IllegalStateException(
                    String.format(
                        "Full log truncation expected but didn't happen. Snapshot of %s, log end offset %s, last fetched %d",
                        snapshot.snapshotId(),
                        log.endOffset(),
                        log.lastFetchedEpoch()
                    )
                );
            }
        }

        state.resetFetchTimeout(currentTimeMs);
        return true;
    }

    private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
        // Only elected leaders are sent in the request/response header, so if we have an elected
        // leaderId, it should be consistent with what is in the message.
        if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localIdOrSentinel()) {
            // The response indicates that we should be the leader, so we verify that is the case
            return quorum.isLeader();
        } else {
            return epoch != quorum.epoch()
                || !leaderId.isPresent()
                || !quorum.leaderId().isPresent()
                || leaderId.equals(quorum.leaderId());
        }
    }

    /**
     * Handle response errors that are common across request types.
     *
     * @param error Error from the received response
     * @param leaderId Optional leaderId from the response
     * @param epoch Epoch received from the response
     * @param currentTimeMs Current epoch time in milliseconds
     * @return Optional value indicating whether the error was handled here and the outcome of
     *    that handling. Specifically:
     *
     *    - Optional.empty means that the response was not handled here and the custom
     *        API handler should be applied
     *    - Optional.of(true) indicates that the response was successfully handled here and
     *        the request does not need to be retried
     *    - Optional.of(false) indicates that the response was handled here, but that the request
     *        will need to be retried
     */
    private Optional<Boolean> maybeHandleCommonResponse(
        Errors error,
        OptionalInt leaderId,
        int epoch,
        long currentTimeMs
    ) {
        if (epoch < quorum.epoch() || error == Errors.UNKNOWN_LEADER_EPOCH) {
            // We have a larger epoch, so the response is no longer relevant
            return Optional.of(true);
        } else if (epoch > quorum.epoch()
            || error == Errors.FENCED_LEADER_EPOCH
            || error == Errors.NOT_LEADER_OR_FOLLOWER) {

            // The response indicates that the request had a stale epoch, but we need
            // to validate the epoch from the response against our current state.
            maybeTransition(leaderId, epoch, currentTimeMs);
            return Optional.of(true);
        } else if (epoch == quorum.epoch()
            && leaderId.isPresent()
            && !quorum.hasLeader()) {

            // Since we are transitioning to Follower, we will only forward the
            // request to the handler if there is no error. Otherwise, we will let
            // the request be retried immediately (if needed) after the transition.
            // This handling allows an observer to discover the leader and append
            // to the log in the same Fetch request.
            transitionToFollower(epoch, leaderId.getAsInt(), currentTimeMs);
            if (error == Errors.NONE) {
                return Optional.empty();
            } else {
                return Optional.of(true);
            }
        } else if (error == Errors.BROKER_NOT_AVAILABLE) {
            return Optional.of(false);
        } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL) {
            // For now we treat this as a fatal error. Once we have support for quorum
            // reassignment, this error could suggest that either we or the recipient of
            // the request just has stale voter information, which means we can retry
            // after backing off.
            throw new IllegalStateException("Received error indicating inconsistent voter sets");
        } else if (error == Errors.INVALID_REQUEST) {
            throw new IllegalStateException("Received unexpected invalid request error");
        }

        return Optional.empty();
    }

    private void maybeTransition(
        OptionalInt leaderId,
        int epoch,
        long currentTimeMs
    ) {
        if (!hasConsistentLeader(epoch, leaderId)) {
            throw new IllegalStateException("Received request or response with leader " + leaderId +
                " and epoch " + epoch + " which is inconsistent with current leader " +
                quorum.leaderId() + " and epoch " + quorum.epoch());
        } else if (epoch > quorum.epoch()) {
            if (leaderId.isPresent()) {
                transitionToFollower(epoch, leaderId.getAsInt(), currentTimeMs);
            } else {
                transitionToUnattached(epoch);
            }
        } else if (leaderId.isPresent() && !quorum.hasLeader()) {
            // The request or response indicates the leader of the current epoch,
            // which is currently unknown
            transitionToFollower(epoch, leaderId.getAsInt(), currentTimeMs);
        }
    }

    private boolean handleTopLevelError(Errors error, RaftResponse.Inbound response) {
        if (error == Errors.BROKER_NOT_AVAILABLE) {
            return false;
        } else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
            throw new ClusterAuthorizationException("Received cluster authorization error in response " + response);
        } else {
            return handleUnexpectedError(error, response);
        }
    }

    private boolean handleUnexpectedError(Errors error, RaftResponse.Inbound response) {
        logger.error("Unexpected error {} in {} response: {}",
            error, ApiKeys.forId(response.data.apiKey()), response);
        return false;
    }

    private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
        // The response epoch matches the local epoch, so we can handle the response
        ApiKeys apiKey = ApiKeys.forId(response.data.apiKey());
        final boolean handledSuccessfully;

        switch (apiKey) {
            case FETCH:
                handledSuccessfully = handleFetchResponse(response, currentTimeMs);
                break;

            case VOTE:
                handledSuccessfully = handleVoteResponse(response, currentTimeMs);
                break;

            case BEGIN_QUORUM_EPOCH:
                handledSuccessfully = handleBeginQuorumEpochResponse(response, currentTimeMs);
                break;

            case END_QUORUM_EPOCH:
                handledSuccessfully = handleEndQuorumEpochResponse(response, currentTimeMs);
                break;

            case FETCH_SNAPSHOT:
                handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs);
                break;

            default:
                throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
        }

        ConnectionState connection = requestManager.getOrCreate(response.sourceId());
        if (handledSuccessfully) {
            connection.onResponseReceived(response.correlationId);
        } else {
            connection.onResponseError(response.correlationId, currentTimeMs);
        }
    }

    /**
     * Validate a request which is only valid between voters. If an error is
     * present in the returned value, it should be returned in the response.
     */
    private Optional<Errors> validateVoterOnlyRequest(int remoteNodeId, int requestEpoch) {
        if (requestEpoch < quorum.epoch()) {
            return Optional.of(Errors.FENCED_LEADER_EPOCH);
        } else if (remoteNodeId < 0) {
            return Optional.of(Errors.INVALID_REQUEST);
        } else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) {
            return Optional.of(Errors.INCONSISTENT_VOTER_SET);
        } else {
            return Optional.empty();
        }
    }

    /**
     * Validate a request which is intended for the current quorum leader.
     * If an error is present in the returned value, it should be returned
     * in the response.
     */
    private Optional<Errors> validateLeaderOnlyRequest(int requestEpoch) {
        if (requestEpoch < quorum.epoch()) {
            return Optional.of(Errors.FENCED_LEADER_EPOCH);
        } else if (requestEpoch > quorum.epoch()) {
            return Optional.of(Errors.UNKNOWN_LEADER_EPOCH);
        } else if (!quorum.isLeader()) {
            // In general, non-leaders do not expect to receive requests
            // matching their own epoch, but it is possible when observers
            // are using the Fetch API to find the result of an election.
            return Optional.of(Errors.NOT_LEADER_OR_FOLLOWER);
        } else if (shutdown.get() != null) {
            return Optional.of(Errors.BROKER_NOT_AVAILABLE);
        } else {
            return Optional.empty();
        }
    }

    private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
        ApiKeys apiKey = ApiKeys.forId(request.data.apiKey());
        final CompletableFuture<? extends ApiMessage> responseFuture;

        switch (apiKey) {
            case FETCH:
                responseFuture = handleFetchRequest(request, currentTimeMs);
                break;

            case VOTE:
                responseFuture = completedFuture(handleVoteRequest(request));
                break;

            case BEGIN_QUORUM_EPOCH:
                responseFuture = completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
                break;

            case END_QUORUM_EPOCH:
                responseFuture = completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
                break;

            case DESCRIBE_QUORUM:
                responseFuture = completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
                break;

            case FETCH_SNAPSHOT:
                responseFuture = completedFuture(handleFetchSnapshotRequest(request));
                break;

            default:
                throw new IllegalArgumentException("Unexpected request type " + apiKey);
        }

        responseFuture.whenComplete((response, exception) -> {
            final ApiMessage message;
            if (response != null) {
                message = response;
            } else {
                message = RaftUtil.errorResponse(apiKey, Errors.forException(exception));
            }

            RaftResponse.Outbound responseMessage = new RaftResponse.Outbound(request.correlationId(), message);
            request.completion.complete(responseMessage);
            logger.trace("Sent response {} to inbound request {}", responseMessage, request);
        });
    }

    private void handleInboundMessage(RaftMessage message, long currentTimeMs) {
        logger.trace("Received inbound message {}", message);

        if (message instanceof RaftRequest.Inbound) {
            RaftRequest.Inbound request = (RaftRequest.Inbound) message;
            handleRequest(request, currentTimeMs);
        } else if (message instanceof RaftResponse.Inbound) {
            RaftResponse.Inbound response = (RaftResponse.Inbound) message;
            ConnectionState connection = requestManager.getOrCreate(response.sourceId());
            if (connection.isResponseExpected(response.correlationId)) {
                handleResponse(response, currentTimeMs);
            } else {
                logger.debug("Ignoring response {} since it is no longer needed", response);
            }
        } else {
            throw new IllegalArgumentException("Unexpected message " + message);
        }
    }

    /**
     * Attempt to send a request. Return the time to wait before the request can be retried.
     */
    private long maybeSendRequest(
        long currentTimeMs,
        int destinationId,
        Supplier<ApiMessage> requestSupplier
    )  {
        ConnectionState connection = requestManager.getOrCreate(destinationId);

        if (connection.isBackingOff(currentTimeMs)) {
            long remainingBackoffMs = connection.remainingBackoffMs(currentTimeMs);
            logger.debug("Connection for {} is backing off for {} ms", destinationId, remainingBackoffMs);
            return remainingBackoffMs;
        }

        if (connection.isReady(currentTimeMs)) {
            int correlationId = channel.newCorrelationId();
            ApiMessage request = requestSupplier.get();

            RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
                correlationId,
                request,
                destinationId,
                currentTimeMs
            );

            requestMessage.completion.whenComplete((response, exception) -> {
                if (exception != null) {
                    ApiKeys api = ApiKeys.forId(request.apiKey());
                    Errors error = Errors.forException(exception);
                    ApiMessage errorResponse = RaftUtil.errorResponse(api, error);

                    response = new RaftResponse.Inbound(
                        correlationId,
                        errorResponse,
                        destinationId
                    );
                }

                messageQueue.add(response);
            });

            channel.send(requestMessage);
            logger.trace("Sent outbound request: {}", requestMessage);
            connection.onRequestSent(correlationId, currentTimeMs);
            return Long.MAX_VALUE;
        }

        return connection.remainingRequestTimeMs(currentTimeMs);
    }

    private EndQuorumEpochRequestData buildEndQuorumEpochRequest(
        ResignedState state
    ) {
        return EndQuorumEpochRequest.singletonRequest(
            log.topicPartition(),
            clusterId,
            quorum.epoch(),
            quorum.localIdOrThrow(),
            state.preferredSuccessors()
        );
    }

    private long maybeSendRequests(
        long currentTimeMs,
        Set<Integer> destinationIds,
        Supplier<ApiMessage> requestSupplier
    ) {
        long minBackoffMs = Long.MAX_VALUE;
        for (Integer destinationId : destinationIds) {
            long backoffMs = maybeSendRequest(currentTimeMs, destinationId, requestSupplier);
            if (backoffMs < minBackoffMs) {
                minBackoffMs = backoffMs;
            }
        }
        return minBackoffMs;
    }

    private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest() {
        return BeginQuorumEpochRequest.singletonRequest(
            log.topicPartition(),
            clusterId,
            quorum.epoch(),
            quorum.localIdOrThrow()
        );
    }

    private VoteRequestData buildVoteRequest() {
        OffsetAndEpoch endOffset = endOffset();
        return VoteRequest.singletonRequest(
            log.topicPartition(),
            clusterId,
            quorum.epoch(),
            quorum.localIdOrThrow(),
            endOffset.epoch(),
            endOffset.offset()
        );
    }

    private FetchRequestData buildFetchRequest() {
        FetchRequestData request = RaftUtil.singletonFetchRequest(log.topicPartition(), log.topicId(), fetchPartition -> {
            fetchPartition
                .setCurrentLeaderEpoch(quorum.epoch())
                .setLastFetchedEpoch(log.lastFetchedEpoch())
                .setFetchOffset(log.endOffset().offset);
        });
        return request
            .setMaxBytes(MAX_FETCH_SIZE_BYTES)
            .setMaxWaitMs(fetchMaxWaitMs)
            .setClusterId(clusterId)
            .setReplicaId(quorum.localIdOrSentinel());
    }

    private long maybeSendAnyVoterFetch(long currentTimeMs) {
        OptionalInt readyVoterIdOpt = requestManager.findReadyVoter(currentTimeMs);
        if (readyVoterIdOpt.isPresent()) {
            return maybeSendRequest(
                currentTimeMs,
                readyVoterIdOpt.getAsInt(),
                this::buildFetchRequest
            );
        } else {
            return requestManager.backoffBeforeAvailableVoter(currentTimeMs);
        }
    }

    private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapshotId, long snapshotSize) {
        FetchSnapshotRequestData.SnapshotId requestSnapshotId = new FetchSnapshotRequestData.SnapshotId()
            .setEpoch(snapshotId.epoch())
            .setEndOffset(snapshotId.offset());

        FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
            clusterId,
            log.topicPartition(),
            snapshotPartition -> {
                return snapshotPartition
                    .setCurrentLeaderEpoch(quorum.epoch())
                    .setSnapshotId(requestSnapshotId)
                    .setPosition(snapshotSize);
            }
        );

        return request.setReplicaId(quorum.localIdOrSentinel());
    }

    private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(
        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot
    ) {
        partitionSnapshot.currentLeader()
            .setLeaderEpoch(quorum.epoch())
            .setLeaderId(quorum.leaderIdOrSentinel());

        return partitionSnapshot;
    }

    public boolean isRunning() {
        GracefulShutdown gracefulShutdown = shutdown.get();
        return gracefulShutdown == null || !gracefulShutdown.isFinished();
    }

    public boolean isShuttingDown() {
        GracefulShutdown gracefulShutdown = shutdown.get();
        return gracefulShutdown != null && !gracefulShutdown.isFinished();
    }

    private void appendBatch(
        LeaderState<T> state,
        BatchAccumulator.CompletedBatch<T> batch,
        long appendTimeMs
    ) {
        try {
            int epoch = state.epoch();
            LogAppendInfo info = appendAsLeader(batch.data);
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch);
            CompletableFuture<Long> future = appendPurgatory.await(
                offsetAndEpoch.offset() + 1, Integer.MAX_VALUE);

            future.whenComplete((commitTimeMs, exception) -> {
                if (exception != null) {
                    logger.debug("Failed to commit {} records at {}", batch.numRecords, offsetAndEpoch, exception);
                } else {
                    long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs);
                    double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords;
                    kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs);
                    logger.debug("Completed commit of {} records at {}", batch.numRecords, offsetAndEpoch);
                    batch.records.ifPresent(records -> {
                        maybeFireHandleCommit(batch.baseOffset, epoch, batch.appendTimestamp(), batch.sizeInBytes(), records);
                    });
                }
            });
        } finally {
            batch.release();
        }
    }

    private long maybeAppendBatches(
        LeaderState<T> state,
        long currentTimeMs
    ) {
        long timeUntilDrain = state.accumulator().timeUntilDrain(currentTimeMs);
        if (timeUntilDrain <= 0) {
            List<BatchAccumulator.CompletedBatch<T>> batches = state.accumulator().drain();
            Iterator<BatchAccumulator.CompletedBatch<T>> iterator = batches.iterator();

            try {
                while (iterator.hasNext()) {
                    BatchAccumulator.CompletedBatch<T> batch = iterator.next();
                    appendBatch(state, batch, currentTimeMs);
                }
                flushLeaderLog(state, currentTimeMs);
            } finally {
                // Release and discard any batches which failed to be appended
                while (iterator.hasNext()) {
                    iterator.next().release();
                }
            }
        }
        return timeUntilDrain;
    }

    private long pollResigned(long currentTimeMs) {
        ResignedState state = quorum.resignedStateOrThrow();
        long endQuorumBackoffMs = maybeSendRequests(
            currentTimeMs,
            state.unackedVoters(),
            () -> buildEndQuorumEpochRequest(state)
        );

        GracefulShutdown shutdown = this.shutdown.get();
        final long stateTimeoutMs;
        if (shutdown != null) {
            // If we are shutting down, then we will remain in the resigned state
            // until either the shutdown expires or an election bumps the epoch
            stateTimeoutMs = shutdown.remainingTimeMs();
        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            transitionToCandidate(currentTimeMs);
            stateTimeoutMs = 0L;
        } else {
            stateTimeoutMs = state.remainingElectionTimeMs(currentTimeMs);
        }

        return Math.min(stateTimeoutMs, endQuorumBackoffMs);
    }

    private long pollLeader(long currentTimeMs) {
        LeaderState<T> state = quorum.leaderStateOrThrow();
        maybeFireLeaderChange(state);

        if (shutdown.get() != null || state.isResignRequested()) {
            transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
            return 0L;
        }

        long timeUntilFlush = maybeAppendBatches(
            state,
            currentTimeMs
        );

        long timeUntilSend = maybeSendRequests(
            currentTimeMs,
            state.nonAcknowledgingVoters(),
            this::buildBeginQuorumEpochRequest
        );

        return Math.min(timeUntilFlush, timeUntilSend);
    }

    private long maybeSendVoteRequests(
        CandidateState state,
        long currentTimeMs
    ) {
        // Continue sending Vote requests as long as we still have a chance to win the election
        if (!state.isVoteRejected()) {
            return maybeSendRequests(
                currentTimeMs,
                state.unrecordedVoters(),
                this::buildVoteRequest
            );
        }
        return Long.MAX_VALUE;
    }

    private long pollCandidate(long currentTimeMs) {
        CandidateState state = quorum.candidateStateOrThrow();
        GracefulShutdown shutdown = this.shutdown.get();

        if (shutdown != null) {
            // If we happen to shutdown while we are a candidate, we will continue
            // with the current election until one of the following conditions is met:
            //  1) we are elected as leader (which allows us to resign)
            //  2) another leader is elected
            //  3) the shutdown timer expires
            long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
        } else if (state.isBackingOff()) {
            if (state.isBackoffComplete(currentTimeMs)) {
                logger.info("Re-elect as candidate after election backoff has completed");
                transitionToCandidate(currentTimeMs);
                return 0L;
            }
            return state.remainingBackoffMs(currentTimeMs);
        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
            logger.debug("Election has timed out, backing off for {}ms before becoming a candidate again",
                backoffDurationMs);
            state.startBackingOff(currentTimeMs, backoffDurationMs);
            return backoffDurationMs;
        } else {
            long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
            return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
        }
    }

    private long pollFollower(long currentTimeMs) {
        FollowerState state = quorum.followerStateOrThrow();
        if (quorum.isVoter()) {
            return pollFollowerAsVoter(state, currentTimeMs);
        } else {
            return pollFollowerAsObserver(state, currentTimeMs);
        }
    }

    private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) {
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            // If we are a follower, then we can shutdown immediately. We want to
            // skip the transition to candidate in any case.
            return 0;
        } else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
            logger.info("Become candidate due to fetch timeout");
            transitionToCandidate(currentTimeMs);
            return 0L;
        } else {
            long backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);

            return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
        }
    }

    private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
        if (state.hasFetchTimeoutExpired(currentTimeMs)) {
            return maybeSendAnyVoterFetch(currentTimeMs);
        } else {
            final long backoffMs;

            // If the current leader is backing off due to some failure or if the
            // request has timed out, then we attempt to send the Fetch to another
            // voter in order to discover if there has been a leader change.
            ConnectionState connection = requestManager.getOrCreate(state.leaderId());
            if (connection.hasRequestTimedOut(currentTimeMs)) {
                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
                connection.reset();
            } else if (connection.isBackingOff(currentTimeMs)) {
                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
            } else {
                backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
            }

            return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
        }
    }

    private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) {
        final Supplier<ApiMessage> requestSupplier;

        if (state.fetchingSnapshot().isPresent()) {
            RawSnapshotWriter snapshot = state.fetchingSnapshot().get();
            long snapshotSize = snapshot.sizeInBytes();

            requestSupplier = () -> buildFetchSnapshotRequest(snapshot.snapshotId(), snapshotSize);
        } else {
            requestSupplier = this::buildFetchRequest;
        }

        return maybeSendRequest(currentTimeMs, state.leaderId(), requestSupplier);
    }

    private long pollVoted(long currentTimeMs) {
        VotedState state = quorum.votedStateOrThrow();
        GracefulShutdown shutdown = this.shutdown.get();

        if (shutdown != null) {
            // If shutting down, then remain in this state until either the
            // shutdown completes or an epoch bump forces another state transition
            return shutdown.remainingTimeMs();
        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            transitionToCandidate(currentTimeMs);
            return 0L;
        } else {
            return state.remainingElectionTimeMs(currentTimeMs);
        }
    }

    private long pollUnattached(long currentTimeMs) {
        UnattachedState state = quorum.unattachedStateOrThrow();
        if (quorum.isVoter()) {
            return pollUnattachedAsVoter(state, currentTimeMs);
        } else {
            return pollUnattachedAsObserver(state, currentTimeMs);
        }
    }

    private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) {
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown != null) {
            // If shutting down, then remain in this state until either the
            // shutdown completes or an epoch bump forces another state transition
            return shutdown.remainingTimeMs();
        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
            transitionToCandidate(currentTimeMs);
            return 0L;
        } else {
            return state.remainingElectionTimeMs(currentTimeMs);
        }
    }

    private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs) {
        long fetchBackoffMs = maybeSendAnyVoterFetch(currentTimeMs);
        return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
    }

    private long pollCurrentState(long currentTimeMs) {
        if (quorum.isLeader()) {
            return pollLeader(currentTimeMs);
        } else if (quorum.isCandidate()) {
            return pollCandidate(currentTimeMs);
        } else if (quorum.isFollower()) {
            return pollFollower(currentTimeMs);
        } else if (quorum.isVoted()) {
            return pollVoted(currentTimeMs);
        } else if (quorum.isUnattached()) {
            return pollUnattached(currentTimeMs);
        } else if (quorum.isResigned()) {
            return pollResigned(currentTimeMs);
        } else {
            throw new IllegalStateException("Unexpected quorum state " + quorum);
        }
    }

    private void pollListeners() {
        // Apply all of the pending registration
        while (true) {
            Registration<T> registration = pendingRegistrations.poll();
            if (registration == null) {
                break;
            }

            processRegistration(registration);
        }

        // Check listener progress to see if reads are expected
        quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
            updateListenersProgress(highWatermarkMetadata.offset);
        });
    }

    private void processRegistration(Registration<T> registration) {
        Listener<T> listener = registration.listener();
        Registration.Ops ops = registration.ops();

        if (ops == Registration.Ops.REGISTER) {
            if (listenerContexts.putIfAbsent(listener, new ListenerContext(listener)) != null) {
                logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
            } else {
                logger.info("Registered the listener {}", listenerName(listener));
            }
        } else {
            if (listenerContexts.remove(listener) == null) {
                logger.error("Attempting to remove a listener that doesn't exists: {}", listenerName(listener));
            } else {
                logger.info("Unregistered the listener {}", listenerName(listener));
            }
        }
    }

    private boolean maybeCompleteShutdown(long currentTimeMs) {
        GracefulShutdown shutdown = this.shutdown.get();
        if (shutdown == null) {
            return false;
        }

        shutdown.update(currentTimeMs);
        if (shutdown.hasTimedOut()) {
            shutdown.failWithTimeout();
            return true;
        }

        if (quorum.isObserver()
            || quorum.remoteVoters().isEmpty()
            || quorum.hasRemoteLeader()) {

            shutdown.complete();
            return true;
        }

        return false;
    }

    /**
     * A simple timer based log cleaner
     */
    private static class RaftMetadataLogCleanerManager {
        private final Logger logger;
        private final Timer timer;
        private final long delayMs;
        private final Runnable cleaner;

        RaftMetadataLogCleanerManager(Logger logger, Time time, long delayMs, Runnable cleaner) {
            this.logger = logger;
            this.timer = time.timer(delayMs);
            this.delayMs = delayMs;
            this.cleaner = cleaner;
        }

        public long maybeClean(long currentTimeMs) {
            timer.update(currentTimeMs);
            if (timer.isExpired()) {
                try {
                    cleaner.run();
                } catch (Throwable t) {
                    logger.error("Had an error during log cleaning", t);
                }
                timer.reset(delayMs);
            }
            return timer.remainingMs();
        }
    }

    private void wakeup() {
        messageQueue.wakeup();
    }

    /**
     * Handle an inbound request. The response will be returned through
     * {@link RaftRequest.Inbound#completion}.
     *
     * @param request The inbound request
     */
    public void handle(RaftRequest.Inbound request) {
        messageQueue.add(Objects.requireNonNull(request));
    }

    /**
     * Poll for new events. This allows the client to handle inbound
     * requests and send any needed outbound requests.
     */
    public void poll() {
        pollListeners();

        long currentTimeMs = time.milliseconds();
        if (maybeCompleteShutdown(currentTimeMs)) {
            return;
        }

        long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
        long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
        long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);

        kafkaRaftMetrics.updatePollStart(currentTimeMs);

        RaftMessage message = messageQueue.poll(pollTimeoutMs);

        currentTimeMs = time.milliseconds();
        kafkaRaftMetrics.updatePollEnd(currentTimeMs);

        if (message != null) {
            handleInboundMessage(message, currentTimeMs);
        }
    }

    @Override
    public long scheduleAppend(int epoch, List<T> records) {
        return append(epoch, records, false);
    }

    @Override
    public long scheduleAtomicAppend(int epoch, List<T> records) {
        return append(epoch, records, true);
    }

    private long append(int epoch, List<T> records, boolean isAtomic) {
        LeaderState<T> leaderState = quorum.<T>maybeLeaderState().orElseThrow(
            () -> new NotLeaderException("Append failed because the replication is not the current leader")
        );

        BatchAccumulator<T> accumulator = leaderState.accumulator();
        boolean isFirstAppend = accumulator.isEmpty();
        final long offset;
        if (isAtomic) {
            offset = accumulator.appendAtomic(epoch, records);
        } else {
            offset = accumulator.append(epoch, records);
        }

        // Wakeup the network channel if either this is the first append
        // or the accumulator is ready to drain now. Checking for the first
        // append ensures that we give the IO thread a chance to observe
        // the linger timeout so that it can schedule its own wakeup in case
        // there are no additional appends.
        if (isFirstAppend || accumulator.needsDrain(time.milliseconds())) {
            wakeup();
        }
        return offset;
    }

    @Override
    public CompletableFuture<Void> shutdown(int timeoutMs) {
        logger.info("Beginning graceful shutdown");
        CompletableFuture<Void> shutdownComplete = new CompletableFuture<>();
        shutdown.set(new GracefulShutdown(timeoutMs, shutdownComplete));
        wakeup();
        return shutdownComplete;
    }

    @Override
    public void resign(int epoch) {
        if (epoch < 0) {
            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
        }

        if (!quorum.isVoter()) {
            throw new IllegalStateException("Attempt to resign by a non-voter");
        }

        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
        int currentEpoch = leaderAndEpoch.epoch();

        if (epoch > currentEpoch) {
            throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
                " which is larger than the current epoch " + currentEpoch);
        } else if (epoch < currentEpoch) {
            // If the passed epoch is smaller than the current epoch, then it might mean
            // that the listener has not been notified about a leader change that already
            // took place. In this case, we consider the call as already fulfilled and
            // take no further action.
            logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
                "current epoch {}", epoch, currentEpoch);
            return;
        } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {
            throw new IllegalArgumentException("Cannot resign from epoch " + epoch +
                " since we are not the leader");
        } else {
            // Note that if we transition to another state before we have a chance to
            // request resignation, then we consider the call fulfilled.
            Optional<LeaderState<Object>> leaderStateOpt = quorum.maybeLeaderState();
            if (!leaderStateOpt.isPresent()) {
                logger.debug("Ignoring call to resign from epoch {} since this node is " +
                    "no longer the leader", epoch);
                return;
            }

            LeaderState<Object> leaderState = leaderStateOpt.get();
            if (leaderState.epoch() != epoch) {
                logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
                    "current epoch {}", epoch, leaderState.epoch());
            } else {
                logger.info("Received user request to resign from the current epoch {}", currentEpoch);
                leaderState.requestResign();
                wakeup();
            }
        }
    }

    @Override
    public Optional<SnapshotWriter<T>> createSnapshot(
        long committedOffset,
        int committedEpoch,
        long lastContainedLogTime
    ) {
        return RecordsSnapshotWriter.createWithHeader(
                () -> log.createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, committedEpoch)),
                MAX_BATCH_SIZE_BYTES,
                memoryPool,
                time,
                lastContainedLogTime,
                CompressionType.NONE,
                serde
            );
    }

    @Override
    public void close() {
        log.flush(true);
        if (kafkaRaftMetrics != null) {
            kafkaRaftMetrics.close();
        }
    }

    QuorumState quorum() {
        return quorum;
    }

    public OptionalLong highWatermark() {
        if (quorum.highWatermark().isPresent()) {
            return OptionalLong.of(quorum.highWatermark().get().offset);
        } else {
            return OptionalLong.empty();
        }
    }

    private class GracefulShutdown {
        final Timer finishTimer;
        final CompletableFuture<Void> completeFuture;

        public GracefulShutdown(long shutdownTimeoutMs,
                                CompletableFuture<Void> completeFuture) {
            this.finishTimer = time.timer(shutdownTimeoutMs);
            this.completeFuture = completeFuture;
        }

        public void update(long currentTimeMs) {
            finishTimer.update(currentTimeMs);
        }

        public boolean hasTimedOut() {
            return finishTimer.isExpired();
        }

        public boolean isFinished() {
            return completeFuture.isDone();
        }

        public long remainingTimeMs() {
            return finishTimer.remainingMs();
        }

        public void failWithTimeout() {
            logger.warn("Graceful shutdown timed out after {}ms", finishTimer.timeoutMs());
            completeFuture.completeExceptionally(
                new TimeoutException("Timeout expired before graceful shutdown completed"));
        }

        public void complete() {
            logger.info("Graceful shutdown completed");
            completeFuture.complete(null);
        }
    }

    private static final class Registration<T> {
        private final Ops ops;
        private final Listener<T> listener;

        private Registration(Ops ops, Listener<T> listener) {
            this.ops = ops;
            this.listener = listener;
        }

        private Ops ops() {
            return ops;
        }

        private Listener<T> listener() {
            return listener;
        }

        private enum Ops {
            REGISTER, UNREGISTER
        }

        private static <T> Registration<T> register(Listener<T> listener) {
            return new Registration<>(Ops.REGISTER, listener);
        }

        private static <T> Registration<T> unregister(Listener<T> listener) {
            return new Registration<>(Ops.UNREGISTER, listener);
        }
    }

    private final class ListenerContext implements CloseListener<BatchReader<T>> {
        private final RaftClient.Listener<T> listener;
        // This field is used only by the Raft IO thread
        private LeaderAndEpoch lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN;

        // These fields are visible to both the Raft IO thread and the listener
        // and are protected through synchronization on this ListenerContext instance
        private BatchReader<T> lastSent = null;
        private long nextOffset = 0;

        private ListenerContext(Listener<T> listener) {
            this.listener = listener;
        }

        /**
         * Get the last acked offset, which is one greater than the offset of the
         * last record which was acked by the state machine.
         */
        private synchronized long nextOffset() {
            return nextOffset;
        }

        /**
         * Get the next expected offset, which might be larger than the last acked
         * offset if there are inflight batches which have not been acked yet.
         * Note that when fetching from disk, we may not know the last offset of
         * inflight data until it has been processed by the state machine. In this case,
         * we delay sending additional data until the state machine has read to the
         * end and the last offset is determined.
         */
        private synchronized OptionalLong nextExpectedOffset() {
            if (lastSent != null) {
                OptionalLong lastSentOffset = lastSent.lastOffset();
                if (lastSentOffset.isPresent()) {
                    return OptionalLong.of(lastSentOffset.getAsLong() + 1);
                } else {
                    return OptionalLong.empty();
                }
            } else {
                return OptionalLong.of(nextOffset);
            }
        }

        /**
         * This API is used when the Listener needs to be notified of a new snapshot. This happens
         * when the context's next offset is less than the log start offset.
         */
        private void fireHandleSnapshot(SnapshotReader<T> reader) {
            synchronized (this) {
                nextOffset = reader.snapshotId().offset();
                lastSent = null;
            }

            logger.debug("Notifying listener {} of snapshot {}", listenerName(), reader.snapshotId());
            listener.handleSnapshot(reader);
        }

        /**
         * This API is used for committed records that have been received through
         * replication. In general, followers will write new data to disk before they
         * know whether it has been committed. Rather than retaining the uncommitted
         * data in memory, we let the state machine read the records from disk.
         */
        private void fireHandleCommit(long baseOffset, Records records) {
            fireHandleCommit(
                RecordsBatchReader.of(
                    baseOffset,
                    records,
                    serde,
                    BufferSupplier.create(),
                    MAX_BATCH_SIZE_BYTES,
                    this,
                    true /* Validate batch CRC*/
                )
            );
        }

        /**
         * This API is used for committed records originating from {@link #scheduleAppend(int, List)}
         * or {@link #scheduleAtomicAppend(int, List)} on this instance. In this case, we are able to
         * save the original record objects, which saves the need to read them back from disk. This is
         * a nice optimization for the leader which is typically doing more work than all of the
         * followers.
         */
        private void fireHandleCommit(
            long baseOffset,
            int epoch,
            long appendTimestamp,
            int sizeInBytes,
            List<T> records
        ) {
            Batch<T> batch = Batch.data(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
            MemoryBatchReader<T> reader = MemoryBatchReader.of(Collections.singletonList(batch), this);
            fireHandleCommit(reader);
        }

        private String listenerName() {
            return KafkaRaftClient.listenerName(listener);
        }

        private void fireHandleCommit(BatchReader<T> reader) {
            synchronized (this) {
                this.lastSent = reader;
            }
            logger.debug(
                "Notifying listener {} of batch for baseOffset {} and lastOffset {}",
                listenerName(),
                reader.baseOffset(),
                reader.lastOffset()
            );
            listener.handleCommit(reader);
        }

        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (shouldFireLeaderChange(leaderAndEpoch)) {
                lastFiredLeaderChange = leaderAndEpoch;
                logger.debug("Notifying listener {} of leader change {}", listenerName(), leaderAndEpoch);
                listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
                return false;
            } else if (leaderAndEpoch.epoch() > lastFiredLeaderChange.epoch()) {
                return true;
            } else {
                return leaderAndEpoch.leaderId().isPresent() &&
                    !lastFiredLeaderChange.leaderId().isPresent();
            }
        }

        private void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
            // If this node is becoming the leader, then we can fire `handleClaim` as soon
            // as the listener has caught up to the start of the leader epoch. This guarantees
            // that the state machine has seen the full committed state before it becomes
            // leader and begins writing to the log.
            if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) {
                lastFiredLeaderChange = leaderAndEpoch;
                listener.handleLeaderChange(leaderAndEpoch);
            }
        }

        public synchronized void onClose(BatchReader<T> reader) {
            OptionalLong lastOffset = reader.lastOffset();

            if (lastOffset.isPresent()) {
                nextOffset = lastOffset.getAsLong() + 1;
            }

            if (lastSent == reader) {
                lastSent = null;
                wakeup();
            }
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka Batch 源码

kafka BatchReader 源码

kafka CandidateState 源码

kafka ElectionState 源码

kafka EpochState 源码

kafka ExpirationService 源码

kafka FileBasedStateStore 源码

kafka FollowerState 源码

kafka Isolation 源码

kafka LeaderAndEpoch 源码

0  赞