kafka WorkerUtils 源码
kafka WorkerUtils 代码
文件路径:/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
/**
* Utilities for Trogdor TaskWorkers.
*/
public final class WorkerUtils {
/**
* Handle an exception in a TaskWorker.
*
* @param log The logger to use.
* @param what The component that had the exception.
* @param exception The exception.
* @param doneFuture The TaskWorker's doneFuture
* @throws KafkaException A wrapped version of the exception.
*/
public static void abort(Logger log, String what, Throwable exception,
KafkaFutureImpl<String> doneFuture) throws KafkaException {
log.warn("{} caught an exception", what, exception);
if (exception.getMessage() == null || exception.getMessage().isEmpty()) {
doneFuture.complete(exception.getClass().getCanonicalName());
} else {
doneFuture.complete(exception.getMessage());
}
throw new KafkaException(exception);
}
/**
* Convert a rate expressed per second to a rate expressed per the given period.
*
* @param perSec The per-second rate.
* @param periodMs The new period to use.
* @return The rate per period. This will never be less than 1.
*/
public static int perSecToPerPeriod(float perSec, long periodMs) {
float period = ((float) periodMs) / 1000.0f;
float perPeriod = perSec * period;
perPeriod = Math.max(1.0f, perPeriod);
return (int) perPeriod;
}
/**
* Adds all properties from commonConf and then from clientConf to given 'props' (in
* that order, over-writing properties with the same keys).
* @param props Properties object that may contain zero or more properties
* @param commonConf Map with common client properties
* @param clientConf Map with client properties
*/
public static void addConfigsToProperties(
Properties props, Map<String, String> commonConf, Map<String, String> clientConf) {
for (Map.Entry<String, String> commonEntry : commonConf.entrySet()) {
props.setProperty(commonEntry.getKey(), commonEntry.getValue());
}
for (Map.Entry<String, String> entry : clientConf.entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
}
private static final int ADMIN_REQUEST_TIMEOUT = 25000;
private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
//Map<String, Map<Integer, List<Integer>>> topics) throws Throwable {
/**
* Create some Kafka topics.
*
* @param log The logger to use.
* @param bootstrapServers The bootstrap server list.
* @param commonClientConf Common client config
* @param adminClientConf AdminClient config. This config has precedence over fields in
* common client config.
* @param topics Maps topic names to partition assignments.
* @param failOnExisting If true, the method will throw TopicExistsException if one or
* more topics already exist. Otherwise, the existing topics are
* verified for number of partitions. In this case, if number of
* partitions of an existing topic does not match the requested
* number of partitions, the method throws RuntimeException.
*/
public static void createTopics(
Logger log, String bootstrapServers, Map<String, String> commonClientConf,
Map<String, String> adminClientConf,
Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
// this method wraps the call to createTopics() that takes admin client, so that we can
// unit test the functionality with MockAdminClient. The exception is caught and
// re-thrown so that admin client is closed when the method returns.
try (Admin adminClient
= createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) {
createTopics(log, adminClient, topics, failOnExisting);
} catch (Exception e) {
log.warn("Failed to create or verify topics {}", topics, e);
throw e;
}
}
/**
* The actual create topics functionality is separated into this method and called from the
* above method to be able to unit test with mock adminClient.
* @throws TopicExistsException if the specified topic already exists.
* @throws UnknownTopicOrPartitionException if topic creation was issued but failed to verify if it was created.
* @throws Throwable if creation of one or more topics fails (except for the cases above).
*/
static void createTopics(
Logger log, Admin adminClient,
Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
if (topics.isEmpty()) {
log.warn("Request to create topics has an empty topic list.");
return;
}
Collection<String> topicsExists = createTopics(log, adminClient, topics.values());
if (!topicsExists.isEmpty()) {
if (failOnExisting) {
log.warn("Topic(s) {} already exist.", topicsExists);
throw new TopicExistsException("One or more topics already exist.");
} else {
verifyTopics(log, adminClient, topicsExists, topics, 3, 2500);
}
}
}
/**
* Creates Kafka topics and returns a list of topics that already exist
* @param log The logger to use
* @param adminClient AdminClient
* @param topics List of topics to create
* @return Collection of topics names that already exist.
* @throws Throwable if creation of one or more topics fails (except for topic exists case).
*/
private static Collection<String> createTopics(Logger log, Admin adminClient,
Collection<NewTopic> topics) throws Throwable {
long startMs = Time.SYSTEM.milliseconds();
int tries = 0;
List<String> existingTopics = new ArrayList<>();
Map<String, NewTopic> newTopics = new HashMap<>();
for (NewTopic newTopic : topics) {
newTopics.put(newTopic.name(), newTopic);
}
List<String> topicsToCreate = new ArrayList<>(newTopics.keySet());
while (true) {
log.info("Attempting to create {} topics (try {})...", topicsToCreate.size(), ++tries);
Map<String, Future<Void>> creations = new HashMap<>();
while (!topicsToCreate.isEmpty()) {
List<NewTopic> newTopicsBatch = new ArrayList<>();
for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
!topicsToCreate.isEmpty(); i++) {
String topicName = topicsToCreate.remove(0);
newTopicsBatch.add(newTopics.get(topicName));
}
creations.putAll(adminClient.createTopics(newTopicsBatch).values());
}
// We retry cases where the topic creation failed with a
// timeout. This is a workaround for KAFKA-6368.
for (Map.Entry<String, Future<Void>> entry : creations.entrySet()) {
String topicName = entry.getKey();
Future<Void> future = entry.getValue();
try {
future.get();
log.debug("Successfully created {}.", topicName);
} catch (Exception e) {
if ((e.getCause() instanceof TimeoutException)
|| (e.getCause() instanceof NotEnoughReplicasException)) {
log.warn("Attempt to create topic `{}` failed: {}", topicName,
e.getCause().getMessage());
topicsToCreate.add(topicName);
} else if (e.getCause() instanceof TopicExistsException) {
log.info("Topic {} already exists.", topicName);
existingTopics.add(topicName);
} else {
log.warn("Failed to create {}", topicName, e.getCause());
throw e.getCause();
}
}
}
if (topicsToCreate.isEmpty()) {
break;
}
if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
String str = "Unable to create topic(s): " +
Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
log.warn(str);
throw new TimeoutException(str);
}
}
return existingTopics;
}
/**
* Verifies that topics in 'topicsToVerify' list have the same number of partitions as
* described in 'topicsInfo'
* @param log The logger to use
* @param adminClient AdminClient
* @param topicsToVerify List of topics to verify
* @param topicsInfo Map of topic name to topic description, which includes topics in
* 'topicsToVerify' list.
* @param retryCount The number of times to retry the fetching of the topics
* @param retryBackoffMs The amount of time, in milliseconds, to wait in between retries
* @throws UnknownTopicOrPartitionException If at least one topic contained in 'topicsInfo'
* does not exist after retrying.
* @throws RuntimeException If one or more topics have different number of partitions than
* described in 'topicsInfo'
*/
static void verifyTopics(
Logger log, Admin adminClient,
Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
Map<String, TopicDescription> topicDescriptionMap = topicDescriptions(topicsToVerify, adminClient,
retryCount, retryBackoffMs);
for (TopicDescription desc: topicDescriptionMap.values()) {
// map will always contain the topic since all topics in 'topicsExists' are in given
// 'topics' map
int partitions = topicsInfo.get(desc.name()).numPartitions();
if (partitions != CreateTopicsRequest.NO_NUM_PARTITIONS && desc.partitions().size() != partitions) {
String str = "Topic '" + desc.name() + "' exists, but has "
+ desc.partitions().size() + " partitions, while requested "
+ " number of partitions is " + partitions;
log.warn(str);
throw new RuntimeException(str);
}
}
}
private static Map<String, TopicDescription> topicDescriptions(Collection<String> topicsToVerify,
Admin adminClient,
int retryCount, long retryBackoffMs)
throws ExecutionException, InterruptedException {
UnknownTopicOrPartitionException lastException = null;
for (int i = 0; i < retryCount; i++) {
try {
DescribeTopicsResult topicsResult = adminClient.describeTopics(
topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
return topicsResult.allTopicNames().get();
} catch (ExecutionException exception) {
if (exception.getCause() instanceof UnknownTopicOrPartitionException) {
lastException = (UnknownTopicOrPartitionException) exception.getCause();
Thread.sleep(retryBackoffMs);
} else {
throw exception;
}
}
}
throw lastException;
}
/**
* Returns list of existing, not internal, topics/partitions that match given pattern and
* where partitions are in range [startPartition, endPartition]
* @param adminClient AdminClient
* @param topicRegex Topic regular expression to match
* @return List of topic names
* @throws Throwable If failed to get list of existing topics
*/
static Collection<TopicPartition> getMatchingTopicPartitions(
Admin adminClient, String topicRegex, int startPartition, int endPartition)
throws Throwable {
final Pattern topicNamePattern = Pattern.compile(topicRegex);
// first get list of matching topics
List<String> matchedTopics = new ArrayList<>();
ListTopicsResult res = adminClient.listTopics(
new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicListing> topicListingMap = res.namesToListings().get();
for (Map.Entry<String, TopicListing> topicListingEntry: topicListingMap.entrySet()) {
if (!topicListingEntry.getValue().isInternal()
&& topicNamePattern.matcher(topicListingEntry.getKey()).matches()) {
matchedTopics.add(topicListingEntry.getKey());
}
}
// create a list of topic/partitions
List<TopicPartition> out = new ArrayList<>();
DescribeTopicsResult topicsResult = adminClient.describeTopics(
matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicDescription> topicDescriptionMap = topicsResult.allTopicNames().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
List<TopicPartitionInfo> partitions = desc.partitions();
for (TopicPartitionInfo info: partitions) {
if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) {
out.add(new TopicPartition(desc.name(), info.partition()));
}
}
}
return out;
}
private static Admin createAdminClient(
String bootstrapServers,
Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT);
// first add common client config, and then admin client config to properties, possibly
// over-writing default or common properties.
addConfigsToProperties(props, commonClientConf, adminClientConf);
return Admin.create(props);
}
}
相关信息
相关文章
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦