spring-kafka DeadLetterPublishingRecoverer 代码


package org.springframework.kafka.listener;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

 * A {@link ConsumerRecordRecoverer} that publishes a failed record to a dead-letter
 * topic.
 * @author Gary Russell
 * @author Tomaz Fernandes
 * @since 2.2
public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer {

	private static final BiFunction<ConsumerRecord<?, ?>, Exception, Headers> DEFAULT_HEADERS_FUNCTION =
			(rec, ex) -> null;

	protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

	private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
		DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

	private static final long FIVE = 5L;

	private static final long THIRTY = 30L;

	private final HeaderNames headerNames = getHeaderNames();

	private final boolean transactional;

	private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;

	private final Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver;

	private final EnumSet<HeaderNames.HeadersToAdd> whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class);

	private boolean retainExceptionHeader;

	private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction = DEFAULT_HEADERS_FUNCTION;

	private boolean verifyPartition = true;

	private Duration partitionInfoTimeout = Duration.ofSeconds(FIVE);

	private Duration waitForSendResultTimeout = Duration.ofSeconds(THIRTY);

	private boolean appendOriginalHeaders = true;

	private boolean failIfSendResultIsError = true;

	private boolean throwIfNoDestinationReturned = false;

	private long timeoutBuffer = Duration.ofSeconds(FIVE).toMillis();

	private boolean stripPreviousExceptionHeaders = true;

	private boolean skipSameTopicFatalExceptions = true;

	private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders;

	 * Create an instance with the provided template and a default destination resolving
	 * function that returns a TopicPartition based on the original topic (appended with ".DLT")
	 * from the failed record, and the same partition as the failed record. Therefore the
	 * dead-letter topic must have at least as many partitions as the original topic.
	 * @param template the {@link KafkaOperations} to use for publishing.
	public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template) {

	 * Create an instance with the provided template and destination resolving function,
	 * that receives the failed consumer record and the exception and returns a
	 * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
	 * 0, no partition is set when publishing to the topic.
	 * @param template the {@link KafkaOperations} to use for publishing.
	 * @param destinationResolver the resolving function.
	public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
			BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
		this(Collections.singletonMap(Object.class, template), destinationResolver);

	 * Create an instance with the provided templates and a default destination resolving
	 * function that returns a TopicPartition based on the original topic (appended with
	 * ".DLT") from the failed record, and the same partition as the failed record.
	 * Therefore the dead-letter topic must have at least as many partitions as the
	 * original topic. The templates map keys are classes and the value the corresponding
	 * template to use for objects (producer record values) of that type. A
	 * {@link java.util.LinkedHashMap} is recommended when there is more than one
	 * template, to ensure the map is traversed in order. To send records with a null
	 * value, add a template with the {@link Void} class as a key; otherwise the first
	 * template from the map values iterator will be used.
	 * @param templates the {@link KafkaOperations}s to use for publishing.
	public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {

	 * Create an instance with the provided templates and destination resolving function,
	 * that receives the failed consumer record and the exception and returns a
	 * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
	 * 0, no partition is set when publishing to the topic. The templates map keys are
	 * classes and the value the corresponding template to use for objects (producer
	 * record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
	 * there is more than one template, to ensure the map is traversed in order. To send
	 * records with a null value, add a template with the {@link Void} class as a key;
	 * otherwise the first template from the map values iterator will be used.
	 * @param templates the {@link KafkaOperations}s to use for publishing.
	 * @param destinationResolver the resolving function.
	public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
			BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

		Assert.isTrue(!ObjectUtils.isEmpty(templates), "At least one template is required");
		Assert.notNull(destinationResolver, "The destinationResolver cannot be null");
		KafkaOperations<?, ?> firstTemplate = templates.values().iterator().next();
		this.templateResolver = templates.size() == 1
				? producerRecord -> firstTemplate
				: producerRecord -> findTemplateForValue(producerRecord.value(), templates);
		this.transactional = firstTemplate.isTransactional();
		Boolean tx = this.transactional;
			.map(t -> t.isTransactional())
			.allMatch(t -> t.equals(tx)), "All templates must have the same setting for transactional");
		this.destinationResolver = destinationResolver;

	* Create an instance with a template resolving function that receives the failed
	* consumer record and the exception and returns a {@link KafkaOperations} and a
	* flag on whether or not the publishing from this instance will be transactional
	* or not. Also receives a destination resolving function that works similarly but
	* returns a {@link TopicPartition} instead. If the partition in the {@link TopicPartition}
	* is less than 0, no partition is set when publishing to the topic.
	* @param templateResolver the function that resolver the {@link KafkaOperations} to use for publishing.
	* @param transactional whether or not publishing by this instance should be transactional
	* @param destinationResolver the resolving function.
	* @since 2.7
	public DeadLetterPublishingRecoverer(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
										boolean transactional,
										BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

		Assert.notNull(templateResolver, "The templateResolver cannot be null");
		Assert.notNull(destinationResolver, "The destinationResolver cannot be null");
		this.transactional = transactional;
		this.destinationResolver = destinationResolver;
		this.templateResolver = templateResolver;

	 * Set to true to retain a Java serialized {@link DeserializationException} header. By
	 * default, such headers are removed from the published record, unless both key and
	 * value deserialization exceptions occur, in which case, the DLT_* headers are
	 * created from the value exception and the key exception header is retained.
	 * @param retainExceptionHeader true to retain the
	 * @since 2.5
	public void setRetainExceptionHeader(boolean retainExceptionHeader) {
		this.retainExceptionHeader = retainExceptionHeader;

	 * Set a function which will be called to obtain additional headers to add to the
	 * published record.
	 * @param headersFunction the headers function.
	 * @since 2.5.4
	 * @see #addHeadersFunction(BiFunction)
	public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
		Assert.notNull(headersFunction, "'headersFunction' cannot be null");
		if (!this.headersFunction.equals(DEFAULT_HEADERS_FUNCTION)) {
			this.logger.warn(() -> "Replacing custom headers function: " + this.headersFunction
					+ ", consider using addHeadersFunction() if you need multiple functions");
		this.headersFunction = headersFunction;

	 * Set to false to disable partition verification. When true, verify that the
	 * partition returned by the resolver actually exists. If not, set the
	 * {@link ProducerRecord#partition()} to null, allowing the producer to determine the
	 * destination partition.
	 * @param verifyPartition false to disable.
	 * @since 2.7
	 * @see #setPartitionInfoTimeout(Duration)
	public void setVerifyPartition(boolean verifyPartition) {
		this.verifyPartition = verifyPartition;

	 * Time to wait for partition information when verifying. Default is 5 seconds.
	 * @param partitionInfoTimeout the timeout.
	 * @since 2.7
	 * @see #setVerifyPartition(boolean)
	public void setPartitionInfoTimeout(Duration partitionInfoTimeout) {
		Assert.notNull(partitionInfoTimeout, "'partitionInfoTimeout' cannot be null");
		this.partitionInfoTimeout = partitionInfoTimeout;

	 * Set to false if you don't want to append the current "original" headers (topic,
	 * partition etc.) if they are already present. When false, only the first "original"
	 * headers are retained.
	 * @param appendOriginalHeaders set to false not to replace.
	 * @since 2.7.9
	public void setAppendOriginalHeaders(boolean appendOriginalHeaders) {
		this.appendOriginalHeaders = appendOriginalHeaders;

	 * Set to true to throw an exception if the destination resolver function returns
	 * a null TopicPartition.
	 * @param throwIfNoDestinationReturned true to enable.
	 * @since 2.7
	public void setThrowIfNoDestinationReturned(boolean throwIfNoDestinationReturned) {
		this.throwIfNoDestinationReturned = throwIfNoDestinationReturned;

	 * Set to true to enable waiting for the send result and throw an exception if it fails.
	 * It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
	 * @param failIfSendResultIsError true to enable.
	 * @since 2.7
	 * @see #setWaitForSendResultTimeout(Duration)
	public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
		this.failIfSendResultIsError = failIfSendResultIsError;

	 * If true, wait for the send result and throw an exception if it fails.
	 * It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
	 * @return true to wait.
	 * @since 2.7.14
	 * @see #setWaitForSendResultTimeout(Duration)
	protected boolean isFailIfSendResultIsError() {
		return this.failIfSendResultIsError;

	 * Set the minimum time to wait for message sending. Default is the producer
	 * configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
	 * @param waitForSendResultTimeout the timeout.
	 * @since 2.7
	 * @see #setFailIfSendResultIsError(boolean)
	 * @see #setTimeoutBuffer(long)
	public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
		this.waitForSendResultTimeout = waitForSendResultTimeout;

	 * Set the number of milliseconds to add to the producer configuration
	 * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
	 * Default 5000.
	 * @param buffer the buffer.
	 * @since 2.7
	 * @see #setWaitForSendResultTimeout(Duration)
	public void setTimeoutBuffer(long buffer) {
		this.timeoutBuffer = buffer;

	 * The number of milliseconds to add to the producer configuration
	 * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
	 * @return the buffer.
	 * @since 2.7.14
	protected long getTimeoutBuffer() {
		return this.timeoutBuffer;

	 * Set to false to retain previous exception headers as well as headers for the
	 * current exception. Default is true, which means only the current headers are
	 * retained; setting it to false this can cause a growth in record size when a record
	 * is republished many times.
	 * @param stripPreviousExceptionHeaders false to retain all.
	 * @since 2.7.9
	public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders) {
		this.stripPreviousExceptionHeaders = stripPreviousExceptionHeaders;

	 * Set to false if you want to forward the record to the same topic even though
	 * the exception is fatal by this class' classification, e.g. to handle this scenario
	 * in a different layer.
	 * @param skipSameTopicFatalExceptions false to forward in this scenario.
	public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions) {
		this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;

	 * Set a {@link ExceptionHeadersCreator} implementation to completely take over
	 * setting the exception headers in the output record. Disables all headers that are
	 * set by default.
	 * @param headersCreator the creator.
	 * @since 2.8.4
	public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) {
		Assert.notNull(headersCreator, "'headersCreator' cannot be null");
		this.exceptionHeadersCreator = headersCreator;

	 * True if publishing should run in a transaction.
	 * @return true for transactional.
	 * @since 2.7.14
	protected boolean isTransactional() {
		return this.transactional;

	 * Clear the header inclusion bit for the header name.
	 * @param headers the headers to clear.
	 * @since 2.8.4
	public void excludeHeader(HeaderNames.HeadersToAdd... headers) {
		Assert.notNull(headers, "'headers' cannot be null");
		Assert.noNullElements(headers, "'headers' cannot include null elements");
		for (HeaderNames.HeadersToAdd header : headers) {

	 * Set the header inclusion bit for the header name.
	 * @param headers the headers to set.
	 * @since 2.8.4
	public void includeHeader(HeaderNames.HeadersToAdd... headers) {
		Assert.notNull(headers, "'headers' cannot be null");
		Assert.noNullElements(headers, "'headers' cannot include null elements");
		for (HeaderNames.HeadersToAdd header : headers) {

	 * Add a function which will be called to obtain additional headers to add to the
	 * published record. Functions are called in the order that they are added, and after
	 * any function passed into {@link #setHeadersFunction(BiFunction)}.
	 * @param headersFunction the headers function.
	 * @since 2.8.4
	 * @see #setHeadersFunction(BiFunction)
	public void addHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
		Assert.notNull(headersFunction, "'headersFunction' cannot be null");
		if (this.headersFunction.equals(DEFAULT_HEADERS_FUNCTION)) {
			this.headersFunction = headersFunction;
		else {
			BiFunction<ConsumerRecord<?, ?>, Exception, Headers> toCompose = this.headersFunction;
			this.headersFunction = (rec, ex) -> {
				Headers headers1 = toCompose.apply(rec, ex);
				if (headers1 == null) {
					headers1 = new RecordHeaders();
				Headers headers2 = headersFunction.apply(rec, ex);
				try {
					if (headers2 != null) {
				catch (IllegalStateException isex) {
					headers1 = new RecordHeaders(headers1);
					headers2.forEach(headers1::add); // NOSONAR, never null here
				return headers1;

	public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
		TopicPartition tp = this.destinationResolver.apply(record, exception);
		if (tp == null) {
			maybeThrow(record, exception);
			this.logger.debug(() -> "Recovery of " + KafkaUtils.format(record)
					+ " skipped because destination resolver returned null");
		if (this.skipSameTopicFatalExceptions
				&& tp.topic().equals(record.topic())
				&& !getClassifier().classify(exception)) {
			this.logger.error("Recovery of " + KafkaUtils.format(record)
					+ " skipped because not retryable exception " + exception.toString()
					+ " and the destination resolver routed back to the same topic");
		if (consumer != null && this.verifyPartition) {
			tp = checkPartition(tp, consumer);
		DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record,
				SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
		DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
				SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
		Headers headers = new RecordHeaders(record.headers().toArray());
		addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
		ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
				kDeserEx == null ? null : kDeserEx.getData(), vDeserEx == null ? null : vDeserEx.getData());
		KafkaOperations<Object, Object> kafkaTemplate =
				(KafkaOperations<Object, Object>) this.templateResolver.apply(outRecord);
		sendOrThrow(outRecord, kafkaTemplate, record);

	private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
			@Nullable DeserializationException vDeserEx, @Nullable DeserializationException kDeserEx, Headers headers) {

		if (kDeserEx != null) {
			if (!this.retainExceptionHeader) {
			this.exceptionHeadersCreator.create(headers, kDeserEx, true, this.headerNames);
		if (vDeserEx != null) {
			if (!this.retainExceptionHeader) {
			this.exceptionHeadersCreator.create(headers, vDeserEx, false, this.headerNames);
		if (kDeserEx == null && vDeserEx == null) {
			this.exceptionHeadersCreator.create(headers, exception, false, this.headerNames);
		enhanceHeaders(headers, record, exception); // NOSONAR headers are never null

	private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
			@Nullable KafkaOperations<Object, Object> kafkaTemplate, ConsumerRecord<?, ?> inRecord) {

		if (kafkaTemplate != null) {
			send(outRecord, kafkaTemplate, inRecord);
		else {
			throw new IllegalArgumentException("No kafka template returned for record " + outRecord);

	private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
		String message = String.format("No destination returned for record %s and exception %s. " +
				"failIfNoDestinationReturned: %s", KafkaUtils.format(record), exception,
		if (this.throwIfNoDestinationReturned) {
			throw new IllegalArgumentException(message);

	 * Send the record.
	 * @param outRecord the record.
	 * @param kafkaTemplate the template.
	 * @param inRecord the consumer record.
	 * @since 2.7
	protected void send(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
			ConsumerRecord<?, ?> inRecord) {

		if (this.transactional && !kafkaTemplate.inTransaction() && !kafkaTemplate.isAllowNonTransactional()) {
			kafkaTemplate.executeInTransaction(t -> {
				publish(outRecord, t, inRecord);
				return null;
		else {
			publish(outRecord, kafkaTemplate, inRecord);

	private TopicPartition checkPartition(TopicPartition tp, Consumer<?, ?> consumer) {
		if (tp.partition() < 0) {
			return tp;
		try {
			List<PartitionInfo> partitions = consumer.partitionsFor(tp.topic(), this.partitionInfoTimeout);
			if (partitions == null) {
				this.logger.debug(() -> "Could not obtain partition info for " + tp.topic());
				return tp;
			boolean anyMatch = partitions.stream().anyMatch(pi -> pi.partition() == tp.partition());
			if (!anyMatch) {
				this.logger.warn(() -> "Destination resolver returned non-existent partition " + tp
						+ ", KafkaProducer will determine partition to use for this topic");
				return new TopicPartition(tp.topic(), -1);
			return tp;
		catch (Exception ex) {
			this.logger.debug(ex, () -> "Could not obtain partition info for " + tp.topic());
			return tp;

	private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object value,
			Map<Class<?>, KafkaOperations<?, ?>> templates) {

		if (value == null) {
			KafkaOperations<?, ?> operations = templates.get(Void.class);
			if (operations == null) {
				return (KafkaOperations<Object, Object>) templates.values().iterator().next();
			else {
				return (KafkaOperations<Object, Object>) operations;
		Optional<Class<?>> key = templates.keySet()
			.filter((k) -> k.isAssignableFrom(value.getClass()))
		if (key.isPresent()) {
			return (KafkaOperations<Object, Object>) templates.get(key.get());
		this.logger.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
		return (KafkaOperations<Object, Object>) templates.values()
				.reduce((first,  second) -> second)

	 * Subclasses can override this method to customize the producer record to send to the
	 * DLQ. The default implementation simply copies the key and value from the consumer
	 * record and adds the headers. The timestamp is not set (the original timestamp is in
	 * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
	 * less than 0, it must be set to null in the {@link ProducerRecord}.
	 * @param record the failed record
	 * @param topicPartition the {@link TopicPartition} returned by the destination
	 * resolver.
	 * @param headers the headers - original record headers plus DLT headers.
	 * @param key the key to use instead of the consumer record key.
	 * @param value the value to use instead of the consumer record value.
	 * @return the producer record to send.
	 * @see KafkaHeaders
	protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
			TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {

		return new ProducerRecord<>(topicPartition.topic(),
				topicPartition.partition() < 0 ? null : topicPartition.partition(),
				key != null ? key : record.key(),
				value != null ? value : record.value(), headers);

	 * Override this if you want more than just logging of the send result.
	 * @param outRecord the record to send.
	 * @param kafkaTemplate the template.
	 * @param inRecord the consumer record.
	 * @since 2.2.5
	protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
			ConsumerRecord<?, ?> inRecord) {

		CompletableFuture<SendResult<Object, Object>> sendResult = null;
		try {
			sendResult = kafkaTemplate.send(outRecord);
			sendResult.whenComplete((result, ex) -> {
				if (ex == null) {
					this.logger.debug(() -> "Successful dead-letter publication: "
							+ KafkaUtils.format(inRecord) + " to " + result.getRecordMetadata());
				else {
					this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord));
		catch (Exception e) {
			this.logger.error(e, () -> pubFailMessage(outRecord, inRecord));
		if (this.failIfSendResultIsError) {
			verifySendResult(kafkaTemplate, outRecord, sendResult, inRecord);

	 * Wait for the send future to complete.
	 * @param kafkaTemplate the template used to send the record.
	 * @param outRecord the record.
	 * @param sendResult the future.
	 * @param inRecord the original consumer record.
	protected void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
			ProducerRecord<Object, Object> outRecord,
			@Nullable CompletableFuture<SendResult<Object, Object>> sendResult, ConsumerRecord<?, ?> inRecord) {

		Duration sendTimeout = determineSendTimeout(kafkaTemplate);
		if (sendResult == null) {
			throw new KafkaException(pubFailMessage(outRecord, inRecord));
		try {
			sendResult.get(sendTimeout.toMillis(), TimeUnit.MILLISECONDS);
		catch (InterruptedException e) {
			throw new KafkaException(pubFailMessage(outRecord, inRecord), e);
		catch (ExecutionException | TimeoutException e) {
			throw new KafkaException(pubFailMessage(outRecord, inRecord), e);

	private String pubFailMessage(ProducerRecord<Object, Object> outRecord, ConsumerRecord<?, ?> inRecord) {
		return "Dead-letter publication to "
				+ outRecord.topic() + " failed for: " + KafkaUtils.format(inRecord);

	 * Determine the send timeout based on the template's producer factory and
	 * {@link #setWaitForSendResultTimeout(Duration)}.
	 * @param template the template.
	 * @return the timeout.
	 * @since 2.7.14
	protected Duration determineSendTimeout(KafkaOperations<?, ?> template) {
		ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
		if (producerFactory != null) { // NOSONAR - will only occur in mock tests
			Map<String, Object> props = producerFactory.getConfigurationProperties();
			if (props != null) { // NOSONAR - will only occur in mock tests
				return KafkaUtils.determineSendTimeout(props, this.timeoutBuffer,
		return Duration.ofSeconds(THIRTY);

	private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
		maybeAddOriginalHeaders(kafkaHeaders, record, exception);
		Headers headers = this.headersFunction.apply(record, exception);
		if (headers != null) {

	private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception ex) {
		maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader,
				() -> record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC);
		maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader,
				() -> ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(),
		maybeAddHeader(kafkaHeaders, this.headerNames.original.offsetHeader,
				() -> ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(),
		maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampHeader,
				() -> ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS);
		maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader,
				() -> record.timestampType().toString().getBytes(StandardCharsets.UTF_8),
		if (ex instanceof ListenerExecutionFailedException) {
			String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId();
			if (consumerGroup != null) {
				maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup,
						() -> consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP);

	private void maybeAddHeader(Headers kafkaHeaders, String header, Supplier<byte[]> valueSupplier,
			HeaderNames.HeadersToAdd hta) {

		if (this.whichHeaders.contains(hta)
				&& (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null)) {
			kafkaHeaders.add(header, valueSupplier.get());

	private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey,
			HeaderNames names) {

				isKey ? names.exceptionInfo.keyExceptionFqcn : names.exceptionInfo.exceptionFqcn,
				() -> exception.getClass().getName().getBytes(StandardCharsets.UTF_8),
		if (exception.getCause() != null) {
					() -> exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8),
		String message = exception.getMessage();
		if (message != null) {
					isKey ? names.exceptionInfo.keyExceptionMessage : names.exceptionInfo.exceptionMessage,
					() -> exception.getMessage().getBytes(StandardCharsets.UTF_8),
				isKey ? names.exceptionInfo.keyExceptionStacktrace : names.exceptionInfo.exceptionStacktrace,
				() -> getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8),

	private void appendOrReplace(Headers headers, String header, Supplier<byte[]> valueSupplier,
			HeaderNames.HeadersToAdd hta) {

		if (this.whichHeaders.contains(hta)) {
			if (this.stripPreviousExceptionHeaders) {
			headers.add(header, valueSupplier.get());

	private String getStackTraceAsString(Throwable cause) {
		StringWriter stringWriter = new StringWriter();
		PrintWriter printWriter = new PrintWriter(stringWriter, true);
		return stringWriter.getBuffer().toString();

	 * Override this if you want different header names to be used
	 * in the sent record.
	 * @return the header names.
	 * @since 2.7
	protected HeaderNames getHeaderNames() {
		return HeaderNames.Builder

	 * Container class for the name of the headers that will
	 * be added to the produced record.
	 * @since 2.7
	public static class HeaderNames {

		 * Bits representing which headers to add.
		 * @since 2.8.4
		public enum HeadersToAdd {

			 * The offset of the failed record.

			 * The timestamp of the failed record.

			 * The timestamp type of the failed record.

			 * The original topic of the failed record.

			 * The partition from which the failed record was received.

			 * The consumer group that received the failed record.

			 * The exception class name.

			 * The exception cause class name.

			 * The exception message.

			 * The exception stack trace.


		private final HeaderNames.Original original;

		private final ExceptionInfo exceptionInfo;

		HeaderNames(HeaderNames.Original original, ExceptionInfo exceptionInfo) {
			this.original = original;
			this.exceptionInfo = exceptionInfo;

		 * The header names for the original record headers.
		 * @return the original.
		 * @since 2.8.4
		public HeaderNames.Original getOriginal() {
			return this.original;

		 * The header names for the exception headers.
		 * @return the exceptionInfo
		 * @since 2.8.4
		public ExceptionInfo getExceptionInfo() {
			return this.exceptionInfo;

		 * Header names for original record property headers.
		 * @since 2.8.4
		public static class Original {

			final String offsetHeader; // NOSONAR

			final String timestampHeader; // NOSONAR

			final String timestampTypeHeader; // NOSONAR

			final String topicHeader; // NOSONAR

			final String partitionHeader; // NOSONAR

			final String consumerGroup; // NOSONAR

			Original(String offsetHeader,
					String timestampHeader,
					String timestampTypeHeader,
					String topicHeader,
					String partitionHeader,
					String consumerGroup) {
				this.offsetHeader = offsetHeader;
				this.timestampHeader = timestampHeader;
				this.timestampTypeHeader = timestampTypeHeader;
				this.topicHeader = topicHeader;
				this.partitionHeader = partitionHeader;
				this.consumerGroup = consumerGroup;

			 * The header name for the offset.
			 * @return the offsetHeader.
			public String getOffsetHeader() {
				return this.offsetHeader;

			 * The header name for the timestamp.
			 * @return the timestampHeader.
			public String getTimestampHeader() {
				return this.timestampHeader;

			 * The header name for the timestamp type.
			 * @return the timestampTypeHeader.
			public String getTimestampTypeHeader() {
				return this.timestampTypeHeader;

			 * The header name for the topic.
			 * @return the topicHeader.
			public String getTopicHeader() {
				return this.topicHeader;

			 * The header name for the partition.
			 * @return the partitionHeader
			public String getPartitionHeader() {
				return this.partitionHeader;

			 * The header name for the consumer group.
			 * @return the consumerGroup
			public String getConsumerGroup() {
				return this.consumerGroup;


		 * Header names for exception headers.
		 * @since 2.8.4
		public static class ExceptionInfo {

			final String keyExceptionFqcn; // NOSONAR

			final String exceptionFqcn; // NOSONAR

			final String exceptionCauseFqcn; // NOSONAR

			final String keyExceptionMessage; // NOSONAR

			final String exceptionMessage; // NOSONAR

			final String keyExceptionStacktrace; // NOSONAR

			final String exceptionStacktrace; // NOSONAR

			ExceptionInfo(String keyExceptionFqcn,
					String exceptionFqcn,
					String exceptionCauseFqcn,
					String keyExceptionMessage,
					String exceptionMessage,
					String keyExceptionStacktrace,
					String exceptionStacktrace) {
				this.keyExceptionFqcn = keyExceptionFqcn;
				this.exceptionFqcn = exceptionFqcn;
				this.exceptionCauseFqcn = exceptionCauseFqcn;
				this.keyExceptionMessage = keyExceptionMessage;
				this.exceptionMessage = exceptionMessage;
				this.keyExceptionStacktrace = keyExceptionStacktrace;
				this.exceptionStacktrace = exceptionStacktrace;

			 * The header name for the key exception class.
			 * @return the keyExceptionFqcn.
			public String getKeyExceptionFqcn() {
				return this.keyExceptionFqcn;

			 * The header name for the value exception class.
			 * @return the exceptionFqcn.
			public String getExceptionFqcn() {
				return this.exceptionFqcn;

			 * The header name for the exception cause.
			 * @return the exceptionCauseFqcn.
			public String getExceptionCauseFqcn() {
				return this.exceptionCauseFqcn;

			 * The header name for the key exception message.
			 * @return the keyExceptionMessage.
			public String getKeyExceptionMessage() {
				return this.keyExceptionMessage;

			 * The header name for the exception message.
			 * @return the exceptionMessage.
			public String getExceptionMessage() {
				return this.exceptionMessage;

			 * The header name for the key exception stack trace.
			 * @return the keyExceptionStacktrace
			public String getKeyExceptionStacktrace() {
				return this.keyExceptionStacktrace;

			 * The header name for the exception stack trace.
			 * @return the exceptionStacktrace
			public String getExceptionStacktrace() {
				return this.exceptionStacktrace;


		 * Provides a convenient API for creating
		 * {@link DeadLetterPublishingRecoverer.HeaderNames}.
		 * @author Tomaz Fernandes
		 * @since 2.7
		 * @see HeaderNames
		public static class Builder {

			private final Original original = new Original();

			private final ExceptionInfo exceptionInfo = new ExceptionInfo();

			public static Builder.Original original() {
				return new Builder().original;

			 * Headers for data relative to the original record.
			 * @author Tomaz Fernandes
			 * @since 2.7
			public class Original {

				private String offsetHeader;

				private String timestampHeader;

				private String timestampTypeHeader;

				private String topicHeader;

				private String partitionHeader;

				private String consumerGroupHeader;

				 * Sets the name of the header that will be used to store the offset
				 * of the original record.
				 * @param offsetHeader the offset header name.
				 * @return the Original builder instance
				 * @since 2.7
				public Builder.Original offsetHeader(String offsetHeader) {
					this.offsetHeader = offsetHeader;
					return this;

				 * Sets the name of the header that will be used to store the timestamp
				 * of the original record.
				 * @param timestampHeader the timestamp header name.
				 * @return the Original builder instance
				 * @since 2.7
				public Builder.Original timestampHeader(String timestampHeader) {
					this.timestampHeader = timestampHeader;
					return this;

				 * Sets the name of the header that will be used to store the timestampType
				 * of the original record.
				 * @param timestampTypeHeader the timestampType header name.
				 * @return the Original builder instance
				 * @since 2.7
				public Builder.Original timestampTypeHeader(String timestampTypeHeader) {
					this.timestampTypeHeader = timestampTypeHeader;
					return this;

				 * Sets the name of the header that will be used to store the topic
				 * of the original record.
				 * @param topicHeader the topic header name.
				 * @return the Original builder instance
				 * @since 2.7
				public Builder.Original topicHeader(String topicHeader) {
					this.topicHeader = topicHeader;
					return this;

				 * Sets the name of the header that will be used to store the partition
				 * of the original record.
				 * @param partitionHeader the partition header name.
				 * @return the Original builder instance
				 * @since 2.7
				public Builder.Original partitionHeader(String partitionHeader) {
					this.partitionHeader = partitionHeader;
					return this;

				 * Sets the name of the header that will be used to store the consumer
				 * group that failed to consume the original record.
				 * @param consumerGroupHeader the consumer group header name.
				 * @return the Original builder instance
				 * @since 2.8
				public Builder.Original consumerGroupHeader(String consumerGroupHeader) {
					this.consumerGroupHeader = consumerGroupHeader;
					return this;

				 * Returns the exception builder.
				 * @return the exception builder.
				 * @since 2.7
				public ExceptionInfo exception() {
					return Builder.this.exceptionInfo;

				 * Builds the Original header names, asserting that none of them is null.
				 * @return the Original header names.
				 * @since 2.7
				private DeadLetterPublishingRecoverer.HeaderNames.Original build() {
					Assert.notNull(this.offsetHeader, "offsetHeader cannot be null");
					Assert.notNull(this.timestampHeader, "timestampHeader cannot be null");
					Assert.notNull(this.timestampTypeHeader, "timestampTypeHeader cannot be null");
					Assert.notNull(this.topicHeader, "topicHeader cannot be null");
					Assert.notNull(this.partitionHeader, "partitionHeader cannot be null");
					Assert.notNull(this.consumerGroupHeader, "consumerGroupHeader cannot be null");
					return new DeadLetterPublishingRecoverer.HeaderNames.Original(this.offsetHeader,

			 * Headers for data relative to the exception thrown.
			 * @author Tomaz Fernandes
			 * @since 2.7
			public class ExceptionInfo {

				private String keyExceptionFqcn;

				private String exceptionFqcn;

				private String exceptionCauseFqcn;

				private String keyExceptionMessage;

				private String exceptionMessage;

				private String keyExceptionStacktrace;

				private String exceptionStacktrace;

				 * Sets the name of the header that will be used to store the keyExceptionFqcn
				 * of the original record.
				 * @param keyExceptionFqcn the keyExceptionFqcn header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo keyExceptionFqcn(String keyExceptionFqcn) {
					this.keyExceptionFqcn = keyExceptionFqcn;
					return this;

				 * Sets the name of the header that will be used to store the exceptionFqcn
				 * of the original record.
				 * @param exceptionFqcn the exceptionFqcn header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo exceptionFqcn(String exceptionFqcn) {
					this.exceptionFqcn = exceptionFqcn;
					return this;

				 * Sets the name of the header that will be used to store the exceptionCauseFqcn
				 * of the original record.
				 * @param exceptionCauseFqcn the exceptionFqcn header name.
				 * @return the Exception builder instance
				 * @since 2.8
				public ExceptionInfo exceptionCauseFqcn(String exceptionCauseFqcn) {
					this.exceptionCauseFqcn = exceptionCauseFqcn;
					return this;
				 * Sets the name of the header that will be used to store the keyExceptionMessage
				 * of the original record.
				 * @param keyExceptionMessage the keyExceptionMessage header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo keyExceptionMessage(String keyExceptionMessage) {
					this.keyExceptionMessage = keyExceptionMessage;
					return this;

				 * Sets the name of the header that will be used to store the exceptionMessage
				 * of the original record.
				 * @param exceptionMessage the exceptionMessage header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo exceptionMessage(String exceptionMessage) {
					this.exceptionMessage = exceptionMessage;
					return this;

				 * Sets the name of the header that will be used to store the
				 * keyExceptionStacktrace of the original record.
				 * @param keyExceptionStacktrace the keyExceptionStacktrace header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo keyExceptionStacktrace(String keyExceptionStacktrace) {
					this.keyExceptionStacktrace = keyExceptionStacktrace;
					return this;

				 * Sets the name of the header that will be used to store the
				 * exceptionStacktrace of the original record.
				 * @param exceptionStacktrace the exceptionStacktrace header name.
				 * @return the Exception builder instance
				 * @since 2.7
				public ExceptionInfo exceptionStacktrace(String exceptionStacktrace) {
					this.exceptionStacktrace = exceptionStacktrace;
					return this;

				 * Builds the Header Names, asserting that none of them is null.
				 * @return the HeaderNames instance.
				 * @since 2.7
				public DeadLetterPublishingRecoverer.HeaderNames build() {
					Assert.notNull(this.keyExceptionFqcn, "keyExceptionFqcn header cannot be null");
					Assert.notNull(this.exceptionFqcn, "exceptionFqcn header cannot be null");
					Assert.notNull(this.exceptionCauseFqcn, "exceptionCauseFqcn header cannot be null");
					Assert.notNull(this.keyExceptionMessage, "keyExceptionMessage header cannot be null");
					Assert.notNull(this.exceptionMessage, "exceptionMessage header cannot be null");
					Assert.notNull(this.keyExceptionStacktrace, "keyExceptionStacktrace header cannot be null");
					Assert.notNull(this.exceptionStacktrace, "exceptionStacktrace header cannot be null");
					return new DeadLetterPublishingRecoverer.HeaderNames(Builder.this.original.build(),
							new HeaderNames.ExceptionInfo(this.keyExceptionFqcn,


	 * Use this to provide a custom implementation to take complete control over exception
	 * header creation for the output record.
	 * @since 2.8.4
	public interface ExceptionHeadersCreator {

		 * Create exception headers.
		 * @param kafkaHeaders the {@link Headers} to add the header(s) to.
		 * @param exception The exception.
		 * @param isKey whether the exception is for a key or value.
		 * @param headerNames the heaader names to use.
		void create(Headers kafkaHeaders, Exception exception, boolean isKey, HeaderNames headerNames);




