diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java index d8da05a6c39..92708cedc75 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/BatchAutoConfiguration.java @@ -56,6 +56,7 @@ import org.springframework.util.StringUtils; * @author Dave Syer * @author EddĂș MelĂ©ndez * @author Kazuki Shimizu + * @author Mahmoud Ben Hassine */ @Configuration @ConditionalOnClass({ JobLauncher.class, DataSource.class, JdbcOperations.class }) @@ -88,9 +89,10 @@ public class BatchAutoConfiguration { @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true) public JobLauncherCommandLineRunner jobLauncherCommandLineRunner( - JobLauncher jobLauncher, JobExplorer jobExplorer) { + JobLauncher jobLauncher, JobExplorer jobExplorer, + JobRepository jobRepository) { JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner( - jobLauncher, jobExplorer); + jobLauncher, jobExplorer, jobRepository); String jobNames = this.properties.getJob().getNames(); if (StringUtils.hasText(jobNames)) { runner.setJobNames(jobNames); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java index d5edeac8436..8be9ff422ca 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunner.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,19 @@ package org.springframework.boot.autoconfigure.batch; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; +import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; @@ -39,12 +44,14 @@ import org.springframework.batch.core.launch.JobParametersNotFoundException; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.Ordered; +import org.springframework.util.Assert; import org.springframework.util.PatternMatchUtils; import org.springframework.util.StringUtils; @@ -55,6 +62,7 @@ import org.springframework.util.StringUtils; * * @author Dave Syer * @author Jean-Pierre Bergamin + * @author Mahmoud Ben Hassine */ public class JobLauncherCommandLineRunner implements CommandLineRunner, Ordered, ApplicationEventPublisherAware { @@ -69,11 +77,13 @@ public class JobLauncherCommandLineRunner private JobParametersConverter converter = new DefaultJobParametersConverter(); - private JobLauncher jobLauncher; + private final JobLauncher jobLauncher; - private JobRegistry jobRegistry; + private final JobExplorer jobExplorer; + + private final JobRepository jobRepository; - private JobExplorer jobExplorer; + private JobRegistry jobRegistry; private String jobNames; @@ -83,10 +93,38 @@ public class JobLauncherCommandLineRunner private ApplicationEventPublisher publisher; + /** + * Create a new {@link JobLauncherCommandLineRunner}. + * @param jobLauncher to launch jobs + * @param jobExplorer to check the job repository for previous executions + * @deprecated since 2.0.7 in favor of + * {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A + * job repository is required to check if a job instance exists with the given + * parameters when running a job (which is not possible with the job explorer). + */ + @Deprecated public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer) { this.jobLauncher = jobLauncher; this.jobExplorer = jobExplorer; + this.jobRepository = null; + } + + /** + * Create a new {@link JobLauncherCommandLineRunner}. + * @param jobLauncher to launch jobs + * @param jobExplorer to check the job repository for previous executions + * @param jobRepository to check if a job instance exists with the given parameters + * when running a job + */ + public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer, + JobRepository jobRepository) { + Assert.notNull(jobLauncher, "JobLauncher must not be null"); + Assert.notNull(jobExplorer, "JobExplorer must not be null"); + Assert.notNull(jobRepository, "JobRepository must not be null"); + this.jobLauncher = jobLauncher; + this.jobExplorer = jobExplorer; + this.jobRepository = jobRepository; } public void setOrder(int order) { @@ -135,6 +173,20 @@ public class JobLauncherCommandLineRunner executeRegisteredJobs(jobParameters); } + private void executeLocalJobs(JobParameters jobParameters) + throws JobExecutionException { + for (Job job : this.jobs) { + if (StringUtils.hasText(this.jobNames)) { + String[] jobsToRun = this.jobNames.split(","); + if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) { + logger.debug("Skipped job: " + job.getName()); + continue; + } + } + execute(job, jobParameters); + } + } + private void executeRegisteredJobs(JobParameters jobParameters) throws JobExecutionException { if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) { @@ -158,26 +210,59 @@ public class JobLauncherCommandLineRunner throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, JobParametersNotFoundException { - JobParameters nextParameters = new JobParametersBuilder(jobParameters, - this.jobExplorer).getNextJobParameters(job).toJobParameters(); - JobExecution execution = this.jobLauncher.run(job, nextParameters); + JobParameters parameters = getNextJobParameters(job, jobParameters); + JobExecution execution = this.jobLauncher.run(job, parameters); if (this.publisher != null) { this.publisher.publishEvent(new JobExecutionEvent(execution)); } } - private void executeLocalJobs(JobParameters jobParameters) - throws JobExecutionException { - for (Job job : this.jobs) { - if (StringUtils.hasText(this.jobNames)) { - String[] jobsToRun = this.jobNames.split(","); - if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) { - logger.debug("Skipped job: " + job.getName()); - continue; - } - } - execute(job, jobParameters); + private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) { + if (this.jobRepository != null + && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) { + return getNextJobParametersForExisting(job, jobParameters); + } + if (job.getJobParametersIncrementer() == null) { + return jobParameters; } + JobParameters nextParameters = new JobParametersBuilder(jobParameters, + this.jobExplorer).getNextJobParameters(job).toJobParameters(); + return merge(nextParameters, jobParameters); + } + + private JobParameters getNextJobParametersForExisting(Job job, + JobParameters jobParameters) { + JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), + jobParameters); + if (isStoppedOrFailed(lastExecution) && job.isRestartable()) { + JobParameters previousIdentifyingParameters = getGetIdentifying( + lastExecution.getJobParameters()); + return merge(previousIdentifyingParameters, jobParameters); + } + return jobParameters; + } + + private boolean isStoppedOrFailed(JobExecution execution) { + BatchStatus status = (execution != null) ? execution.getStatus() : null; + return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED); + } + + private JobParameters getGetIdentifying(JobParameters parameters) { + HashMap nonIdentifying = new LinkedHashMap<>( + parameters.getParameters().size()); + parameters.getParameters().forEach((key, value) -> { + if (value.isIdentifying()) { + nonIdentifying.put(key, value); + } + }); + return new JobParameters(nonIdentifying); + } + + private JobParameters merge(JobParameters parameters, JobParameters additionals) { + Map merged = new LinkedHashMap<>(); + merged.putAll(parameters.getParameters()); + merged.putAll(additionals.getParameters()); + return new JobParameters(merged); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunnerTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunnerTests.java index 869025af682..e2bbc56998a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunnerTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/batch/JobLauncherCommandLineRunnerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import org.junit.Before; import org.junit.Test; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.Step; @@ -34,6 +36,7 @@ import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; @@ -43,12 +46,15 @@ import org.springframework.core.task.SyncTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.fail; /** * Tests for {@link JobLauncherCommandLineRunner}. * * @author Dave Syer * @author Jean-Pierre Bergamin + * @author Mahmoud Ben Hassine */ public class JobLauncherCommandLineRunnerTests { @@ -80,7 +86,8 @@ public class JobLauncherCommandLineRunnerTests { this.step = this.steps.get("step").tasklet(tasklet).build(); this.job = this.jobs.get("job").start(this.step).build(); this.jobExplorer = this.context.getBean(JobExplorer.class); - this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer); + this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer, + jobRepository); this.context.getBean(BatchConfiguration.class).clear(); } @@ -113,8 +120,25 @@ public class JobLauncherCommandLineRunnerTests { .start(this.steps.get("step").tasklet(throwingTasklet()).build()) .incrementer(new RunIdIncrementer()).build(); this.runner.execute(this.job, new JobParameters()); - this.runner.execute(this.job, new JobParameters()); + this.runner.execute(this.job, + new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()); + assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + } + + @Test + public void runDifferentInstances() throws Exception { + this.job = this.jobs.get("job") + .start(this.steps.get("step").tasklet(throwingTasklet()).build()).build(); + // start a job instance + JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo") + .toJobParameters(); + this.runner.execute(this.job, jobParameters); assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + // start a different job instance + JobParameters otherJobParameters = new JobParametersBuilder() + .addString("name", "bar").toJobParameters(); + this.runner.execute(this.job, otherJobParameters); + assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2); } @Test @@ -127,6 +151,12 @@ public class JobLauncherCommandLineRunnerTests { // A failed job that is not restartable does not re-use the job params of // the last execution, but creates a new job instance when running it again. assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2); + assertThatExceptionOfType(JobRestartException.class).isThrownBy(() -> { + // try to re-run a failed execution + this.runner.execute(this.job, + new JobParametersBuilder().addLong("run.id", 1L).toJobParameters()); + fail("expected JobRestartException"); + }).withMessageContaining("JobInstance already exists and is not restartable"); } @Test @@ -137,8 +167,43 @@ public class JobLauncherCommandLineRunnerTests { JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false) .addLong("foo", 2L, false).toJobParameters(); this.runner.execute(this.job, jobParameters); + assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + // try to re-run a failed execution with non identifying parameters + this.runner.execute(this.job, new JobParametersBuilder(jobParameters) + .addLong("run.id", 1L).toJobParameters()); + assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + } + + @Test + public void retryFailedExecutionWithDifferentNonIdentifyingParametersFromPreviousExecution() + throws Exception { + this.job = this.jobs.get("job") + .start(this.steps.get("step").tasklet(throwingTasklet()).build()) + .incrementer(new RunIdIncrementer()).build(); + JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false) + .addLong("foo", 2L, false).toJobParameters(); this.runner.execute(this.job, jobParameters); assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + // try to re-run a failed execution with non identifying parameters + this.runner.execute(this.job, new JobParametersBuilder().addLong("run.id", 1L) + .addLong("id", 2L, false).addLong("foo", 3L, false).toJobParameters()); + assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1); + JobInstance jobInstance = this.jobExplorer.getJobInstance(0L); + assertThat(this.jobExplorer.getJobExecutions(jobInstance)).hasSize(2); + // first execution + JobExecution firstJobExecution = this.jobExplorer.getJobExecution(0L); + JobParameters parameters = firstJobExecution.getJobParameters(); + assertThat(parameters.getLong("run.id")).isEqualTo(1L); + assertThat(parameters.getLong("id")).isEqualTo(1L); + assertThat(parameters.getLong("foo")).isEqualTo(2L); + // second execution + JobExecution secondJobExecution = this.jobExplorer.getJobExecution(1L); + parameters = secondJobExecution.getJobParameters(); + // identifying parameters should be the same as previous execution + assertThat(parameters.getLong("run.id")).isEqualTo(1L); + // non-identifying parameters should be the newly specified ones + assertThat(parameters.getLong("id")).isEqualTo(2L); + assertThat(parameters.getLong("foo")).isEqualTo(3L); } private Tasklet throwingTasklet() {