kafka StreamsMetadataState 源码

  • 2022-10-20
  • 浏览 (216)

kafka StreamsMetadataState 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Comparator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;

import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
 * Provides access to the {@link StreamsMetadata} in a KafkaStreams application. This can be used
 * to discover the locations of {@link org.apache.kafka.streams.processor.StateStore}s
 * in a KafkaStreams application
 */
public class StreamsMetadataState {
    private final Logger log;
    public static final HostInfo UNKNOWN_HOST = HostInfo.unavailable();
    private final TopologyMetadata topologyMetadata;
    private final Set<String> globalStores;
    private final HostInfo thisHost;
    private List<StreamsMetadata> allMetadata = Collections.emptyList();
    private Cluster clusterMetadata;
    private final AtomicReference<StreamsMetadata> localMetadata = new AtomicReference<>(null);

    public StreamsMetadataState(final TopologyMetadata topologyMetadata,
                                final HostInfo thisHost,
                                final LogContext logContext) {
        this.topologyMetadata = topologyMetadata;
        this.globalStores = this.topologyMetadata.globalStateStores().keySet();
        this.thisHost = thisHost;
        this.log = logContext.logger(getClass());
    }

    @Override
    public String toString() {
        return toString("");
    }

    public String toString(final String indent) {
        final StringBuilder builder = new StringBuilder();
        builder.append(indent).append("GlobalMetadata: ").append(allMetadata).append("\n");
        builder.append(indent).append("GlobalStores: ").append(globalStores).append("\n");
        builder.append(indent).append("My HostInfo: ").append(thisHost).append("\n");
        builder.append(indent).append(clusterMetadata).append("\n");

        return builder.toString();
    }

    /**
     * Get the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams application}
     *
     * @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application
     */
    public StreamsMetadata getLocalMetadata() {
        return localMetadata.get();
    }

    /**
     * Find all of the {@link StreamsMetadata}s in a
     * {@link KafkaStreams application}
     *
     * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
     */
    public Collection<StreamsMetadata> getAllMetadata() {
        return Collections.unmodifiableList(allMetadata);
    }

    /**
     * Find all of the {@link StreamsMetadata}s for a given storeName
     *
     * @param storeName the storeName to find metadata for
     * @return A collection of {@link StreamsMetadata} that have the provided storeName
     */
    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
        Objects.requireNonNull(storeName, "storeName cannot be null");
        if (topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getAllMetadataForStore(storeName) method when"
                                                   + "using named topologies, please use the overload that accepts"
                                                   + "a topologyName parameter to identify the correct store");
        }

        if (!isInitialized()) {
            return Collections.emptyList();
        }

        if (globalStores.contains(storeName)) {
            return allMetadata;
        }

        final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, null);
        if (sourceTopics.isEmpty()) {
            return Collections.emptyList();
        }

        final ArrayList<StreamsMetadata> results = new ArrayList<>();
        for (final StreamsMetadata metadata : allMetadata) {
            if (metadata.stateStoreNames().contains(storeName) || metadata.standbyStateStoreNames().contains(storeName)) {
                results.add(metadata);
            }
        }
        return results;
    }

    /**
     * Find all of the {@link StreamsMetadata}s for a given storeName in the given topology
     *
     * @param storeName the storeName to find metadata for
     * @param topologyName the storeName to find metadata for
     * @return A collection of {@link StreamsMetadata} that have the provided storeName
     */
    public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName, final String topologyName) {
        Objects.requireNonNull(storeName, "storeName cannot be null");
        Objects.requireNonNull(topologyName, "topologyName cannot be null");

        if (!isInitialized()) {
            return Collections.emptyList();
        }

        final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, topologyName);
        if (sourceTopics.isEmpty()) {
            return Collections.emptyList();
        }

        final ArrayList<StreamsMetadata> results = new ArrayList<>();
        for (final StreamsMetadata metadata : allMetadata) {
            final String metadataTopologyName = ((StreamsMetadataImpl) metadata).topologyName();
            if (metadataTopologyName != null && metadataTopologyName.equals(topologyName)
                && metadata.stateStoreNames().contains(storeName) || metadata.standbyStateStoreNames().contains(storeName)) {
                results.add(metadata);
            }
        }
        return results;
    }

    public synchronized Collection<StreamsMetadata> getAllMetadataForTopology(final String topologyName) {
        Objects.requireNonNull(topologyName, "topologyName cannot be null");

        if (!isInitialized()) {
            return Collections.emptyList();
        }

        final ArrayList<StreamsMetadata> results = new ArrayList<>();
        for (final StreamsMetadata metadata : allMetadata) {
            final String metadataTopologyName = ((StreamsMetadataImpl) metadata).topologyName();
            if (metadataTopologyName != null && metadataTopologyName.equals(topologyName)) {
                results.add(metadata);
            }
        }
        return results;
    }

    /**
     * Find the {@link KeyQueryMetadata}s for a given storeName and key. This method will use the
     * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
     * please use {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead.
     *
     * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
     * this method provides a way of finding which {@link KeyQueryMetadata} it would exist on.
     *
     * @param storeName     Name of the store
     * @param key           Key to use
     * @param keySerializer Serializer for the key
     * @param <K>           key type
     * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE}
     * if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found,
     * or null if no matching metadata could be found.
     */
    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                                       final K key,
                                                                       final Serializer<K> keySerializer) {
        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
        if (topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)"
                                                   + "method when using named topologies, please use the overload that"
                                                   + "accepts a topologyName parameter to identify the correct store");
        }
        return getKeyQueryMetadataForKey(storeName,
                                         key,
                                         new DefaultStreamPartitioner<>(keySerializer));
    }

    /**
     * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, Serializer)}
     */
    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                                       final K key,
                                                                       final Serializer<K> keySerializer,
                                                                       final String topologyName) {
        Objects.requireNonNull(keySerializer, "keySerializer can't be null");
        return getKeyQueryMetadataForKey(storeName,
                                         key,
                                         new DefaultStreamPartitioner<>(keySerializer),
                                         topologyName);
    }

    /**
     * Find the {@link KeyQueryMetadata}s for a given storeName and key
     *
     * Note: the key may not exist in the {@link StateStore},this method provides a way of finding which
     * {@link StreamsMetadata} it would exist on.
     *
     * @param storeName   Name of the store
     * @param key         Key to use
     * @param partitioner partitioner to use to find correct partition for key
     * @param <K>         key type
     * @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE}
     * if streams is (re-)initializing, or {@code null} if no matching metadata could be found.
     */
    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                                       final K key,
                                                                       final StreamPartitioner<? super K, ?> partitioner) {
        Objects.requireNonNull(storeName, "storeName can't be null");
        Objects.requireNonNull(key, "key can't be null");
        Objects.requireNonNull(partitioner, "partitioner can't be null");
        if (topologyMetadata.hasNamedTopologies()) {
            throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, partitioner)"
                                                   + "method when using named topologies, please use the overload that"
                                                   + "accepts a topologyName parameter to identify the correct store");
        }

        if (!isInitialized()) {
            return KeyQueryMetadata.NOT_AVAILABLE;
        }

        if (globalStores.contains(storeName)) {
            // global stores are on every node. if we don't have the host info
            // for this host then just pick the first metadata
            if (thisHost.equals(UNKNOWN_HOST)) {
                return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
            }
            return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1);
        }

        final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
        if (sourceTopicsInfo == null) {
            return null;
        }
        return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
    }

    /**
     * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)}
     */
    public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                                       final K key,
                                                                       final StreamPartitioner<? super K, ?> partitioner,
                                                                       final String topologyName) {
        Objects.requireNonNull(storeName, "storeName can't be null");
        Objects.requireNonNull(key, "key can't be null");
        Objects.requireNonNull(partitioner, "partitioner can't be null");
        Objects.requireNonNull(topologyName, "topologyName can't be null");


        if (!isInitialized()) {
            return KeyQueryMetadata.NOT_AVAILABLE;
        }

        final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName, topologyName);
        if (sourceTopicsInfo == null) {
            return null;
        }
        return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
    }

    /**
     * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the
     * metadata
     *
     * @param activePartitionHostMap  the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
     * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
     * @param clusterMetadata         the current clusterMetadata {@link Cluster}
     */
    synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                               final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
                               final Cluster clusterMetadata) {
        this.clusterMetadata = clusterMetadata;
        rebuildMetadata(activePartitionHostMap, standbyPartitionHostMap);
    }

    private boolean hasPartitionsForAnyTopics(final List<String> topicNames, final Set<TopicPartition> partitionForHost) {
        for (final TopicPartition topicPartition : partitionForHost) {
            if (topicNames.contains(topicPartition.topic())) {
                return true;
            }
        }
        return false;
    }

    private Set<String> getStoresOnHost(final Map<String, List<String>> storeToSourceTopics,
                                        final Set<TopicPartition> sourceTopicPartitions) {
        final Set<String> storesOnHost = new HashSet<>();
        for (final Map.Entry<String, List<String>> storeTopicEntry : storeToSourceTopics.entrySet()) {
            final List<String> topicsForStore = storeTopicEntry.getValue();
            if (hasPartitionsForAnyTopics(topicsForStore, sourceTopicPartitions)) {
                storesOnHost.add(storeTopicEntry.getKey());
            }
        }
        return storesOnHost;
    }

    private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                 final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
            allMetadata = Collections.emptyList();
            localMetadata.set(new StreamsMetadataImpl(
                thisHost,
                Collections.emptySet(),
                Collections.emptySet(),
                Collections.emptySet(),
                Collections.emptySet()
            ));
            return;
        }

        allMetadata = topologyMetadata.hasNamedTopologies() ?
            rebuildMetadataForNamedTopologies(activePartitionHostMap, standbyPartitionHostMap) :
            rebuildMetadataForSingleTopology(activePartitionHostMap, standbyPartitionHostMap);
    }

    private List<StreamsMetadata> rebuildMetadataForNamedTopologies(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                                                    final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
        Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream())
            .distinct()
            .sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port))
            .forEach(hostInfo -> {
                for (final String topologyName : topologyMetadata.namedTopologiesView()) {
                    final Map<String, List<String>> storeToSourceTopics =
                        topologyMetadata.stateStoreNameToSourceTopicsForTopology(topologyName);

                    final Set<TopicPartition> activePartitionsOnHost = new HashSet<>();
                    final Set<String> activeStoresOnHost = new HashSet<>();
                    if (activePartitionHostMap.containsKey(hostInfo)) {
                        // filter out partitions for topics that are not connected to this topology
                        activePartitionsOnHost.addAll(
                            activePartitionHostMap.get(hostInfo).stream()
                                .filter(tp -> topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic()))
                                .collect(Collectors.toSet())
                        );
                        activeStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, activePartitionsOnHost));
                    }

                    // TODO KAFKA-13281: when we add support for global stores with named topologies we will
                    //  need to add the global stores to the activeStoresOnHost set

                    final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>();
                    final Set<String> standbyStoresOnHost = new HashSet<>();
                    if (standbyPartitionHostMap.containsKey(hostInfo)) {
                        standbyPartitionsOnHost.addAll(
                            standbyPartitionHostMap.get(hostInfo).stream()
                                .filter(tp -> topologyMetadata.fullSourceTopicNamesForTopology(topologyName).contains(tp.topic()))
                                .collect(Collectors.toSet()));
                        standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
                    }

                    final StreamsMetadata metadata = new StreamsMetadataImpl(
                        hostInfo,
                        activeStoresOnHost,
                        activePartitionsOnHost,
                        standbyStoresOnHost,
                        standbyPartitionsOnHost,
                        topologyName
                    );

                    rebuiltMetadata.add(metadata);
                    if (hostInfo.equals(thisHost)) {
                        localMetadata.set(metadata);
                    }
                }

                // Construct metadata across all topologies on this host for the `localMetadata` field
                final Map<String, List<String>> storeToSourceTopics = topologyMetadata.stateStoreNameToSourceTopics();
                final Set<TopicPartition> localActivePartitions = activePartitionHostMap.get(thisHost);
                final Set<TopicPartition> localStandbyPartitions = standbyPartitionHostMap.get(thisHost);

                localMetadata.set(
                    new StreamsMetadataImpl(thisHost,
                                            getStoresOnHost(storeToSourceTopics, localActivePartitions),
                                            localActivePartitions,
                                            getStoresOnHost(storeToSourceTopics, localStandbyPartitions),
                                            localStandbyPartitions)
                );
            });
        return rebuiltMetadata;
    }

    private List<StreamsMetadata> rebuildMetadataForSingleTopology(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                                                   final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
        final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
        final Map<String, List<String>> storeToSourceTopics = topologyMetadata.stateStoreNameToSourceTopics();
        Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream())
            .distinct()
            .sorted(Comparator.comparing(HostInfo::host).thenComparingInt(HostInfo::port))
            .forEach(hostInfo -> {
                final Set<TopicPartition> activePartitionsOnHost = new HashSet<>();
                final Set<String> activeStoresOnHost = new HashSet<>();
                if (activePartitionHostMap.containsKey(hostInfo)) {
                    activePartitionsOnHost.addAll(activePartitionHostMap.get(hostInfo));
                    activeStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, activePartitionsOnHost));
                }
                activeStoresOnHost.addAll(globalStores);

                final Set<TopicPartition> standbyPartitionsOnHost = new HashSet<>();
                final Set<String> standbyStoresOnHost = new HashSet<>();
                if (standbyPartitionHostMap.containsKey(hostInfo)) {
                    standbyPartitionsOnHost.addAll(standbyPartitionHostMap.get(hostInfo));
                    standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
                }

                final StreamsMetadata metadata = new StreamsMetadataImpl(
                    hostInfo,
                    activeStoresOnHost,
                    activePartitionsOnHost,
                    standbyStoresOnHost,
                    standbyPartitionsOnHost
                );

                rebuiltMetadata.add(metadata);
                if (hostInfo.equals(thisHost)) {
                    localMetadata.set(metadata);
                }
            });
        return rebuiltMetadata;
    }

    private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                           final K key,
                                                           final StreamPartitioner<? super K, ?> partitioner,
                                                           final SourceTopicsInfo sourceTopicsInfo) {

        final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
        final Set<TopicPartition> matchingPartitions = new HashSet<>();
        for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
        }

        HostInfo activeHost = UNKNOWN_HOST;
        final Set<HostInfo> standbyHosts = new HashSet<>();
        for (final StreamsMetadata streamsMetadata : allMetadata) {
            final Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames();
            final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions());
            final Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames();
            final Set<TopicPartition> standbyTopicPartitions = new HashSet<>(streamsMetadata.standbyTopicPartitions());

            topicPartitions.retainAll(matchingPartitions);
            if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) {
                activeHost = streamsMetadata.hostInfo();
            }

            standbyTopicPartitions.retainAll(matchingPartitions);
            if (standbyStateStoreNames.contains(storeName) && !standbyTopicPartitions.isEmpty()) {
                standbyHosts.add(streamsMetadata.hostInfo());
            }
        }

        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
    }

    private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                           final K key,
                                                           final StreamPartitioner<? super K, ?> partitioner,
                                                           final SourceTopicsInfo sourceTopicsInfo,
                                                           final String topologyName) {
        Objects.requireNonNull(topologyName, "topology name must not be null");
        final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions);
        final Set<TopicPartition> matchingPartitions = new HashSet<>();
        for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
        }

        HostInfo activeHost = UNKNOWN_HOST;
        final Set<HostInfo> standbyHosts = new HashSet<>();
        for (final StreamsMetadata streamsMetadata : allMetadata) {
            final String metadataTopologyName = ((StreamsMetadataImpl) streamsMetadata).topologyName();
            if (metadataTopologyName != null && metadataTopologyName.equals(topologyName)) {

                final Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames();
                final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions());
                final Set<String> standbyStateStoreNames = streamsMetadata.standbyStateStoreNames();
                final Set<TopicPartition> standbyTopicPartitions = new HashSet<>(streamsMetadata.standbyTopicPartitions());

                topicPartitions.retainAll(matchingPartitions);
                if (activeStateStoreNames.contains(storeName) && !topicPartitions.isEmpty()) {
                    activeHost = streamsMetadata.hostInfo();
                }

                standbyTopicPartitions.retainAll(matchingPartitions);
                if (standbyStateStoreNames.contains(storeName) && !standbyTopicPartitions.isEmpty()) {
                    standbyHosts.add(streamsMetadata.hostInfo());
                }
            }
        }

        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
    }

    private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
        return getSourceTopicsInfo(storeName, null);
    }

    private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final String topologyName) {
        final List<String> sourceTopics = new ArrayList<>(topologyMetadata.sourceTopicsForStore(storeName, topologyName));

        if (sourceTopics.isEmpty()) {
            return null;
        }
        return new SourceTopicsInfo(sourceTopics);
    }

    private boolean isInitialized() {
        return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null;
    }

    public String getStoreForChangelogTopic(final String topicName) {
        return topologyMetadata.getStoreForChangelogTopic(topicName);
    }

    private class SourceTopicsInfo {
        private final List<String> sourceTopics;
        private int maxPartitions;
        private String topicWithMostPartitions;

        private SourceTopicsInfo(final List<String> sourceTopics) {
            this.sourceTopics = sourceTopics;
            for (final String topic : sourceTopics) {
                final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
                if (partitions.size() > maxPartitions) {
                    maxPartitions = partitions.size();
                    topicWithMostPartitions = partitions.get(0).topic();
                }
            }
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractProcessorContext 源码

kafka AbstractReadOnlyDecorator 源码

kafka AbstractReadWriteDecorator 源码

kafka AbstractTask 源码

kafka ActiveTaskCreator 源码

kafka ChangelogReader 源码

kafka ChangelogRecordDeserializationHelper 源码

kafka ChangelogRegister 源码

kafka ChangelogTopics 源码

kafka ClientUtils 源码

0  赞