kafka MessageGenerator 源码

  • 2022-10-20
  • 浏览 (214)

kafka MessageGenerator 代码

文件路径:/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.message;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;

import java.io.BufferedWriter;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;

import static net.sourceforge.argparse4j.impl.Arguments.store;

/**
 * The Kafka message generator.
 */
public final class MessageGenerator {
    static final String JSON_SUFFIX = ".json";

    static final String JSON_GLOB = "*" + JSON_SUFFIX;

    static final String JAVA_SUFFIX = ".java";

    static final String API_MESSAGE_TYPE_JAVA = "ApiMessageType.java";

    static final String API_SCOPE_JAVA = "ApiScope.java";

    static final String METADATA_RECORD_TYPE_JAVA = "MetadataRecordType.java";

    static final String METADATA_JSON_CONVERTERS_JAVA = "MetadataJsonConverters.java";

    static final String API_MESSAGE_CLASS = "org.apache.kafka.common.protocol.ApiMessage";

    static final String MESSAGE_CLASS = "org.apache.kafka.common.protocol.Message";

    static final String MESSAGE_UTIL_CLASS = "org.apache.kafka.common.protocol.MessageUtil";

    static final String READABLE_CLASS = "org.apache.kafka.common.protocol.Readable";

    static final String WRITABLE_CLASS = "org.apache.kafka.common.protocol.Writable";

    static final String ARRAYS_CLASS = "java.util.Arrays";

    static final String OBJECTS_CLASS = "java.util.Objects";

    static final String LIST_CLASS = "java.util.List";

    static final String ARRAYLIST_CLASS = "java.util.ArrayList";

    static final String IMPLICIT_LINKED_HASH_COLLECTION_CLASS =
        "org.apache.kafka.common.utils.ImplicitLinkedHashCollection";

    static final String IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS =
        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection";

    static final String UNSUPPORTED_VERSION_EXCEPTION_CLASS =
        "org.apache.kafka.common.errors.UnsupportedVersionException";

    static final String ITERATOR_CLASS = "java.util.Iterator";

    static final String ENUM_SET_CLASS = "java.util.EnumSet";

    static final String TYPE_CLASS = "org.apache.kafka.common.protocol.types.Type";

    static final String FIELD_CLASS = "org.apache.kafka.common.protocol.types.Field";

    static final String SCHEMA_CLASS = "org.apache.kafka.common.protocol.types.Schema";

    static final String ARRAYOF_CLASS = "org.apache.kafka.common.protocol.types.ArrayOf";

    static final String COMPACT_ARRAYOF_CLASS = "org.apache.kafka.common.protocol.types.CompactArrayOf";

    static final String BYTES_CLASS = "org.apache.kafka.common.utils.Bytes";

    static final String UUID_CLASS = "org.apache.kafka.common.Uuid";

    static final String BASE_RECORDS_CLASS = "org.apache.kafka.common.record.BaseRecords";

    static final String MEMORY_RECORDS_CLASS = "org.apache.kafka.common.record.MemoryRecords";

    static final String REQUEST_SUFFIX = "Request";

    static final String RESPONSE_SUFFIX = "Response";

    static final String BYTE_UTILS_CLASS = "org.apache.kafka.common.utils.ByteUtils";

    static final String STANDARD_CHARSETS = "java.nio.charset.StandardCharsets";

    static final String TAGGED_FIELDS_SECTION_CLASS = "org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection";

    static final String OBJECT_SERIALIZATION_CACHE_CLASS = "org.apache.kafka.common.protocol.ObjectSerializationCache";

    static final String MESSAGE_SIZE_ACCUMULATOR_CLASS = "org.apache.kafka.common.protocol.MessageSizeAccumulator";

    static final String RAW_TAGGED_FIELD_CLASS = "org.apache.kafka.common.protocol.types.RawTaggedField";

    static final String RAW_TAGGED_FIELD_WRITER_CLASS = "org.apache.kafka.common.protocol.types.RawTaggedFieldWriter";

    static final String TREE_MAP_CLASS = "java.util.TreeMap";

    static final String BYTE_BUFFER_CLASS = "java.nio.ByteBuffer";

    static final String NAVIGABLE_MAP_CLASS = "java.util.NavigableMap";

    static final String MAP_ENTRY_CLASS = "java.util.Map.Entry";

    static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode";

    static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode";

    static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory";

    static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode";

    static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode";

    static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode";

    static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode";

    static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode";

    static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode";

    static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode";

    static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode";

    static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";

    static final long UNSIGNED_INT_MAX = 4294967295L;

    static final int UNSIGNED_SHORT_MAX = 65535;

    /**
     * The Jackson serializer we use for JSON objects.
     */
    static final ObjectMapper JSON_SERDE;

    static {
        JSON_SERDE = new ObjectMapper();
        JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
        JSON_SERDE.configure(DeserializationFeature.FAIL_ON_TRAILING_TOKENS, true);
        JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
    }

    private static List<TypeClassGenerator> createTypeClassGenerators(String packageName,
                                                                      List<String> types) {
        if (types == null) return Collections.emptyList();
        List<TypeClassGenerator> generators = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "ApiMessageTypeGenerator":
                    generators.add(new ApiMessageTypeGenerator(packageName));
                    break;
                case "MetadataRecordTypeGenerator":
                    generators.add(new MetadataRecordTypeGenerator(packageName));
                    break;
                case "MetadataJsonConvertersGenerator":
                    generators.add(new MetadataJsonConvertersGenerator(packageName));
                    break;
                default:
                    throw new RuntimeException("Unknown type class generator type '" + type + "'");
            }
        }
        return generators;
    }

    private static List<MessageClassGenerator> createMessageClassGenerators(String packageName,
                                                                            List<String> types) {
        if (types == null) return Collections.emptyList();
        List<MessageClassGenerator> generators = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "MessageDataGenerator":
                    generators.add(new MessageDataGenerator(packageName));
                    break;
                case "JsonConverterGenerator":
                    generators.add(new JsonConverterGenerator(packageName));
                    break;
                default:
                    throw new RuntimeException("Unknown message class generator type '" + type + "'");
            }
        }
        return generators;
    }

    public static void processDirectories(String packageName,
                                          String outputDir,
                                          String inputDir,
                                          List<String> typeClassGeneratorTypes,
                                          List<String> messageClassGeneratorTypes) throws Exception {
        Files.createDirectories(Paths.get(outputDir));
        int numProcessed = 0;

        List<TypeClassGenerator> typeClassGenerators =
                createTypeClassGenerators(packageName, typeClassGeneratorTypes);
        HashSet<String> outputFileNames = new HashSet<>();
        try (DirectoryStream<Path> directoryStream = Files
                .newDirectoryStream(Paths.get(inputDir), JSON_GLOB)) {
            for (Path inputPath : directoryStream) {
                try {
                    MessageSpec spec = JSON_SERDE.
                        readValue(inputPath.toFile(), MessageSpec.class);
                    List<MessageClassGenerator> generators =
                        createMessageClassGenerators(packageName, messageClassGeneratorTypes);
                    for (MessageClassGenerator generator : generators) {
                        String name = generator.outputName(spec) + JAVA_SUFFIX;
                        outputFileNames.add(name);
                        Path outputPath = Paths.get(outputDir, name);
                        try (BufferedWriter writer = Files.newBufferedWriter(outputPath)) {
                            generator.generateAndWrite(spec, writer);
                        }
                    }
                    numProcessed++;
                    typeClassGenerators.forEach(generator -> generator.registerMessageType(spec));
                } catch (Exception e) {
                    throw new RuntimeException("Exception while processing " + inputPath.toString(), e);
                }
            }
        }
        for (TypeClassGenerator typeClassGenerator : typeClassGenerators) {
            outputFileNames.add(typeClassGenerator.outputName());
            Path factoryOutputPath = Paths.get(outputDir, typeClassGenerator.outputName());
            try (BufferedWriter writer = Files.newBufferedWriter(factoryOutputPath)) {
                typeClassGenerator.generateAndWrite(writer);
            }
        }
        try (DirectoryStream<Path> directoryStream = Files.
                newDirectoryStream(Paths.get(outputDir))) {
            for (Path outputPath : directoryStream) {
                Path fileName = outputPath.getFileName();
                if (fileName != null) {
                    if (!outputFileNames.contains(fileName.toString())) {
                        Files.delete(outputPath);
                    }
                }
            }
        }
        System.out.printf("MessageGenerator: processed %d Kafka message JSON files(s).%n", numProcessed);
    }

    static String capitalizeFirst(String string) {
        if (string.isEmpty()) {
            return string;
        }
        return string.substring(0, 1).toUpperCase(Locale.ENGLISH) +
            string.substring(1);
    }

    static String lowerCaseFirst(String string) {
        if (string.isEmpty()) {
            return string;
        }
        return string.substring(0, 1).toLowerCase(Locale.ENGLISH) +
            string.substring(1);
    }

    static boolean firstIsCapitalized(String string) {
        if (string.isEmpty()) {
            return false;
        }
        return Character.isUpperCase(string.charAt(0));
    }

    static String toSnakeCase(String string) {
        StringBuilder bld = new StringBuilder();
        boolean prevWasCapitalized = true;
        for (int i = 0; i < string.length(); i++) {
            char c = string.charAt(i);
            if (Character.isUpperCase(c)) {
                if (!prevWasCapitalized) {
                    bld.append('_');
                }
                bld.append(Character.toLowerCase(c));
                prevWasCapitalized = true;
            } else {
                bld.append(c);
                prevWasCapitalized = false;
            }
        }
        return bld.toString();
    }

    static String stripSuffix(String str, String suffix) {
        if (str.endsWith(suffix)) {
            return str.substring(0, str.length() - suffix.length());
        } else {
            throw new RuntimeException("String " + str + " does not end with the " +
                "expected suffix " + suffix);
        }
    }

    /**
     * Return the number of bytes needed to encode an integer in unsigned variable-length format.
     */
    static int sizeOfUnsignedVarint(int value) {
        int bytes = 1;
        while ((value & 0xffffff80) != 0L) {
            bytes += 1;
            value >>>= 7;
        }
        return bytes;
    }

    public static void main(String[] args) throws Exception {
        ArgumentParser parser = ArgumentParsers
            .newArgumentParser("message-generator")
            .defaultHelp(true)
            .description("The Kafka message generator");
        parser.addArgument("--package", "-p")
            .action(store())
            .required(true)
            .metavar("PACKAGE")
            .help("The java package to use in generated files.");
        parser.addArgument("--output", "-o")
            .action(store())
            .required(true)
            .metavar("OUTPUT")
            .help("The output directory to create.");
        parser.addArgument("--input", "-i")
            .action(store())
            .required(true)
            .metavar("INPUT")
            .help("The input directory to use.");
        parser.addArgument("--typeclass-generators", "-t")
            .nargs("+")
            .action(store())
            .metavar("TYPECLASS_GENERATORS")
            .help("The type class generators to use, if any.");
        parser.addArgument("--message-class-generators", "-m")
            .nargs("+")
            .action(store())
            .metavar("MESSAGE_CLASS_GENERATORS")
            .help("The message class generators to use.");
        Namespace res = parser.parseArgsOrFail(args);
        processDirectories(res.getString("package"), res.getString("output"),
            res.getString("input"), res.getList("typeclass_generators"),
            res.getList("message_class_generators"));
    }
}

相关信息

kafka 源码目录

相关文章

kafka ApiMessageTypeGenerator 源码

kafka ClauseGenerator 源码

kafka CodeBuffer 源码

kafka EntityType 源码

kafka FieldSpec 源码

kafka FieldType 源码

kafka HeaderGenerator 源码

kafka IsNullConditional 源码

kafka JsonConverterGenerator 源码

kafka MessageClassGenerator 源码

0  赞