kafka StateDirectory 源码

  • 2022-10-20
  • 浏览 (220)

kafka StateDirectory 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;

/**
 * Manages the directories where the state of Tasks owned by a {@link StreamThread} are
 * stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not
 * thread-safe.
 */
public class StateDirectory {

    private static final Pattern TASK_DIR_PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Pattern NAMED_TOPOLOGY_DIR_PATH_NAME = Pattern.compile("__.+__"); // named topology dirs follow '__Topology-Name__'
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";

    /* The process file is used to persist the process id across restarts.
     * For compatibility reasons you should only ever add fields to the json schema
     */
    static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";

    @JsonIgnoreProperties(ignoreUnknown = true)
    static class StateDirectoryProcessFile {
        @JsonProperty
        private final UUID processId;

        // required by jackson -- do not remove, your IDE may be warning that this is unused but it's lying to you
        public StateDirectoryProcessFile() {
            this.processId = null;
        }

        StateDirectoryProcessFile(final UUID processId) {
            this.processId = processId;
        }
    }

    private final Object taskDirCreationLock = new Object();
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private final boolean hasNamedTopologies;

    private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();

    private FileChannel stateDirLockChannel;
    private FileLock stateDirLock;

    /**
     * Ensures that the state base directory as well as the application's sub-directory are created.
     *
     * @param config              streams application configuration to read the root state directory path
     * @param time                system timer used to execute periodic cleanup procedure
     * @param hasPersistentStores only when the application's topology does have stores persisted on local file
     *                            system, we would go ahead and auto-create the corresponding application / task / store
     *                            directories whenever necessary; otherwise no directories would be created.
     * @param hasNamedTopologies  whether this application is composed of independent named topologies
     *
     * @throws ProcessorStateException if the base state directory or application state directory does not exist
     *                                 and could not be created when hasPersistentStores is enabled.
     */
    public StateDirectory(final StreamsConfig config, final Time time, final boolean hasPersistentStores, final boolean hasNamedTopologies) {
        this.time = time;
        this.hasPersistentStores = hasPersistentStores;
        this.hasNamedTopologies = hasNamedTopologies;
        this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
        final File baseDir = new File(stateDirName);
        stateDir = new File(baseDir, appId);

        if (this.hasPersistentStores) {
            if (!baseDir.exists() && !baseDir.mkdirs()) {
                throw new ProcessorStateException(
                    String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
            }
            if (!stateDir.exists() && !stateDir.mkdir()) {
                throw new ProcessorStateException(
                    String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
            } else if (stateDir.exists() && !stateDir.isDirectory()) {
                throw new ProcessorStateException(
                    String.format("state directory [%s] can't be created as there is an existing file with the same name", stateDir.getPath()));
            }

            if (stateDirName.startsWith(System.getProperty("java.io.tmpdir"))) {
                log.warn("Using an OS temp directory in the state.dir property can cause failures with writing" +
                    " the checkpoint file due to the fact that this directory can be cleared by the OS." +
                    " Resolved state.dir: [" + stateDirName + "]");

            }
            // change the dir permission to "rwxr-x---" to avoid world readable
            configurePermissions(baseDir);
            configurePermissions(stateDir);
        }
    }

    private void configurePermissions(final File file) {
        final Path path = file.toPath();
        if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
            final Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
            try {
                Files.setPosixFilePermissions(path, perms);
            } catch (final IOException e) {
                log.error("Error changing permissions for the directory {} ", path, e);
            }
        } else {
            boolean set = file.setReadable(true, true);
            set &= file.setWritable(true, true);
            set &= file.setExecutable(true, true);
            if (!set) {
                log.error("Failed to change permissions for the directory {}", file);
            }
        }
    }

    /**
     * @return true if the state directory was successfully locked
     */
    private boolean lockStateDirectory() {
        final File lockFile = new File(stateDir, LOCK_FILE_NAME);
        try {
            stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            stateDirLock = tryLock(stateDirLockChannel);
        } catch (final IOException e) {
            log.error("Unable to lock the state directory due to unexpected exception", e);
            throw new ProcessorStateException(String.format("Failed to lock the state directory [%s] during startup",
                stateDir.getAbsolutePath()), e);
        }
        return stateDirLock != null;
    }

    public UUID initializeProcessId() {
        if (!hasPersistentStores) {
            return UUID.randomUUID();
        }

        if (!lockStateDirectory()) {
            log.error("Unable to obtain lock as state directory is already locked by another process");
            throw new StreamsException(String.format("Unable to initialize state, this can happen if multiple instances of " +
                                           "Kafka Streams are running in the same state directory " +
                                           "(current state directory is [%s]", stateDir.getAbsolutePath()));
        }

        final File processFile = new File(stateDir, PROCESS_FILE_NAME);
        final ObjectMapper mapper = new ObjectMapper();

        try {
            if (processFile.exists()) {
                try {
                    final StateDirectoryProcessFile processFileData = mapper.readValue(processFile, StateDirectoryProcessFile.class);
                    log.info("Reading UUID from process file: {}", processFileData.processId);
                    if (processFileData.processId != null) {
                        return processFileData.processId;
                    }
                } catch (final Exception e) {
                    log.warn("Failed to read json process file", e);
                }
            }

            final StateDirectoryProcessFile processFileData = new StateDirectoryProcessFile(UUID.randomUUID());
            log.info("No process id found on disk, got fresh process id {}", processFileData.processId);

            mapper.writeValue(processFile, processFileData);
            return processFileData.processId;
        } catch (final IOException e) {
            log.error("Unable to read/write process file due to unexpected exception", e);
            throw new ProcessorStateException(e);
        }
    }

    /**
     * Get or create the directory for the provided {@link TaskId}.
     * @return directory for the {@link TaskId}
     * @throws ProcessorStateException if the task directory does not exist and could not be created
     */
    public File getOrCreateDirectoryForTask(final TaskId taskId) {
        final File taskParentDir = getTaskDirectoryParentName(taskId);
        final File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId));
        if (hasPersistentStores) {
            if (!taskDir.exists()) {
                synchronized (taskDirCreationLock) {
                    // to avoid a race condition, we need to check again if the directory does not exist:
                    // otherwise, two threads might pass the outer `if` (and enter the `then` block),
                    // one blocks on `synchronized` while the other creates the directory,
                    // and the blocking one fails when trying to create it after it's unblocked
                    if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
                        throw new ProcessorStateException(
                            String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created",
                                taskParentDir.getPath(), taskDir.getPath()));
                    }
                    if (!taskDir.exists() && !taskDir.mkdir()) {
                        throw new ProcessorStateException(
                            String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
                    }
                }
            } else if (!taskDir.isDirectory()) {
                throw new ProcessorStateException(
                    String.format("state directory [%s] can't be created as there is an existing file with the same name", taskDir.getPath()));
            }
        }
        return taskDir;
    }

    private File getTaskDirectoryParentName(final TaskId taskId) {
        final String namedTopology = taskId.topologyName();
        if (namedTopology != null) {
            if (!hasNamedTopologies) {
                throw new IllegalStateException("Tried to lookup taskId with named topology, but StateDirectory thinks hasNamedTopologies = false");
            }
            return new File(stateDir, getNamedTopologyDirName(namedTopology));
        } else {
            return stateDir;
        }
    }

    private String getNamedTopologyDirName(final String topologyName) {
        return "__" + topologyName + "__";
    }

    /**
     * @return The File handle for the checkpoint in the given task's directory
     */
    File checkpointFileFor(final TaskId taskId) {
        return new File(getOrCreateDirectoryForTask(taskId), StateManagerUtil.CHECKPOINT_FILE_NAME);
    }

    /**
     * Decide if the directory of the task is empty or not
     */
    boolean directoryForTaskIsEmpty(final TaskId taskId) {
        final File taskDir = getOrCreateDirectoryForTask(taskId);

        return taskDirIsEmpty(taskDir);
    }

    private boolean taskDirIsEmpty(final File taskDir) {
        final File[] storeDirs = taskDir.listFiles(pathname ->
                !pathname.getName().equals(CHECKPOINT_FILE_NAME));

        boolean taskDirEmpty = true;

        // if the task is stateless, storeDirs would be null
        if (storeDirs != null && storeDirs.length > 0) {
            for (final File file : storeDirs) {
                // We removed the task directory locking but some upgrading applications may still have old lock files on disk,
                // we just lazily delete those in this method since it's the only thing that would be affected by these
                if (file.getName().endsWith(LOCK_FILE_NAME)) {
                    if (!file.delete()) {
                        // If we hit an error deleting this just ignore it and move on, we'll retry again at some point
                        log.warn("Error encountered deleting lock file in {}", taskDir);
                    }
                } else {
                    // If it's not a lock file then the directory is not empty,
                    // but finish up the loop in case there's a lock file left to delete
                    log.trace("TaskDir {} was not empty, found {}", taskDir, file);
                    taskDirEmpty = false;
                }
            }
        }
        return taskDirEmpty;
    }

    /**
     * Get or create the directory for the global stores.
     * @return directory for the global stores
     * @throws ProcessorStateException if the global store directory does not exists and could not be created
     */
    File globalStateDir() {
        final File dir = new File(stateDir, "global");
        if (hasPersistentStores) {
            if (!dir.exists() && !dir.mkdir()) {
                throw new ProcessorStateException(
                    String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
            } else if (dir.exists() && !dir.isDirectory()) {
                throw new ProcessorStateException(
                    String.format("global state directory [%s] can't be created as there is an existing file with the same name", dir.getPath()));
            }
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    /**
     * Get the lock for the {@link TaskId}s directory if it is available
     * @param taskId task id
     * @return true if successful
     */
    synchronized boolean lock(final TaskId taskId) {
        if (!hasPersistentStores) {
            return true;
        }

        final Thread lockOwner = lockedTasksToOwner.get(taskId);
        if (lockOwner != null) {
            if (lockOwner.equals(Thread.currentThread())) {
                log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
                // we already own the lock
                return true;
            } else {
                // another thread owns the lock
                return false;
            }
        } else if (!stateDir.exists()) {
            log.error("Tried to lock task directory for {} but the state directory does not exist", taskId);
            throw new IllegalStateException("The state directory has been deleted");
        } else {
            lockedTasksToOwner.put(taskId, Thread.currentThread());
            // make sure the task directory actually exists, and create it if not
            getOrCreateDirectoryForTask(taskId);
            return true;
        }
    }

    /**
     * Unlock the state directory for the given {@link TaskId}.
     */
    synchronized void unlock(final TaskId taskId) {
        final Thread lockOwner = lockedTasksToOwner.get(taskId);
        if (lockOwner != null && lockOwner.equals(Thread.currentThread())) {
            lockedTasksToOwner.remove(taskId);
            log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);
        }
    }

    public void close() {
        if (hasPersistentStores) {
            try {
                stateDirLock.release();
                stateDirLockChannel.close();

                stateDirLock = null;
                stateDirLockChannel = null;
            } catch (final IOException e) {
                log.error("Unexpected exception while unlocking the state dir", e);
                throw new StreamsException(String.format("Failed to release the lock on the state directory [%s]", stateDir.getAbsolutePath()), e);
            }

            // all threads should be stopped and cleaned up by now, so none should remain holding a lock
            if (!lockedTasksToOwner.isEmpty()) {
                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", lockedTasksToOwner);
            }
        }
    }

    public synchronized void clean() {
        try {
            cleanStateAndTaskDirectoriesCalledByUser();
        } catch (final Exception e) {
            throw new StreamsException(e);
        }

        try {
            if (stateDir.exists()) {
                Utils.delete(globalStateDir().getAbsoluteFile());
            }
        } catch (final IOException exception) {
            log.error(
                String.format("%s Failed to delete global state directory of %s due to an unexpected exception",
                    logPrefix(), appId),
                exception
            );
            throw new StreamsException(exception);
        }

        try {
            if (hasPersistentStores && stateDir.exists() && !stateDir.delete()) {
                log.warn(
                    String.format("%s Failed to delete state store directory of %s for it is not empty",
                        logPrefix(), stateDir.getAbsolutePath())
                );
            }
        } catch (final SecurityException exception) {
            log.error(
                String.format("%s Failed to delete state store directory of %s due to an unexpected exception",
                    logPrefix(), stateDir.getAbsolutePath()),
                exception
            );
            throw new StreamsException(exception);
        }
    }

    /**
     * Remove the directories for any {@link TaskId}s that are no-longer
     * owned by this {@link StreamThread} and aren't locked by either
     * another process or another {@link StreamThread}
     * @param cleanupDelayMs only remove directories if they haven't been modified for at least
     *                       this amount of time (milliseconds)
     */
    public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
        try {
            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
        } catch (final Exception cannotHappen) {
            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
        }
    }

    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
        for (final TaskDirectory taskDir : listAllTaskDirectories()) {
            final String dirName = taskDir.file().getName();
            final TaskId id = parseTaskDirectoryName(dirName, taskDir.namedTopology());
            if (!lockedTasksToOwner.containsKey(id)) {
                try {
                    if (lock(id)) {
                        final long now = time.milliseconds();
                        final long lastModifiedMs = taskDir.file().lastModified();
                        if (now - cleanupDelayMs > lastModifiedMs) {
                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
                            Utils.delete(taskDir.file());
                        }
                    }
                } catch (final IOException exception) {
                    log.warn(
                        String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:",
                            logPrefix(), dirName, id),
                        exception
                    );
                } finally {
                    unlock(id);
                }
            }
        }
        // Ok to ignore returned exception as it should be swallowed
        maybeCleanEmptyNamedTopologyDirs(true);
    }

    /**
     * Cleans up any leftover named topology directories that are empty, if any exist
     * @param logExceptionAsWarn if true, an exception will be logged as a warning
     *                       if false, an exception will be logged as error
     * @return the first IOException to be encountered
     */
    private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionAsWarn) {
        if (!hasNamedTopologies) {
            return null;
        }

        final AtomicReference<IOException> firstException = new AtomicReference<>(null);
        final File[] namedTopologyDirs = stateDir.listFiles(pathname ->
                pathname.isDirectory() && NAMED_TOPOLOGY_DIR_PATH_NAME.matcher(pathname.getName()).matches()
        );
        if (namedTopologyDirs != null) {
            for (final File namedTopologyDir : namedTopologyDirs) {
                final File[] contents = namedTopologyDir.listFiles();
                if (contents != null && contents.length == 0) {
                    try {
                        Utils.delete(namedTopologyDir);
                    } catch (final IOException exception) {
                        if (logExceptionAsWarn) {
                            log.warn(
                                String.format("%sSwallowed the following exception during deletion of named topology directory %s",
                                    logPrefix(), namedTopologyDir.getName()),
                                exception
                            );
                        } else {
                            log.error(
                                String.format("%s Failed to delete named topology directory %s with exception:",
                                    logPrefix(), namedTopologyDir.getName()),
                                exception
                            );
                        }
                        firstException.compareAndSet(null, exception);
                    }
                }
            }
        }
        return firstException.get();
    }

    /**
     * Clears out any local state found for the given NamedTopology after it was removed
     *
     * @throws StreamsException if cleanup failed
     */
    public void clearLocalStateForNamedTopology(final String topologyName) {
        final File namedTopologyDir = new File(stateDir, getNamedTopologyDirName(topologyName));
        if (!namedTopologyDir.exists() || !namedTopologyDir.isDirectory()) {
            log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName);
        }
        try {
            Utils.delete(namedTopologyDir);
        } catch (final IOException e) {
            log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e);
            throw new StreamsException("Unable to delete state for the named topology " + topologyName,
                                       e, new TaskId(-1, -1, topologyName)); // use dummy taskid to report source topology for this error
        }
    }

    private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
        if (!lockedTasksToOwner.isEmpty()) {
            log.warn("Found some still-locked task directories when user requested to cleaning up the state, "
                + "since Streams is not running any more these will be ignored to complete the cleanup");
        }
        final AtomicReference<Exception> firstException = new AtomicReference<>();
        for (final TaskDirectory taskDir : listAllTaskDirectories()) {
            final String dirName = taskDir.file().getName();
            final TaskId id = parseTaskDirectoryName(dirName, taskDir.namedTopology());
            try {
                log.info("{} Deleting task directory {} for {} as user calling cleanup.",
                    logPrefix(), dirName, id);

                if (lockedTasksToOwner.containsKey(id)) {
                    log.warn("{} Task {} in state directory {} was still locked by {}",
                        logPrefix(), dirName, id, lockedTasksToOwner.get(id));
                }
                Utils.delete(taskDir.file());
            } catch (final IOException exception) {
                log.error(
                    String.format("%s Failed to delete task directory %s for %s with exception:",
                        logPrefix(), dirName, id),
                    exception
                );
                firstException.compareAndSet(null, exception);
            }
        }

        firstException.compareAndSet(null, maybeCleanEmptyNamedTopologyDirs(false));

        final Exception exception = firstException.get();
        if (exception != null) {
            throw exception;
        }
    }

    /**
     * List all of the task directories that are non-empty
     * @return The list of all the non-empty local directories for stream tasks
     */
    List<TaskDirectory> listNonEmptyTaskDirectories() {
        return listTaskDirectories(pathname -> {
            if (!pathname.isDirectory() || !TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) {
                return false;
            } else {
                return !taskDirIsEmpty(pathname);
            }
        });
    }

    /**
     * List all of the task directories along with their parent directory if they belong to a named topology
     * @return The list of all the existing local directories for stream tasks
     */
    List<TaskDirectory> listAllTaskDirectories() {
        return listTaskDirectories(pathname -> pathname.isDirectory() && TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches());
    }

    private List<TaskDirectory> listTaskDirectories(final FileFilter filter) {
        final List<TaskDirectory> taskDirectories = new ArrayList<>();
        if (hasPersistentStores && stateDir.exists()) {
            if (hasNamedTopologies) {
                for (final File namedTopologyDir : listNamedTopologyDirs()) {
                    final String namedTopology = parseNamedTopologyFromDirectory(namedTopologyDir.getName());
                    final File[] taskDirs = namedTopologyDir.listFiles(filter);
                    if (taskDirs != null) {
                        taskDirectories.addAll(Arrays.stream(taskDirs)
                            .map(f -> new TaskDirectory(f, namedTopology)).collect(Collectors.toList()));
                    }
                }
            } else {
                final File[] taskDirs =
                    stateDir.listFiles(filter);
                if (taskDirs != null) {
                    taskDirectories.addAll(Arrays.stream(taskDirs)
                                               .map(f -> new TaskDirectory(f, null)).collect(Collectors.toList()));
                }
            }
        }

        return taskDirectories;
    }

    private List<File> listNamedTopologyDirs() {
        final File[] namedTopologyDirectories = stateDir.listFiles(f -> f.getName().startsWith("__") &&  f.getName().endsWith("__"));
        return namedTopologyDirectories != null ? Arrays.asList(namedTopologyDirectories) : Collections.emptyList();
    }

    private String parseNamedTopologyFromDirectory(final String dirName) {
        return dirName.substring(2, dirName.length() - 2);
    }

    private FileLock tryLock(final FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        } catch (final OverlappingFileLockException e) {
            return null;
        }
    }

    public static class TaskDirectory {
        private final File file;
        private final String namedTopology; // may be null if hasNamedTopologies = false

        TaskDirectory(final File file, final String namedTopology) {
            this.file = file;
            this.namedTopology = namedTopology;
        }

        public File file() {
            return file;
        }

        public String namedTopology() {
            return namedTopology;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            final TaskDirectory that = (TaskDirectory) o;
            return file.equals(that.file) &&
                Objects.equals(namedTopology, that.namedTopology);
        }

        @Override
        public int hashCode() {
            return Objects.hash(file, namedTopology);
        }
    }

}

相关信息

kafka 源码目录

相关文章

kafka AbstractProcessorContext 源码

kafka AbstractReadOnlyDecorator 源码

kafka AbstractReadWriteDecorator 源码

kafka AbstractTask 源码

kafka ActiveTaskCreator 源码

kafka ChangelogReader 源码

kafka ChangelogRecordDeserializationHelper 源码

kafka ChangelogRegister 源码

kafka ChangelogTopics 源码

kafka ClientUtils 源码

0  赞