kafka StreamsPartitionAssignor 源码

  • 2022-10-20
  • 浏览 (246)

kafka StreamsPartitionAssignor 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.UUID.randomUUID;

import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;

public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {

    private Logger log;
    private String logPrefix;

    private static class AssignedPartition implements Comparable<AssignedPartition> {

        private final TaskId taskId;
        private final TopicPartition partition;

        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(final AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(partition, that.partition);
        }

        @Override
        public boolean equals(final Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            final AssignedPartition other = (AssignedPartition) o;
            return compareTo(other) == 0;
        }

        @Override
        public int hashCode() {
            // Only partition is important for compareTo, equals and hashCode.
            return partition.hashCode();
        }
    }

    private static class ClientMetadata {

        private final HostInfo hostInfo;
        private final ClientState state;
        private final SortedSet<String> consumers;

        ClientMetadata(final String endPoint, final Map<String, String> clientTags) {

            // get the host info, or null if no endpoint is configured (ie endPoint == null)
            hostInfo = HostInfo.buildFromEndpoint(endPoint);

            // initialize the consumer memberIds
            consumers = new TreeSet<>();

            // initialize the client state with client tags
            state = new ClientState(clientTags);
        }

        void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
            consumers.add(consumerMemberId);
            state.incrementCapacity();
            state.addOwnedPartitions(ownedPartitions, consumerMemberId);
        }

        void addPreviousTasksAndOffsetSums(final String consumerId, final Map<TaskId, Long> taskOffsetSums) {
            state.addPreviousTasksAndOffsetSums(consumerId, taskOffsetSums);
        }

        @Override
        public String toString() {
            return "ClientMetadata{" +
                "hostInfo=" + hostInfo +
                ", consumers=" + consumers +
                ", state=" + state +
                '}';
        }
    }

    // keep track of any future consumers in a "dummy" Client since we can't decipher their subscription
    private static final UUID FUTURE_ID = randomUUID();

    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
        Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);

    private String userEndPoint;
    private AssignmentConfigs assignmentConfigs;

    // for the main consumer, we need to use a supplier to break a cyclic setup dependency
    private Supplier<Consumer<byte[], byte[]>> mainConsumerSupplier;
    private Admin adminClient;
    private TaskManager taskManager;
    private StreamsMetadataState streamsMetadataState;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    private AtomicLong nextScheduledRebalanceMs;
    private Queue<StreamsException> nonFatalExceptionsToHandle;
    private Time time;

    protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;

    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private RebalanceProtocol rebalanceProtocol;
    private AssignmentListener assignmentListener;

    private Supplier<TaskAssignor> taskAssignorSupplier;
    private byte uniqueField;
    private Map<String, String> clientTags;

    /**
     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
     * latter's cached metadata while sending subscriptions, and the latter needs former's returned assignment when
     * adding tasks.
     *
     * @throws KafkaException if the stream thread is not specified
     */
    @Override
    public void configure(final Map<String, ?> configs) {
        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);

        logPrefix = assignorConfiguration.logPrefix();
        log = new LogContext(logPrefix).logger(getClass());
        usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);

        final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
        mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
        adminClient = Objects.requireNonNull(referenceContainer.adminClient, "Admin client was not specified");
        taskManager = Objects.requireNonNull(referenceContainer.taskManager, "TaskManager was not specified");
        streamsMetadataState = Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
        assignmentErrorCode = referenceContainer.assignmentErrorCode;
        nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
        nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
        time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
        assignmentConfigs = assignorConfiguration.assignmentConfigs();
        partitionGrouper = new PartitionGrouper();
        userEndPoint = assignorConfiguration.userEndPoint();
        internalTopicManager = assignorConfiguration.internalTopicManager();
        copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
        rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
        taskAssignorSupplier = assignorConfiguration::taskAssignor;
        assignmentListener = assignorConfiguration.assignmentListener();
        uniqueField = 0;
        clientTags = referenceContainer.clientTags;
    }

    @Override
    public String name() {
        return "stream";
    }

    @Override
    public List<RebalanceProtocol> supportedProtocols() {
        final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
        supportedProtocols.add(RebalanceProtocol.EAGER);
        if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
            supportedProtocols.add(rebalanceProtocol);
        }
        return supportedProtocols;
    }

    @Override
    public ByteBuffer subscriptionUserData(final Set<String> topics) {
        // Adds the following information to subscription
        // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
        // 2. Map from task id to its overall lag
        // 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different

        handleRebalanceStart(topics);
        uniqueField++;

        final Set<String> currentNamedTopologies = taskManager.topologyMetadata().namedTopologiesView();

        // If using NamedTopologies, filter out any that are no longer recognized/have been removed
        final Map<TaskId, Long> taskOffsetSums = taskManager.topologyMetadata().hasNamedTopologies() ?
            filterMap(taskManager.getTaskOffsetSums(), t -> currentNamedTopologies.contains(t.getKey().topologyName())) :
            taskManager.getTaskOffsetSums();

        return new SubscriptionInfo(
            usedSubscriptionMetadataVersion,
            LATEST_SUPPORTED_VERSION,
            taskManager.processId(),
            userEndPoint,
            taskOffsetSums,
            uniqueField,
            assignmentErrorCode.get(),
            clientTags
        ).encode();
    }

    private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
                                                    final int errorCode) {
        final Map<String, Assignment> assignment = new HashMap<>();
        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
            for (final String consumerId : clientMetadata.consumers) {
                assignment.put(consumerId, new Assignment(
                    Collections.emptyList(),
                    new AssignmentInfo(LATEST_SUPPORTED_VERSION,
                        Collections.emptyList(),
                        Collections.emptyMap(),
                        Collections.emptyMap(),
                        Collections.emptyMap(),
                        errorCode).encode()
                ));
            }
        }
        return assignment;
    }

    /*
     * This assigns tasks to consumer clients in the following steps.
     *
     * 0. decode the subscriptions to assemble the metadata for each client and check for version probing
     *
     * 1. check all repartition source topics and use internal topic manager to make sure
     *    they have been created with the right number of partitions. Also verify and/or create
     *    any changelog topics with the correct number of partitions.
     *
     * 2. use the partition grouper to generate tasks along with their assigned partitions, then use
     *    the configured TaskAssignor to construct the mapping of tasks to clients.
     *
     * 3. construct the global mapping of host to partitions to enable query routing.
     *
     * 4. within each client, assign tasks to consumer clients.
     */
    @Override
    public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) {
        final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();

        // ---------------- Step Zero ---------------- //

        // construct the client metadata from the decoded subscription info

        final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
        final Set<TopicPartition> allOwnedPartitions = new HashSet<>();

        int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
        int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;

        boolean shutdownRequested = false;
        boolean assignmentErrorFound = false;
        int futureMetadataVersion = UNKNOWN;
        for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
            final String consumerId = entry.getKey();
            final Subscription subscription = entry.getValue();
            final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
            final int usedVersion = info.version();
            if (info.errorCode() == AssignorError.SHUTDOWN_REQUESTED.code()) {
                shutdownRequested = true;
            }

            minReceivedMetadataVersion = updateMinReceivedVersion(usedVersion, minReceivedMetadataVersion);
            minSupportedMetadataVersion = updateMinSupportedVersion(info.latestSupportedVersion(), minSupportedMetadataVersion);

            final UUID processId;
            if (usedVersion > LATEST_SUPPORTED_VERSION) {
                futureMetadataVersion = usedVersion;
                processId = FUTURE_ID;
                if (!clientMetadataMap.containsKey(FUTURE_ID)) {
                    clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap()));
                }
            } else {
                processId = info.processId();
            }

            ClientMetadata clientMetadata = clientMetadataMap.get(processId);

            // create the new client metadata if necessary
            if (clientMetadata == null) {
                clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags());
                clientMetadataMap.put(info.processId(), clientMetadata);
            }

            // add the consumer and any info in its subscription to the client
            clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
            final int prevSize = allOwnedPartitions.size();
            allOwnedPartitions.addAll(subscription.ownedPartitions());
            if (allOwnedPartitions.size() < prevSize + subscription.ownedPartitions().size()) {
                assignmentErrorFound = true;
            }
            clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
        }

        if (assignmentErrorFound) {
            log.warn("The previous assignment contains a partition more than once. " +
                "\t Mapping: {}", subscriptions);
        }

        try {
            final boolean versionProbing =
                checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);

            log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);

            // ---------------- Step One ---------------- //

            if (shutdownRequested) {
                return new GroupAssignment(errorAssignment(clientMetadataMap, AssignorError.SHUTDOWN_REQUESTED.code()));
            }

            // parse the topology to determine the repartition source topics,
            // making sure they are created with the number of partitions as
            // the maximum of the depending sub-topologies source topics' number of partitions
            final RepartitionTopics repartitionTopics = prepareRepartitionTopics(metadata);
            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();

            final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
            log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());

            // ---------------- Step Two ---------------- //

            // construct the assignment of tasks to clients

            final Map<Subtopology, TopicsInfo> topicGroups =
                taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(repartitionTopics.topologiesWithMissingInputTopics());

            final Set<String> allSourceTopics = new HashSet<>();
            final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
            for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) {
                allSourceTopics.addAll(entry.getValue().sourceTopics);
                sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
            }

            // get the tasks as partition groups from the partition grouper
            final Map<TaskId, Set<TopicPartition>> partitionsForTask =
                partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);

            final Set<TaskId> statefulTasks = new HashSet<>();

            final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);

            // ---------------- Step Three ---------------- //

            // construct the global partition assignment per host map

            final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
            final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
            if (minReceivedMetadataVersion >= 2) {
                populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
            }
            streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);

            // ---------------- Step Four ---------------- //

            // compute the assignment of tasks to threads within each client and build the final group assignment

            final Map<String, Assignment> assignment = computeNewAssignment(
                statefulTasks,
                clientMetadataMap,
                partitionsForTask,
                partitionsByHost,
                standbyPartitionsByHost,
                allOwnedPartitions,
                minReceivedMetadataVersion,
                minSupportedMetadataVersion,
                versionProbing,
                probingRebalanceNeeded
            );

            return new GroupAssignment(assignment);
        } catch (final MissingSourceTopicException e) {
            log.error("Caught an error in the task assignment. Returning an error assignment.", e);
            return new GroupAssignment(
                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
            );
        } catch (final TaskAssignmentException e) {
            log.error("Caught an error in the task assignment. Returning an error assignment.", e);
            return new GroupAssignment(
                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
            );
        }
    }

    /**
     * Verify the subscription versions are within the expected bounds and check for version probing.
     *
     * @return whether this was a version probing rebalance
     */
    private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
                                          final int minSupportedMetadataVersion,
                                          final int futureMetadataVersion) {
        final boolean versionProbing;

        if (futureMetadataVersion == UNKNOWN) {
            versionProbing = false;
        } else if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
            versionProbing = true;
            log.info("Received a future (version probing) subscription (version: {})."
                         + " Sending assignment back (with supported version {}).",
                futureMetadataVersion,
                minSupportedMetadataVersion);

        } else {
            throw new TaskAssignmentException(
                "Received a future (version probing) subscription (version: " + futureMetadataVersion
                    + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion
                    + ") at the same time."
            );
        }

        if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) {
            log.info("Downgrade metadata to version {}. Latest supported version is {}.",
                minReceivedMetadataVersion,
                LATEST_SUPPORTED_VERSION);
        }
        if (minSupportedMetadataVersion < LATEST_SUPPORTED_VERSION) {
            log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.",
                minSupportedMetadataVersion,
                LATEST_SUPPORTED_VERSION);
        }
        return versionProbing;
    }

    /**
     * Computes and assembles all repartition topic metadata then creates the topics if necessary. Also verifies
     * that all user input topics of each topology have been created ahead of time. If any such source topics are
     * missing from a NamedTopology, the assignor will skip distributing its tasks until they have been created
     * and invoke the exception handler (without killing the thread) once for each topology to alert the user of
     * the missing topics.
     * <p>
     * For regular applications without named topologies, the assignor will instead send a shutdown signal to
     * all clients so the user can identify and resolve the problem.
     *
     * @return application metadata such as partition info of repartition topics, missing external topics, etc
     */
    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) {
        final RepartitionTopics repartitionTopics = new RepartitionTopics(
            taskManager.topologyMetadata(),
            internalTopicManager,
            copartitionedTopicsEnforcer,
            metadata,
            logPrefix
        );
        repartitionTopics.setup();
        final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
        if (isMissingInputTopics) {
            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
                throw new MissingSourceTopicException("Missing source topics.");
            } else {
                nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
            }
        }
        return repartitionTopics;
    }


    /**
     * Populates the taskForPartition and tasksForTopicGroup maps, and checks that partitions are assigned to exactly
     * one task.
     *
     * @param taskForPartition a map from partition to the corresponding task. Populated here.
     * @param tasksForTopicGroup a map from the topicGroupId to the set of corresponding tasks. Populated here.
     * @param allSourceTopics a set of all source topics in the topology
     * @param partitionsForTask a map from task to the set of input partitions
     * @param fullMetadata the cluster metadata
     */
    private void populateTasksForMaps(final Map<TopicPartition, TaskId> taskForPartition,
                                      final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
                                      final Set<String> allSourceTopics,
                                      final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                      final Cluster fullMetadata) {
        // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
        final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
            final TaskId id = entry.getKey();
            final Set<TopicPartition> partitions = entry.getValue();

            for (final TopicPartition partition : partitions) {
                taskForPartition.put(partition, id);
                if (allAssignedPartitions.contains(partition)) {
                    log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
                }
            }
            allAssignedPartitions.addAll(partitions);

            tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.topologyName()), k -> new HashSet<>()).add(id);
        }

        checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
    }

    // Logs a warning if any partitions are not assigned to a task, or a task has no assigned partitions
    private void checkAllPartitions(final Set<String> allSourceTopics,
                                    final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                    final Set<TopicPartition> allAssignedPartitions,
                                    final Cluster fullMetadata) {
        for (final String topic : allSourceTopics) {
            final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
            if (partitionInfoList.isEmpty()) {
                log.warn("No partitions found for topic {}", topic);
            } else {
                for (final PartitionInfo partitionInfo : partitionInfoList) {
                    final TopicPartition partition = new TopicPartition(partitionInfo.topic(),
                        partitionInfo.partition());
                    if (!allAssignedPartitions.contains(partition)) {
                        log.warn("Partition {} is not assigned to any tasks: {}"
                                     + " Possible causes of a partition not getting assigned"
                                     + " is that another topic defined in the topology has not been"
                                     + " created when starting your streams application,"
                                     + " resulting in no tasks created for this topology at all.", partition,
                            partitionsForTask);
                    }
                }
            }
        }
    }

    /**
     * Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also
     * populate the stateful tasks that have been assigned to the clients
     * @return true if a probing rebalance should be triggered
     */
    private boolean assignTasksToClients(final Cluster fullMetadata,
                                         final Set<String> allSourceTopics,
                                         final Map<Subtopology, TopicsInfo> topicGroups,
                                         final Map<UUID, ClientMetadata> clientMetadataMap,
                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                         final Set<TaskId> statefulTasks) {
        if (!statefulTasks.isEmpty()) {
            throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
        }

        final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
        final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
        populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);

        final ChangelogTopics changelogTopics = new ChangelogTopics(
            internalTopicManager,
            topicGroups,
            tasksForTopicGroup,
            logPrefix
        );
        changelogTopics.setup();

        final Map<UUID, ClientState> clientStates = new HashMap<>();
        final boolean lagComputationSuccessful =
            populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics);

        log.info("All members participating in this rebalance: \n{}.",
                 clientStates.entrySet().stream()
                     .map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
                     .collect(Collectors.joining(Utils.NL)));

        final Set<TaskId> allTasks = partitionsForTask.keySet();
        statefulTasks.addAll(changelogTopics.statefulTaskIds());

        log.debug("Assigning tasks {} including stateful {} to clients {} with number of replicas {}",
            allTasks, statefulTasks, clientStates, numStandbyReplicas());

        final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);

        final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
                                                                   allTasks,
                                                                   statefulTasks,
                                                                   assignmentConfigs);

        log.info("Assigned tasks {} including stateful {} to clients as: \n{}.",
                allTasks, statefulTasks, clientStates.entrySet().stream()
                        .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment())
                        .collect(Collectors.joining(Utils.NL)));

        return probingRebalanceNeeded;
    }

    private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
        final TaskAssignor taskAssignor = taskAssignorSupplier.get();
        if (taskAssignor instanceof StickyTaskAssignor) {
            // special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
            // whether or not lag computation failed.
            return taskAssignor;
        } else if (lagComputationSuccessful) {
            return taskAssignor;
        } else {
            log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
                         + "trigger another rebalance to retry.");
            return new FallbackPriorTaskAssignor();
        }
    }

    /**
     * Builds a map from client to state, and readies each ClientState for assignment by adding any missing prev tasks
     * and computing the per-task overall lag based on the fetched end offsets for each changelog.
     *
     * @param clientStates a map from each client to its state, including offset lags. Populated by this method.
     * @param clientMetadataMap a map from each client to its full metadata
     * @param taskForPartition map from topic partition to its corresponding task
     * @param changelogTopics object that manages changelog topics
     *
     * @return whether we were able to successfully fetch the changelog end offsets and compute each client's lag
     */
    private boolean populateClientStatesMap(final Map<UUID, ClientState> clientStates,
                                            final Map<UUID, ClientMetadata> clientMetadataMap,
                                            final Map<TopicPartition, TaskId> taskForPartition,
                                            final ChangelogTopics changelogTopics) {
        boolean fetchEndOffsetsSuccessful;
        Map<TaskId, Long> allTaskEndOffsetSums;
        try {
            // Make the listOffsets request first so it can  fetch the offsets for non-source changelogs
            // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets
            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture =
                fetchEndOffsetsFuture(changelogTopics.preExistingNonSourceTopicBasedPartitions(), adminClient);

            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
                fetchCommittedOffsets(changelogTopics.preExistingSourceTopicBasedPartitions(), mainConsumerSupplier.get());

            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);

            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
                endOffsets,
                sourceChangelogEndOffsets,
                changelogTopics
            );
            fetchEndOffsetsSuccessful = true;
        } catch (final StreamsException | TimeoutException e) {
            allTaskEndOffsetSums = changelogTopics.statefulTaskIds().stream().collect(Collectors.toMap(t -> t, t -> UNKNOWN_OFFSET_SUM));
            fetchEndOffsetsSuccessful = false;
        }

        for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
            final UUID uuid = entry.getKey();
            final ClientState state = entry.getValue().state;
            state.initializePrevTasks(taskForPartition, taskManager.topologyMetadata().hasNamedTopologies());

            state.computeTaskLags(uuid, allTaskEndOffsetSums);
            clientStates.put(uuid, state);
        }

        return fetchEndOffsetsSuccessful;
    }

    /**
     * @param endOffsets the listOffsets result from the adminClient
     * @param sourceChangelogEndOffsets the end (committed) offsets of optimized source changelogs
     * @param changelogTopics object that manages changelog topics
     *
     * @return Map from stateful task to its total end offset summed across all changelog partitions
     */
    private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TopicPartition, ListOffsetsResultInfo> endOffsets,
                                                         final Map<TopicPartition, Long> sourceChangelogEndOffsets,
                                                         final ChangelogTopics changelogTopics) {

        final Map<TaskId, Long> taskEndOffsetSums = new HashMap<>();
        for (final TaskId taskId : changelogTopics.statefulTaskIds()) {
            taskEndOffsetSums.put(taskId, 0L);
            for (final TopicPartition changelogPartition : changelogTopics.preExistingPartitionsFor(taskId)) {
                final long changelogPartitionEndOffset;
                if (sourceChangelogEndOffsets.containsKey(changelogPartition)) {
                    changelogPartitionEndOffset = sourceChangelogEndOffsets.get(changelogPartition);
                } else if (endOffsets.containsKey(changelogPartition)) {
                    changelogPartitionEndOffset = endOffsets.get(changelogPartition).offset();
                } else {
                    log.debug("Fetched offsets did not contain the changelog {} of task {}", changelogPartition, taskId);
                    throw new IllegalStateException("Could not get end offset for " + changelogPartition);
                }
                final long newEndOffsetSum = taskEndOffsetSums.get(taskId) + changelogPartitionEndOffset;
                if (newEndOffsetSum < 0) {
                    taskEndOffsetSums.put(taskId, Long.MAX_VALUE);
                    break;
                } else {
                    taskEndOffsetSums.put(taskId, newEndOffsetSum);
                }
            }
        }
        return taskEndOffsetSums;
    }

    /**
     * Populates the global partitionsByHost and standbyPartitionsByHost maps that are sent to each member
     *
     * @param partitionsByHost a map from host to the set of partitions hosted there. Populated here.
     * @param standbyPartitionsByHost a map from host to the set of standby partitions hosted there. Populated here.
     * @param partitionsForTask a map from task to its set of assigned partitions
     * @param clientMetadataMap a map from client to its metadata and state
     */
    private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
                                              final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
                                              final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                              final Map<UUID, ClientMetadata> clientMetadataMap) {
        for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
            final HostInfo hostInfo = entry.getValue().hostInfo;

            // if application server is configured, also include host state map
            if (hostInfo != null) {
                final Set<TopicPartition> topicPartitions = new HashSet<>();
                final Set<TopicPartition> standbyPartitions = new HashSet<>();
                final ClientState state = entry.getValue().state;

                for (final TaskId id : state.activeTasks()) {
                    topicPartitions.addAll(partitionsForTask.get(id));
                }

                for (final TaskId id : state.standbyTasks()) {
                    standbyPartitions.addAll(partitionsForTask.get(id));
                }

                partitionsByHost.put(hostInfo, topicPartitions);
                standbyPartitionsByHost.put(hostInfo, standbyPartitions);
            }
        }
    }

    /**
     * Computes the assignment of tasks to threads within each client and assembles the final assignment to send out.
     *
     * @return the final assignment for each StreamThread consumer
     */
    private Map<String, Assignment> computeNewAssignment(final Set<TaskId> statefulTasks,
                                                         final Map<UUID, ClientMetadata> clientsMetadata,
                                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
                                                         final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
                                                         final Set<TopicPartition> allOwnedPartitions,
                                                         final int minUserMetadataVersion,
                                                         final int minSupportedMetadataVersion,
                                                         final boolean versionProbing,
                                                         final boolean shouldTriggerProbingRebalance) {
        boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing;
        final Map<String, Assignment> assignment = new HashMap<>();

        // within the client, distribute tasks to its owned consumers
        for (final Map.Entry<UUID, ClientMetadata> clientEntry : clientsMetadata.entrySet()) {
            final UUID clientId = clientEntry.getKey();
            final ClientMetadata clientMetadata = clientEntry.getValue();
            final ClientState state = clientMetadata.state;
            final SortedSet<String> consumers = clientMetadata.consumers;
            final Map<String, Integer> threadTaskCounts = new HashMap<>();

            final Map<String, List<TaskId>> activeTaskStatefulAssignment = assignTasksToThreads(
                state.statefulActiveTasks(),
                true,
                consumers,
                state,
                threadTaskCounts
            );

            final Map<String, List<TaskId>> standbyTaskAssignment = assignTasksToThreads(
                state.standbyTasks(),
                true,
                consumers,
                state,
                threadTaskCounts
            );

            final Map<String, List<TaskId>> activeTaskStatelessAssignment = assignTasksToThreads(
                state.statelessActiveTasks(),
                false,
                consumers,
                state,
                threadTaskCounts
            );

            // Combine activeTaskStatefulAssignment and activeTaskStatelessAssignment together into
            // activeTaskStatelessAssignment
            final Map<String, List<TaskId>> activeTaskAssignment = activeTaskStatefulAssignment;
            for (final Map.Entry<String, List<TaskId>> threadEntry : activeTaskStatelessAssignment.entrySet()) {
                activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue());
            }

            // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
            // note once we pick the first consumer within the process to trigger probing rebalance, other consumer
            // would not set to trigger any more.
            final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());

            final boolean tasksRevoked = addClientAssignments(
                statefulTasks,
                assignment,
                clientMetadata,
                partitionsForTask,
                partitionsByHostState,
                standbyPartitionsByHost,
                allOwnedPartitions,
                activeTaskAssignment,
                standbyTaskAssignment,
                minUserMetadataVersion,
                minSupportedMetadataVersion,
                encodeNextProbingRebalanceTime
            );

            if (tasksRevoked || encodeNextProbingRebalanceTime) {
                rebalanceRequired = true;
                log.debug("Requested client {} to schedule a followup rebalance", clientId);
            }

            log.info("Client {} per-consumer assignment:\n" +
                "\tprev owned active {}\n" +
                "\tprev owned standby {}\n" +
                "\tassigned active {}\n" +
                "\trevoking active {}\n" +
                "\tassigned standby {}\n",
                clientId,
                clientMetadata.state.prevOwnedActiveTasksByConsumer(),
                clientMetadata.state.prevOwnedStandbyByConsumer(),
                clientMetadata.state.assignedActiveTasksByConsumer(),
                clientMetadata.state.revokingActiveTasksByConsumer(),
                clientMetadata.state.assignedStandbyTasksByConsumer());
        }

        if (rebalanceRequired) {
            assignmentListener.onAssignmentComplete(false);
            log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
        } else {
            assignmentListener.onAssignmentComplete(true);
            log.info("Finished stable assignment of tasks, no followup rebalances required.");
        }

        return assignment;
    }

    /**
     * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
     * @return true if a followup rebalance will be required due to revoked tasks
     */
    private boolean addClientAssignments(final Set<TaskId> statefulTasks,
                                         final Map<String, Assignment> assignment,
                                         final ClientMetadata clientMetadata,
                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
                                         final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
                                         final Set<TopicPartition> allOwnedPartitions,
                                         final Map<String, List<TaskId>> activeTaskAssignments,
                                         final Map<String, List<TaskId>> standbyTaskAssignments,
                                         final int minUserMetadataVersion,
                                         final int minSupportedMetadataVersion,
                                         final boolean probingRebalanceNeeded) {
        boolean followupRebalanceRequiredForRevokedTasks = false;

        // We only want to encode a scheduled probing rebalance for a single member in this client
        boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;

        // Loop through the consumers and build their assignment
        for (final String consumer : clientMetadata.consumers) {
            final List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer);

            // These will be filled in by populateActiveTaskAndPartitionsLists below
            final List<TopicPartition> activePartitionsList = new ArrayList<>();
            final List<TaskId> assignedActiveList = new ArrayList<>();

            final Set<TaskId> activeTasksRemovedPendingRevokation = populateActiveTaskAndPartitionsLists(
                activePartitionsList,
                assignedActiveList,
                consumer,
                clientMetadata.state,
                activeTasksForConsumer,
                partitionsForTask,
                allOwnedPartitions
            );

            final Map<TaskId, Set<TopicPartition>> standbyTaskMap = buildStandbyTaskMap(
                    consumer,
                    standbyTaskAssignments.get(consumer),
                    activeTasksRemovedPendingRevokation,
                    statefulTasks,
                    partitionsForTask,
                    clientMetadata.state
                );

            final AssignmentInfo info = new AssignmentInfo(
                minUserMetadataVersion,
                minSupportedMetadataVersion,
                assignedActiveList,
                standbyTaskMap,
                partitionsByHostState,
                standbyPartitionsByHost,
                AssignorError.NONE.code()
            );

            if (!activeTasksRemovedPendingRevokation.isEmpty()) {
                // TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger this rebalance
                log.info("Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership.", consumer);
                info.setNextRebalanceTime(0L);
                followupRebalanceRequiredForRevokedTasks = true;
                // Don't bother to schedule a probing rebalance if an immediate one is already scheduled
                shouldEncodeProbingRebalance = false;
            } else if (shouldEncodeProbingRebalance) {
                final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
                log.info("Requesting followup rebalance be scheduled by {} for {} ms to probe for caught-up replica tasks.",
                        consumer, nextRebalanceTimeMs);
                info.setNextRebalanceTime(nextRebalanceTimeMs);
                shouldEncodeProbingRebalance = false;
            }

            assignment.put(
                consumer,
                new Assignment(
                    activePartitionsList,
                    info.encode()
                )
            );
        }
        return followupRebalanceRequiredForRevokedTasks;
    }

    /**
     * Populates the lists of active tasks and active task partitions for the consumer with a 1:1 mapping between them
     * such that the nth task corresponds to the nth partition in the list. This means tasks with multiple partitions
     * will be repeated in the list.
     */
    private Set<TaskId> populateActiveTaskAndPartitionsLists(final List<TopicPartition> activePartitionsList,
                                                             final List<TaskId> assignedActiveList,
                                                             final String consumer,
                                                             final ClientState clientState,
                                                             final List<TaskId> activeTasksForConsumer,
                                                             final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                             final Set<TopicPartition> allOwnedPartitions) {
        final List<AssignedPartition> assignedPartitions = new ArrayList<>();
        final Set<TaskId> removedActiveTasks = new TreeSet<>();

        for (final TaskId taskId : activeTasksForConsumer) {
            // Populate the consumer for assigned tasks without considering revocation,
            // this is for debugging purposes only
            clientState.assignActiveToConsumer(taskId, consumer);

            final List<AssignedPartition> assignedPartitionsForTask = new ArrayList<>();
            for (final TopicPartition partition : partitionsForTask.get(taskId)) {
                final String oldOwner = clientState.previousOwnerForPartition(partition);
                final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);

                // If the partition is new to this consumer but is still owned by another, remove from the assignment
                // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
                if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
                    log.info(
                        "Removing task {} from {} active assignment until it is safely revoked in followup rebalance",
                        taskId,
                        consumer
                    );
                    removedActiveTasks.add(taskId);

                    clientState.revokeActiveFromConsumer(taskId, consumer);

                    // Clear the assigned partitions list for this task if any partition can not safely be assigned,
                    // so as not to encode a partial task
                    assignedPartitionsForTask.clear();

                    // This has no effect on the assignment, as we'll never consult the ClientState again, but
                    // it does perform a useful assertion that the task was actually assigned.
                    clientState.unassignActive(taskId);
                    break;
                } else {
                    assignedPartitionsForTask.add(new AssignedPartition(taskId, partition));
                }
            }
            // assignedPartitionsForTask will either contain all partitions for the task or be empty, so just add all
            assignedPartitions.addAll(assignedPartitionsForTask);
        }

        // Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping
        Collections.sort(assignedPartitions);
        for (final AssignedPartition partition : assignedPartitions) {
            assignedActiveList.add(partition.taskId);
            activePartitionsList.add(partition.partition);
        }
        return removedActiveTasks;
    }

    /**
     * @return map from task id to its assigned partitions for all standby tasks
     */
    private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String consumer,
                                                                 final Iterable<TaskId> standbyTasks,
                                                                 final Iterable<TaskId> revokedTasks,
                                                                 final Set<TaskId> allStatefulTasks,
                                                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                                                 final ClientState clientState) {
        final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();

        for (final TaskId task : standbyTasks) {
            clientState.assignStandbyToConsumer(task, consumer);
            standbyTaskMap.put(task, partitionsForTask.get(task));
        }

        for (final TaskId task : revokedTasks) {
            if (allStatefulTasks.contains(task)) {
                log.info("Adding removed stateful active task {} as a standby for {} before it is revoked in followup rebalance",
                        task, consumer);

                // This has no effect on the assignment, as we'll never consult the ClientState again, but
                // it does perform a useful assertion that the it's legal to assign this task as a standby to this instance
                clientState.assignStandbyToConsumer(task, consumer);
                clientState.assignStandby(task);

                standbyTaskMap.put(task, partitionsForTask.get(task));
            }
        }
        return standbyTaskMap;
    }

    /**
     * Generate an assignment that tries to preserve thread-level stickiness for stateful tasks without violating
     * balance. The tasks are balanced across threads. Stateful tasks without previous owners will be interleaved by
     * group id to spread subtopologies across threads and further balance the workload.
     * Stateless tasks are simply spread across threads without taking into account previous ownership.
     * threadLoad is a map that keeps track of task load per thread across multiple calls so active and standby
     * tasks are evenly distributed
     */
    static Map<String, List<TaskId>> assignTasksToThreads(final Collection<TaskId> tasksToAssign,
                                                          final boolean isStateful,
                                                          final SortedSet<String> consumers,
                                                          final ClientState state,
                                                          final Map<String, Integer> threadLoad) {
        final Map<String, List<TaskId>> assignment = new HashMap<>();
        for (final String consumer : consumers) {
            assignment.put(consumer, new ArrayList<>());
        }

        final int totalTasks = threadLoad.values().stream().reduce(tasksToAssign.size(), Integer::sum);

        final int minTasksPerThread = (int) Math.floor(((double) totalTasks) / consumers.size());
        final PriorityQueue<TaskId> unassignedTasks = new PriorityQueue<>(tasksToAssign);

        final Queue<String> consumersToFill = new LinkedList<>();
        // keep track of tasks that we have to skip during the first pass in case we can reassign them later
        // using tree-map to make sure the iteration ordering over keys are preserved
        final Map<TaskId, String> unassignedTaskToPreviousOwner = new TreeMap<>();

        if (!unassignedTasks.isEmpty()) {
            // First assign tasks to previous owner, up to the min expected tasks/thread if these are stateful
            for (final String consumer : consumers) {
                final List<TaskId> threadAssignment = assignment.get(consumer);
                // The number of tasks we have to assign here to hit minTasksPerThread
                final int tasksTargetCount = minTasksPerThread - threadLoad.getOrDefault(consumer, 0);

                if (isStateful) {
                    for (final TaskId task : state.prevTasksByLag(consumer)) {
                        if (unassignedTasks.contains(task)) {
                            if (threadAssignment.size() < tasksTargetCount) {
                                threadAssignment.add(task);
                                unassignedTasks.remove(task);
                            } else {
                                unassignedTaskToPreviousOwner.put(task, consumer);
                            }
                        }
                    }
                }

                if (threadAssignment.size() < tasksTargetCount) {
                    consumersToFill.offer(consumer);
                }
            }

            // Next interleave remaining unassigned tasks amongst unfilled consumers
            while (!consumersToFill.isEmpty()) {
                final TaskId task = unassignedTasks.poll();
                if (task != null) {
                    final String consumer = consumersToFill.poll();
                    final List<TaskId> threadAssignment = assignment.get(consumer);
                    threadAssignment.add(task);
                    final int threadTaskCount = threadAssignment.size() + threadLoad.getOrDefault(consumer, 0);
                    if (threadTaskCount < minTasksPerThread) {
                        consumersToFill.offer(consumer);
                    }
                } else {
                    throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                }
            }

            // At this point all consumers are at the min or min + 1 capacity.
            // The min + 1 case can occur for standbys where there's fewer standbys than consumers and after assigning
            // the active tasks some consumers already have min + 1 one tasks assigned.
            // The tasks still remaining should now be distributed over the consumers that are still at min capacity
            if (!unassignedTasks.isEmpty()) {
                for (final String consumer : consumers) {
                    final int taskCount = assignment.get(consumer).size() + threadLoad.getOrDefault(consumer, 0);
                    if (taskCount == minTasksPerThread) {
                        consumersToFill.add(consumer);
                    }
                }

                // Go over the tasks we skipped earlier and assign them to their previous owner when possible
                for (final Map.Entry<TaskId, String> taskEntry : unassignedTaskToPreviousOwner.entrySet()) {
                    final TaskId task = taskEntry.getKey();
                    final String consumer = taskEntry.getValue();
                    if (consumersToFill.contains(consumer) && unassignedTasks.contains(task)) {
                        assignment.get(consumer).add(task);
                        unassignedTasks.remove(task);
                        // Remove this consumer since we know it is now at minCapacity + 1
                        consumersToFill.remove(consumer);
                    }
                }

                // Now just distribute the remaining unassigned stateful tasks over the consumers still at min capacity
                for (final TaskId task : unassignedTasks) {
                    final String consumer = consumersToFill.poll();
                    final List<TaskId> threadAssignment = assignment.get(consumer);
                    threadAssignment.add(task);
                }
            }
        }
        // Update threadLoad
        for (final Map.Entry<String, List<TaskId>> taskEntry : assignment.entrySet()) {
            final String consumer = taskEntry.getKey();
            final int totalCount = threadLoad.getOrDefault(consumer, 0) + taskEntry.getValue().size();
            threadLoad.put(consumer, totalCount);
        }

        return assignment;
    }

    private void validateMetadataVersions(final int receivedAssignmentMetadataVersion,
                                          final int latestCommonlySupportedVersion) {

        if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
            log.error("Leader sent back an assignment with version {} which was greater than our used version {}",
                receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion);
            throw new TaskAssignmentException(
                "Sent a version " + usedSubscriptionMetadataVersion
                    + " subscription but got an assignment with higher version "
                    + receivedAssignmentMetadataVersion + "."
            );
        }

        if (latestCommonlySupportedVersion > LATEST_SUPPORTED_VERSION) {
            log.error("Leader sent back assignment with commonly supported version {} that is greater than our "
                + "actual latest supported version {}", latestCommonlySupportedVersion, LATEST_SUPPORTED_VERSION);
            throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
        }
    }

    // Returns true if subscription version was changed, indicating version probing and need to rebalance again
    protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion,
                                                     final int latestCommonlySupportedVersion) {
        if (receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
            // If the latest commonly supported version is now greater than our used version, this indicates we have just
            // completed the rolling upgrade and can now update our subscription version for the final rebalance
            if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) {
                log.info(
                    "Sent a version {} subscription and group's latest commonly supported version is {} (successful "
                        +
                        "version probing and end of rolling upgrade). Upgrading subscription metadata version to " +
                        "{} for next rebalance.",
                    usedSubscriptionMetadataVersion,
                    latestCommonlySupportedVersion,
                    latestCommonlySupportedVersion
                );
                usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }

            // If we received a lower version than we sent, someone else in the group still hasn't upgraded. We
            // should downgrade our subscription until everyone is on the latest version
            if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) {
                log.info(
                    "Sent a version {} subscription and got version {} assignment back (successful version probing). "
                        +
                        "Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
                    usedSubscriptionMetadataVersion,
                    receivedAssignmentMetadataVersion,
                    latestCommonlySupportedVersion
                );
                usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }
        } else {
            log.debug("Received an assignment version {} that is less than the earliest version that allows version " +
                "probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.",
                receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION);
        }

        return false;
    }

    @Override
    public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
        final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
        partitions.sort(PARTITION_COMPARATOR);

        final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        if (info.errCode() != AssignorError.NONE.code()) {
            // set flag to shutdown streams app
            assignmentErrorCode.set(info.errCode());
            return;
        }
        /*
         * latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
         * receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
         *
         * usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling
         * bounce upgrade with version probing.
         *
         * usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have
         * been bounced and it is safe to use the latest version.
         */
        final int receivedAssignmentMetadataVersion = info.version();
        final int latestCommonlySupportedVersion = info.commonlySupportedVersion();

        validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);

        // version 1 field
        final Map<TaskId, Set<TopicPartition>> activeTasks;
        // version 2 fields
        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo;
        final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
        final long encodedNextScheduledRebalanceMs;

        switch (receivedAssignmentMetadataVersion) {
            case 1:
                validateActiveTaskEncoding(partitions, info, logPrefix);

                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = Collections.emptyMap();
                standbyPartitionsByHost = Collections.emptyMap();
                topicToPartitionInfo = Collections.emptyMap();
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            case 2:
            case 3:
            case 4:
            case 5:
                validateActiveTaskEncoding(partitions, info, logPrefix);

                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = Collections.emptyMap();
                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            case 6:
                validateActiveTaskEncoding(partitions, info, logPrefix);

                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                break;
            case 7:
            case 8:
            case 9:
            case 10:
            case 11:
                validateActiveTaskEncoding(partitions, info, logPrefix);

                activeTasks = getActiveTasks(partitions, info);
                partitionsByHost = info.partitionsByHost();
                standbyPartitionsByHost = info.standbyPartitionByHost();
                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                break;
            default:
                throw new IllegalStateException(
                    "This code should never be reached."
                        + " Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/"
                );
        }

        maybeScheduleFollowupRebalance(
            encodedNextScheduledRebalanceMs,
            receivedAssignmentMetadataVersion,
            latestCommonlySupportedVersion,
            partitionsByHost.keySet()
        );

        final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);

        // we do not capture any exceptions but just let the exception thrown from consumer.poll directly
        // since when stream thread captures it, either we close all tasks as dirty or we close thread
        taskManager.handleAssignment(activeTasks, info.standbyTasks());
    }

    private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebalanceMs,
                                                final int receivedAssignmentMetadataVersion,
                                                final int latestCommonlySupportedVersion,
                                                final Set<HostInfo> groupHostInfo) {
        if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
            log.info("Requested to schedule immediate rebalance due to version probing.");
            nextScheduledRebalanceMs.set(0L);
        } else if (!verifyHostInfo(groupHostInfo)) {
            log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", userEndPoint);
            nextScheduledRebalanceMs.set(0L);
        } else if (encodedNextScheduledRebalanceMs == 0L) {
            log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
            nextScheduledRebalanceMs.set(0L);
        } else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
            log.info(
                "Requested to schedule next probing rebalance at {} to try for a more balanced assignment.",
                Instant.ofEpochMilli(encodedNextScheduledRebalanceMs) // The Instant#toString format is more readable.
            );
            nextScheduledRebalanceMs.set(encodedNextScheduledRebalanceMs);
        } else {
            log.info("No followup rebalance was requested, resetting the rebalance schedule.");
            nextScheduledRebalanceMs.set(Long.MAX_VALUE);
        }
    }

    /**
     * Verify that this client's host info was included in the map returned in the assignment, and trigger a
     * rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
     * back its original assignment to avoid an unnecessary rebalance. If the client's endpoint has changed, we need
     * to force a rebalance for the other members in the group to get the updated host info for this client.
     *
     * @param groupHostInfo the HostInfo of all clients in the group
     * @return false if the current host info does not match that in the group assignment
     */
    private boolean verifyHostInfo(final Set<HostInfo> groupHostInfo) {
        if (userEndPoint != null && !groupHostInfo.isEmpty()) {
            final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);

            return groupHostInfo.contains(myHostInfo);
        } else {
            return true;
        }
    }

    // protected for upgrade test
    protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(final List<TopicPartition> partitions, final AssignmentInfo info) {
        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
        for (int i = 0; i < partitions.size(); i++) {
            final TopicPartition partition = partitions.get(i);
            final TaskId id = info.activeTasks().get(i);
            activeTasks.computeIfAbsent(id, k1 -> new HashSet<>()).add(partition);
        }
        return activeTasks;
    }

    static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
        for (final Set<TopicPartition> value : partitionsByHost.values()) {
            for (final TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(
                    topicPartition,
                    new PartitionInfo(
                        topicPartition.topic(),
                        topicPartition.partition(),
                        null,
                        new Node[0],
                        new Node[0]
                    )
                );
            }
        }
        return topicToPartitionInfo;
    }

    private static void validateActiveTaskEncoding(final List<TopicPartition> partitions, final AssignmentInfo info, final String logPrefix) {
        // the number of assigned partitions should be the same as number of active tasks, which
        // could be duplicated if one task has more than one assigned partitions
        if (partitions.size() != info.activeTasks().size()) {
            throw new TaskAssignmentException(
                String.format(
                    "%sNumber of assigned partitions %d is not equal to "
                        + "the number of active taskIds %d, assignmentInfo=%s",
                    logPrefix, partitions.size(),
                    info.activeTasks().size(), info.toString()
                )
            );
        }
    }

    private int updateMinReceivedVersion(final int usedVersion, final int minReceivedMetadataVersion) {
        return Math.min(usedVersion, minReceivedMetadataVersion);
    }

    private int updateMinSupportedVersion(final int supportedVersion, final int minSupportedMetadataVersion) {
        if (supportedVersion < minSupportedMetadataVersion) {
            log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
                minSupportedMetadataVersion, supportedVersion);
            return supportedVersion;
        } else {
            log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
                minSupportedMetadataVersion, supportedVersion);
            return minSupportedMetadataVersion;
        }
    }

    // following functions are for test only
    void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    RebalanceProtocol rebalanceProtocol() {
        return rebalanceProtocol;
    }

    protected String userEndPoint() {
        return userEndPoint;
    }

    protected TaskManager taskManager() {
        return taskManager;
    }

    protected byte uniqueField() {
        return uniqueField;
    }

    protected Map<String, String> clientTags() {
        return clientTags;
    }

    protected void handleRebalanceStart(final Set<String> topics) {
        taskManager.handleRebalanceStart(topics);
    }

    long acceptableRecoveryLag() {
        return assignmentConfigs.acceptableRecoveryLag;
    }

    int maxWarmupReplicas() {
        return assignmentConfigs.maxWarmupReplicas;
    }

    int numStandbyReplicas() {
        return assignmentConfigs.numStandbyReplicas;
    }

    long probingRebalanceIntervalMs() {
        return assignmentConfigs.probingRebalanceIntervalMs;
    }

}

相关信息

kafka 源码目录

相关文章

kafka AbstractProcessorContext 源码

kafka AbstractReadOnlyDecorator 源码

kafka AbstractReadWriteDecorator 源码

kafka AbstractTask 源码

kafka ActiveTaskCreator 源码

kafka ChangelogReader 源码

kafka ChangelogRecordDeserializationHelper 源码

kafka ChangelogRegister 源码

kafka ChangelogTopics 源码

kafka ClientUtils 源码

0  赞