Restarting steps & persisted state as part of the same JobInstance after a COMPLETED execution #4609
Replies: 2 comments
-
Are you restarting the same job instance? Do you have an identifying job parameter? Moreover, are you using a persistent job repository in your example? Or probably an in-memory database but you are restarting the same job manually after the first attempt? If you provide an example I can try to debug the case and help you more efficiently. |
Beta Was this translation helpful? Give feedback.
-
Re-reading the documentation, I think it was confusion on my side, as what I was trying to do sounds like something that's not really intended, namely to have a But, to answer your questions: no, I don't think I was restarting the same job instance in the same sense that the documentation points to (but it was the same instance I was trying to start again) and yes, I did use identifiers at some point (though not for purposes related to restorability capabilities). While I might end up abandoning this approach and stick to more out-of-the-box functionalities, here's how I solved it in the end. I've also attached a sample project (please chip in if you think there's a better way of doing it), but the gist of it is: package com.example;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.List;
@Configuration
@EnableConfigurationProperties(Config.Properties.class)
public class Config {
@ConfigurationProperties("com.example")
public record Properties(List<String> stream) { }
@Bean
public Job restartableJob(
final Properties properties,
final JobRepository jobRepository,
final PlatformTransactionManager platformTransactionManager
) {
final String KEY = "state.next";
return new JobBuilder("restartable-job", jobRepository)
.start(new StepBuilder("consume", jobRepository)
.allowStartIfComplete(true)
.<String, String>chunk(100, platformTransactionManager)
.reader(new AbstractItemStreamItemReader<>() {
private int next = 0;
@Override
public void open(ExecutionContext executionContext) {
// if this is not the first execution, start reading from where we left off
if (executionContext.containsKey(KEY)) {
next = executionContext.getInt(KEY);
}
}
@Override
public String read() {
if (next >= properties.stream().size()) {
return null;
}
return properties.stream().get(next++);
}
@Override
public void update(ExecutionContext executionContext) {
// remember where we left off
executionContext.putInt(KEY, next);
}
})
.writer(chunk -> System.out.println(chunk.getItems()))
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {
// see if we have some state from previous executions and restore it
stepExecution.getExecutionContext().put(
KEY,
stepExecution.getJobExecution().getExecutionContext().get(KEY)
);
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// Move the relevant state to the job's context so that we can reload it on subsequent
// executions of this step, as out-of-the=box, such executions (ran after a `COMPLETED`
// one) are started with just the job context.
stepExecution.getJobExecution().getExecutionContext().put(
KEY,
stepExecution.getExecutionContext().get(KEY)
);
return null;
}
})
.build())
.listener(new JobExecutionListener() {
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus().equals(BatchStatus.COMPLETED)) {
// allow this job instance to start again at a later point
jobExecution.setStatus(BatchStatus.STOPPED);
}
}
})
.build();
}
} For what it's worth, while it might be obvious to others, the fact that a job with a |
Beta Was this translation helpful? Give feedback.
-
What is the expected contract for the contents of the
ExecutionContext
that's passed in toItemStream
s (e.g. anItemReader
) after a restart of the sameJobInstance
, when the previous execution wasCOMPLETED
?The documentation states that
but defining a job like:
does not behave like that, as the
ExecutionContext
in the 2nd execution of the job is a new one, not containing an entry under theKEY
(it's basically starting the reading from the beginning).I can see that
SimpleStepHandler
defines a step "restart" asand I'm wondering if that's intentionally excluding the
step.isAllowStartIfComplete()
flag, or it's just an omission, asSimpleStepHandler#shouldStart
does take it into account as part of checking whetherstepStatus == BatchStatus.COMPLETED
.Shouldn't the "restart" of a step be defined instead as
Thanks!
spring-batch version: 5.1.1
spring-boot version: 3.2.5
Beta Was this translation helpful? Give feedback.
All reactions