spring-batch TaskletStep 源码
spring-batch TaskletStep 代码
文件路径:/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java
/*
* Copyright 2006-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.step.tasklet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.listener.CompositeChunkListener;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
import org.springframework.batch.core.step.AbstractStep;
import org.springframework.batch.core.step.FatalStepExecutionException;
import org.springframework.batch.core.step.StepInterruptionPolicy;
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemStream;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.batch.repeat.support.RepeatTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import java.util.concurrent.Semaphore;
/**
* Simple implementation of executing the step as a call to a {@link Tasklet}, possibly
* repeated, and each call surrounded by a transaction. The structure is therefore that of
* a loop with transaction boundary inside the loop. The loop is controlled by the step
* operations ( {@link #setStepOperations(RepeatOperations)}).<br>
* <br>
*
* Clients can use interceptors in the step operations to intercept or listen to the
* iteration on a step-wide basis, for instance to get a callback when the step is
* complete. Those that want callbacks at the level of an individual tasks, can specify
* interceptors for the chunk operations.
*
* @author Dave Syer
* @author Lucas Ward
* @author Ben Hale
* @author Robert Kasanicky
* @author Michael Minella
* @author Will Schipp
* @author Mahmoud Ben Hassine
*/
@SuppressWarnings("serial")
public class TaskletStep extends AbstractStep {
private static final Log logger = LogFactory.getLog(TaskletStep.class);
private RepeatOperations stepOperations = new RepeatTemplate();
private CompositeChunkListener chunkListener = new CompositeChunkListener();
// default to checking current thread for interruption.
private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
private CompositeItemStream stream = new CompositeItemStream();
private PlatformTransactionManager transactionManager;
private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
@Override
public boolean rollbackOn(Throwable ex) {
return true;
}
};
private Tasklet tasklet;
public static final String TASKLET_TYPE_KEY = "batch.taskletType";
/**
* Default constructor.
*/
public TaskletStep() {
this(null);
}
/**
* @param name the name for the {@link TaskletStep}
*/
public TaskletStep(String name) {
super(name);
}
/*
* (non-Javadoc)
*
* @see org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.state(transactionManager != null, "A transaction manager must be provided");
}
/**
* Public setter for the {@link PlatformTransactionManager}.
* @param transactionManager the transaction manager to set
*/
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Public setter for the {@link TransactionAttribute}.
* @param transactionAttribute the {@link TransactionAttribute} to set
*/
public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
this.transactionAttribute = transactionAttribute;
}
/**
* Public setter for the {@link Tasklet}.
* @param tasklet the {@link Tasklet} to set
*/
public void setTasklet(Tasklet tasklet) {
this.tasklet = tasklet;
if (tasklet instanceof StepExecutionListener) {
registerStepExecutionListener((StepExecutionListener) tasklet);
}
}
/**
* Register a chunk listener for callbacks at the appropriate stages in a step
* execution.
* @param listener a {@link ChunkListener}
*/
public void registerChunkListener(ChunkListener listener) {
this.chunkListener.register(listener);
}
/**
* Register each of the objects as listeners.
* @param listeners an array of listener objects of known types.
*/
public void setChunkListeners(ChunkListener[] listeners) {
for (int i = 0; i < listeners.length; i++) {
registerChunkListener(listeners[i]);
}
}
/**
* Register each of the streams for callbacks at the appropriate time in the step. The
* {@link ItemReader} and {@link ItemWriter} are automatically registered, but it
* doesn't hurt to also register them here. Injected dependencies of the reader and
* writer are not automatically registered, so if you implement {@link ItemWriter}
* using delegation to another object which itself is a {@link ItemStream}, you need
* to register the delegate here.
* @param streams an array of {@link ItemStream} objects.
*/
public void setStreams(ItemStream[] streams) {
for (int i = 0; i < streams.length; i++) {
registerStream(streams[i]);
}
}
/**
* Register a single {@link ItemStream} for callbacks to the stream interface.
* @param stream instance of {@link ItemStream}
*/
public void registerStream(ItemStream stream) {
this.stream.register(stream);
}
/**
* The {@link RepeatOperations} to use for the outer loop of the batch processing.
* Should be set up by the caller through a factory. Defaults to a plain
* {@link RepeatTemplate}.
* @param stepOperations a {@link RepeatOperations} instance.
*/
public void setStepOperations(RepeatOperations stepOperations) {
this.stepOperations = stepOperations;
}
/**
* Setter for the {@link StepInterruptionPolicy}. The policy is used to check whether
* an external request has been made to interrupt the job execution.
* @param interruptionPolicy a {@link StepInterruptionPolicy}
*/
public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
this.interruptionPolicy = interruptionPolicy;
}
/**
* Process the step and update its context so that progress can be monitored by the
* caller. The step is broken down into chunks, each one executing in a transaction.
* The step and its execution and execution context are all given an up to date
* {@link BatchStatus}, and the {@link JobRepository} is used to store the result.
* Various reporting information are also added to the current context governing the
* step execution, which would normally be available to the caller through the step's
* {@link ExecutionContext}.<br>
* @throws JobInterruptedException if the step or a chunk is interrupted
* @throws RuntimeException if there is an exception during a chunk execution
*
*/
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
stream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
// Shared semaphore per step execution, so other step executions can run
// in parallel without needing the lock
final Semaphore semaphore = createSemaphore();
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
// Before starting a new transaction, check for
// interruption.
interruptionPolicy.checkInterrupted(stepExecution);
RepeatStatus result;
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
catch (UncheckedTransactionException e) {
// Allow checked exceptions to be thrown inside callback
throw (Exception) e.getCause();
}
chunkListener.afterChunk(chunkContext);
// Check for interruption after transaction as well, so that
// the interrupted exception is correctly propagated up to
// caller
interruptionPolicy.checkInterrupted(stepExecution);
return result == null ? RepeatStatus.FINISHED : result;
}
});
}
/**
* Extension point mainly for test purposes so that the behaviour of the lock can be
* manipulated to simulate various pathologies.
* @return a semaphore for locking access to the JobRepository
*/
protected Semaphore createSemaphore() {
return new Semaphore(1);
}
@Override
protected void close(ExecutionContext ctx) throws Exception {
stream.close();
}
@Override
protected void open(ExecutionContext ctx) throws Exception {
stream.open(ctx);
}
/**
* retrieve the tasklet - helper method for JobOperator
* @return the {@link Tasklet} instance being executed within this step
*/
public Tasklet getTasklet() {
return tasklet;
}
/**
* A callback for the transactional work inside a chunk. Also detects failures in the
* transaction commit and rollback, only panicking if the transaction status is
* unknown (i.e. if a commit failure leads to a clean rollback then we assume the
* state is consistent).
*
* @author Dave Syer
*
*/
private class ChunkTransactionCallback implements TransactionSynchronization, TransactionCallback<RepeatStatus> {
private final StepExecution stepExecution;
private final ChunkContext chunkContext;
private boolean rolledBack = false;
private boolean stepExecutionUpdated = false;
private StepExecution oldVersion;
private boolean locked = false;
private final Semaphore semaphore;
public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
this.chunkContext = chunkContext;
this.stepExecution = chunkContext.getStepContext().getStepExecution();
this.semaphore = semaphore;
}
@Override
public void afterCompletion(int status) {
try {
if (status != TransactionSynchronization.STATUS_COMMITTED) {
if (stepExecutionUpdated) {
// Wah! the commit failed. We need to rescue the step
// execution data.
logger.info("Commit failed while step execution data was already updated. "
+ "Reverting to old version.");
copy(oldVersion, stepExecution);
if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
rollback(stepExecution);
}
}
chunkListener.afterChunkError(chunkContext);
}
if (status == TransactionSynchronization.STATUS_UNKNOWN) {
logger.error("Rolling back with transaction in unknown state");
rollback(stepExecution);
stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
stepExecution.setTerminateOnly();
}
}
finally {
// Only release the lock if we acquired it, and release as late
// as possible
if (locked) {
semaphore.release();
}
locked = false;
}
}
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
TransactionSynchronizationManager.registerSynchronization(this);
RepeatStatus result = RepeatStatus.CONTINUABLE;
StepContribution contribution = stepExecution.createStepContribution();
chunkListener.beforeChunk(chunkContext);
// In case we need to push it back to its old value
// after a commit fails...
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
copy(stepExecution, oldVersion);
try {
try {
try {
result = tasklet.execute(contribution, chunkContext);
if (result == null) {
result = RepeatStatus.FINISHED;
}
}
catch (Exception e) {
if (transactionAttribute.rollbackOn(e)) {
chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
throw e;
}
}
}
finally {
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step
// execution.
try {
semaphore.acquire();
locked = true;
}
catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
Thread.currentThread().interrupt();
}
// Apply the contribution to the step
// even if unsuccessful
if (logger.isDebugEnabled()) {
logger.debug("Applying contribution: " + contribution);
}
stepExecution.apply(contribution);
}
stepExecutionUpdated = true;
stream.update(stepExecution.getExecutionContext());
try {
// Going to attempt a commit. If it fails this flag will
// stay false and we can use that later.
getJobRepository().updateExecutionContext(stepExecution);
stepExecution.incrementCommitCount();
if (logger.isDebugEnabled()) {
logger.debug("Saving step execution before commit: " + stepExecution);
}
getJobRepository().update(stepExecution);
}
catch (Exception e) {
// If we get to here there was a problem saving the step
// execution and we have to fail.
String msg = "JobRepository failure forcing rollback";
logger.error(msg, e);
throw new FatalStepExecutionException(msg, e);
}
}
catch (Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
// Allow checked exceptions
throw new UncheckedTransactionException(e);
}
return result;
}
private void rollback(StepExecution stepExecution) {
if (!rolledBack) {
stepExecution.incrementRollbackCount();
rolledBack = true;
}
}
private void copy(final StepExecution source, final StepExecution target) {
target.setVersion(source.getVersion());
target.setWriteCount(source.getWriteCount());
target.setFilterCount(source.getFilterCount());
target.setCommitCount(source.getCommitCount());
target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
}
}
}
相关信息
相关文章
spring-batch CallableTaskletAdapter 源码
spring-batch ConfigurableSystemProcessExitCodeMapper 源码
spring-batch MethodInvokingTaskletAdapter 源码
spring-batch SimpleSystemProcessExitCodeMapper 源码
spring-batch StoppableTasklet 源码
spring-batch SystemCommandException 源码
spring-batch SystemCommandTasklet 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦