kafka VerifiableLog4jAppender 源码

  • 2022-10-20
  • 浏览 (243)

kafka VerifiableLog4jAppender 代码

文件路径:/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.tools;

import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;

import static net.sourceforge.argparse4j.impl.Arguments.store;

/**
 * Primarily intended for use with system testing, this appender produces message
 * to Kafka on each "append" request. For example, this helps with end-to-end tests
 * of KafkaLog4jAppender.
 *
 * When used as a command-line tool, it appends increasing integers. It will produce a
 * fixed number of messages unless the default max-messages -1 is used, in which case
 * it appends indefinitely.
 */

public class VerifiableLog4jAppender {
    Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);

    // If maxMessages < 0, log until the process is killed externally
    private long maxMessages = -1;

    // Hook to trigger logging thread to stop logging messages
    private volatile boolean stopLogging = false;

    /** Get the command-line argument parser. */
    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers
            .newArgumentParser("verifiable-log4j-appender")
            .defaultHelp(true)
            .description("This tool produces increasing integers to the specified topic using KafkaLog4jAppender.");

        parser.addArgument("--topic")
            .action(store())
            .required(true)
            .type(String.class)
            .metavar("TOPIC")
            .help("Produce messages to this topic.");

        parser.addArgument("--broker-list")
            .action(store())
            .required(true)
            .type(String.class)
            .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
            .dest("brokerList")
            .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

        parser.addArgument("--max-messages")
            .action(store())
            .required(false)
            .setDefault(-1)
            .type(Integer.class)
            .metavar("MAX-MESSAGES")
            .dest("maxMessages")
            .help("Produce this many messages. If -1, produce messages until the process is killed externally.");

        parser.addArgument("--acks")
            .action(store())
            .required(false)
            .setDefault("-1")
            .type(String.class)
            .choices("0", "1", "-1")
            .metavar("ACKS")
            .help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");

        parser.addArgument("--security-protocol")
            .action(store())
            .required(false)
            .setDefault("PLAINTEXT")
            .type(String.class)
            .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
            .metavar("SECURITY-PROTOCOL")
            .dest("securityProtocol")
            .help("Security protocol to be used while communicating with Kafka brokers.");

        parser.addArgument("--ssl-truststore-location")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("SSL-TRUSTSTORE-LOCATION")
            .dest("sslTruststoreLocation")
            .help("Location of SSL truststore to use.");

        parser.addArgument("--ssl-truststore-password")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("SSL-TRUSTSTORE-PASSWORD")
            .dest("sslTruststorePassword")
            .help("Password for SSL truststore to use.");

        parser.addArgument("--appender.config")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("CONFIG_FILE")
            .help("Log4jAppender config properties file.");

        parser.addArgument("--sasl-kerberos-service-name")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("SASL-KERBEROS-SERVICE-NAME")
            .dest("saslKerberosServiceName")
            .help("Name of sasl kerberos service.");

        parser.addArgument("--client-jaas-conf-path")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("CLIENT-JAAS-CONF-PATH")
            .dest("clientJaasConfPath")
            .help("Path of JAAS config file of Kafka client.");

        parser.addArgument("--kerb5-conf-path")
            .action(store())
            .required(false)
            .type(String.class)
            .metavar("KERB5-CONF-PATH")
            .dest("kerb5ConfPath")
            .help("Path of Kerb5 config file.");

        return parser;
    }

    /**
     * Read a properties file from the given path
     * @param filename The path of the file to read
     *
     * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
     * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
     * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
     * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
     */
    public static Properties loadProps(String filename) throws IOException {
        Properties props = new Properties();
        try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
            props.load(propStream);
        }
        return props;
    }

    /** Construct a VerifiableLog4jAppender object from command-line arguments. */
    public static VerifiableLog4jAppender createFromArgs(String[] args) {
        ArgumentParser parser = argParser();
        VerifiableLog4jAppender producer = null;

        try {
            Namespace res = parser.parseArgs(args);

            int maxMessages = res.getInt("maxMessages");
            String topic = res.getString("topic");
            String configFile = res.getString("appender.config");

            Properties props = new Properties();
            props.setProperty("log4j.rootLogger", "INFO, KAFKA");
            props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
            props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
            props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
            props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList"));
            props.setProperty("log4j.appender.KAFKA.Topic", topic);
            props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
            props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
            final String securityProtocol = res.getString("securityProtocol");
            if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) {
                props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol);
            }
            if (securityProtocol != null && securityProtocol.contains("SSL")) {
                props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation"));
                props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword"));
            }
            if (securityProtocol != null && securityProtocol.contains("SASL")) {
                props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName"));
                props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath"));
                props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath"));
            }
            props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
            // Changing log level from INFO to WARN as a temporary workaround for KAFKA-6415. This is to
            // avoid deadlock in system tests when producer network thread appends to log while updating metadata.
            props.setProperty("log4j.logger.org.apache.kafka.clients.Metadata", "WARN, KAFKA");

            if (configFile != null) {
                try {
                    props.putAll(loadProps(configFile));
                } catch (IOException e) {
                    throw new ArgumentParserException(e.getMessage(), parser);
                }
            }

            producer = new VerifiableLog4jAppender(props, maxMessages);
        } catch (ArgumentParserException e) {
            if (args.length == 0) {
                parser.printHelp();
                Exit.exit(0);
            } else {
                parser.handleError(e);
                Exit.exit(1);
            }
        }

        return producer;
    }


    public VerifiableLog4jAppender(Properties props, int maxMessages) {
        this.maxMessages = maxMessages;
        PropertyConfigurator.configure(props);
    }

    public static void main(String[] args) {

        final VerifiableLog4jAppender appender = createFromArgs(args);
        boolean infinite = appender.maxMessages < 0;

        // Trigger main thread to stop producing messages when shutting down
        Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true);

        long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
        for (long i = 0; i < maxMessages; i++) {
            if (appender.stopLogging) {
                break;
            }
            appender.append(String.format("%d", i));
        }
    }

    private void append(String msg) {
        logger.info(msg);
    }
}

相关信息

kafka 源码目录

相关文章

kafka ClientCompatibilityTest 源码

kafka OAuthCompatibilityTool 源码

kafka PrintVersionAndExitAction 源码

kafka ProducerPerformance 源码

kafka PushHttpMetricsReporter 源码

kafka ThroughputThrottler 源码

kafka TransactionalMessageCopier 源码

kafka TransactionsCommand 源码

kafka VerifiableConsumer 源码

kafka VerifiableProducer 源码

0  赞