From c47da1247a3e65cd0729de24814a0959c4479d84 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Sun, 25 Jan 2026 10:41:25 +0100 Subject: [PATCH] Add TaskCallback/Callable/Runnable wrapper for retryable tasks Closes gh-36208 --- .../core/retry/support/RetryTask.java | 215 ++++++++++++++++ .../core/task/SimpleAsyncTaskExecutor.java | 11 +- .../core/task/SyncTaskExecutor.java | 14 ++ .../core/task/TaskCallback.java | 21 ++ .../core/retry/support/RetryTaskTests.java | 238 ++++++++++++++++++ 5 files changed, 495 insertions(+), 4 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java create mode 100644 spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java diff --git a/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java b/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java new file mode 100644 index 00000000000..aa078814f26 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java @@ -0,0 +1,215 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.retry.support; + +import java.util.concurrent.Callable; + +import org.jspecify.annotations.Nullable; + +import org.springframework.core.retry.RetryException; +import org.springframework.core.retry.RetryOperations; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.retry.Retryable; +import org.springframework.core.task.TaskCallback; +import org.springframework.util.Assert; + +/** + * A {@link TaskCallback} that executes a given retryable task, + * re-executing it after failure according to a retry policy. + * Inherits {@link Callable} through {@link TaskCallback}. + * + *

For regular construction, this is designed to match the + * {@link org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback)} + * signature for propagating specific exception types but is + * also usable with the {@code Callback}-based methods on + * {@link org.springframework.core.task.AsyncTaskExecutor}. + * + *

Alternatively, this class can also be used to wrap a + * given {@link Callable} or {@link Runnable} into corresponding + * retrying variants: see the common static {@code wrap} methods. + * This is particularly useful for existing {@code Callable} and + * {@code Runnable} classes, as well as for scheduling methods + * which typically accept a {@code Runnable} in their signature. + * + * @author Juergen Hoeller + * @since 7.0.4 + * @param the returned value type, if any + * @param the exception propagated, if any + * @see RetryTemplate + * @see org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback) + * @see org.springframework.core.task.AsyncTaskExecutor#submit(Callable) + * @see org.springframework.core.task.AsyncTaskExecutor#submitCompletable(Callable) + */ +public class RetryTask implements TaskCallback { + + private final TaskCallback task; + + private final RetryOperations retryTemplate; + + + /** + * Create a new {@code RetryTask} for the given retryable task. + * @param task a task that allows for re-execution after failure + * @see #RetryTask(TaskCallback, RetryPolicy) + * @see #RetryTask(TaskCallback, RetryOperations) + */ + public RetryTask(TaskCallback task) { + this(task, new RetryTemplate()); + } + + /** + * Create a new {@code RetryTask} for the given retryable task. + * @param task a task that allows for re-execution after failure + * @param retryPolicy the retry policy to apply + * @see #RetryTask(TaskCallback, RetryOperations) + */ + public RetryTask(TaskCallback task, RetryPolicy retryPolicy) { + this(task, new RetryTemplate(retryPolicy)); + } + + /** + * Create a new {@code RetryTask} for the given retryable task. + * @param task a task that allows for re-execution after failure + * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate} + * but declaring the {@link RetryOperations} interface for flexibility) + */ + public RetryTask(TaskCallback task, RetryOperations retryTemplate) { + Assert.notNull(task, "TaskCallback must not be null"); + Assert.notNull(retryTemplate, "RetryTemplate must not be null"); + this.task = task; + this.retryTemplate = retryTemplate; + } + + + @SuppressWarnings("unchecked") + @Override + public V call() throws E { + try { + return this.retryTemplate.execute(new Retryable<>() { + @Override + public V execute() throws E { + return task.call(); + } + @Override + public String getName() { + return RetryTask.this.getName(); + } + }); + } + catch (RetryException retryException) { + throw (E) retryException.getCause(); + } + } + + /** + * Return the name of the retryable task: + * by default, the class name of the target task. + * @see Retryable#getName() + */ + public String getName() { + return this.task.getClass().getName(); + } + + @Override + public String toString() { + return "RetryTask for " + getName() + " using " + + (this.retryTemplate instanceof RetryTemplate rt ? rt.getRetryPolicy() : this.retryTemplate); + } + + + /** + * Wrap the given target {@code Callable} into a retrying {@code Callable}. + * @param task a task that allows for re-execution after failure + * @see #wrap(Callable, RetryPolicy) + * @see #wrap(Callable, RetryOperations) + */ + public static Callable wrap(Callable task) { + return wrap(task, new RetryTemplate()); + } + + /** + * Wrap the given target {@code Callable} into a retrying {@code Callable}. + * @param task a task that allows for re-execution after failure + * @param retryPolicy the retry policy to apply + * @see #wrap(Callable, RetryOperations) + */ + public static Callable wrap(Callable task, RetryPolicy retryPolicy) { + return wrap(task, new RetryTemplate(retryPolicy)); + } + + /** + * Wrap the given target {@code Callable} into a retrying {@code Callable}. + * @param task a task that allows for re-execution after failure + * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate} + * but declaring the {@link RetryOperations} interface for flexibility) + */ + public static Callable wrap(Callable task, RetryOperations retryTemplate) { + return new RetryTask<>(TaskCallback.from(task), retryTemplate) { + @Override + public String getName() { + return task.getClass().getName(); + } + }; + } + + /** + * Wrap the given target {@code Runnable} into a retrying {@code Runnable}. + * @param task a task that allows for re-execution after failure + * @see #wrap(Runnable, RetryPolicy) + * @see #wrap(Runnable, RetryOperations) + */ + public static Runnable wrap(Runnable task) { + return wrap(task, new RetryTemplate()); + } + + /** + * Wrap the given target {@code Runnable} into a retrying {@code Runnable}. + * @param task a task that allows for re-execution after failure + * @param retryPolicy the retry policy to apply + * @see #wrap(Runnable, RetryOperations) + */ + public static Runnable wrap(Runnable task, RetryPolicy retryPolicy) { + return wrap(task, new RetryTemplate(retryPolicy)); + } + + /** + * Wrap the given target {@code Runnable} into a retrying {@code Runnable}. + * @param task a task that allows for re-execution after failure + * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate} + * but declaring the {@link RetryOperations} interface for flexibility) + */ + public static Runnable wrap(Runnable task, RetryOperations retryTemplate) { + RetryTask rt = new RetryTask<>(TaskCallback.from(task), retryTemplate) { + @Override + public String getName() { + return task.getClass().getName(); + } + }; + return new Runnable() { + @Override + public void run() { + rt.call(); + } + @Override + public String toString() { + return rt.toString(); + } + }; + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index 8bc5b7fd0c9..f8d44c68225 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -31,13 +31,15 @@ import org.springframework.util.ConcurrencyThrottleSupport; import org.springframework.util.CustomizableThreadCreator; /** - * {@link TaskExecutor} implementation that fires up a new Thread for each task, - * executing it asynchronously. Provides a virtual thread option on JDK 21. + * {@link TaskExecutor} implementation that fires up a new Thread for each task. + * Provides a {@link #setVirtualThreads virtual threads} option on JDK 21+. * *

Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, * at the expense of task tracking overhead per execution thread at runtime. - * Supports limiting concurrent threads through {@link #setConcurrencyLimit}. - * By default, the number of concurrent task executions is unlimited. + * Supports limiting concurrent threads through {@link #setConcurrencyLimit}; + * by default, the number of concurrent task executions is unlimited. Can be + * combined with {@link org.springframework.core.retry.support.RetryTask} for + * re-executing submitted tasks after failure, according to a retry policy. * *

NOTE: This implementation does not reuse threads! Consider a * thread-pooling TaskExecutor implementation instead, in particular for @@ -54,6 +56,7 @@ import org.springframework.util.CustomizableThreadCreator; * @see #setVirtualThreads * @see #setTaskTerminationTimeout * @see #setConcurrencyLimit + * @see org.springframework.core.retry.support.RetryTask * @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor */ diff --git a/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java index ee45dfc7e4a..bd454357189 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java @@ -63,7 +63,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task /** * Execute the given {@code task} synchronously, through direct * invocation of its {@link Runnable#run() run()} method. + *

This can be used with a {@link #setConcurrencyLimit concurrency limit}, + * analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup. + * Also, the provided task may apply a retry policy via + * {@link org.springframework.core.retry.support.RetryTask}. * @throws RuntimeException if propagated from the given {@code Runnable} + * @see #setConcurrencyLimit + * @see #setRejectTasksWhenLimitReached + * @see org.springframework.core.retry.support.RetryTask#wrap(Runnable) */ @Override public void execute(Runnable task) { @@ -85,10 +92,17 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task /** * Execute the given {@code task} synchronously, through direct * invocation of its {@link TaskCallback#call() call()} method. + *

This can be used with a {@link #setConcurrencyLimit concurrency limit}, + * analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup. + * Also, the provided task may apply a retry policy via + * {@link org.springframework.core.retry.support.RetryTask}. * @param the returned value type, if any * @param the exception propagated, if any * @throws E if propagated from the given {@code TaskCallback} * @since 7.0 + * @see #setConcurrencyLimit + * @see #setRejectTasksWhenLimitReached + * @see org.springframework.core.retry.support.RetryTask#RetryTask(TaskCallback) */ public V execute(TaskCallback task) throws E { Assert.notNull(task, "Task must not be null"); diff --git a/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java b/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java index 4a947ad1263..d7900990047 100644 --- a/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java +++ b/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java @@ -41,4 +41,25 @@ public interface TaskCallback e @Override V call() throws E; + + /** + * Derive a {@link TaskCallback} from the given {@link Callable}. + * @since 7.0.4 + */ + static TaskCallback from(Callable task) { + return task::call; + } + + /** + * Derive a {@link TaskCallback} from the given {@link Callable}. + * @since 7.0.4 + */ + @SuppressWarnings("NullAway") + static TaskCallback from(Runnable task) { + return () -> { + task.run(); + return null; + }; + } + } diff --git a/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java b/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java new file mode 100644 index 00000000000..d0c6788701d --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java @@ -0,0 +1,238 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.retry.support; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.SyncTaskExecutor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; + +/** + * @author Juergen Hoeller + * @since 7.0.4 + */ +class RetryTaskTests { + + RetryPolicy retryPolicy = RetryPolicy.builder().maxRetries(2).delay(Duration.ZERO).build(); + + RetryTemplate customTemplate = new RetryTemplate(RetryPolicy.builder().maxRetries(3).delay(Duration.ZERO).build()); + + SyncTaskExecutor syncExecutor = new SyncTaskExecutor(); + + AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor(); + + AtomicInteger invocationCount = new AtomicInteger(); + + + @Test + void syncTaskWithImmediateSuccess() { + assertThat( + syncExecutor.execute(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + return "always succeeds"; + }, retryPolicy))) + .isEqualTo("always succeeds"); + assertThat(invocationCount).hasValue(1); + } + + @Test + void syncTaskWithSuccessAfterInitialFailures() { + assertThat( + syncExecutor.execute(new RetryTask<>(() -> { + if (invocationCount.incrementAndGet() < 2) { + throw new IllegalStateException("Boom " + invocationCount.get()); + } + return "finally succeeded"; + }, retryPolicy))) + .isEqualTo("finally succeeded"); + assertThat(invocationCount).hasValue(2); + } + + @Test + void syncTaskWithExhaustedPolicy() { + assertThatIllegalStateException().isThrownBy(() -> + syncExecutor.execute(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, retryPolicy))); + assertThat(invocationCount).hasValue(3); + } + + @Test + void syncTaskWithCustomTemplate() { + assertThatIllegalStateException().isThrownBy(() -> + syncExecutor.execute(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, customTemplate))); + assertThat(invocationCount).hasValue(4); + } + + @Test + void asyncTaskWithImmediateSuccess() throws Exception { + assertThat( + asyncExecutor.submit(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + return "always succeeds"; + }, retryPolicy)).get()) + .isEqualTo("always succeeds"); + assertThat(invocationCount).hasValue(1); + } + + @Test + void asyncTaskWithSuccessAfterInitialFailures() throws Exception { + assertThat( + asyncExecutor.submit(new RetryTask<>(() -> { + if (invocationCount.incrementAndGet() < 2) { + throw new IllegalStateException("Boom " + invocationCount.get()); + } + return "finally succeeded"; + }, retryPolicy)).get()) + .isEqualTo("finally succeeded"); + assertThat(invocationCount).hasValue(2); + } + + @Test + void asyncTaskWithExhaustedPolicy() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, retryPolicy)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(3); + } + + @Test + void asyncTaskWithCustomTemplate() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(new RetryTask<>(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, customTemplate)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(4); + } + + @Test + void callableWithImmediateSuccess() throws Exception { + assertThat( + asyncExecutor.submit(RetryTask.wrap(() -> { + invocationCount.incrementAndGet(); + return "always succeeds"; + }, retryPolicy)).get()) + .isEqualTo("always succeeds"); + assertThat(invocationCount).hasValue(1); + } + + @Test + void callableWithSuccessAfterInitialFailures() throws Exception { + assertThat( + asyncExecutor.submit(RetryTask.wrap(() -> { + if (invocationCount.incrementAndGet() < 2) { + throw new IllegalStateException("Boom " + invocationCount.get()); + } + return "finally succeeded"; + }, retryPolicy)).get()) + .isEqualTo("finally succeeded"); + assertThat(invocationCount).hasValue(2); + } + + @Test + void callableWithExhaustedPolicy() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(RetryTask.wrap(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, retryPolicy)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(3); + } + + @Test + void callableWithCustomTemplate() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(RetryTask.wrap(() -> { + invocationCount.incrementAndGet(); + throw new IllegalStateException("Boom " + invocationCount.get()); + }, customTemplate)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(4); + } + + @Test + void runnableWithImmediateSuccess() throws Exception { + assertThat( + asyncExecutor.submit(RetryTask.wrap(() -> { + if (true) { // forcing Runnable over Callable on overloaded wrap method + invocationCount.incrementAndGet(); + } + }, retryPolicy)).get()) + .isNull(); + assertThat(invocationCount).hasValue(1); + } + + @Test + void runnableWithSuccessAfterInitialFailures() throws Exception { + assertThat( + asyncExecutor.submit(RetryTask.wrap(() -> { + if (invocationCount.incrementAndGet() < 2) { + throw new IllegalStateException("Boom " + invocationCount.get()); + } + }, retryPolicy)).get()) + .isNull(); + assertThat(invocationCount).hasValue(2); + } + + @Test + void runnableWithExhaustedPolicy() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(RetryTask.wrap(() -> { + invocationCount.incrementAndGet(); + if (true) { // forcing Runnable over Callable on overloaded wrap method + throw new IllegalStateException("Boom " + invocationCount.get()); + } + }, retryPolicy)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(3); + } + + @Test + void runnableWithCustomTemplate() { + assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> + asyncExecutor.submit(RetryTask.wrap(() -> { + invocationCount.incrementAndGet(); + if (true) { // forcing Runnable over Callable on overloaded wrap method + throw new IllegalStateException("Boom " + invocationCount.get()); + } + }, customTemplate)).get()) + .withCauseExactlyInstanceOf(IllegalStateException.class); + assertThat(invocationCount).hasValue(4); + } + +}