spring-batch FaultTolerantStepBuilder 源码
spring-batch FaultTolerantStepBuilder 代码
文件路径:/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.step.builder;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.annotation.OnSkipInProcess;
import org.springframework.batch.core.annotation.OnSkipInRead;
import org.springframework.batch.core.annotation.OnSkipInWrite;
import org.springframework.batch.core.listener.StepListenerFactoryBean;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.FatalStepExecutionException;
import org.springframework.batch.core.step.item.BatchRetryTemplate;
import org.springframework.batch.core.step.item.ChunkMonitor;
import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.core.step.item.ChunkProvider;
import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor;
import org.springframework.batch.core.step.item.FaultTolerantChunkProvider;
import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException;
import org.springframework.batch.core.step.item.KeyGenerator;
import org.springframework.batch.core.step.item.SimpleRetryExceptionHandler;
import org.springframework.batch.core.step.skip.CompositeSkipPolicy;
import org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NonSkippableReadException;
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.support.RepeatTemplate;
import org.springframework.batch.support.ReflectionUtils;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.Classifier;
import org.springframework.classify.SubclassClassifier;
import org.springframework.retry.ExhaustedRetryException;
import org.springframework.retry.RetryException;
import org.springframework.retry.RetryListener;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
import org.springframework.retry.policy.NeverRetryPolicy;
import org.springframework.retry.policy.RetryContextCache;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.util.Assert;
/**
* A step builder for fully fault tolerant chunk-oriented item processing steps. Extends
* {@link SimpleStepBuilder} with additional properties for retry and skip of failed
* items.
*
* @author Dave Syer
* @author Chris Schaefer
* @author Michael Minella
* @author Mahmoud Ben Hassine
* @since 2.2
*/
public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
private ChunkMonitor chunkMonitor = new ChunkMonitor();
private boolean streamIsReader;
private int retryLimit = 0;
private BackOffPolicy backOffPolicy;
private Set<RetryListener> retryListeners = new LinkedHashSet<>();
private RetryPolicy retryPolicy;
private RetryContextCache retryContextCache;
private KeyGenerator keyGenerator;
private Collection<Class<? extends Throwable>> noRollbackExceptionClasses = new LinkedHashSet<>();
private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<>();
private Collection<Class<? extends Throwable>> nonSkippableExceptionClasses = new HashSet<>();
private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<>();
private Collection<Class<? extends Throwable>> nonRetryableExceptionClasses = new HashSet<>();
private Set<SkipListener<? super I, ? super O>> skipListeners = new LinkedHashSet<>();
private int skipLimit = 0;
private SkipPolicy skipPolicy;
private boolean processorTransactional = true;
/**
* Create a new builder initialized with any properties in the parent. The parent is
* copied, so it can be re-used.
* @param parent a parent helper containing common step properties
*/
public FaultTolerantStepBuilder(StepBuilderHelper<?> parent) {
super(parent);
}
/**
* Create a new builder initialized with any properties in the parent. The parent is
* copied, so it can be re-used.
* @param parent a parent helper containing common step properties
*/
protected FaultTolerantStepBuilder(SimpleStepBuilder<I, O> parent) {
super(parent);
}
@Override
public TaskletStep build() {
registerStepListenerAsSkipListener();
return super.build();
}
@SuppressWarnings("unchecked")
protected void registerStepListenerAsSkipListener() {
for (StepExecutionListener stepExecutionListener : properties.getStepExecutionListeners()) {
if (stepExecutionListener instanceof SkipListener) {
listener((SkipListener<I, O>) stepExecutionListener);
}
}
for (ChunkListener chunkListener : this.chunkListeners) {
if (chunkListener instanceof SkipListener) {
listener((SkipListener<I, O>) chunkListener);
}
}
}
/**
* Create a new chunk oriented tasklet with reader, writer and processor as provided.
*
* @see org.springframework.batch.core.step.builder.SimpleStepBuilder#createTasklet()
*/
@Override
protected Tasklet createTasklet() {
Assert.state(getReader() != null, "ItemReader must be provided");
Assert.state(getWriter() != null, "ItemWriter must be provided");
addSpecialExceptions();
registerSkipListeners();
ChunkProvider<I> chunkProvider = createChunkProvider();
ChunkProcessor<I> chunkProcessor = createChunkProcessor();
ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
tasklet.setBuffering(!isReaderTransactionalQueue());
return tasklet;
}
/**
* Registers objects using the annotation based listener configuration.
* @param listener the object that has a method configured with listener annotation
* @return this for fluent chaining
*/
@Override
public FaultTolerantStepBuilder<I, O> listener(Object listener) {
super.listener(listener);
Set<Method> skipListenerMethods = new HashSet<>();
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInRead.class));
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));
if (skipListenerMethods.size() > 0) {
StepListenerFactoryBean factory = new StepListenerFactoryBean();
factory.setDelegate(listener);
skipListeners.add((SkipListener) factory.getObject());
}
return this;
}
/**
* Register a skip listener.
* @param listener the listener to register
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
skipListeners.add(listener);
return this;
}
@Override
public FaultTolerantStepBuilder<I, O> listener(ChunkListener listener) {
super.listener(new TerminateOnExceptionChunkListenerDelegate(listener));
return this;
}
@Override
public SimpleStepBuilder<I, O> transactionAttribute(TransactionAttribute transactionAttribute) {
return super.transactionAttribute(getTransactionAttribute(transactionAttribute));
}
/**
* Register a retry listener.
* @param listener the listener to register
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> listener(RetryListener listener) {
retryListeners.add(listener);
return this;
}
/**
* Sets the key generator for identifying retried items. Retry across transaction
* boundaries requires items to be identified when they are encountered again. The
* default strategy is to use the items themselves, relying on their own
* implementation to ensure that they can be identified. Often a key generator is not
* necessary as long as the items have reliable hash code and equals implementations,
* or the reader is not transactional (the default) and the item processor either is
* itself not transactional (not the default) or does not create new items.
* @param keyGenerator a key generator for the stateful retry
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
this.keyGenerator = keyGenerator;
return this;
}
/**
* The maximum number of times to try a failed item. Zero and one both translate to
* try only once and do not retry. Ignored if an explicit {@link #retryPolicy} is set.
* @param retryLimit the retry limit (default 0)
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) {
this.retryLimit = retryLimit;
return this;
}
/**
* Provide an explicit retry policy instead of using the {@link #retryLimit(int)} and
* retryable exceptions provided elsewhere. Can be used to retry different exceptions
* a different number of times, for instance.
* @param retryPolicy a retry policy
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
/**
* Provide a backoff policy to prevent items being retried immediately (e.g. in case
* the failure was caused by a remote resource failure that might take some time to be
* resolved). Ignored if an explicit {@link #retryPolicy} is set.
* @param backOffPolicy the back off policy to use (default no backoff)
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
this.backOffPolicy = backOffPolicy;
return this;
}
/**
* Provide an explicit retry context cache. Retry is stateful across transactions in
* the case of failures in item processing or writing, so some information about the
* context for subsequent retries has to be stored.
* @param retryContextCache cache for retry contexts in between transactions (default
* to standard in-memory implementation)
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
this.retryContextCache = retryContextCache;
return this;
}
/**
* Sets the maximum number of failed items to skip before the step fails. Ignored if
* an explicit {@link #skipPolicy(SkipPolicy)} is provided.
* @param skipLimit the skip limit to set
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> skipLimit(int skipLimit) {
this.skipLimit = skipLimit;
return this;
}
/**
* Explicitly prevent certain exceptions (and subclasses) from being skipped.
* @param type the non-skippable exception
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
skippableExceptionClasses.put(type, false);
return this;
}
/**
* Explicitly request certain exceptions (and subclasses) to be skipped. These
* exceptions (and their subclasses) might be thrown during any phase of the chunk
* processing (read, process, write) but separate counts are made of skips on read,
* process and write inside the step execution.
* @param type the exception type.
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
skippableExceptionClasses.put(type, true);
return this;
}
/**
* Provide an explicit policy for managing skips. A skip policy determines which
* exceptions are skippable and how many times.
* @param skipPolicy the skip policy
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
this.skipPolicy = skipPolicy;
return this;
}
/**
* Mark this exception as ignorable during item read or processing operations.
* Processing continues with no additional callbacks (use skips instead if you need to
* be notified). Ignored during write because there is no guarantee of skip and retry
* without rollback.
* @param type the exception to mark as no rollback
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
noRollbackExceptionClasses.add(type);
return this;
}
/**
* Explicitly ask for an exception (and subclasses) to be excluded from retry.
* @param type the exception to exclude from retry
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
retryableExceptionClasses.put(type, false);
return this;
}
/**
* Explicitly ask for an exception (and subclasses) to be retried.
* @param type the exception to retry
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> retry(Class<? extends Throwable> type) {
retryableExceptionClasses.put(type, true);
return this;
}
/**
* Mark the item processor as non-transactional (default is the opposite). If this
* flag is set the results of item processing are cached across transactions in
* between retries and during skip processing, otherwise the processor will be called
* in every transaction.
* @return this for fluent chaining
*/
public FaultTolerantStepBuilder<I, O> processorNonTransactional() {
this.processorTransactional = false;
return this;
}
@Override
public SimpleStepBuilder<I, O> stream(ItemStream stream) {
if (stream instanceof ItemReader<?>) {
if (!streamIsReader) {
streamIsReader = true;
super.stream(chunkMonitor);
}
// In cases where multiple nested item readers are registered,
// they all want to get the open() and close() callbacks.
chunkMonitor.registerItemStream(stream);
}
else {
super.stream(stream);
}
return this;
}
/**
* Override parent method to prevent creation of a new FaultTolerantStepBuilder
*/
@Override
public FaultTolerantStepBuilder<I, O> faultTolerant() {
return this;
}
protected ChunkProvider<I> createChunkProvider() {
SkipPolicy readSkipPolicy = createSkipPolicy();
readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<>(getReader(),
createChunkOperations());
chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
chunkProvider.setSkipPolicy(readSkipPolicy);
chunkProvider.setRollbackClassifier(getRollbackClassifier());
ArrayList<StepListener> listeners = new ArrayList<>(getItemListeners());
listeners.addAll(skipListeners);
chunkProvider.setListeners(listeners);
return chunkProvider;
}
protected ChunkProcessor<I> createChunkProcessor() {
BatchRetryTemplate batchRetryTemplate = createRetryOperations();
FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<>(getProcessor(),
getWriter(), batchRetryTemplate);
chunkProcessor.setBuffering(!isReaderTransactionalQueue());
chunkProcessor.setProcessorTransactional(processorTransactional);
SkipPolicy writeSkipPolicy = createSkipPolicy();
writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
chunkProcessor.setWriteSkipPolicy(writeSkipPolicy);
chunkProcessor.setProcessSkipPolicy(writeSkipPolicy);
chunkProcessor.setRollbackClassifier(getRollbackClassifier());
chunkProcessor.setKeyGenerator(keyGenerator);
detectStreamInReader();
ArrayList<StepListener> listeners = new ArrayList<>(getItemListeners());
listeners.addAll(skipListeners);
chunkProcessor.setListeners(listeners);
chunkProcessor.setChunkMonitor(chunkMonitor);
return chunkProcessor;
}
@SuppressWarnings("unchecked")
private void addSpecialExceptions() {
addNonSkippableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
SkipListenerFailedException.class, SkipPolicyFailedException.class, RetryException.class,
JobInterruptedException.class, Error.class, BeanCreationException.class);
addNonRetryableExceptionIfMissing(SkipLimitExceededException.class, NonSkippableReadException.class,
TransactionException.class, FatalStepExecutionException.class, SkipListenerFailedException.class,
SkipPolicyFailedException.class, RetryException.class, JobInterruptedException.class, Error.class,
BeanCreationException.class);
}
protected void detectStreamInReader() {
if (streamIsReader) {
if (!concurrent()) {
chunkMonitor.setItemReader(getReader());
}
else {
logger.warn("Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, "
+ "and may lead to incorrect restart data being stored.");
}
}
}
/**
* Register explicitly set item listeners and auto-register reader, processor and
* writer if applicable
*/
private void registerSkipListeners() {
// auto-register reader, processor and writer
for (Object itemHandler : new Object[] { getReader(), getWriter(), getProcessor() }) {
if (StepListenerFactoryBean.isListener(itemHandler)) {
StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
if (listener instanceof SkipListener<?, ?>) {
@SuppressWarnings("unchecked")
SkipListener<? super I, ? super O> skipListener = (SkipListener<? super I, ? super O>) listener;
skipListeners.add(skipListener);
}
}
}
}
/**
* Convenience method to get an exception classifier based on the provided transaction
* attributes.
* @return an exception classifier: maps to true if an exception should cause rollback
*/
protected Classifier<Throwable, Boolean> getRollbackClassifier() {
Classifier<Throwable, Boolean> classifier = new BinaryExceptionClassifier(noRollbackExceptionClasses, false);
// Try to avoid pathological cases where we cannot force a rollback
// (should be pretty uncommon):
if (!classifier.classify(new ForceRollbackForWriteSkipException("test", new RuntimeException()))
|| !classifier.classify(new ExhaustedRetryException("test"))) {
final Classifier<Throwable, Boolean> binary = classifier;
Collection<Class<? extends Throwable>> types = new HashSet<>();
types.add(ForceRollbackForWriteSkipException.class);
types.add(ExhaustedRetryException.class);
final Classifier<Throwable, Boolean> panic = new BinaryExceptionClassifier(types, true);
classifier = (Classifier<Throwable, Boolean>) classifiable -> {
// Rollback if either the user's list or our own applies
return panic.classify(classifiable) || binary.classify(classifiable);
};
}
return classifier;
}
@SuppressWarnings("serial")
private TransactionAttribute getTransactionAttribute(TransactionAttribute attribute) {
final Classifier<Throwable, Boolean> classifier = getRollbackClassifier();
return new DefaultTransactionAttribute(attribute) {
@Override
public boolean rollbackOn(Throwable ex) {
return classifier.classify(ex);
}
};
}
protected SkipPolicy createSkipPolicy() {
SkipPolicy skipPolicy = this.skipPolicy;
Map<Class<? extends Throwable>, Boolean> map = new HashMap<>(skippableExceptionClasses);
map.put(ForceRollbackForWriteSkipException.class, true);
LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
if (skipPolicy == null) {
Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
"If a skip limit is provided then skippable exceptions must also be specified");
skipPolicy = limitCheckingItemSkipPolicy;
}
else if (limitCheckingItemSkipPolicy != null) {
skipPolicy = new CompositeSkipPolicy(new SkipPolicy[] { skipPolicy, limitCheckingItemSkipPolicy });
}
return skipPolicy;
}
/**
* @return fully configured retry template for item processing phase.
*/
protected BatchRetryTemplate createRetryOperations() {
RetryPolicy retryPolicy = this.retryPolicy;
SimpleRetryPolicy simpleRetryPolicy = null;
Map<Class<? extends Throwable>, Boolean> map = new HashMap<>(retryableExceptionClasses);
map.put(ForceRollbackForWriteSkipException.class, true);
simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map);
if (retryPolicy == null) {
Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
"If a retry limit is provided then retryable exceptions must also be specified");
retryPolicy = simpleRetryPolicy;
}
else if ((!retryableExceptionClasses.isEmpty() && retryLimit > 0)) {
CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
compositeRetryPolicy.setPolicies(new RetryPolicy[] { retryPolicy, simpleRetryPolicy });
retryPolicy = compositeRetryPolicy;
}
RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate();
if (backOffPolicy != null) {
batchRetryTemplate.setBackOffPolicy(backOffPolicy);
}
batchRetryTemplate.setRetryPolicy(retryPolicyWrapper);
// Coordinate the retry policy with the exception handler:
RepeatOperations stepOperations = getStepOperations();
if (stepOperations instanceof RepeatTemplate) {
SimpleRetryExceptionHandler exceptionHandler = new SimpleRetryExceptionHandler(retryPolicyWrapper,
getExceptionHandler(), nonRetryableExceptionClasses);
((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
}
if (retryContextCache != null) {
batchRetryTemplate.setRetryContextCache(retryContextCache);
}
if (retryListeners != null) {
batchRetryTemplate.setListeners(retryListeners.toArray(new RetryListener[0]));
}
return batchRetryTemplate;
}
protected ChunkMonitor getChunkMonitor() {
return this.chunkMonitor;
}
protected Set<SkipListener<? super I, ? super O>> getSkipListeners() {
return skipListeners;
}
/**
* Wrap the provided {@link org.springframework.retry.RetryPolicy} so that it never
* retries explicitly non-retryable exceptions.
*/
private RetryPolicy getFatalExceptionAwareProxy(RetryPolicy retryPolicy) {
NeverRetryPolicy neverRetryPolicy = new NeverRetryPolicy();
Map<Class<? extends Throwable>, RetryPolicy> map = new HashMap<>();
for (Class<? extends Throwable> fatal : nonRetryableExceptionClasses) {
map.put(fatal, neverRetryPolicy);
}
SubclassClassifier<Throwable, RetryPolicy> classifier = new SubclassClassifier<>(retryPolicy);
classifier.setTypeMap(map);
ExceptionClassifierRetryPolicy retryPolicyWrapper = new ExceptionClassifierRetryPolicy();
retryPolicyWrapper.setExceptionClassifier(classifier);
return retryPolicyWrapper;
}
/**
* Wrap a {@link SkipPolicy} and make it consistent with known fatal exceptions.
* @param skipPolicy an existing skip policy
* @return a skip policy that will not skip fatal exceptions
*/
protected SkipPolicy getFatalExceptionAwareProxy(SkipPolicy skipPolicy) {
NeverSkipItemSkipPolicy neverSkipPolicy = new NeverSkipItemSkipPolicy();
Map<Class<? extends Throwable>, SkipPolicy> map = new HashMap<>();
for (Class<? extends Throwable> fatal : nonSkippableExceptionClasses) {
map.put(fatal, neverSkipPolicy);
}
SubclassClassifier<Throwable, SkipPolicy> classifier = new SubclassClassifier<>(skipPolicy);
classifier.setTypeMap(map);
ExceptionClassifierSkipPolicy skipPolicyWrapper = new ExceptionClassifierSkipPolicy();
skipPolicyWrapper.setExceptionClassifier(classifier);
return skipPolicyWrapper;
}
@SuppressWarnings("unchecked")
private void addNonSkippableExceptionIfMissing(Class<? extends Throwable>... cls) {
List<Class<? extends Throwable>> exceptions = new ArrayList<>();
for (Class<? extends Throwable> exceptionClass : nonSkippableExceptionClasses) {
exceptions.add(exceptionClass);
}
for (Class<? extends Throwable> fatal : cls) {
if (!exceptions.contains(fatal)) {
exceptions.add(fatal);
}
}
nonSkippableExceptionClasses = exceptions;
}
@SuppressWarnings("unchecked")
private void addNonRetryableExceptionIfMissing(Class<? extends Throwable>... cls) {
List<Class<? extends Throwable>> exceptions = new ArrayList<>();
for (Class<? extends Throwable> exceptionClass : nonRetryableExceptionClasses) {
exceptions.add(exceptionClass);
}
for (Class<? extends Throwable> fatal : cls) {
if (!exceptions.contains(fatal)) {
exceptions.add(fatal);
}
}
nonRetryableExceptionClasses = exceptions;
}
/**
* ChunkListener that wraps exceptions thrown from the ChunkListener in
* {@link FatalStepExecutionException} to force termination of StepExecution
*
* ChunkListeners shoulnd't throw exceptions and expect continued processing, they
* must be handled in the implementation or the step will terminate
*
*/
private class TerminateOnExceptionChunkListenerDelegate implements ChunkListener {
private ChunkListener chunkListener;
TerminateOnExceptionChunkListenerDelegate(ChunkListener chunkListener) {
this.chunkListener = chunkListener;
}
@Override
public void beforeChunk(ChunkContext context) {
try {
chunkListener.beforeChunk(context);
}
catch (Throwable t) {
throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
}
}
@Override
public void afterChunk(ChunkContext context) {
try {
chunkListener.afterChunk(context);
}
catch (Throwable t) {
throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
}
}
@Override
public void afterChunkError(ChunkContext context) {
try {
chunkListener.afterChunkError(context);
}
catch (Throwable t) {
throw new FatalStepExecutionException("ChunkListener threw exception, rethrowing as fatal", t);
}
}
@Override
public int hashCode() {
return chunkListener.hashCode();
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (obj instanceof FaultTolerantStepBuilder.TerminateOnExceptionChunkListenerDelegate) {
// unwrap the ChunkListener
obj = ((TerminateOnExceptionChunkListenerDelegate) obj).chunkListener;
}
return chunkListener.equals(obj);
}
}
}
相关信息
相关文章
spring-batch AbstractTaskletStepBuilder 源码
spring-batch FlowStepBuilder 源码
spring-batch JobStepBuilder 源码
spring-batch PartitionStepBuilder 源码
spring-batch SimpleStepBuilder 源码
spring-batch StepBuilderException 源码
spring-batch StepBuilderHelper 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦