kafka VerifiableConsumer 源码

  • 2022-10-20
  • 浏览 (247)

kafka VerifiableConsumer 代码

文件路径:/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;

/**
 * Command line consumer designed for system testing. It outputs consumer events to STDOUT as JSON
 * formatted objects. The "name" field in each JSON event identifies the event type. The following
 * events are currently supported:
 *
 * <ul>
 * <li>partitions_revoked: outputs the partitions revoked through {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
 *     See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}</li>
 * <li>partitions_assigned: outputs the partitions assigned through {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
 *     See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.</li>
 * <li>records_consumed: contains a summary of records consumed in a single call to {@link KafkaConsumer#poll(Duration)}.
 *     See {@link org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.</li>
 * <li>record_data: contains the key, value, and offset of an individual consumed record (only included if verbose
 *     output is enabled). See {@link org.apache.kafka.tools.VerifiableConsumer.RecordData}.</li>
 * <li>offsets_committed: The result of every offset commit (only included if auto-commit is not enabled).
 *     See {@link org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}</li>
 * <li>shutdown_complete: emitted after the consumer returns from {@link KafkaConsumer#close()}.
 *     See {@link org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.</li>
 * </ul>
 */
public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener {

    private static final Logger log = LoggerFactory.getLogger(VerifiableConsumer.class);

    private final ObjectMapper mapper = new ObjectMapper();
    private final PrintStream out;
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private final boolean useAutoCommit;
    private final boolean useAsyncCommit;
    private final boolean verbose;
    private final int maxMessages;
    private int consumedMessages = 0;

    private CountDownLatch shutdownLatch = new CountDownLatch(1);

    public VerifiableConsumer(KafkaConsumer<String, String> consumer,
                              PrintStream out,
                              String topic,
                              int maxMessages,
                              boolean useAutoCommit,
                              boolean useAsyncCommit,
                              boolean verbose) {
        this.consumer = consumer;
        this.out = out;
        this.topic = topic;
        this.maxMessages = maxMessages;
        this.useAutoCommit = useAutoCommit;
        this.useAsyncCommit = useAsyncCommit;
        this.verbose = verbose;
        addKafkaSerializerModule();
    }

    private void addKafkaSerializerModule() {
        SimpleModule kafka = new SimpleModule();
        kafka.addSerializer(TopicPartition.class, new JsonSerializer<TopicPartition>() {
            @Override
            public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException {
                gen.writeStartObject();
                gen.writeObjectField("topic", tp.topic());
                gen.writeObjectField("partition", tp.partition());
                gen.writeEndObject();
            }
        });
        mapper.registerModule(kafka);
    }

    private boolean hasMessageLimit() {
        return maxMessages >= 0;
    }

    private boolean isFinished() {
        return hasMessageLimit() && consumedMessages >= maxMessages;
    }

    private Map<TopicPartition, OffsetAndMetadata> onRecordsReceived(ConsumerRecords<String, String> records) {
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        List<RecordSetSummary> summaries = new ArrayList<>();
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);

            if (hasMessageLimit() && consumedMessages + partitionRecords.size() > maxMessages)
                partitionRecords = partitionRecords.subList(0, maxMessages - consumedMessages);

            if (partitionRecords.isEmpty())
                continue;

            long minOffset = partitionRecords.get(0).offset();
            long maxOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

            offsets.put(tp, new OffsetAndMetadata(maxOffset + 1));
            summaries.add(new RecordSetSummary(tp.topic(), tp.partition(),
                    partitionRecords.size(), minOffset, maxOffset));

            if (verbose) {
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    printJson(new RecordData(record));
                }
            }

            consumedMessages += partitionRecords.size();
            if (isFinished())
                break;
        }

        printJson(new RecordsConsumed(records.count(), summaries));
        return offsets;
    }

    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        List<CommitData> committedOffsets = new ArrayList<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
            TopicPartition tp = offsetEntry.getKey();
            committedOffsets.add(new CommitData(tp.topic(), tp.partition(), offsetEntry.getValue().offset()));
        }

        boolean success = true;
        String error = null;
        if (exception != null) {
            success = false;
            error = exception.getMessage();
        }
        printJson(new OffsetsCommitted(committedOffsets, error, success));
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        printJson(new PartitionsAssigned(partitions));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        printJson(new PartitionsRevoked(partitions));
    }

    private void printJson(Object data) {
        try {
            out.println(mapper.writeValueAsString(data));
        } catch (JsonProcessingException e) {
            out.println("Bad data can't be written as json: " + e.getMessage());
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try {
            consumer.commitSync(offsets);
            onComplete(offsets, null);
        } catch (WakeupException e) {
            // we only call wakeup() once to close the consumer, so this recursion should be safe
            commitSync(offsets);
            throw e;
        } catch (FencedInstanceIdException e) {
            throw e;
        } catch (Exception e) {
            onComplete(offsets, e);
        }
    }

    public void run() {
        try {
            printJson(new StartupComplete());
            consumer.subscribe(Collections.singletonList(topic), this);

            while (!isFinished()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);

                if (!useAutoCommit) {
                    if (useAsyncCommit)
                        consumer.commitAsync(offsets, this);
                    else
                        commitSync(offsets);
                }
            }
        } catch (WakeupException e) {
            // ignore, we are closing
            log.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.", e);
        } catch (Throwable t) {
            // Log the error so it goes to the service log and not stdout
            log.error("Error during processing, terminating consumer process: ", t);
        } finally {
            consumer.close();
            printJson(new ShutdownComplete());
            shutdownLatch.countDown();
        }
    }

    public void close() {
        boolean interrupted = false;
        try {
            consumer.wakeup();
            while (true) {
                try {
                    shutdownLatch.await();
                    return;
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted)
                Thread.currentThread().interrupt();
        }
    }

    @JsonPropertyOrder({ "timestamp", "name" })
    private static abstract class ConsumerEvent {
        private final long timestamp = System.currentTimeMillis();

        @JsonProperty
        public abstract String name();

        @JsonProperty
        public long timestamp() {
            return timestamp;
        }
    }

    private static class StartupComplete extends ConsumerEvent {

        @Override
        public String name() {
            return "startup_complete";
        }
    }

    private static class ShutdownComplete extends ConsumerEvent {

        @Override
        public String name() {
            return "shutdown_complete";
        }
    }

    private static class PartitionsRevoked extends ConsumerEvent {
        private final Collection<TopicPartition> partitions;

        public PartitionsRevoked(Collection<TopicPartition> partitions) {
            this.partitions = partitions;
        }

        @JsonProperty
        public Collection<TopicPartition> partitions() {
            return partitions;
        }

        @Override
        public String name() {
            return "partitions_revoked";
        }
    }

    private static class PartitionsAssigned extends ConsumerEvent {
        private final Collection<TopicPartition> partitions;

        public PartitionsAssigned(Collection<TopicPartition> partitions) {
            this.partitions = partitions;
        }

        @JsonProperty
        public Collection<TopicPartition> partitions() {
            return partitions;
        }

        @Override
        public String name() {
            return "partitions_assigned";
        }
    }

    public static class RecordsConsumed extends ConsumerEvent {
        private final long count;
        private final List<RecordSetSummary> partitionSummaries;

        public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) {
            this.count = count;
            this.partitionSummaries = partitionSummaries;
        }

        @Override
        public String name() {
            return "records_consumed";
        }

        @JsonProperty
        public long count() {
            return count;
        }

        @JsonProperty
        public List<RecordSetSummary> partitions() {
            return partitionSummaries;
        }
    }

    @JsonPropertyOrder({ "timestamp", "name", "key", "value", "topic", "partition", "offset" })
    public static class RecordData extends ConsumerEvent {

        private final ConsumerRecord<String, String> record;

        public RecordData(ConsumerRecord<String, String> record) {
            this.record = record;
        }

        @Override
        public String name() {
            return "record_data";
        }

        @JsonProperty
        public String topic() {
            return record.topic();
        }

        @JsonProperty
        public int partition() {
            return record.partition();
        }

        @JsonProperty
        public String key() {
            return record.key();
        }

        @JsonProperty
        public String value() {
            return record.value();
        }

        @JsonProperty
        public long offset() {
            return record.offset();
        }

    }

    private static class PartitionData {
        private final String topic;
        private final int partition;

        public PartitionData(String topic, int partition) {
            this.topic = topic;
            this.partition = partition;
        }

        @JsonProperty
        public String topic() {
            return topic;
        }

        @JsonProperty
        public int partition() {
            return partition;
        }
    }

    private static class OffsetsCommitted extends ConsumerEvent {

        private final List<CommitData> offsets;
        private final String error;
        private final boolean success;

        public OffsetsCommitted(List<CommitData> offsets, String error, boolean success) {
            this.offsets = offsets;
            this.error = error;
            this.success = success;
        }

        @Override
        public String name() {
            return "offsets_committed";
        }

        @JsonProperty
        public List<CommitData> offsets() {
            return offsets;
        }

        @JsonProperty
        @JsonInclude(JsonInclude.Include.NON_NULL)
        public String error() {
            return error;
        }

        @JsonProperty
        public boolean success() {
            return success;
        }

    }

    private static class CommitData extends PartitionData {
        private final long offset;

        public CommitData(String topic, int partition, long offset) {
            super(topic, partition);
            this.offset = offset;
        }

        @JsonProperty
        public long offset() {
            return offset;
        }
    }

    private static class RecordSetSummary extends PartitionData {
        private final long count;
        private final long minOffset;
        private final long maxOffset;

        public RecordSetSummary(String topic, int partition, long count, long minOffset, long maxOffset) {
            super(topic, partition);
            this.count = count;
            this.minOffset = minOffset;
            this.maxOffset = maxOffset;
        }

        @JsonProperty
        public long count() {
            return count;
        }

        @JsonProperty
        public long minOffset() {
            return minOffset;
        }

        @JsonProperty
        public long maxOffset() {
            return maxOffset;
        }

    }

    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers
                .newArgumentParser("verifiable-consumer")
                .defaultHelp(true)
                .description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
                .description("Group of arguments for connection to brokers")
                .required(true);
        connectionGroup.addArgument("--bootstrap-server")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
                .dest("bootstrapServer")
                .help("REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        connectionGroup.addArgument("--broker-list")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
                .dest("brokerList")
                .help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

        parser.addArgument("--topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("TOPIC")
                .help("Consumes messages from this topic.");

        parser.addArgument("--group-id")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("GROUP_ID")
                .dest("groupId")
                .help("The groupId shared among members of the consumer group");

        parser.addArgument("--group-instance-id")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("GROUP_INSTANCE_ID")
                .dest("groupInstanceId")
                .help("A unique identifier of the consumer instance");

        parser.addArgument("--max-messages")
                .action(store())
                .required(false)
                .type(Integer.class)
                .setDefault(-1)
                .metavar("MAX-MESSAGES")
                .dest("maxMessages")
                .help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");

        parser.addArgument("--session-timeout")
                .action(store())
                .required(false)
                .setDefault(30000)
                .type(Integer.class)
                .metavar("TIMEOUT_MS")
                .dest("sessionTimeout")
                .help("Set the consumer's session timeout");

        parser.addArgument("--verbose")
                .action(storeTrue())
                .type(Boolean.class)
                .metavar("VERBOSE")
                .help("Enable to log individual consumed records");

        parser.addArgument("--enable-autocommit")
                .action(storeTrue())
                .type(Boolean.class)
                .metavar("ENABLE-AUTOCOMMIT")
                .dest("useAutoCommit")
                .help("Enable offset auto-commit on consumer");

        parser.addArgument("--reset-policy")
                .action(store())
                .required(false)
                .setDefault("earliest")
                .type(String.class)
                .dest("resetPolicy")
                .help("Set reset policy (must be either 'earliest', 'latest', or 'none'");

        parser.addArgument("--assignment-strategy")
                .action(store())
                .required(false)
                .setDefault(RangeAssignor.class.getName())
                .type(String.class)
                .dest("assignmentStrategy")
                .help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");

        parser.addArgument("--consumer.config")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("CONFIG_FILE")
                .help("Consumer config properties file (config options shared with command line parameters will be overridden).");

        return parser;
    }

    public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
        Namespace res = parser.parseArgs(args);

        boolean useAutoCommit = res.getBoolean("useAutoCommit");
        String configFile = res.getString("consumer.config");
        String brokerHostandPort = null;

        Properties consumerProps = new Properties();
        if (configFile != null) {
            try {
                consumerProps.putAll(Utils.loadProps(configFile));
            } catch (IOException e) {
                throw new ArgumentParserException(e.getMessage(), parser);
            }
        }

        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));

        String groupInstanceId = res.getString("groupInstanceId");
        if (groupInstanceId != null) {
            consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
        }


        if (res.get("bootstrapServer") != null) {
            brokerHostandPort = res.getString("bootstrapServer");
        } else if (res.getString("brokerList") != null) {
            brokerHostandPort = res.getString("brokerList");
        } else {
            parser.printHelp();
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
            System.exit(0);
        }
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);

        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));

        StringDeserializer deserializer = new StringDeserializer();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);

        String topic = res.getString("topic");
        int maxMessages = res.getInt("maxMessages");
        boolean verbose = res.getBoolean("verbose");

        return new VerifiableConsumer(
                consumer,
                System.out,
                topic,
                maxMessages,
                useAutoCommit,
                false,
                verbose);
    }

    public static void main(String[] args) {
        ArgumentParser parser = argParser();
        if (args.length == 0) {
            parser.printHelp();
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
            System.exit(0);
        }
        try {
            final VerifiableConsumer consumer = createFromArgs(parser, args);
            // Can't use `Exit.addShutdownHook` here because it didn't exist until 2.5.0.
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::close, "verifiable-consumer-shutdown-hook"));

            consumer.run();
        } catch (ArgumentParserException e) {
            parser.handleError(e);
            // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0.
            System.exit(1);
        }
    }

}

相关信息

kafka 源码目录

相关文章

kafka ClientCompatibilityTest 源码

kafka OAuthCompatibilityTool 源码

kafka PrintVersionAndExitAction 源码

kafka ProducerPerformance 源码

kafka PushHttpMetricsReporter 源码

kafka ThroughputThrottler 源码

kafka TransactionalMessageCopier 源码

kafka TransactionsCommand 源码

kafka VerifiableLog4jAppender 源码

kafka VerifiableProducer 源码

0  赞