kafka ReplicationControlManager 源码

  • 2022-10-20
  • 浏览 (177)

kafka ReplicationControlManager 代码

文件路径:/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.controller;

import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfigCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.kafka.timeline.TimelineInteger;
import org.slf4j.Logger;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map.Entry;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;


/**
 * The ReplicationControlManager is the part of the controller which deals with topics
 * and partitions. It is responsible for managing the in-sync replica set and leader
 * of each partition, as well as administrative tasks like creating or deleting topics.
 */
public class ReplicationControlManager {
    static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;

    static class Builder {
        private SnapshotRegistry snapshotRegistry = null;
        private LogContext logContext = null;
        private short defaultReplicationFactor = (short) 3;
        private int defaultNumPartitions = 1;
        private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
        private ConfigurationControlManager configurationControl = null;
        private ClusterControlManager clusterControl = null;
        private ControllerMetrics controllerMetrics = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private FeatureControlManager featureControl = null;

        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
            this.maxElectionsPerImbalance = maxElectionsPerImbalance;
            return this;
        }

        Builder setConfigurationControl(ConfigurationControlManager configurationControl) {
            this.configurationControl = configurationControl;
            return this;
        }

        Builder setClusterControl(ClusterControlManager clusterControl) {
            this.clusterControl = clusterControl;
            return this;
        }

        Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
            this.controllerMetrics = controllerMetrics;
            return this;
        }

        Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
            this.createTopicPolicy = createTopicPolicy;
            return this;
        }

        public Builder setFeatureControl(FeatureControlManager featureControl) {
            this.featureControl = featureControl;
            return this;
        }

        ReplicationControlManager build() {
            if (configurationControl == null) {
                throw new IllegalStateException("Configuration control must be set before building");
            } else if (clusterControl == null) {
                throw new IllegalStateException("Cluster controller must be set before building");
            } else if (controllerMetrics == null) {
                throw new IllegalStateException("Metrics must be set before building");
            }
            if (logContext == null) logContext = new LogContext();
            if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
            if (featureControl == null) {
                featureControl = new FeatureControlManager.Builder().
                    setLogContext(logContext).
                    setSnapshotRegistry(snapshotRegistry).
                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
                        QuorumFeatures.defaultFeatureMap(),
                        Collections.singletonList(0))).
                    setMetadataVersion(MetadataVersion.latest()).
                    build();
            }
            return new ReplicationControlManager(snapshotRegistry,
                logContext,
                defaultReplicationFactor,
                defaultNumPartitions,
                maxElectionsPerImbalance,
                configurationControl,
                clusterControl,
                controllerMetrics,
                createTopicPolicy,
                featureControl);
        }
    }

    class KRaftClusterDescriber implements ClusterDescriber {
        @Override
        public Iterator<UsableBroker> usableBrokers() {
            return clusterControl.usableBrokers();
        }
    }

    static class TopicControlInfo {
        private final String name;
        private final Uuid id;
        private final TimelineHashMap<Integer, PartitionRegistration> parts;

        TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
            this.name = name;
            this.id = id;
            this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
        }

        public String name() {
            return name;
        }

        public Uuid topicId() {
            return id;
        }
    }

    /**
     * Translate a CreateableTopicConfigCollection to a map from string to string.
     */
    static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollection collection) {
        HashMap<String, String> result = new HashMap<>();
        collection.forEach(config -> result.put(config.name(), config.value()));
        return Collections.unmodifiableMap(result);
    }

    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;

    /**
     * The KIP-464 default replication factor that is used if a CreateTopics request does
     * not specify one.
     */
    private final short defaultReplicationFactor;

    /**
     * The KIP-464 default number of partitions that is used if a CreateTopics request does
     * not specify a number of partitions.
     */
    private final int defaultNumPartitions;

    /**
     * Maximum number of leader elections to perform during one partition leader balancing operation.
     */
    private final int maxElectionsPerImbalance;

    /**
     * A count of the total number of partitions in the cluster.
     */
    private final TimelineInteger globalPartitionCount;

    /**
     * A reference to the controller's configuration control manager.
     */
    private final ConfigurationControlManager configurationControl;

    /**
     * A reference to the controller's cluster control manager.
     */
    private final ClusterControlManager clusterControl;

    /**
     * A reference to the controller's metrics registry.
     */
    private final ControllerMetrics controllerMetrics;

    /**
     * The policy to use to validate that topic assignments are valid, if one is present.
     */
    private final Optional<CreateTopicPolicy> createTopicPolicy;

    /**
     * The feature control manager.
     */
    private final FeatureControlManager featureControl;

    /**
     * Maps topic names to topic UUIDs.
     */
    private final TimelineHashMap<String, Uuid> topicsByName;

    /**
     * We try to prevent topics from being created if their names would collide with
     * existing topics when periods in the topic name are replaced with underscores.
     * The reason for this is that some per-topic metrics do replace periods with
     * underscores, and would therefore be ambiguous otherwise.
     *
     * This map is from normalized topic name to a set of topic names. So if we had two
     * topics named foo.bar and foo_bar this map would contain
     * a mapping from foo_bar to a set containing foo.bar and foo_bar.
     *
     * Since we reject topic creations that would collide, under normal conditions the
     * sets in this map should only have a size of 1. However, if the cluster was
     * upgraded from a version prior to KAFKA-13743, it may be possible to have more
     * values here, since collidiing topic names will be "grandfathered in."
     */
    private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;

    /**
     * Maps topic UUIDs to structures containing topic information, including partitions.
     */
    private final TimelineHashMap<Uuid, TopicControlInfo> topics;

    /**
     * A map of broker IDs to the partitions that the broker is in the ISR for.
     */
    private final BrokersToIsrs brokersToIsrs;

    /**
     * A map from topic IDs to the partitions in the topic which are reassigning.
     */
    private final TimelineHashMap<Uuid, int[]> reassigningTopics;

    /**
     * The set of topic partitions for which the leader is not the preferred leader.
     */
    private final TimelineHashSet<TopicIdPartition> imbalancedPartitions;

    /**
     * A ClusterDescriber which supplies cluster information to our ReplicaPlacer.
     */
    final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();

    private ReplicationControlManager(
        SnapshotRegistry snapshotRegistry,
        LogContext logContext,
        short defaultReplicationFactor,
        int defaultNumPartitions,
        int maxElectionsPerImbalance,
        ConfigurationControlManager configurationControl,
        ClusterControlManager clusterControl,
        ControllerMetrics controllerMetrics,
        Optional<CreateTopicPolicy> createTopicPolicy,
        FeatureControlManager featureControl
    ) {
        this.snapshotRegistry = snapshotRegistry;
        this.log = logContext.logger(ReplicationControlManager.class);
        this.defaultReplicationFactor = defaultReplicationFactor;
        this.defaultNumPartitions = defaultNumPartitions;
        this.maxElectionsPerImbalance = maxElectionsPerImbalance;
        this.configurationControl = configurationControl;
        this.controllerMetrics = controllerMetrics;
        this.createTopicPolicy = createTopicPolicy;
        this.featureControl = featureControl;
        this.clusterControl = clusterControl;
        this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
        this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
        this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
        this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
        this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
        this.imbalancedPartitions = new TimelineHashSet<>(snapshotRegistry, 0);
    }

    public void replay(TopicRecord record) {
        topicsByName.put(record.name(), record.topicId());
        if (Topic.hasCollisionChars(record.name())) {
            String normalizedName = Topic.unifyCollisionChars(record.name());
            TimelineHashSet<String> topicNames = topicsWithCollisionChars.get(normalizedName);
            if (topicNames == null) {
                topicNames = new TimelineHashSet<>(snapshotRegistry, 1);
                topicsWithCollisionChars.put(normalizedName, topicNames);
            }
            topicNames.add(record.name());
        }
        topics.put(record.topicId(),
            new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
        controllerMetrics.setGlobalTopicsCount(topics.size());
        log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
    }

    public void replay(PartitionRecord record) {
        TopicControlInfo topicInfo = topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() +
                ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration newPartInfo = new PartitionRegistration(record);
        PartitionRegistration prevPartInfo = topicInfo.parts.get(record.partitionId());
        String description = topicInfo.name + "-" + record.partitionId() +
            " with topic ID " + record.topicId();
        if (prevPartInfo == null) {
            log.info("Created partition {} and {}.", description, newPartInfo);
            topicInfo.parts.put(record.partitionId(), newPartInfo);
            brokersToIsrs.update(record.topicId(), record.partitionId(), null,
                newPartInfo.isr, NO_LEADER, newPartInfo.leader);
            globalPartitionCount.increment();
            controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
            updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
                    false,  newPartInfo.isReassigning());
        } else if (!newPartInfo.equals(prevPartInfo)) {
            newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
            topicInfo.parts.put(record.partitionId(), newPartInfo);
            brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
                newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
            updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
                    prevPartInfo.isReassigning(), newPartInfo.isReassigning());
        }

        if (newPartInfo.hasPreferredLeader()) {
            imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
        }

        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
    }

    private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
                                                 boolean wasReassigning, boolean isReassigning) {
        if (!wasReassigning) {
            if (isReassigning) {
                int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
                reassigningTopics.put(topicId, Replicas.copyWith(prevReassigningParts, partitionId));
            }
        } else if (!isReassigning) {
            int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
            int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
            if (newReassigningParts.length == 0) {
                reassigningTopics.remove(topicId);
            } else {
                reassigningTopics.put(topicId, newReassigningParts);
            }
        }
    }

    public void replay(PartitionChangeRecord record) {
        TopicControlInfo topicInfo = topics.get(record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() +
                ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration prevPartitionInfo = topicInfo.parts.get(record.partitionId());
        if (prevPartitionInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() +
                ":" + record.partitionId() + ", but no partition with that id was found.");
        }
        PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
        updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
                prevPartitionInfo.isReassigning(), newPartitionInfo.isReassigning());
        topicInfo.parts.put(record.partitionId(), newPartitionInfo);
        brokersToIsrs.update(record.topicId(), record.partitionId(),
            prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
            newPartitionInfo.leader);
        String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
            record.topicId();
        newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);

        if (newPartitionInfo.hasPreferredLeader()) {
            imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
        }

        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());

        if (record.removingReplicas() != null || record.addingReplicas() != null) {
            log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
        } else if (log.isTraceEnabled()) {
            log.trace("Replayed partition change {} for topic {}", record, topicInfo.name);
        }
    }

    public void replay(RemoveTopicRecord record) {
        // Remove this topic from the topics map and the topicsByName map.
        TopicControlInfo topic = topics.remove(record.topicId());
        if (topic == null) {
            throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() +
                " to remove.");
        }
        topicsByName.remove(topic.name);
        if (Topic.hasCollisionChars(topic.name)) {
            String normalizedName = Topic.unifyCollisionChars(topic.name);
            TimelineHashSet<String> colliding = topicsWithCollisionChars.get(normalizedName);
            if (colliding != null) {
                colliding.remove(topic.name);
                if (colliding.isEmpty()) {
                    topicsWithCollisionChars.remove(topic.name);
                }
            }
        }
        reassigningTopics.remove(record.topicId());

        // Delete the configurations associated with this topic.
        configurationControl.deleteTopicConfigs(topic.name);

        for (Map.Entry<Integer, PartitionRegistration> entry : topic.parts.entrySet()) {
            int partitionId = entry.getKey();
            PartitionRegistration partition = entry.getValue();

            // Remove the entries for this topic in brokersToIsrs.
            for (int i = 0; i < partition.isr.length; i++) {
                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
            }

            imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), partitionId));

            globalPartitionCount.decrement();
        }
        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

        controllerMetrics.setGlobalTopicsCount(topics.size());
        controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
        log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
    }

    ControllerResult<CreateTopicsResponseData>
            createTopics(CreateTopicsRequestData request, Set<String> describable) {
        Map<String, ApiError> topicErrors = new HashMap<>();
        List<ApiMessageAndVersion> records = new ArrayList<>();

        // Check the topic names.
        validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);

        // Identify topics that already exist and mark them with the appropriate error
        request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name()))
                .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS,
                    "Topic '" + t.name() + "' already exists.")));

        // Verify that the configurations for the new topics are OK, and figure out what
        // ConfigRecords should be created.
        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
            computeConfigChanges(topicErrors, request.topics());
        ControllerResult<Map<ConfigResource, ApiError>> configResult =
            configurationControl.incrementalAlterConfigs(configChanges, true);
        for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) {
            if (entry.getValue().isFailure()) {
                topicErrors.put(entry.getKey().name(), entry.getValue());
            }
        }
        records.addAll(configResult.records());

        // Try to create whatever topics are needed.
        Map<String, CreatableTopicResult> successes = new HashMap<>();
        for (CreatableTopic topic : request.topics()) {
            if (topicErrors.containsKey(topic.name())) continue;
            ApiError error;
            try {
                error = createTopic(topic, records, successes, describable.contains(topic.name()));
            } catch (ApiException e) {
                error = ApiError.fromThrowable(e);
            }
            if (error.isFailure()) {
                topicErrors.put(topic.name(), error);
            }
        }

        // Create responses for all topics.
        CreateTopicsResponseData data = new CreateTopicsResponseData();
        StringBuilder resultsBuilder = new StringBuilder();
        String resultsPrefix = "";
        for (CreatableTopic topic : request.topics()) {
            ApiError error = topicErrors.get(topic.name());
            if (error != null) {
                data.topics().add(new CreatableTopicResult().
                    setName(topic.name()).
                    setErrorCode(error.error().code()).
                    setErrorMessage(error.message()));
                resultsBuilder.append(resultsPrefix).append(topic).append(": ").
                    append(error.error()).append(" (").append(error.message()).append(")");
                resultsPrefix = ", ";
                continue;
            }
            CreatableTopicResult result = successes.get(topic.name());
            data.topics().add(result);
            resultsBuilder.append(resultsPrefix).append(topic).append(": ").
                append("SUCCESS");
            resultsPrefix = ", ";
        }
        if (request.validateOnly()) {
            log.info("Validate-only CreateTopics result(s): {}", resultsBuilder.toString());
            return ControllerResult.atomicOf(Collections.emptyList(), data);
        } else {
            log.info("CreateTopics result(s): {}", resultsBuilder.toString());
            return ControllerResult.atomicOf(records, data);
        }
    }

    private ApiError createTopic(CreatableTopic topic,
                                 List<ApiMessageAndVersion> records,
                                 Map<String, CreatableTopicResult> successes,
                                 boolean authorizedToReturnConfigs) {
        Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
        Map<Integer, PartitionRegistration> newParts = new HashMap<>();
        if (!topic.assignments().isEmpty()) {
            if (topic.replicationFactor() != -1) {
                return new ApiError(INVALID_REQUEST,
                    "A manual partition assignment was specified, but replication " +
                    "factor was not set to -1.");
            }
            if (topic.numPartitions() != -1) {
                return new ApiError(INVALID_REQUEST,
                    "A manual partition assignment was specified, but numPartitions " +
                        "was not set to -1.");
            }
            OptionalInt replicationFactor = OptionalInt.empty();
            for (CreatableReplicaAssignment assignment : topic.assignments()) {
                if (newParts.containsKey(assignment.partitionIndex())) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                        "Found multiple manual partition assignments for partition " +
                            assignment.partitionIndex());
                }
                validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
                replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                List<Integer> isr = assignment.brokerIds().stream().
                    filter(clusterControl::active).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                        "All brokers specified in the manual partition assignment for " +
                        "partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
                }
                newParts.put(assignment.partitionIndex(), new PartitionRegistration(
                    Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
                    Replicas.NONE, Replicas.NONE, isr.get(0), LeaderRecoveryState.RECOVERED, 0, 0));
            }
            for (int i = 0; i < newParts.size(); i++) {
                if (!newParts.containsKey(i)) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                            "partitions should be a consecutive 0-based integer sequence");
                }
            }
            ApiError error = maybeCheckCreateTopicPolicy(() -> {
                Map<Integer, List<Integer>> assignments = new HashMap<>();
                newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
                    Replicas.toList(e.getValue().replicas)));
                return new CreateTopicPolicy.RequestMetadata(
                    topic.name(), null, null, assignments, creationConfigs);
            });
            if (error.isFailure()) return error;
        } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
            return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                "Replication factor must be larger than 0, or -1 to use the default value.");
        } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
            return new ApiError(Errors.INVALID_PARTITIONS,
                "Number of partitions was set to an invalid non-positive value.");
        } else {
            int numPartitions = topic.numPartitions() == -1 ?
                defaultNumPartitions : topic.numPartitions();
            short replicationFactor = topic.replicationFactor() == -1 ?
                defaultReplicationFactor : topic.replicationFactor();
            try {
                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                    0,
                    numPartitions,
                    replicationFactor
                ), clusterDescriber);
                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
                    List<Integer> replicas = partitions.get(partitionId);
                    List<Integer> isr = replicas.stream().
                        filter(clusterControl::active).collect(Collectors.toList());
                    // If the ISR is empty, it means that all brokers are fenced or
                    // in controlled shutdown. To be consistent with the replica placer,
                    // we reject the create topic request with INVALID_REPLICATION_FACTOR.
                    if (isr.isEmpty()) {
                        return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                            "Unable to replicate the partition " + replicationFactor +
                                " time(s): All brokers are currently fenced or in controlled shutdown.");
                    }
                    newParts.put(partitionId,
                        new PartitionRegistration(
                            Replicas.toArray(replicas),
                            Replicas.toArray(isr),
                            Replicas.NONE,
                            Replicas.NONE,
                            isr.get(0),
                            LeaderRecoveryState.RECOVERED,
                            0,
                            0));
                }
            } catch (InvalidReplicationFactorException e) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                    "Unable to replicate the partition " + replicationFactor +
                        " time(s): " + e.getMessage());
            }
            ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(
                topic.name(), numPartitions, replicationFactor, null, creationConfigs));
            if (error.isFailure()) return error;
        }
        Uuid topicId = Uuid.randomUuid();
        CreatableTopicResult result = new CreatableTopicResult().
            setName(topic.name()).
            setTopicId(topicId).
            setErrorCode(NONE.code()).
            setErrorMessage(null);
        if (authorizedToReturnConfigs) {
            Map<String, ConfigEntry> effectiveConfig = configurationControl.
                computeEffectiveTopicConfigs(creationConfigs);
            List<String> configNames = new ArrayList<>(effectiveConfig.keySet());
            configNames.sort(String::compareTo);
            for (String configName : configNames) {
                ConfigEntry entry = effectiveConfig.get(configName);
                result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().
                    setName(entry.name()).
                    setValue(entry.isSensitive() ? null : entry.value()).
                    setReadOnly(entry.isReadOnly()).
                    setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
                    setIsSensitive(entry.isSensitive()));
            }
            result.setNumPartitions(newParts.size());
            result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
            result.setTopicConfigErrorCode(NONE.code());
        } else {
            result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
        }
        successes.put(topic.name(), result);
        records.add(new ApiMessageAndVersion(new TopicRecord().
            setName(topic.name()).
            setTopicId(topicId), TOPIC_RECORD.highestSupportedVersion()));
        for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
            int partitionIndex = partEntry.getKey();
            PartitionRegistration info = partEntry.getValue();
            records.add(info.toRecord(topicId, partitionIndex));
        }
        return ApiError.NONE;
    }

    private ApiError maybeCheckCreateTopicPolicy(Supplier<CreateTopicPolicy.RequestMetadata> supplier) {
        if (createTopicPolicy.isPresent()) {
            try {
                createTopicPolicy.get().validate(supplier.get());
            } catch (PolicyViolationException e) {
                return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
            }
        }
        return ApiError.NONE;
    }

    static void validateNewTopicNames(Map<String, ApiError> topicErrors,
                                      CreatableTopicCollection topics,
                                      Map<String, ? extends Set<String>> topicsWithCollisionChars) {
        for (CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            try {
                Topic.validate(topic.name());
            } catch (InvalidTopicException e) {
                topicErrors.put(topic.name(),
                    new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
            }
            if (Topic.hasCollisionChars(topic.name())) {
                String normalizedName = Topic.unifyCollisionChars(topic.name());
                Set<String> colliding = topicsWithCollisionChars.get(normalizedName);
                if (colliding != null) {
                    topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
                        "Topic '" + topic.name() + "' collides with existing topic: " +
                            colliding.iterator().next()));
                }
            }
        }
    }

    static Map<ConfigResource, Map<String, Entry<OpType, String>>>
            computeConfigChanges(Map<String, ApiError> topicErrors,
                                 CreatableTopicCollection topics) {
        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = new HashMap<>();
        for (CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>();
            List<String> nullConfigs = new ArrayList<>();
            for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) {
                if (config.value() == null) {
                    nullConfigs.add(config.name());
                } else {
                    topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value()));
                }
            }
            if (!nullConfigs.isEmpty()) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG,
                    "Null value not supported for topic configs: " + String.join(",", nullConfigs)));
            } else if (!topicConfigs.isEmpty()) {
                configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs);
            }
        }
        return configChanges;
    }

    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
        Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
        for (String name : names) {
            if (name == null) {
                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
            } else {
                Uuid id = topicsByName.get(name, offset);
                if (id == null) {
                    results.put(name, new ResultOrError<>(
                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
                } else {
                    results.put(name, new ResultOrError<>(id));
                }
            }
        }
        return results;
    }

    Map<String, Uuid> findAllTopicIds(long offset) {
        HashMap<String, Uuid> result = new HashMap<>(topicsByName.size(offset));
        for (Entry<String, Uuid> entry : topicsByName.entrySet(offset)) {
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }

    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
        Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
        for (Uuid id : ids) {
            if (id == null || id.equals(Uuid.ZERO_UUID)) {
                results.put(id, new ResultOrError<>(new ApiError(INVALID_REQUEST,
                    "Attempt to find topic with invalid topicId " + id)));
            } else {
                TopicControlInfo topic = topics.get(id, offset);
                if (topic == null) {
                    results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
                } else {
                    results.put(id, new ResultOrError<>(topic.name));
                }
            }
        }
        return results;
    }

    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
        Map<Uuid, ApiError> results = new HashMap<>(ids.size());
        List<ApiMessageAndVersion> records = new ArrayList<>(ids.size());
        for (Uuid id : ids) {
            try {
                deleteTopic(id, records);
                results.put(id, ApiError.NONE);
            } catch (ApiException e) {
                results.put(id, ApiError.fromThrowable(e));
            } catch (Exception e) {
                log.error("Unexpected deleteTopics error for {}", id, e);
                results.put(id, ApiError.fromThrowable(e));
            }
        }
        return ControllerResult.atomicOf(records, results);
    }

    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
        TopicControlInfo topic = topics.get(id);
        if (topic == null) {
            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
        }
        records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
            setTopicId(id), REMOVE_TOPIC_RECORD.highestSupportedVersion()));
    }

    // VisibleForTesting
    PartitionRegistration getPartition(Uuid topicId, int partitionId) {
        TopicControlInfo topic = topics.get(topicId);
        if (topic == null) {
            return null;
        }
        return topic.parts.get(partitionId);
    }

    // VisibleForTesting
    TopicControlInfo getTopic(Uuid topicId) {
        return topics.get(topicId);
    }

    Uuid getTopicId(String name) {
        return topicsByName.get(name);
    }

    // VisibleForTesting
    BrokersToIsrs brokersToIsrs() {
        return brokersToIsrs;
    }

    // VisibleForTesting
    Set<TopicIdPartition> imbalancedPartitions() {
        return new HashSet<>(imbalancedPartitions);
    }

    ControllerResult<AlterPartitionResponseData> alterPartition(
        ControllerRequestContext context,
        AlterPartitionRequestData request
    ) {
        short requestVersion = context.requestHeader().requestApiVersion();
        clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
        AlterPartitionResponseData response = new AlterPartitionResponseData();
        List<ApiMessageAndVersion> records = new ArrayList<>();
        for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
            AlterPartitionResponseData.TopicData responseTopicData =
                new AlterPartitionResponseData.TopicData().
                    setTopicName(topicData.topicName()).
                    setTopicId(topicData.topicId());
            response.topics().add(responseTopicData);

            Uuid topicId = requestVersion > 1 ? topicData.topicId() : topicsByName.get(topicData.topicName());
            if (topicId == null || topicId.equals(Uuid.ZERO_UUID) || !topics.containsKey(topicId)) {
                Errors error = requestVersion > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
                for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                    responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                        setPartitionIndex(partitionData.partitionIndex()).
                        setErrorCode(error.code()));
                }
                log.info("Rejecting AlterPartition request for unknown topic ID {} or name {}.",
                    topicData.topicId(), topicData.topicName());
                continue;
            }

            TopicControlInfo topic = topics.get(topicId);
            for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                int partitionId = partitionData.partitionIndex();
                PartitionRegistration partition = topic.parts.get(partitionId);

                Errors validationError = validateAlterPartitionData(
                    request.brokerId(),
                    topic,
                    partitionId,
                    partition,
                    context.requestHeader().requestApiVersion(),
                    partitionData);

                if (validationError != Errors.NONE) {
                    responseTopicData.partitions().add(
                        new AlterPartitionResponseData.PartitionData()
                            .setPartitionIndex(partitionId)
                            .setErrorCode(validationError.code())
                    );

                    continue;
                }

                PartitionChangeBuilder builder = new PartitionChangeBuilder(
                    partition,
                    topic.id,
                    partitionId,
                    clusterControl::active,
                    featureControl.metadataVersion().isLeaderRecoverySupported());
                if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                    builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                }
                builder.setTargetIsr(partitionData.newIsr());
                builder.setTargetLeaderRecoveryState(
                    LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
                Optional<ApiMessageAndVersion> record = builder.build();
                if (record.isPresent()) {
                    records.add(record.get());
                    PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
                    partition = partition.merge(change);
                    if (log.isDebugEnabled()) {
                        log.debug("Node {} has altered ISR for {}-{} to {}.",
                            request.brokerId(), topic.name, partitionId, change.isr());
                    }
                    if (change.leader() != request.brokerId() &&
                            change.leader() != NO_LEADER_CHANGE) {
                        // Normally, an AlterPartition request, which is made by the partition
                        // leader itself, is not allowed to modify the partition leader.
                        // However, if there is an ongoing partition reassignment and the
                        // ISR change completes it, then the leader may change as part of
                        // the changes made during reassignment cleanup.
                        //
                        // In this case, we report back NEW_LEADER_ELECTED to the leader
                        // which made the AlterPartition request. This lets it know that it must
                        // fetch new metadata before trying again. This return code is
                        // unusual because we both return an error and generate a new
                        // metadata record. We usually only do one or the other.
                        // FENCED_LEADER_EPOCH is used for request version below or equal to 1.
                        Errors error = requestVersion > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
                        log.info("AlterPartition request from node {} for {}-{} completed " +
                            "the ongoing partition reassignment and triggered a " +
                            "leadership change. Returning {}.",
                            request.brokerId(), topic.name, partitionId, error);
                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                            setPartitionIndex(partitionId).
                            setErrorCode(error.code()));
                        continue;
                    } else if (change.removingReplicas() != null ||
                            change.addingReplicas() != null) {
                        log.info("AlterPartition request from node {} for {}-{} completed " +
                            "the ongoing partition reassignment.", request.brokerId(),
                            topic.name, partitionId);
                    }
                }

                /* Setting the LeaderRecoveryState field is always safe because it will always be the
                 * same as the value set in the request. For version 0, that is always the default
                 * RECOVERED which is ignored when serializing to version 0. For any other version, the
                 * LeaderRecoveryState field is supported.
                 */
                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
                    setPartitionIndex(partitionId).
                    setErrorCode(Errors.NONE.code()).
                    setLeaderId(partition.leader).
                    setIsr(Replicas.toList(partition.isr)).
                    setLeaderRecoveryState(partition.leaderRecoveryState.value()).
                    setLeaderEpoch(partition.leaderEpoch).
                    setPartitionEpoch(partition.partitionEpoch));
            }
        }

        return ControllerResult.of(records, response);
    }

    /**
     * Validate the partition information included in the alter partition request.
     *
     * @param brokerId id of the broker requesting the alter partition
     * @param topic current topic information store by the replication manager
     * @param partitionId partition id being altered
     * @param partition current partition registration for the partition being altered
     * @param partitionData partition data from the alter partition request
     *
     * @return Errors.NONE for valid alter partition data; otherwise the validation error
     */
    private Errors validateAlterPartitionData(
        int brokerId,
        TopicControlInfo topic,
        int partitionId,
        PartitionRegistration partition,
        short requestApiVersion,
        AlterPartitionRequestData.PartitionData partitionData
    ) {
        if (partition == null) {
            log.info("Rejecting AlterPartition request for unknown partition {}-{}.",
                    topic.name, partitionId);

            return UNKNOWN_TOPIC_OR_PARTITION;
        }

        // If the partition leader has a higher leader/partition epoch, then it is likely
        // that this node is no longer the active controller. We return NOT_CONTROLLER in
        // this case to give the leader an opportunity to find the new controller.
        if (partitionData.leaderEpoch() > partition.leaderEpoch) {
            log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the current leader epoch is {}, which is greater than the local value {}.",
                brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
            return NOT_CONTROLLER;
        }
        if (partitionData.partitionEpoch() > partition.partitionEpoch) {
            log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the current partition epoch is {}, which is greater than the local value {}.",
                brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch());
            return NOT_CONTROLLER;
        }
        if (partitionData.leaderEpoch() < partition.leaderEpoch) {
            log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the current leader epoch is {}, not {}.", brokerId, topic.name,
                    partitionId, partition.leaderEpoch, partitionData.leaderEpoch());

            return FENCED_LEADER_EPOCH;
        }
        if (brokerId != partition.leader) {
            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the current leader is {}.", brokerId, topic.name,
                    partitionId, partition.leader);

            return INVALID_REQUEST;
        }
        if (partitionData.partitionEpoch() < partition.partitionEpoch) {
            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the current partition epoch is {}, not {}.", brokerId,
                    topic.name, partitionId, partition.partitionEpoch,
                    partitionData.partitionEpoch());

            return INVALID_UPDATE_VERSION;
        }
        int[] newIsr = Replicas.toArray(partitionData.newIsr());
        if (!Replicas.validateIsr(partition.replicas, newIsr)) {
            log.error("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "it specified an invalid ISR {}.", brokerId,
                    topic.name, partitionId, partitionData.newIsr());

            return INVALID_REQUEST;
        }
        if (!Replicas.contains(newIsr, partition.leader)) {
            // The ISR must always include the current leader.
            log.error("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "it specified an invalid ISR {} that doesn't include itself.",
                    brokerId, topic.name, partitionId, partitionData.newIsr());

            return INVALID_REQUEST;
        }
        LeaderRecoveryState leaderRecoveryState = LeaderRecoveryState.of(partitionData.leaderRecoveryState());
        if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && newIsr.length > 1) {
            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the ISR {} had more than one replica while the leader was still " +
                    "recovering from an unclean leader election {}.",
                    brokerId, topic.name, partitionId, partitionData.newIsr(),
                    leaderRecoveryState);

            return INVALID_REQUEST;
        }
        if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
                leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "the leader recovery state cannot change from RECOVERED to RECOVERING.",
                    brokerId, topic.name, partitionId);

            return INVALID_REQUEST;
        }

        List<IneligibleReplica> ineligibleReplicas = ineligibleReplicasForIsr(newIsr);
        if (!ineligibleReplicas.isEmpty()) {
            log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                    "it specified ineligible replicas {} in the new ISR {}.",
                    brokerId, topic.name, partitionId, ineligibleReplicas, partitionData.newIsr());

            if (requestApiVersion > 1) {
                return INELIGIBLE_REPLICA;
            } else {
                return OPERATION_NOT_ATTEMPTED;
            }
        }

        return Errors.NONE;
    }

    private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
        List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
        for (Integer replicaId : replicas) {
            BrokerRegistration registration = clusterControl.registration(replicaId);
            if (registration == null) {
                ineligibleReplicas.add(new IneligibleReplica(replicaId, "not registered"));
            } else if (registration.inControlledShutdown()) {
                ineligibleReplicas.add(new IneligibleReplica(replicaId, "shutting down"));
            } else if (registration.fenced()) {
                ineligibleReplicas.add(new IneligibleReplica(replicaId, "fenced"));
            }
        }
        return ineligibleReplicas;
    }

    /**
     * Generate the appropriate records to handle a broker being fenced.
     *
     * First, we remove this broker from any non-singleton ISR. Then we generate a
     * FenceBrokerRecord.
     *
     * @param brokerId      The broker id.
     * @param records       The record list to append to.
     */
    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
        BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
        if (brokerRegistration == null) {
            throw new RuntimeException("Can't find broker registration for broker " + brokerId);
        }
        generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
            brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
        if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
                    setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
                    setFenced(BrokerRegistrationFencingChange.FENCE.value()),
                    (short) 0));
        } else {
            records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
                    setId(brokerId).setEpoch(brokerRegistration.epoch()),
                    (short) 0));
        }
    }

    /**
     * Generate the appropriate records to handle a broker being unregistered.
     *
     * First, we remove this broker from any non-singleton ISR. Then we generate an
     * UnregisterBrokerRecord.
     *
     * @param brokerId      The broker id.
     * @param brokerEpoch   The broker epoch.
     * @param records       The record list to append to.
     */
    void handleBrokerUnregistered(int brokerId, long brokerEpoch,
                                  List<ApiMessageAndVersion> records) {
        generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records,
            brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
        records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
            setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
            (short) 0));
    }

    /**
     * Generate the appropriate records to handle a broker becoming unfenced.
     *
     * First, we create an UnfenceBrokerRecord. Then, we check if there are any
     * partitions that don't currently have a leader that should be led by the newly
     * unfenced broker.
     *
     * @param brokerId      The broker id.
     * @param brokerEpoch   The broker epoch.
     * @param records       The record list to append to.
     */
    void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
                setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
                (short) 0));
        } else {
            records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
                setEpoch(brokerEpoch), (short) 0));
        }
        generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
            brokersToIsrs.partitionsWithNoLeader());
    }

    /**
     * Generate the appropriate records to handle a broker starting a controlled shutdown.
     *
     * First, we create an BrokerRegistrationChangeRecord. Then, we remove this broker
     * from any non-singleton ISR and elect new leaders for partitions led by this
     * broker.
     *
     * @param brokerId      The broker id.
     * @param brokerEpoch   The broker epoch.
     * @param records       The record list to append to.
     */
    void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (featureControl.metadataVersion().isInControlledShutdownStateSupported()
                && !clusterControl.inControlledShutdown(brokerId)) {
            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
                setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
                (short) 1));
        }
        generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
            brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
    }

    ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
        ElectionType electionType = electionType(request.electionType());
        List<ApiMessageAndVersion> records = new ArrayList<>();
        ElectLeadersResponseData response = new ElectLeadersResponseData();
        if (request.topicPartitions() == null) {
            // If topicPartitions is null, we try to elect a new leader for every partition.  There
            // are some obvious issues with this wire protocol.  For example, what if we have too
            // many partitions to fit the results in a single RPC?  This behavior should probably be
            // removed from the protocol.  For now, however, we have to implement this for
            // compatibility with the old controller.
            for (Entry<String, Uuid> topicEntry : topicsByName.entrySet()) {
                String topicName = topicEntry.getKey();
                ReplicaElectionResult topicResults =
                    new ReplicaElectionResult().setTopic(topicName);
                response.replicaElectionResults().add(topicResults);
                TopicControlInfo topic = topics.get(topicEntry.getValue());
                if (topic != null) {
                    for (int partitionId : topic.parts.keySet()) {
                        ApiError error = electLeader(topicName, partitionId, electionType, records);

                        // When electing leaders for all partitions, we do not return
                        // partitions which already have the desired leader.
                        if (error.error() != Errors.ELECTION_NOT_NEEDED) {
                            topicResults.partitionResult().add(new PartitionResult().
                                setPartitionId(partitionId).
                                setErrorCode(error.error().code()).
                                setErrorMessage(error.message()));
                        }
                    }
                }
            }
        } else {
            for (TopicPartitions topic : request.topicPartitions()) {
                ReplicaElectionResult topicResults =
                    new ReplicaElectionResult().setTopic(topic.topic());
                response.replicaElectionResults().add(topicResults);
                for (int partitionId : topic.partitions()) {
                    ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
                    topicResults.partitionResult().add(new PartitionResult().
                        setPartitionId(partitionId).
                        setErrorCode(error.error().code()).
                        setErrorMessage(error.message()));
                }
            }
        }
        return ControllerResult.of(records, response);
    }

    private static ElectionType electionType(byte electionType) {
        try {
            return ElectionType.valueOf(electionType);
        } catch (IllegalArgumentException e) {
            throw new InvalidRequestException("Unknown election type " + (int) electionType);
        }
    }

    ApiError electLeader(String topic, int partitionId, ElectionType electionType,
                         List<ApiMessageAndVersion> records) {
        Uuid topicId = topicsByName.get(topic);
        if (topicId == null) {
            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                "No such topic as " + topic);
        }
        TopicControlInfo topicInfo = topics.get(topicId);
        if (topicInfo == null) {
            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                "No such topic id as " + topicId);
        }
        PartitionRegistration partition = topicInfo.parts.get(partitionId);
        if (partition == null) {
            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                "No such partition as " + topic + "-" + partitionId);
        }
        if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
            || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
            return new ApiError(Errors.ELECTION_NOT_NEEDED);
        }

        PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED;
        if (electionType == ElectionType.UNCLEAN) {
            election = PartitionChangeBuilder.Election.UNCLEAN;
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
            topicId,
            partitionId,
            clusterControl::active,
            featureControl.metadataVersion().isLeaderRecoverySupported());
        builder.setElection(election);
        Optional<ApiMessageAndVersion> record = builder.build();
        if (!record.isPresent()) {
            if (electionType == ElectionType.PREFERRED) {
                return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
            } else {
                return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
            }
        }
        records.add(record.get());
        return ApiError.NONE;
    }

    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
                BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
        int brokerId = request.brokerId();
        long brokerEpoch = request.brokerEpoch();
        clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
        BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
        BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
            request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
        List<ApiMessageAndVersion> records = new ArrayList<>();
        if (states.current() != states.next()) {
            switch (states.next()) {
                case FENCED:
                    handleBrokerFenced(brokerId, records);
                    break;
                case UNFENCED:
                    handleBrokerUnfenced(brokerId, brokerEpoch, records);
                    break;
                case CONTROLLED_SHUTDOWN:
                    handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
                    break;
                case SHUTDOWN_NOW:
                    handleBrokerFenced(brokerId, records);
                    break;
            }
        }
        heartbeatManager.touch(brokerId,
            states.next().fenced(),
            request.currentMetadataOffset());
        boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
        BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
                states.next().fenced(),
                states.next().inControlledShutdown(),
                states.next().shouldShutDown());
        return ControllerResult.of(records, reply);
    }

    public ControllerResult<Void> unregisterBroker(int brokerId) {
        BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
        if (registration == null) {
            throw new BrokerIdNotRegisteredException("Broker ID " + brokerId +
                " is not currently registered");
        }
        List<ApiMessageAndVersion> records = new ArrayList<>();
        handleBrokerUnregistered(brokerId, registration.epoch(), records);
        return ControllerResult.of(records, null);
    }

    ControllerResult<Void> maybeFenceOneStaleBroker() {
        List<ApiMessageAndVersion> records = new ArrayList<>();
        BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
        heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
            // Even though multiple brokers can go stale at a time, we will process
            // fencing one at a time so that the effect of fencing each broker is visible
            // to the system prior to processing the next one
            log.info("Fencing broker {} because its session has timed out.", brokerId);
            handleBrokerFenced(brokerId, records);
            heartbeatManager.fence(brokerId);
        });
        return ControllerResult.of(records, null);
    }

    boolean arePartitionLeadersImbalanced() {
        return !imbalancedPartitions.isEmpty();
    }

    /**
     * Attempt to elect a preferred leader for all topic partitions which have a leader that is not the preferred replica.
     *
     * The response() method in the return object is true if this method returned without electing all possible preferred replicas.
     * The quorum controller should reschedule this operation immediately if it is true.
     *
     * @return All of the election records and if there may be more available preferred replicas to elect as leader
     */
    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
        List<ApiMessageAndVersion> records = new ArrayList<>();

        boolean rescheduleImmidiately = false;
        for (TopicIdPartition topicPartition : imbalancedPartitions) {
            if (records.size() >= maxElectionsPerImbalance) {
                rescheduleImmidiately = true;
                break;
            }

            TopicControlInfo topic = topics.get(topicPartition.topicId());
            if (topic == null) {
                log.error("Skipping unknown imbalanced topic {}", topicPartition);
                continue;
            }

            PartitionRegistration partition = topic.parts.get(topicPartition.partitionId());
            if (partition == null) {
                log.error("Skipping unknown imbalanced partition {}", topicPartition);
                continue;
            }

            // Attempt to perform a preferred leader election
            PartitionChangeBuilder builder = new PartitionChangeBuilder(
                partition,
                topicPartition.topicId(),
                topicPartition.partitionId(),
                clusterControl::active,
                featureControl.metadataVersion().isLeaderRecoverySupported()
            );
            builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
            builder.build().ifPresent(records::add);
        }

        return ControllerResult.of(records, rescheduleImmidiately);
    }

    ControllerResult<List<CreatePartitionsTopicResult>>
            createPartitions(List<CreatePartitionsTopic> topics) {
        List<ApiMessageAndVersion> records = new ArrayList<>();
        List<CreatePartitionsTopicResult> results = new ArrayList<>();
        for (CreatePartitionsTopic topic : topics) {
            ApiError apiError = ApiError.NONE;
            try {
                createPartitions(topic, records);
            } catch (ApiException e) {
                apiError = ApiError.fromThrowable(e);
            } catch (Exception e) {
                log.error("Unexpected createPartitions error for {}", topic, e);
                apiError = ApiError.fromThrowable(e);
            }
            results.add(new CreatePartitionsTopicResult().
                setName(topic.name()).
                setErrorCode(apiError.error().code()).
                setErrorMessage(apiError.message()));
        }
        return ControllerResult.atomicOf(records, results);
    }

    void createPartitions(CreatePartitionsTopic topic,
                          List<ApiMessageAndVersion> records) {
        Uuid topicId = topicsByName.get(topic.name());
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException();
        }
        TopicControlInfo topicInfo = topics.get(topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException();
        }
        if (topic.count() == topicInfo.parts.size()) {
            throw new InvalidPartitionsException("Topic already has " +
                topicInfo.parts.size() + " partition(s).");
        } else if (topic.count() < topicInfo.parts.size()) {
            throw new InvalidPartitionsException("The topic " + topic.name() + " currently " +
                "has " + topicInfo.parts.size() + " partition(s); " + topic.count() +
                " would not be an increase.");
        }
        int additional = topic.count() - topicInfo.parts.size();
        if (topic.assignments() != null) {
            if (topic.assignments().size() != additional) {
                throw new InvalidReplicaAssignmentException("Attempted to add " + additional +
                    " additional partition(s), but only " + topic.assignments().size() +
                    " assignment(s) were specified.");
            }
        }
        Iterator<PartitionRegistration> iterator = topicInfo.parts.values().iterator();
        if (!iterator.hasNext()) {
            throw new UnknownServerException("Invalid state: topic " + topic.name() +
                " appears to have no partitions.");
        }
        PartitionRegistration partitionInfo = iterator.next();
        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
            throw new UnknownServerException("Invalid replication factor " +
                partitionInfo.replicas.length + ": expected a number equal to less than " +
                Short.MAX_VALUE);
        }
        short replicationFactor = (short) partitionInfo.replicas.length;
        int startPartitionId = topicInfo.parts.size();

        List<List<Integer>> placements;
        List<List<Integer>> isrs;
        if (topic.assignments() != null) {
            placements = new ArrayList<>();
            isrs = new ArrayList<>();
            for (int i = 0; i < topic.assignments().size(); i++) {
                CreatePartitionsAssignment assignment = topic.assignments().get(i);
                validateManualPartitionAssignment(assignment.brokerIds(),
                    OptionalInt.of(replicationFactor));
                placements.add(assignment.brokerIds());
                List<Integer> isr = assignment.brokerIds().stream().
                    filter(clusterControl::active).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    throw new InvalidReplicaAssignmentException(
                        "All brokers specified in the manual partition assignment for " +
                            "partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
                }
                isrs.add(isr);
            }
        } else {
            placements = clusterControl.replicaPlacer().place(new PlacementSpec(
                startPartitionId,
                additional,
                replicationFactor
            ), clusterDescriber);
            isrs = placements;
        }
        int partitionId = startPartitionId;
        for (int i = 0; i < placements.size(); i++) {
            List<Integer> replicas = placements.get(i);
            List<Integer> isr = isrs.get(i).stream().
                filter(clusterControl::active).collect(Collectors.toList());
            // If the ISR is empty, it means that all brokers are fenced or
            // in controlled shutdown. To be consistent with the replica placer,
            // we reject the create topic request with INVALID_REPLICATION_FACTOR.
            if (isr.isEmpty()) {
                throw new InvalidReplicationFactorException(
                    "Unable to replicate the partition " + replicationFactor +
                        " time(s): All brokers are currently fenced or in controlled shutdown.");
            }
            records.add(new ApiMessageAndVersion(new PartitionRecord().
                setPartitionId(partitionId).
                setTopicId(topicId).
                setReplicas(replicas).
                setIsr(isr).
                setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
                setRemovingReplicas(Collections.emptyList()).
                setAddingReplicas(Collections.emptyList()).
                setLeader(isr.get(0)).
                setLeaderEpoch(0).
                setPartitionEpoch(0), PARTITION_RECORD.highestSupportedVersion()));
            partitionId++;
        }
    }

    void validateManualPartitionAssignment(List<Integer> assignment,
                                           OptionalInt replicationFactor) {
        if (assignment.isEmpty()) {
            throw new InvalidReplicaAssignmentException("The manual partition " +
                "assignment includes an empty replica list.");
        }
        List<Integer> sortedBrokerIds = new ArrayList<>(assignment);
        sortedBrokerIds.sort(Integer::compare);
        Integer prevBrokerId = null;
        for (Integer brokerId : sortedBrokerIds) {
            if (!clusterControl.brokerRegistrations().containsKey(brokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition " +
                    "assignment includes broker " + brokerId + ", but no such broker is " +
                    "registered.");
            }
            if (brokerId.equals(prevBrokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition " +
                    "assignment includes the broker " + prevBrokerId + " more than " +
                    "once.");
            }
            prevBrokerId = brokerId;
        }
        if (replicationFactor.isPresent() &&
                sortedBrokerIds.size() != replicationFactor.getAsInt()) {
            throw new InvalidReplicaAssignmentException("The manual partition " +
                "assignment includes a partition with " + sortedBrokerIds.size() +
                " replica(s), but this is not consistent with previous " +
                "partitions, which have " + replicationFactor.getAsInt() + " replica(s).");
        }
    }

    /**
     * Iterate over a sequence of partitions and generate ISR changes and/or leader
     * changes if necessary.
     *
     * @param context           A human-readable context string used in log4j logging.
     * @param brokerToRemove    NO_LEADER if no broker is being removed; the ID of the
     *                          broker to remove from the ISR and leadership, otherwise.
     * @param brokerToAdd       NO_LEADER if no broker is being added; the ID of the
     *                          broker which is now eligible to be a leader, otherwise.
     * @param records           A list of records which we will append to.
     * @param iterator          The iterator containing the partitions to examine.
     */
    void generateLeaderAndIsrUpdates(String context,
                                     int brokerToRemove,
                                     int brokerToAdd,
                                     List<ApiMessageAndVersion> records,
                                     Iterator<TopicIdPartition> iterator) {
        int oldSize = records.size();

        // If the caller passed a valid broker ID for brokerToAdd, rather than passing
        // NO_LEADER, that node will be considered an acceptable leader even if it is
        // currently fenced. This is useful when handling unfencing. The reason is that
        // while we're generating the records to handle unfencing, the ClusterControlManager
        // still shows the node as fenced.
        //
        // Similarly, if the caller passed a valid broker ID for brokerToRemove, rather
        // than passing NO_LEADER, that node will never be considered an acceptable leader.
        // This is useful when handling a newly fenced node. We also exclude brokerToRemove
        // from the target ISR, but we need to exclude it here too, to handle the case
        // where there is an unclean leader election which chooses a leader from outside
        // the ISR.
        Function<Integer, Boolean> isAcceptableLeader =
            r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.active(r));

        while (iterator.hasNext()) {
            TopicIdPartition topicIdPart = iterator.next();
            TopicControlInfo topic = topics.get(topicIdPart.topicId());
            if (topic == null) {
                throw new RuntimeException("Topic ID " + topicIdPart.topicId() +
                    " existed in isrMembers, but not in the topics map.");
            }
            PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId());
            if (partition == null) {
                throw new RuntimeException("Partition " + topicIdPart +
                    " existed in isrMembers, but not in the partitions map.");
            }
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
                topicIdPart.topicId(),
                topicIdPart.partitionId(),
                isAcceptableLeader,
                featureControl.metadataVersion().isLeaderRecoverySupported());
            if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
            }

            // Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new
            // target ISR will be the same as the old one).
            builder.setTargetIsr(Replicas.toList(
                Replicas.copyWithout(partition.isr, brokerToRemove)));

            builder.build().ifPresent(records::add);
        }
        if (records.size() != oldSize) {
            if (log.isDebugEnabled()) {
                StringBuilder bld = new StringBuilder();
                String prefix = "";
                for (ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
                     iter.hasNext(); ) {
                    ApiMessageAndVersion apiMessageAndVersion = iter.next();
                    PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message();
                    bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
                        append(record.partitionId());
                    prefix = ", ";
                }
                log.debug("{}: changing partition(s): {}", context, bld.toString());
            } else if (log.isInfoEnabled()) {
                log.info("{}: changing {} partition(s)", context, records.size() - oldSize);
            }
        }
    }

    ControllerResult<AlterPartitionReassignmentsResponseData>
            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
        List<ApiMessageAndVersion> records = new ArrayList<>();
        AlterPartitionReassignmentsResponseData result =
                new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
        int successfulAlterations = 0, totalAlterations = 0;
        for (ReassignableTopic topic : request.topics()) {
            ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
                setName(topic.name());
            for (ReassignablePartition partition : topic.partitions()) {
                ApiError error = ApiError.NONE;
                try {
                    alterPartitionReassignment(topic.name(), partition, records);
                    successfulAlterations++;
                } catch (Throwable e) {
                    log.info("Unable to alter partition reassignment for " +
                        topic.name() + ":" + partition.partitionIndex() + " because " +
                        "of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
                    error = ApiError.fromThrowable(e);
                }
                totalAlterations++;
                topicResponse.partitions().add(new ReassignablePartitionResponse().
                    setPartitionIndex(partition.partitionIndex()).
                    setErrorCode(error.error().code()).
                    setErrorMessage(error.message()));
            }
            result.responses().add(topicResponse);
        }
        log.info("Successfully altered {} out of {} partition reassignment(s).",
            successfulAlterations, totalAlterations);
        return ControllerResult.atomicOf(records, result);
    }

    void alterPartitionReassignment(String topicName,
                                    ReassignablePartition target,
                                    List<ApiMessageAndVersion> records) {
        Uuid topicId = topicsByName.get(topicName);
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
                "named " + topicName + ".");
        }
        TopicControlInfo topicInfo = topics.get(topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic " +
                "with ID " + topicId + ".");
        }
        TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
        PartitionRegistration part = topicInfo.parts.get(target.partitionIndex());
        if (part == null) {
            throw new UnknownTopicOrPartitionException("Unable to find partition " +
                topicName + ":" + target.partitionIndex() + ".");
        }
        Optional<ApiMessageAndVersion> record;
        if (target.replicas() == null) {
            record = cancelPartitionReassignment(topicName, tp, part);
        } else {
            record = changePartitionReassignment(tp, part, target);
        }
        record.ifPresent(records::add);
    }

    Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
                                                               TopicIdPartition tp,
                                                               PartitionRegistration part) {
        if (!part.isReassigning()) {
            throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
        }
        PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
        if (revert.unclean()) {
            if (!configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
                throw new InvalidReplicaAssignmentException("Unable to revert partition " +
                    "assignment for " + topicName + ":" + tp.partitionId() + " because " +
                    "it would require an unclean leader election.");
            }
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
            tp.topicId(),
            tp.partitionId(),
            clusterControl::active,
            featureControl.metadataVersion().isLeaderRecoverySupported());
        if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
            builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
        }
        builder.setTargetIsr(revert.isr()).
            setTargetReplicas(revert.replicas()).
            setTargetRemoving(Collections.emptyList()).
            setTargetAdding(Collections.emptyList());
        return builder.build();
    }

    /**
     * Apply a given partition reassignment. In general a partition reassignment goes
     * through several stages:
     *
     * 1. Issue a PartitionChangeRecord adding all the new replicas to the partition's
     * main replica list, and setting removingReplicas and addingReplicas.
     *
     * 2. Wait for the partition to have an ISR that contains all the new replicas. Or
     * if there are no new replicas, wait until we have an ISR that contains at least one
     * replica that we are not removing.
     *
     * 3. Issue a second PartitionChangeRecord removing all removingReplicas from the
     * partitions' main replica list, and clearing removingReplicas and addingReplicas.
     *
     * After stage 3, the reassignment is done.
     *
     * Under some conditions, steps #1 and #2 can be skipped entirely since the ISR is
     * already suitable to progress to stage #3. For example, a partition reassignment
     * that merely rearranges existing replicas in the list can bypass step #1 and #2 and
     * complete immediately.
     *
     * @param tp                The topic id and partition id.
     * @param part              The existing partition info.
     * @param target            The target partition info.
     *
     * @return                  The ChangePartitionRecord for the new partition assignment,
     *                          or empty if no change is needed.
     */
    Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
                                                               PartitionRegistration part,
                                                               ReassignablePartition target) {
        // Check that the requested partition assignment is valid.
        validateManualPartitionAssignment(target.replicas(), OptionalInt.empty());

        List<Integer> currentReplicas = Replicas.toList(part.replicas);
        PartitionReassignmentReplicas reassignment =
            new PartitionReassignmentReplicas(currentReplicas, target.replicas());
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
            tp.topicId(),
            tp.partitionId(),
            clusterControl::active,
            featureControl.metadataVersion().isLeaderRecoverySupported());
        if (!reassignment.merged().equals(currentReplicas)) {
            builder.setTargetReplicas(reassignment.merged());
        }
        if (!reassignment.removing().isEmpty()) {
            builder.setTargetRemoving(reassignment.removing());
        }
        if (!reassignment.adding().isEmpty()) {
            builder.setTargetAdding(reassignment.adding());
        }
        return builder.build();
    }

    ListPartitionReassignmentsResponseData listPartitionReassignments(
            List<ListPartitionReassignmentsTopics> topicList) {
        ListPartitionReassignmentsResponseData response =
            new ListPartitionReassignmentsResponseData().setErrorMessage(null);
        if (topicList == null) {
            // List all reassigning topics.
            for (Entry<Uuid, int[]> entry : reassigningTopics.entrySet()) {
                listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
            }
        } else {
            // List the given topics.
            for (ListPartitionReassignmentsTopics topic : topicList) {
                Uuid topicId = topicsByName.get(topic.name());
                if (topicId != null) {
                    listReassigningTopic(response, topicId, topic.partitionIndexes());
                }
            }
        }
        return response;
    }

    private void listReassigningTopic(ListPartitionReassignmentsResponseData response,
                                      Uuid topicId,
                                      List<Integer> partitionIds) {
        TopicControlInfo topicInfo = topics.get(topicId);
        if (topicInfo == null) return;
        OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().
            setName(topicInfo.name);
        for (int partitionId : partitionIds) {
            Optional<OngoingPartitionReassignment> ongoing =
                getOngoingPartitionReassignment(topicInfo, partitionId);
            if (ongoing.isPresent()) {
                ongoingTopic.partitions().add(ongoing.get());
            }
        }
        if (!ongoingTopic.partitions().isEmpty()) {
            response.topics().add(ongoingTopic);
        }
    }

    private Optional<OngoingPartitionReassignment>
            getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
        PartitionRegistration partition = topicInfo.parts.get(partitionId);
        if (partition == null || !partition.isReassigning()) {
            return Optional.empty();
        }
        return Optional.of(new OngoingPartitionReassignment().
            setAddingReplicas(Replicas.toList(partition.addingReplicas)).
            setRemovingReplicas(Replicas.toList(partition.removingReplicas)).
            setPartitionIndex(partitionId).
            setReplicas(Replicas.toList(partition.replicas)));
    }

    class ReplicationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
        private final long epoch;
        private final Iterator<TopicControlInfo> iterator;

        ReplicationControlIterator(long epoch) {
            this.epoch = epoch;
            this.iterator = topics.values(epoch).iterator();
        }

        @Override
        public boolean hasNext() {
            return iterator.hasNext();
        }

        @Override
        public List<ApiMessageAndVersion> next() {
            if (!hasNext()) throw new NoSuchElementException();
            TopicControlInfo topic = iterator.next();
            List<ApiMessageAndVersion> records = new ArrayList<>();
            records.add(new ApiMessageAndVersion(new TopicRecord().
                setName(topic.name).
                setTopicId(topic.id), TOPIC_RECORD.highestSupportedVersion()));
            for (Entry<Integer, PartitionRegistration> entry : topic.parts.entrySet(epoch)) {
                records.add(entry.getValue().toRecord(topic.id, entry.getKey()));
            }
            return records;
        }
    }

    ReplicationControlIterator iterator(long epoch) {
        return new ReplicationControlIterator(epoch);
    }

    private static final class IneligibleReplica {
        private final int replicaId;
        private final String reason;

        private IneligibleReplica(int replicaId, String reason) {
            this.replicaId = replicaId;
            this.reason = reason;
        }

        @Override
        public String toString() {
            return replicaId + " (" + reason + ")";
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka AclControlManager 源码

kafka BrokerControlState 源码

kafka BrokerControlStates 源码

kafka BrokerHeartbeatManager 源码

kafka BrokersToIsrs 源码

kafka ClientQuotaControlManager 源码

kafka ClusterControlManager 源码

kafka ConfigurationControlManager 源码

kafka ConfigurationValidator 源码

kafka Controller 源码

0  赞