kafka PartitionReassignmentRevert 源码
kafka PartitionReassignmentRevert 代码
文件路径:/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import java.util.ArrayList;
import java.util.Set;
import java.util.List;
import java.util.Objects;
class PartitionReassignmentRevert {
private final List<Integer> replicas;
private final List<Integer> isr;
private final boolean unclean;
PartitionReassignmentRevert(PartitionRegistration registration) {
// Figure out the replica list and ISR that we will have after reverting the
// reassignment. In general, we want to take out any replica that the reassignment
// was adding, but keep the ones the reassignment was removing. (But see the
// special case below.)
Set<Integer> adding = Replicas.toSet(registration.addingReplicas);
this.replicas = new ArrayList<>(registration.replicas.length);
this.isr = new ArrayList<>(registration.isr.length);
for (int i = 0; i < registration.isr.length; i++) {
int replica = registration.isr[i];
if (!adding.contains(replica)) {
this.isr.add(replica);
}
}
for (int replica : registration.replicas) {
if (!adding.contains(replica)) {
this.replicas.add(replica);
}
}
if (isr.isEmpty()) {
// In the special case that all the replicas that are in the ISR are also
// contained in addingReplicas, we choose the first remaining replica and add
// it to the ISR. This is considered an unclean leader election. Therefore,
// calling code must check that unclean leader election is enabled before
// accepting the new ISR.
if (this.replicas.isEmpty()) {
// This should not be reachable, since it would require a partition
// starting with an empty replica set prior to the reassignment we are
// trying to revert.
throw new InvalidReplicaAssignmentException("Invalid replica " +
"assignment: addingReplicas contains all replicas.");
}
isr.add(replicas.get(0));
this.unclean = true;
} else {
this.unclean = false;
}
}
List<Integer> replicas() {
return replicas;
}
List<Integer> isr() {
return isr;
}
boolean unclean() {
return unclean;
}
@Override
public int hashCode() {
return Objects.hash(replicas, isr);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof PartitionReassignmentRevert)) return false;
PartitionReassignmentRevert other = (PartitionReassignmentRevert) o;
return replicas.equals(other.replicas) &&
isr.equals(other.isr);
}
@Override
public String toString() {
return "PartitionReassignmentRevert(" +
"replicas=" + replicas + ", " +
"isr=" + isr + ")";
}
}
相关信息
相关文章
kafka BrokerHeartbeatManager 源码
kafka ClientQuotaControlManager 源码
kafka ClusterControlManager 源码
kafka ConfigurationControlManager 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦