kafka StreamTask 源码

  • 2022-10-20
  • 浏览 (216)

kafka StreamTask 代码

文件路径:/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;

/**
 * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
 */
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, Task {

    private final Time time;
    private final Consumer<byte[], byte[]> mainConsumer;

    // we want to abstract eos logic out of StreamTask, however
    // there's still an optimization that requires this info to be
    // leaked into this class, which is to checkpoint after committing if EOS is not enabled.
    private final boolean eosEnabled;

    private final int maxBufferedSize;
    private final PartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final PartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final Map<TopicPartition, Long> committedOffsets;
    private final Map<TopicPartition, Long> highWatermark;
    private final Set<TopicPartition> resetOffsetsForPartitions;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final StreamsMetricsImpl streamsMetrics;

    private long processTimeMs = 0L;

    private final Sensor closeTaskSensor;
    private final Sensor processRatioSensor;
    private final Sensor processLatencySensor;
    private final Sensor punctuateLatencySensor;
    private final Sensor bufferedRecordsSensor;
    private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();

    private final RecordQueueCreator recordQueueCreator;

    @SuppressWarnings("rawtypes")
    protected final InternalProcessorContext processorContext;

    private StampedRecord record;
    private boolean commitNeeded = false;
    private boolean commitRequested = false;
    private boolean hasPendingTxCommit = false;
    private Optional<Long> timeCurrentIdlingStarted;

    @SuppressWarnings("rawtypes")
    public StreamTask(final TaskId id,
                      final Set<TopicPartition> inputPartitions,
                      final ProcessorTopology topology,
                      final Consumer<byte[], byte[]> mainConsumer,
                      final TaskConfig config,
                      final StreamsMetricsImpl streamsMetrics,
                      final StateDirectory stateDirectory,
                      final ThreadCache cache,
                      final Time time,
                      final ProcessorStateManager stateMgr,
                      final RecordCollector recordCollector,
                      final InternalProcessorContext processorContext,
                      final LogContext logContext) {
        super(
            id,
            topology,
            stateDirectory,
            stateMgr,
            inputPartitions,
            config,
            "task",
            StreamTask.class
        );
        this.mainConsumer = mainConsumer;

        this.processorContext = processorContext;
        processorContext.transitionToActive(this, recordCollector, cache);

        this.time = time;
        this.recordCollector = recordCollector;
        this.eosEnabled = config.eosEnabled;

        final String threadId = Thread.currentThread().getName();
        this.streamsMetrics = streamsMetrics;
        closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
        final String taskId = id.toString();
        processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
        processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
        punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);
        bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);

        for (final String terminalNodeName : topology.terminalNodes()) {
            e2eLatencySensors.put(
                terminalNodeName,
                ProcessorNodeMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, streamsMetrics)
            );
        }

        for (final ProcessorNode<?, ?, ?, ?> sourceNode : topology.sources()) {
            final String sourceNodeName = sourceNode.name();
            e2eLatencySensors.put(
                sourceNodeName,
                ProcessorNodeMetrics.e2ELatencySensor(threadId, taskId, sourceNodeName, streamsMetrics)
            );
        }

        streamTimePunctuationQueue = new PunctuationQueue();
        systemTimePunctuationQueue = new PunctuationQueue();
        maxBufferedSize = config.maxBufferedSize;

        // initialize the consumed and committed offset cache
        consumedOffsets = new HashMap<>();
        resetOffsetsForPartitions = new HashSet<>();

        recordQueueCreator = new RecordQueueCreator(this.logContext, config.timestampExtractor, config.deserializationExceptionHandler);

        recordInfo = new PartitionGroup.RecordInfo();

        final Sensor enforcedProcessingSensor;
        enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
        final long maxTaskIdleMs = config.maxTaskIdleMs;
        partitionGroup = new PartitionGroup(
            logContext,
            createPartitionQueues(),
            mainConsumer::currentLag,
            TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
            enforcedProcessingSensor,
            maxTaskIdleMs
        );

        stateMgr.registerGlobalStateStores(topology.globalStateStores());
        committedOffsets = new HashMap<>();
        highWatermark = new HashMap<>();
        for (final TopicPartition topicPartition: inputPartitions) {
            committedOffsets.put(topicPartition, -1L);
            highWatermark.put(topicPartition, -1L);
        }
        timeCurrentIdlingStarted = Optional.empty();
    }

    // create queues for each assigned partition and associate them
    // to corresponding source nodes in the processor topology
    private Map<TopicPartition, RecordQueue> createPartitionQueues() {
        final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
        for (final TopicPartition partition : inputPartitions()) {
            partitionQueues.put(partition, recordQueueCreator.createQueue(partition));
        }
        return partitionQueues;
    }

    @Override
    public boolean isActive() {
        return true;
    }

    /**
     * @throws LockException    could happen when multi-threads within the single instance, could retry
     * @throws TimeoutException if initializing record collector timed out
     * @throws StreamsException fatal error, should close the thread
     */
    @Override
    public void initializeIfNeeded() {
        if (state() == State.CREATED) {
            recordCollector.initialize();

            StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);

            // without EOS the checkpoint file would not be deleted after loading, and
            // with EOS we would not checkpoint ever during running state anyways.
            // therefore we can initialize the snapshot as empty so that we would checkpoint right after loading
            offsetSnapshotSinceLastFlush = Collections.emptyMap();

            transitionTo(State.RESTORING);

            log.info("Initialized");
        }
    }

    public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
        mainConsumer.pause(partitionsForOffsetReset);
        resetOffsetsForPartitions.addAll(partitionsForOffsetReset);
    }

    /**
     * @throws TimeoutException if fetching committed offsets timed out
     */
    @Override
    public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
        switch (state()) {
            case RUNNING:
                return;

            case RESTORING:
                resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                initializeTopology();
                processorContext.initialize();

                transitionTo(State.RUNNING);

                log.info("Restored and ready to run");

                break;

            case CREATED:
            case SUSPENDED:
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while completing restoration for active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while completing restoration for active task " + id);
        }
    }

    @Override
    public void suspend() {
        switch (state()) {
            case CREATED:
                transitToSuspend();
                break;

            case RESTORING:
                transitToSuspend();
                break;

            case RUNNING:
                try {
                    // use try-catch to ensure state transition to SUSPENDED even if user code throws in `Processor#close()`
                    closeTopology();

                    // we must clear the buffered records when suspending because upon resuming the consumer would
                    // re-fetch those records starting from the committed position
                    partitionGroup.clear();
                } finally {
                    transitToSuspend();
                }

                break;

            case SUSPENDED:
                log.info("Skip suspending since state is {}", state());

                break;

            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id);
        }
    }

    @SuppressWarnings("unchecked")
    private void closeTopology() {
        log.trace("Closing processor topology");

        // close the processors
        // make sure close() is called for each node even when there is a RuntimeException
        RuntimeException exception = null;
        for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
            processorContext.setCurrentNode(node);
            try {
                node.close();
            } catch (final RuntimeException e) {
                exception = e;
            } finally {
                processorContext.setCurrentNode(null);
            }
        }

        if (exception != null) {
            throw exception;
        }
    }

    /**
     * <pre>
     * - resume the task
     * </pre>
     */
    @Override
    public void resume() {
        switch (state()) {
            case CREATED:
            case RUNNING:
            case RESTORING:
                // no need to do anything, just let them continue running / restoring / closing
                log.trace("Skip resuming since state is {}", state());
                break;

            case SUSPENDED:
                // just transit the state without any logical changes: suspended and restoring states
                // are not actually any different for inner modules

                // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362)
                try {
                    stateMgr.deleteCheckPointFileIfEOSEnabled();
                    log.debug("Deleted check point file upon resuming with EOS enabled");
                } catch (final IOException ioe) {
                    log.error("Encountered error while deleting the checkpoint file due to this exception", ioe);
                }

                transitionTo(State.RESTORING);
                log.info("Resumed to restoring state");

                break;

            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while resuming active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while resuming active task " + id);
        }
        timeCurrentIdlingStarted = Optional.empty();
    }

    /**
     * @throws StreamsException fatal error that should cause the thread to die
     * @throws TaskMigratedException recoverable error that would cause the task to be removed
     * @return offsets that should be committed for this task
     */
    @Override
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        switch (state()) {
            case CREATED:
            case RESTORING:
            case RUNNING:
            case SUSPENDED:
                // the commitNeeded flag just indicates whether we have reached RUNNING and processed any new data,
                // so it only indicates whether the record collector should be flushed or not, whereas the state
                // manager should always be flushed; either there is newly restored data or the flush will be a no-op
                if (commitNeeded) {
                    // we need to flush the store caches before flushing the record collector since it may cause some
                    // cached records to be processed and hence generate more records to be sent out
                    //
                    // TODO: this should be removed after we decouple caching with emitting
                    stateMgr.flushCache();
                    recordCollector.flush();
                    hasPendingTxCommit = eosEnabled;

                    log.debug("Prepared {} task for committing", state());
                    return committableOffsetsAndMetadata();
                } else {
                    log.debug("Skipped preparing {} task for commit since there is nothing to commit", state());
                    return Collections.emptyMap();
                }

            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");

            default:
                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
        }
    }

    private Long findOffset(final TopicPartition partition) {
        Long offset = partitionGroup.headRecordOffset(partition);
        if (offset == null) {
            try {
                offset = mainConsumer.position(partition);
            } catch (final TimeoutException error) {
                // the `consumer.position()` call should never block, because we know that we did process data
                // for the requested partition and thus the consumer should have a valid local position
                // that it can return immediately

                // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
                throw new IllegalStateException(error);
            } catch (final KafkaException fatal) {
                throw new StreamsException(fatal);
            }
        }
        return offset;
    }

    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
        final Map<TopicPartition, OffsetAndMetadata> committableOffsets;

        switch (state()) {
            case CREATED:
            case RESTORING:
                committableOffsets = Collections.emptyMap();

                break;

            case RUNNING:
            case SUSPENDED:
                final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes();

                // If there's processor metadata to be committed. We need to commit them to all
                // input partitions
                final Set<TopicPartition> partitionsNeedCommit = processorContext.getProcessorMetadata().needsCommit() ?
                    inputPartitions() : consumedOffsets.keySet();
                committableOffsets = new HashMap<>(partitionsNeedCommit.size());

                for (final TopicPartition partition : partitionsNeedCommit) {
                    final Long offset = findOffset(partition);
                    final long partitionTime = partitionTimes.get(partition);
                    committableOffsets.put(partition, new OffsetAndMetadata(offset,
                        new TopicPartitionMetadata(partitionTime, processorContext.getProcessorMetadata()).encode()));
                }
                break;

            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
        }

        return committableOffsets;
    }

    @Override
    public void postCommit(final boolean enforceCheckpoint) {
        switch (state()) {
            case CREATED:
                // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint
                // with empty uninitialized offsets
                log.debug("Skipped writing checkpoint for {} task", state());

                break;

            case RESTORING:
            case SUSPENDED:
                maybeCheckpoint(enforceCheckpoint);
                log.debug("Finalized commit for {} task with enforce checkpoint {}", state(), enforceCheckpoint);

                break;

            case RUNNING:
                if (enforceCheckpoint || !eosEnabled) {
                    maybeCheckpoint(enforceCheckpoint);
                }
                log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", state(), eosEnabled, enforceCheckpoint);

                break;

            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
        }

        clearCommitStatuses();
    }

    private void clearCommitStatuses() {
        commitNeeded = false;
        commitRequested = false;
        hasPendingTxCommit = false;
        processorContext.getProcessorMetadata().setNeedsCommit(false);
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        final Map<TopicPartition, Long> partitionTimes = new HashMap<>();
        for (final TopicPartition partition : partitionGroup.partitions()) {
            partitionTimes.put(partition, partitionGroup.partitionTimestamp(partition));
        }
        return partitionTimes;
    }

    @Override
    public void closeClean() {
        validateClean();
        removeAllSensors();
        clearCommitStatuses();
        close(true);
        log.info("Closed clean");
    }

    @Override
    public void closeDirty() {
        removeAllSensors();
        clearCommitStatuses();
        close(false);
        log.info("Closed dirty");
    }

    @Override
    public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
        super.updateInputPartitions(topicPartitions, allTopologyNodesToSourceTopics);
        partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
        processorContext.getProcessorMetadata().setNeedsCommit(true);
    }

    @Override
    public void prepareRecycle() {
        validateClean();
        removeAllSensors();
        clearCommitStatuses();
        switch (state()) {
            case SUSPENDED:
                stateMgr.recycle();
                partitionGroup.close();
                recordCollector.closeClean();

                break;

            case CREATED:
            case RESTORING:
            case RUNNING:
            case CLOSED:
                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id);
        }

        closeTaskSensor.record();
        transitionTo(State.CLOSED);

        log.info("Closed and recycled state");
    }

    /**
     * The following exceptions maybe thrown from the state manager flushing call
     *
     * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed
     * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
     *                          or flushing state store get IO errors; such error should cause the thread to die
     */
    @Override
    public void maybeCheckpoint(final boolean enforceCheckpoint) {
        // commitNeeded indicates we may have processed some records since last commit
        // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
        if (commitNeeded || enforceCheckpoint) {
            stateMgr.updateChangelogOffsets(checkpointableOffsets());
        }

        super.maybeCheckpoint(enforceCheckpoint);
    }

    private void validateClean() {
        // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
        // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
        if (commitNeeded) {
            log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                          + " commit and should close as dirty instead");
            throw new TaskMigratedException("Tried to close dirty task as clean");
        }
    }

    private void removeAllSensors() {
        streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
        for (final String nodeName : e2eLatencySensors.keySet()) {
            streamsMetrics.removeAllNodeLevelSensors(Thread.currentThread().getName(), id.toString(), nodeName);
        }
    }

    /**
     * You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
     */
    private void close(final boolean clean) {
        switch (state()) {
            case SUSPENDED:
                TaskManager.executeAndMaybeSwallow(
                    clean,
                    partitionGroup::close,
                    "partition group close",
                    log
                );

                // first close state manager (which is idempotent) then close the record collector
                // if the latter throws and we re-close dirty which would close the state manager again.
                TaskManager.executeAndMaybeSwallow(
                    clean,
                    () -> StateManagerUtil.closeStateManager(
                        log,
                        logPrefix,
                        clean,
                        eosEnabled,
                        stateMgr,
                        stateDirectory,
                        TaskType.ACTIVE
                    ),
                    "state manager close",
                    log);

                TaskManager.executeAndMaybeSwallow(
                    clean,
                    clean ? recordCollector::closeClean : recordCollector::closeDirty,
                    "record collector close",
                    log
                );

                break;

            case CLOSED:
                log.trace("Skip closing since state is {}", state());
                return;

            case CREATED:
            case RESTORING:
            case RUNNING:
                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);

            default:
                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id);
        }

        record = null;
        closeTaskSensor.record();

        transitionTo(State.CLOSED);
    }

    /**
     * An active task is processable if its buffer contains data for all of its input
     * source topic partitions, or if it is enforced to be processable.
     */
    public boolean isProcessable(final long wallClockTime) {
        if (state() == State.CLOSED) {
            // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
            // in either case we can just log it and move on without notifying the thread since the consumer
            // would soon be updated to not return any records for this task anymore.
            log.info("Stream task {} is already in {} state, skip processing it.", id(), state());

            return false;
        }

        if (hasPendingTxCommit) {
            // if the task has a pending TX commit, we should just retry the commit but not process any records
            // thus, the task is not processable, even if there is available data in the record queue
            return false;
        }
        final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime);
        if (!readyToProcess) {
            if (!timeCurrentIdlingStarted.isPresent()) {
                timeCurrentIdlingStarted = Optional.of(wallClockTime);
            }
        } else {
            timeCurrentIdlingStarted = Optional.empty();
        }
        return readyToProcess;
    }

    /**
     * Process one record.
     *
     * @return true if this method processes a record, false if it does not process a record.
     * @throws TaskMigratedException if the task producer got fenced (EOS only)
     */
    @SuppressWarnings("unchecked")
    public boolean process(final long wallClockTime) {
        if (record == null) {
            if (!isProcessable(wallClockTime)) {
                return false;
            }

            // get the next record to process
            record = partitionGroup.nextRecord(recordInfo, wallClockTime);

            // if there is no record to process, return immediately
            if (record == null) {
                return false;
            }
        }

        try {
            final TopicPartition partition = recordInfo.partition();

            if (!(record instanceof CorruptedRecord)) {
                doProcess(wallClockTime);
            }

            // update the consumed offset map after processing is done
            consumedOffsets.put(partition, record.offset());
            commitNeeded = true;

            // after processing this record, if its partition queue's buffered size has been
            // decreased to the threshold, we can then resume the consumption on this partition
            if (recordInfo.queue().size() == maxBufferedSize) {
                mainConsumer.resume(singleton(partition));
            }

            record = null;
        } catch (final TimeoutException timeoutException) {
            if (!eosEnabled) {
                throw timeoutException;
            } else {
                record = null;
                throw new TaskCorruptedException(Collections.singleton(id));
            }
        } catch (final StreamsException exception) {
            record = null;
            throw exception;
        } catch (final RuntimeException e) {
            final StreamsException error = new StreamsException(
                String.format(
                    "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
                    id(),
                    processorContext.currentNode().name(),
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    getStacktraceString(e)
                ),
                e
            );
            record = null;

            throw error;
        } finally {
            processorContext.setCurrentNode(null);
        }

        return true;
    }

    @SuppressWarnings("unchecked")
    private void doProcess(final long wallClockTime) {
        // process the record by passing to the source node of the topology
        final ProcessorNode<Object, Object, Object, Object> currNode = (ProcessorNode<Object, Object, Object, Object>) recordInfo.node();
        log.trace("Start processing one record [{}]", record);

        final ProcessorRecordContext recordContext = new ProcessorRecordContext(
            record.timestamp,
            record.offset(),
            record.partition(),
            record.topic(),
            record.headers()
        );
        updateProcessorContext(currNode, wallClockTime, recordContext);

        maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
        final Record<Object, Object> toProcess = new Record<>(
            record.key(),
            record.value(),
            processorContext.timestamp(),
            processorContext.headers()
        );
        maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

        log.trace("Completed processing one record [{}]", record);
    }

    @Override
    public void recordProcessBatchTime(final long processBatchTime) {
        processTimeMs += processBatchTime;
    }

    @Override
    public void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {
        bufferedRecordsSensor.record(partitionGroup.numBuffered());
        processRatioSensor.record((double) processTimeMs / allTaskProcessMs, now);
        processTimeMs = 0L;
    }

    private String getStacktraceString(final RuntimeException e) {
        String stacktrace = null;
        try (final StringWriter stringWriter = new StringWriter();
             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
            e.printStackTrace(printWriter);
            stacktrace = stringWriter.toString();
        } catch (final IOException ioe) {
            log.error("Encountered error extracting stacktrace from this exception", ioe);
        }
        return stacktrace;
    }

    /**
     * @throws IllegalStateException if the current node is not null
     * @throws TaskMigratedException if the task producer got fenced (EOS only)
     */
    @SuppressWarnings("unchecked")
    @Override
    public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
                          final long timestamp,
                          final PunctuationType type,
                          final Punctuator punctuator) {
        if (processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
        }

        // when punctuating, we need to preserve the timestamp (this can be either system time or event time)
        // while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header
        final ProcessorRecordContext recordContext = new ProcessorRecordContext(
            timestamp,
            -1L,
            -1,
            null,
            new RecordHeaders()
        );
        updateProcessorContext(node, time.milliseconds(), recordContext);

        if (log.isTraceEnabled()) {
            log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
        }

        try {
            maybeMeasureLatency(() -> node.punctuate(timestamp, punctuator), time, punctuateLatencySensor);
        } catch (final StreamsException e) {
            throw e;
        } catch (final RuntimeException e) {
            throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e);
        } finally {
            processorContext.setCurrentNode(null);
        }
    }

    @SuppressWarnings("unchecked")
    private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode,
                                        final long wallClockTime,
                                        final ProcessorRecordContext recordContext) {
        processorContext.setRecordContext(recordContext);
        processorContext.setCurrentNode(currNode);
        processorContext.setSystemTimeMs(wallClockTime);
    }

    /**
     * Return all the checkpointable offsets(written + consumed) to the state manager.
     * Currently only changelog topic offsets need to be checkpointed.
     */
    private Map<TopicPartition, Long> checkpointableOffsets() {
        final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>(recordCollector.offsets());
        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
            checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
        }

        log.debug("Checkpointable offsets {}", checkpointableOffsets);

        return checkpointableOffsets;
    }

    private void resetOffsetsIfNeededAndInitializeMetadata(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
        try {
            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = mainConsumer.committed(inputPartitions());

            for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : offsetsAndMetadata.entrySet()) {
                if (resetOffsetsForPartitions.contains(committedEntry.getKey())) {
                    final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
                    if (offsetAndMetadata != null) {
                        mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
                        resetOffsetsForPartitions.remove(committedEntry.getKey());
                    }
                }
            }

            offsetResetter.accept(resetOffsetsForPartitions);
            resetOffsetsForPartitions.clear();

            initializeTaskTimeAndProcessorMetadata(offsetsAndMetadata.entrySet().stream()
                .filter(e -> e.getValue() != null)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
            );
        } catch (final TimeoutException timeoutException) {
            log.warn(
                "Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
                    "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
                time.toString(),
                ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

            // re-throw to trigger `task.timeout.ms`
            throw timeoutException;
        } catch (final KafkaException e) {
            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", id, inputPartitions()), e);
        }
    }

    private void initializeTaskTimeAndProcessorMetadata(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
        final ProcessorMetadata finalProcessMetadata = new ProcessorMetadata();
        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
            final TopicPartition partition = entry.getKey();
            final OffsetAndMetadata metadata = entry.getValue();

            if (metadata != null) {
                final TopicPartitionMetadata committedTimestampAndMeta = TopicPartitionMetadata.decode(metadata.metadata());
                final long committedTimestamp = committedTimestampAndMeta.partitionTime();
                partitionGroup.setPartitionTime(partition, committedTimestamp);
                log.debug("A committed timestamp was detected: setting the partition time of partition {}"
                    + " to {} in stream task {}", partition, committedTimestamp, id);

                final ProcessorMetadata processorMetadata = committedTimestampAndMeta.processorMetadata();
                finalProcessMetadata.update(processorMetadata);
            } else {
                log.debug("No committed timestamp was found in metadata for partition {}", partition);
            }
        }
        processorContext.setProcessorMetadata(finalProcessMetadata);

        final Set<TopicPartition> nonCommitted = new HashSet<>(inputPartitions());
        nonCommitted.removeAll(offsetsAndMetadata.keySet());
        for (final TopicPartition partition : nonCommitted) {
            log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
        }
    }

    @Override
    public Map<TopicPartition, Long> purgeableOffsets() {
        final Map<TopicPartition, Long> purgeableConsumedOffsets = new HashMap<>();
        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
            final TopicPartition tp = entry.getKey();
            if (topology.isRepartitionTopic(tp.topic())) {
                purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
            }
        }

        return purgeableConsumedOffsets;
    }

    @SuppressWarnings("unchecked")
    private void initializeTopology() {
        // initialize the task by initializing all its processor nodes in the topology
        log.trace("Initializing processor nodes of the topology");
        for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
            processorContext.setCurrentNode(node);
            try {
                node.init(processorContext);
            } finally {
                processorContext.setCurrentNode(null);
            }
        }
    }

    /**
     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
     * and not added to the queue for processing
     *
     * @param partition the partition
     * @param records   the records
     */
    @Override
    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
        final int newQueueSize = partitionGroup.addRawRecords(partition, records);

        if (log.isTraceEnabled()) {
            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
        }

        // if after adding these records, its partition queue's buffered size has been
        // increased beyond the threshold, we can then pause the consumption for this partition
        if (newQueueSize > maxBufferedSize) {
            mainConsumer.pause(singleton(partition));
        }
    }

    /**
     * Schedules a punctuation for the processor
     *
     * @param interval the interval in milliseconds
     * @param type     the punctuation type
     * @throws IllegalStateException if the current node is not null
     */
    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
        switch (type) {
            case STREAM_TIME:
                // align punctuation to 0L, punctuate as soon as we have data
                return schedule(0L, interval, type, punctuator);
            case WALL_CLOCK_TIME:
                // align punctuation to now, punctuate after interval has elapsed
                return schedule(time.milliseconds() + interval, interval, type, punctuator);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
        }
    }

    /**
     * Schedules a punctuation for the processor
     *
     * @param startTime time of the first punctuation
     * @param interval  the interval in milliseconds
     * @param type      the punctuation type
     * @throws IllegalStateException if the current node is not null
     */
    private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
        if (processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
        }

        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);

        switch (type) {
            case STREAM_TIME:
                // STREAM_TIME punctuation is data driven, will first punctuate as soon as stream-time is known and >= time,
                // stream-time is known when we have received at least one record from each input topic
                return streamTimePunctuationQueue.schedule(schedule);
            case WALL_CLOCK_TIME:
                // WALL_CLOCK_TIME is driven by the wall clock time, will first punctuate when now >= time
                return systemTimePunctuationQueue.schedule(schedule);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
        }
    }

    /**
     * Possibly trigger registered stream-time punctuation functions if
     * current partition group timestamp has reached the defined stamp
     * Note, this is only called in the presence of new records
     *
     * @throws TaskMigratedException if the task producer got fenced (EOS only)
     */
    public boolean maybePunctuateStreamTime() {
        final long streamTime = partitionGroup.streamTime();

        // if the timestamp is not known yet, meaning there is not enough data accumulated
        // to reason stream partition time, then skip.
        if (streamTime == RecordQueue.UNKNOWN) {
            return false;
        } else {
            final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);

            if (punctuated) {
                commitNeeded = true;
            }

            return punctuated;
        }
    }

    /**
     * Possibly trigger registered system-time punctuation functions if
     * current system timestamp has reached the defined stamp
     * Note, this is called irrespective of the presence of new records
     *
     * @throws TaskMigratedException if the task producer got fenced (EOS only)
     */
    public boolean maybePunctuateSystemTime() {
        final long systemTime = time.milliseconds();

        final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);

        if (punctuated) {
            commitNeeded = true;
        }

        return punctuated;
    }

    void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) {
        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
        if (e2eLatencySensor == null) {
            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName);
        } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {
            e2eLatencySensor.record(now - recordTimestamp, now);
        }
    }

    /**
     * Request committing the current task's state
     */
    void requestCommit() {
        commitRequested = true;
    }

    /**
     * Whether or not a request has been made to commit the current state
     */
    @Override
    public boolean commitRequested() {
        return commitRequested;
    }

    @SuppressWarnings("rawtypes")
    public InternalProcessorContext processorContext() {
        return processorContext;
    }

    /**
     * Produces a string representation containing useful information about a Task.
     * This is useful in debugging scenarios.
     *
     * @return A string representation of the StreamTask instance.
     */
    @Override
    public String toString() {
        return toString("");
    }

    /**
     * Produces a string representation containing useful information about a Task starting with the given indent.
     * This is useful in debugging scenarios.
     *
     * @return A string representation of the Task instance.
     */
    public String toString(final String indent) {
        final StringBuilder sb = new StringBuilder();
        sb.append(indent);
        sb.append("TaskId: ");
        sb.append(id);
        sb.append("\n");

        // print topology
        if (topology != null) {
            sb.append(indent).append(topology.toString(indent + "\t"));
        }

        // print assigned partitions
        final Set<TopicPartition> partitions = inputPartitions();
        if (partitions != null && !partitions.isEmpty()) {
            sb.append(indent).append("Partitions [");
            for (final TopicPartition topicPartition : partitions) {
                sb.append(topicPartition).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    @Override
    public boolean commitNeeded() {
        // we need to do an extra check if the flag was false, that
        // if the consumer position has been updated; this is because
        // there may be non data records such as control markers bypassed
        if (commitNeeded) {
            return true;
        } else {
            for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
                final TopicPartition partition = entry.getKey();
                try {
                    final long offset = mainConsumer.position(partition);

                    // note the position in consumer is the "next" record to fetch,
                    // so it should be larger than the consumed offset by 1; if it is
                    // more than 1 it means there are control records, which the consumer skips over silently
                    if (offset > entry.getValue() + 1) {
                        commitNeeded = true;
                        entry.setValue(offset - 1);
                    }
                } catch (final TimeoutException error) {
                    // the `consumer.position()` call should never block, because we know that we did process data
                    // for the requested partition and thus the consumer should have a valid local position
                    // that it can return immediately

                    // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
                    throw new IllegalStateException(error);
                } catch (final KafkaException fatal) {
                    throw new StreamsException(fatal);
                }
            }

            return commitNeeded;
        }
    }

    @Override
    public Map<TopicPartition, Long> changelogOffsets() {
        if (state() == State.RUNNING) {
            // if we are in running state, just return the latest offset sentinel indicating
            // we should be at the end of the changelog
            return changelogPartitions().stream()
                                        .collect(Collectors.toMap(Function.identity(), tp -> Task.LATEST_OFFSET));
        } else {
            return Collections.unmodifiableMap(stateMgr.changelogOffsets());
        }
    }

    @Override
    public Map<TopicPartition, Long> committedOffsets() {
        return Collections.unmodifiableMap(committedOffsets);
    }

    @Override
    public Map<TopicPartition, Long> highWaterMark() {
        return Collections.unmodifiableMap(highWatermark);
    }

    private void transitToSuspend() {
        log.info("Suspended from {}", state());
        transitionTo(State.SUSPENDED);
        timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());
    }

    @Override
    public Optional<Long> timeCurrentIdlingStarted() {
        return timeCurrentIdlingStarted;
    }

    public void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset) {
        committedOffsets.put(topicPartition, offset);
    }

    public void updateEndOffsets(final TopicPartition topicPartition, final Long offset) {
        highWatermark.put(topicPartition, offset);
    }

    public boolean hasRecordsQueued() {
        return numBuffered() > 0;
    }

    RecordCollector recordCollector() {
        return recordCollector;
    }

    // below are visible for testing only
    int numBuffered() {
        return partitionGroup.numBuffered();
    }

    long streamTime() {
        return partitionGroup.streamTime();
    }

    private class RecordQueueCreator {
        private final LogContext logContext;
        private final TimestampExtractor defaultTimestampExtractor;
        private final DeserializationExceptionHandler defaultDeserializationExceptionHandler;

        private RecordQueueCreator(final LogContext logContext,
                                   final TimestampExtractor defaultTimestampExtractor,
                                   final DeserializationExceptionHandler defaultDeserializationExceptionHandler) {
            this.logContext = logContext;
            this.defaultTimestampExtractor = defaultTimestampExtractor;
            this.defaultDeserializationExceptionHandler = defaultDeserializationExceptionHandler;
        }

        public RecordQueue createQueue(final TopicPartition partition) {
            final SourceNode<?, ?> source = topology.source(partition.topic());
            if (source == null) {
                throw new TopologyException(
                        "Topic " + partition.topic() + " is unknown to the topology. " +
                                "This may happen if different KafkaStreams instances of the same application execute different Topologies. " +
                                "Note that Topologies are only identical if all operators are added in the same order."
                );
            }

            final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
            final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
            return new RecordQueue(
                    partition,
                    source,
                    timestampExtractor,
                    defaultDeserializationExceptionHandler,
                    processorContext,
                    logContext
            );
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractProcessorContext 源码

kafka AbstractReadOnlyDecorator 源码

kafka AbstractReadWriteDecorator 源码

kafka AbstractTask 源码

kafka ActiveTaskCreator 源码

kafka ChangelogReader 源码

kafka ChangelogRecordDeserializationHelper 源码

kafka ChangelogRegister 源码

kafka ChangelogTopics 源码

kafka ClientUtils 源码

0  赞