spring-batch SimpleChunkProvider 源码
spring-batch SimpleChunkProvider 代码
文件路径:/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.step.item;
import java.util.List;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.listener.MulticasterBatchListener;
import org.springframework.batch.core.observability.BatchMetrics;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.repeat.RepeatCallback;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.lang.Nullable;
/**
* Simple implementation of the ChunkProvider interface that does basic chunk providing
* from an {@link ItemReader}.
*
* @author Dave Syer
* @author Michael Minella
* @author Mahmoud Ben Hassine
* @see ChunkOrientedTasklet
*/
public class SimpleChunkProvider<I> implements ChunkProvider<I> {
protected final Log logger = LogFactory.getLog(getClass());
protected final ItemReader<? extends I> itemReader;
private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<>();
private final RepeatOperations repeatOperations;
public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
this.itemReader = itemReader;
this.repeatOperations = repeatOperations;
}
/**
* Register some {@link StepListener}s with the handler. Each will get the callbacks
* in the order specified at the correct stage.
* @param listeners list of {@link StepListener}s.
*/
public void setListeners(List<? extends StepListener> listeners) {
for (StepListener listener : listeners) {
registerListener(listener);
}
}
/**
* Register a listener for callbacks at the appropriate stages in a process.
* @param listener a {@link StepListener}
*/
public void registerListener(StepListener listener) {
this.listener.register(listener);
}
/**
* @return the listener
*/
protected MulticasterBatchListener<I, ?> getListener() {
return listener;
}
/**
* Surrounds the read call with listener callbacks.
* @return the item or {@code null} if the data source is exhausted
* @throws Exception is thrown if error occurs during read.
*/
@Nullable
protected final I doRead() throws Exception {
try {
listener.beforeRead();
I item = itemReader.read();
if (item != null) {
listener.afterRead(item);
}
return item;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage() + " : " + e.getClass().getName());
}
listener.onReadError(e);
throw e;
}
}
@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
final Chunk<I> inputs = new Chunk<>();
repeatOperations.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}
});
return inputs;
}
private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String status) {
String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + "item.read";
sample.stop(BatchMetrics.createTimer("item.read", "Item reading duration",
Tag.of(fullyQualifiedMetricName + ".job.name",
stepExecution.getJobExecution().getJobInstance().getJobName()),
Tag.of(fullyQualifiedMetricName + ".step.name", stepExecution.getStepName()),
Tag.of(fullyQualifiedMetricName + ".status", status)));
}
@Override
public void postProcess(StepContribution contribution, Chunk<I> chunk) {
// do nothing
}
/**
* Delegates to {@link #doRead()}. Subclasses can add additional behaviour (e.g.
* exception handling).
* @param contribution the current step execution contribution
* @param chunk the current chunk
* @return a new item for processing or {@code null} if the data source is exhausted
* @throws SkipOverflowException if specifically the chunk is accumulating too much
* data (e.g. skips) and it wants to force a commit.
* @throws Exception if there is a generic issue
*/
@Nullable
protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
return doRead();
}
}
相关信息
相关文章
spring-batch BatchRetryTemplate 源码
spring-batch ChunkOrientedTasklet 源码
spring-batch ChunkProcessor 源码
spring-batch DefaultItemFailureHandler 源码
spring-batch FaultTolerantChunkProcessor 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦