kafka LeaderState 源码
kafka LeaderState 代码
文件路径:/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* In the context of LeaderState, an acknowledged voter means one who has acknowledged the current leader by either
* responding to a `BeginQuorumEpoch` request from the leader or by beginning to send `Fetch` requests.
* More specifically, the set of unacknowledged voters are targets for BeginQuorumEpoch requests from the leader until
* they acknowledge the leader.
*/
public class LeaderState<T> implements EpochState {
static final long OBSERVER_SESSION_TIMEOUT_MS = 300_000L;
private final int localId;
private final int epoch;
private final long epochStartOffset;
private Optional<LogOffsetMetadata> highWatermark;
private final Map<Integer, ReplicaState> voterStates = new HashMap<>();
private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
private final Set<Integer> grantingVoters = new HashSet<>();
private final Logger log;
private final BatchAccumulator<T> accumulator;
// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;
protected LeaderState(
int localId,
int epoch,
long epochStartOffset,
Set<Integer> voters,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
LogContext logContext
) {
this.localId = localId;
this.epoch = epoch;
this.epochStartOffset = epochStartOffset;
this.highWatermark = Optional.empty();
for (int voterId : voters) {
boolean hasAcknowledgedLeader = voterId == localId;
this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader));
}
this.grantingVoters.addAll(grantingVoters);
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
}
public BatchAccumulator<T> accumulator() {
return this.accumulator;
}
private static List<Voter> convertToVoters(Set<Integer> voterIds) {
return voterIds.stream()
.map(follower -> new Voter().setVoterId(follower))
.collect(Collectors.toList());
}
public void appendLeaderChangeMessage(long currentTimeMs) {
List<Voter> voters = convertToVoters(voterStates.keySet());
List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
.setVersion(ControlRecordUtils.LEADER_CHANGE_CURRENT_VERSION)
.setLeaderId(this.election().leaderId())
.setVoters(voters)
.setGrantingVoters(grantingVoters);
accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs);
accumulator.forceDrain();
}
public boolean isResignRequested() {
return resignRequested;
}
public void requestResign() {
this.resignRequested = true;
}
@Override
public Optional<LogOffsetMetadata> highWatermark() {
return highWatermark;
}
@Override
public ElectionState election() {
return ElectionState.withElectedLeader(epoch, localId, voterStates.keySet());
}
@Override
public int epoch() {
return epoch;
}
public Set<Integer> grantingVoters() {
return this.grantingVoters;
}
public int localId() {
return localId;
}
public Set<Integer> nonAcknowledgingVoters() {
Set<Integer> nonAcknowledging = new HashSet<>();
for (ReplicaState state : voterStates.values()) {
if (!state.hasAcknowledgedLeader)
nonAcknowledging.add(state.nodeId);
}
return nonAcknowledging;
}
private boolean maybeUpdateHighWatermark() {
// Find the largest offset which is replicated to a majority of replicas (the leader counts)
List<ReplicaState> followersByDescendingFetchOffset = followersByDescendingFetchOffset();
int indexOfHw = voterStates.size() / 2;
Optional<LogOffsetMetadata> highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset;
if (highWatermarkUpdateOpt.isPresent()) {
// The KRaft protocol requires an extra condition on commitment after a leader
// election. The leader must commit one record from its own epoch before it is
// allowed to expose records from any previous epoch. This guarantees that its
// log will contain the largest record (in terms of epoch/offset) in any log
// which ensures that any future leader will have replicated this record as well
// as all records from previous epochs that the current leader has committed.
LogOffsetMetadata highWatermarkUpdateMetadata = highWatermarkUpdateOpt.get();
long highWatermarkUpdateOffset = highWatermarkUpdateMetadata.offset;
if (highWatermarkUpdateOffset > epochStartOffset) {
if (highWatermark.isPresent()) {
LogOffsetMetadata currentHighWatermarkMetadata = highWatermark.get();
if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset
|| (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset &&
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) {
highWatermark = highWatermarkUpdateOpt;
logHighWatermarkUpdate(
highWatermarkUpdateMetadata,
indexOfHw,
followersByDescendingFetchOffset
);
return true;
} else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) {
log.error("The latest computed high watermark {} is smaller than the current " +
"value {}, which suggests that one of the voters has lost committed data. " +
"Full voter replication state: {}", highWatermarkUpdateOffset,
currentHighWatermarkMetadata.offset, voterStates.values());
return false;
} else {
return false;
}
} else {
highWatermark = highWatermarkUpdateOpt;
logHighWatermarkUpdate(
highWatermarkUpdateMetadata,
indexOfHw,
followersByDescendingFetchOffset
);
return true;
}
}
}
return false;
}
private void logHighWatermarkUpdate(
LogOffsetMetadata newHighWatermark,
int indexOfHw,
List<ReplicaState> followersByDescendingFetchOffset
) {
log.trace(
"High watermark set to {} based on indexOfHw {} and voters {}",
newHighWatermark,
indexOfHw,
followersByDescendingFetchOffset
);
}
/**
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata
) {
ReplicaState state = getOrCreateReplicaState(localId);
state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset > endOffsetMetadata.offset) {
throw new IllegalStateException("Detected non-monotonic update of local " +
"end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset);
}
});
state.updateLeaderState(endOffsetMetadata);
return maybeUpdateHighWatermark();
}
/**
* Update the replica state in terms of fetch time and log end offsets.
*
* @param replicaId replica id
* @param currentTimeMs current time in milliseconds
* @param fetchOffsetMetadata new log offset and metadata
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateReplicaState(
int replicaId,
long currentTimeMs,
LogOffsetMetadata fetchOffsetMetadata
) {
// Ignore fetches from negative replica id, as it indicates
// the fetch is from non-replica. For example, a consumer.
if (replicaId < 0) {
return false;
} else if (replicaId == localId) {
throw new IllegalStateException("Remote replica ID " + replicaId + " matches the local leader ID");
}
ReplicaState state = getOrCreateReplicaState(replicaId);
state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset > fetchOffsetMetadata.offset) {
log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}",
state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset);
}
});
Optional<LogOffsetMetadata> leaderEndOffsetOpt =
voterStates.get(localId).endOffset;
state.updateFollowerState(
currentTimeMs,
fetchOffsetMetadata,
leaderEndOffsetOpt
);
return isVoter(state.nodeId) && maybeUpdateHighWatermark();
}
public List<Integer> nonLeaderVotersByDescendingFetchOffset() {
return followersByDescendingFetchOffset().stream()
.filter(state -> state.nodeId != localId)
.map(state -> state.nodeId)
.collect(Collectors.toList());
}
private List<ReplicaState> followersByDescendingFetchOffset() {
return new ArrayList<>(this.voterStates.values()).stream()
.sorted()
.collect(Collectors.toList());
}
public void addAcknowledgementFrom(int remoteNodeId) {
ReplicaState voterState = ensureValidVoter(remoteNodeId);
voterState.hasAcknowledgedLeader = true;
}
private ReplicaState ensureValidVoter(int remoteNodeId) {
ReplicaState state = voterStates.get(remoteNodeId);
if (state == null)
throw new IllegalArgumentException("Unexpected acknowledgement from non-voter " + remoteNodeId);
return state;
}
public long epochStartOffset() {
return epochStartOffset;
}
private ReplicaState getOrCreateReplicaState(int remoteNodeId) {
ReplicaState state = voterStates.get(remoteNodeId);
if (state == null) {
observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false));
return observerStates.get(remoteNodeId);
}
return state;
}
public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) {
clearInactiveObservers(currentTimeMs);
return new DescribeQuorumResponseData.PartitionData()
.setErrorCode(Errors.NONE.code())
.setLeaderId(localId)
.setLeaderEpoch(epoch)
.setHighWatermark(highWatermark().map(offsetMetadata -> offsetMetadata.offset).orElse(-1L))
.setCurrentVoters(describeReplicaStates(voterStates, currentTimeMs))
.setObservers(describeReplicaStates(observerStates, currentTimeMs));
}
private List<DescribeQuorumResponseData.ReplicaState> describeReplicaStates(
Map<Integer, ReplicaState> state,
long currentTimeMs
) {
return state.values().stream()
.map(replicaState -> describeReplicaState(replicaState, currentTimeMs))
.collect(Collectors.toList());
}
private DescribeQuorumResponseData.ReplicaState describeReplicaState(
ReplicaState replicaState,
long currentTimeMs
) {
final long lastCaughtUpTimestamp;
final long lastFetchTimestamp;
if (replicaState.nodeId == localId) {
lastCaughtUpTimestamp = currentTimeMs;
lastFetchTimestamp = currentTimeMs;
} else {
lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp;
lastFetchTimestamp = replicaState.lastFetchTimestamp;
}
return new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(replicaState.nodeId)
.setLogEndOffset(replicaState.endOffset.map(md -> md.offset).orElse(-1L))
.setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
.setLastFetchTimestamp(lastFetchTimestamp);
}
private void clearInactiveObservers(final long currentTimeMs) {
observerStates.entrySet().removeIf(integerReplicaStateEntry ->
currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS
);
}
private boolean isVoter(int remoteNodeId) {
return voterStates.containsKey(remoteNodeId);
}
private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
long lastFetchTimestamp;
long lastFetchLeaderLogEndOffset;
long lastCaughtUpTimestamp;
boolean hasAcknowledgedLeader;
public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.nodeId = nodeId;
this.endOffset = Optional.empty();
this.lastFetchTimestamp = -1;
this.lastFetchLeaderLogEndOffset = -1;
this.lastCaughtUpTimestamp = -1;
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}
void updateLeaderState(
LogOffsetMetadata endOffsetMetadata
) {
// For the leader, we only update the end offset. The remaining fields
// (such as the caught up time) are determined implicitly.
this.endOffset = Optional.of(endOffsetMetadata);
}
void updateFollowerState(
long currentTimeMs,
LogOffsetMetadata fetchOffsetMetadata,
Optional<LogOffsetMetadata> leaderEndOffsetOpt
) {
// Update the `lastCaughtUpTimestamp` before we update the `lastFetchTimestamp`.
// This allows us to use the previous value for `lastFetchTimestamp` if the
// follower was able to catch up to `lastFetchLeaderLogEndOffset` on this fetch.
leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs);
} else if (lastFetchLeaderLogEndOffset > 0
&& fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) {
lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp);
}
lastFetchLeaderLogEndOffset = leaderEndOffset.offset;
});
lastFetchTimestamp = Math.max(lastFetchTimestamp, currentTimeMs);
endOffset = Optional.of(fetchOffsetMetadata);
hasAcknowledgedLeader = true;
}
@Override
public int compareTo(ReplicaState that) {
if (this.endOffset.equals(that.endOffset))
return Integer.compare(this.nodeId, that.nodeId);
else if (!this.endOffset.isPresent())
return 1;
else if (!that.endOffset.isPresent())
return -1;
else
return Long.compare(that.endOffset.get().offset, this.endOffset.get().offset);
}
@Override
public String toString() {
return String.format(
"ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, " +
"lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)",
nodeId,
endOffset,
lastFetchTimestamp,
lastCaughtUpTimestamp,
hasAcknowledgedLeader
);
}
}
@Override
public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
log.debug("Rejecting vote request from candidate {} since we are already leader in epoch {}",
candidateId, epoch);
return false;
}
@Override
public String toString() {
return String.format(
"Leader(localId=%d, epoch=%d, epochStartOffset=%d, highWatermark=%s, voterStates=%s)",
localId,
epoch,
epochStartOffset,
highWatermark,
voterStates
);
}
@Override
public String name() {
return "Leader";
}
@Override
public void close() {
accumulator.close();
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦