diff --git a/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java b/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java
new file mode 100644
index 00000000000..aa078814f26
--- /dev/null
+++ b/spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2002-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.core.retry.support;
+
+import java.util.concurrent.Callable;
+
+import org.jspecify.annotations.Nullable;
+
+import org.springframework.core.retry.RetryException;
+import org.springframework.core.retry.RetryOperations;
+import org.springframework.core.retry.RetryPolicy;
+import org.springframework.core.retry.RetryTemplate;
+import org.springframework.core.retry.Retryable;
+import org.springframework.core.task.TaskCallback;
+import org.springframework.util.Assert;
+
+/**
+ * A {@link TaskCallback} that executes a given retryable task,
+ * re-executing it after failure according to a retry policy.
+ * Inherits {@link Callable} through {@link TaskCallback}.
+ *
+ *
For regular construction, this is designed to match the
+ * {@link org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback)}
+ * signature for propagating specific exception types but is
+ * also usable with the {@code Callback}-based methods on
+ * {@link org.springframework.core.task.AsyncTaskExecutor}.
+ *
+ *
Alternatively, this class can also be used to wrap a
+ * given {@link Callable} or {@link Runnable} into corresponding
+ * retrying variants: see the common static {@code wrap} methods.
+ * This is particularly useful for existing {@code Callable} and
+ * {@code Runnable} classes, as well as for scheduling methods
+ * which typically accept a {@code Runnable} in their signature.
+ *
+ * @author Juergen Hoeller
+ * @since 7.0.4
+ * @param the returned value type, if any
+ * @param the exception propagated, if any
+ * @see RetryTemplate
+ * @see org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback)
+ * @see org.springframework.core.task.AsyncTaskExecutor#submit(Callable)
+ * @see org.springframework.core.task.AsyncTaskExecutor#submitCompletable(Callable)
+ */
+public class RetryTask implements TaskCallback {
+
+ private final TaskCallback task;
+
+ private final RetryOperations retryTemplate;
+
+
+ /**
+ * Create a new {@code RetryTask} for the given retryable task.
+ * @param task a task that allows for re-execution after failure
+ * @see #RetryTask(TaskCallback, RetryPolicy)
+ * @see #RetryTask(TaskCallback, RetryOperations)
+ */
+ public RetryTask(TaskCallback task) {
+ this(task, new RetryTemplate());
+ }
+
+ /**
+ * Create a new {@code RetryTask} for the given retryable task.
+ * @param task a task that allows for re-execution after failure
+ * @param retryPolicy the retry policy to apply
+ * @see #RetryTask(TaskCallback, RetryOperations)
+ */
+ public RetryTask(TaskCallback task, RetryPolicy retryPolicy) {
+ this(task, new RetryTemplate(retryPolicy));
+ }
+
+ /**
+ * Create a new {@code RetryTask} for the given retryable task.
+ * @param task a task that allows for re-execution after failure
+ * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
+ * but declaring the {@link RetryOperations} interface for flexibility)
+ */
+ public RetryTask(TaskCallback task, RetryOperations retryTemplate) {
+ Assert.notNull(task, "TaskCallback must not be null");
+ Assert.notNull(retryTemplate, "RetryTemplate must not be null");
+ this.task = task;
+ this.retryTemplate = retryTemplate;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public V call() throws E {
+ try {
+ return this.retryTemplate.execute(new Retryable<>() {
+ @Override
+ public V execute() throws E {
+ return task.call();
+ }
+ @Override
+ public String getName() {
+ return RetryTask.this.getName();
+ }
+ });
+ }
+ catch (RetryException retryException) {
+ throw (E) retryException.getCause();
+ }
+ }
+
+ /**
+ * Return the name of the retryable task:
+ * by default, the class name of the target task.
+ * @see Retryable#getName()
+ */
+ public String getName() {
+ return this.task.getClass().getName();
+ }
+
+ @Override
+ public String toString() {
+ return "RetryTask for " + getName() + " using " +
+ (this.retryTemplate instanceof RetryTemplate rt ? rt.getRetryPolicy() : this.retryTemplate);
+ }
+
+
+ /**
+ * Wrap the given target {@code Callable} into a retrying {@code Callable}.
+ * @param task a task that allows for re-execution after failure
+ * @see #wrap(Callable, RetryPolicy)
+ * @see #wrap(Callable, RetryOperations)
+ */
+ public static Callable wrap(Callable task) {
+ return wrap(task, new RetryTemplate());
+ }
+
+ /**
+ * Wrap the given target {@code Callable} into a retrying {@code Callable}.
+ * @param task a task that allows for re-execution after failure
+ * @param retryPolicy the retry policy to apply
+ * @see #wrap(Callable, RetryOperations)
+ */
+ public static Callable wrap(Callable task, RetryPolicy retryPolicy) {
+ return wrap(task, new RetryTemplate(retryPolicy));
+ }
+
+ /**
+ * Wrap the given target {@code Callable} into a retrying {@code Callable}.
+ * @param task a task that allows for re-execution after failure
+ * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
+ * but declaring the {@link RetryOperations} interface for flexibility)
+ */
+ public static Callable wrap(Callable task, RetryOperations retryTemplate) {
+ return new RetryTask<>(TaskCallback.from(task), retryTemplate) {
+ @Override
+ public String getName() {
+ return task.getClass().getName();
+ }
+ };
+ }
+
+ /**
+ * Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
+ * @param task a task that allows for re-execution after failure
+ * @see #wrap(Runnable, RetryPolicy)
+ * @see #wrap(Runnable, RetryOperations)
+ */
+ public static Runnable wrap(Runnable task) {
+ return wrap(task, new RetryTemplate());
+ }
+
+ /**
+ * Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
+ * @param task a task that allows for re-execution after failure
+ * @param retryPolicy the retry policy to apply
+ * @see #wrap(Runnable, RetryOperations)
+ */
+ public static Runnable wrap(Runnable task, RetryPolicy retryPolicy) {
+ return wrap(task, new RetryTemplate(retryPolicy));
+ }
+
+ /**
+ * Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
+ * @param task a task that allows for re-execution after failure
+ * @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
+ * but declaring the {@link RetryOperations} interface for flexibility)
+ */
+ public static Runnable wrap(Runnable task, RetryOperations retryTemplate) {
+ RetryTask rt = new RetryTask<>(TaskCallback.from(task), retryTemplate) {
+ @Override
+ public String getName() {
+ return task.getClass().getName();
+ }
+ };
+ return new Runnable() {
+ @Override
+ public void run() {
+ rt.call();
+ }
+ @Override
+ public String toString() {
+ return rt.toString();
+ }
+ };
+ }
+
+}
diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
index 8bc5b7fd0c9..f8d44c68225 100644
--- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
+++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
@@ -31,13 +31,15 @@ import org.springframework.util.ConcurrencyThrottleSupport;
import org.springframework.util.CustomizableThreadCreator;
/**
- * {@link TaskExecutor} implementation that fires up a new Thread for each task,
- * executing it asynchronously. Provides a virtual thread option on JDK 21.
+ * {@link TaskExecutor} implementation that fires up a new Thread for each task.
+ * Provides a {@link #setVirtualThreads virtual threads} option on JDK 21+.
*
* Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
- * Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
- * By default, the number of concurrent task executions is unlimited.
+ * Supports limiting concurrent threads through {@link #setConcurrencyLimit};
+ * by default, the number of concurrent task executions is unlimited. Can be
+ * combined with {@link org.springframework.core.retry.support.RetryTask} for
+ * re-executing submitted tasks after failure, according to a retry policy.
*
*
NOTE: This implementation does not reuse threads! Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
@@ -54,6 +56,7 @@ import org.springframework.util.CustomizableThreadCreator;
* @see #setVirtualThreads
* @see #setTaskTerminationTimeout
* @see #setConcurrencyLimit
+ * @see org.springframework.core.retry.support.RetryTask
* @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/
diff --git a/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java
index ee45dfc7e4a..bd454357189 100644
--- a/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java
+++ b/spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java
@@ -63,7 +63,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task
/**
* Execute the given {@code task} synchronously, through direct
* invocation of its {@link Runnable#run() run()} method.
+ *
This can be used with a {@link #setConcurrencyLimit concurrency limit},
+ * analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup.
+ * Also, the provided task may apply a retry policy via
+ * {@link org.springframework.core.retry.support.RetryTask}.
* @throws RuntimeException if propagated from the given {@code Runnable}
+ * @see #setConcurrencyLimit
+ * @see #setRejectTasksWhenLimitReached
+ * @see org.springframework.core.retry.support.RetryTask#wrap(Runnable)
*/
@Override
public void execute(Runnable task) {
@@ -85,10 +92,17 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task
/**
* Execute the given {@code task} synchronously, through direct
* invocation of its {@link TaskCallback#call() call()} method.
+ *
This can be used with a {@link #setConcurrencyLimit concurrency limit},
+ * analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup.
+ * Also, the provided task may apply a retry policy via
+ * {@link org.springframework.core.retry.support.RetryTask}.
* @param the returned value type, if any
* @param the exception propagated, if any
* @throws E if propagated from the given {@code TaskCallback}
* @since 7.0
+ * @see #setConcurrencyLimit
+ * @see #setRejectTasksWhenLimitReached
+ * @see org.springframework.core.retry.support.RetryTask#RetryTask(TaskCallback)
*/
public V execute(TaskCallback task) throws E {
Assert.notNull(task, "Task must not be null");
diff --git a/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java b/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java
index 4a947ad1263..d7900990047 100644
--- a/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java
+++ b/spring-core/src/main/java/org/springframework/core/task/TaskCallback.java
@@ -41,4 +41,25 @@ public interface TaskCallback e
@Override
V call() throws E;
+
+ /**
+ * Derive a {@link TaskCallback} from the given {@link Callable}.
+ * @since 7.0.4
+ */
+ static TaskCallback from(Callable task) {
+ return task::call;
+ }
+
+ /**
+ * Derive a {@link TaskCallback} from the given {@link Callable}.
+ * @since 7.0.4
+ */
+ @SuppressWarnings("NullAway")
+ static TaskCallback from(Runnable task) {
+ return () -> {
+ task.run();
+ return null;
+ };
+ }
+
}
diff --git a/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java b/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java
new file mode 100644
index 00000000000..d0c6788701d
--- /dev/null
+++ b/spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2002-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.core.retry.support;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+
+import org.springframework.core.retry.RetryPolicy;
+import org.springframework.core.retry.RetryTemplate;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.core.task.SyncTaskExecutor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+
+/**
+ * @author Juergen Hoeller
+ * @since 7.0.4
+ */
+class RetryTaskTests {
+
+ RetryPolicy retryPolicy = RetryPolicy.builder().maxRetries(2).delay(Duration.ZERO).build();
+
+ RetryTemplate customTemplate = new RetryTemplate(RetryPolicy.builder().maxRetries(3).delay(Duration.ZERO).build());
+
+ SyncTaskExecutor syncExecutor = new SyncTaskExecutor();
+
+ AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor();
+
+ AtomicInteger invocationCount = new AtomicInteger();
+
+
+ @Test
+ void syncTaskWithImmediateSuccess() {
+ assertThat(
+ syncExecutor.execute(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ return "always succeeds";
+ }, retryPolicy)))
+ .isEqualTo("always succeeds");
+ assertThat(invocationCount).hasValue(1);
+ }
+
+ @Test
+ void syncTaskWithSuccessAfterInitialFailures() {
+ assertThat(
+ syncExecutor.execute(new RetryTask<>(() -> {
+ if (invocationCount.incrementAndGet() < 2) {
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ return "finally succeeded";
+ }, retryPolicy)))
+ .isEqualTo("finally succeeded");
+ assertThat(invocationCount).hasValue(2);
+ }
+
+ @Test
+ void syncTaskWithExhaustedPolicy() {
+ assertThatIllegalStateException().isThrownBy(() ->
+ syncExecutor.execute(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, retryPolicy)));
+ assertThat(invocationCount).hasValue(3);
+ }
+
+ @Test
+ void syncTaskWithCustomTemplate() {
+ assertThatIllegalStateException().isThrownBy(() ->
+ syncExecutor.execute(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, customTemplate)));
+ assertThat(invocationCount).hasValue(4);
+ }
+
+ @Test
+ void asyncTaskWithImmediateSuccess() throws Exception {
+ assertThat(
+ asyncExecutor.submit(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ return "always succeeds";
+ }, retryPolicy)).get())
+ .isEqualTo("always succeeds");
+ assertThat(invocationCount).hasValue(1);
+ }
+
+ @Test
+ void asyncTaskWithSuccessAfterInitialFailures() throws Exception {
+ assertThat(
+ asyncExecutor.submit(new RetryTask<>(() -> {
+ if (invocationCount.incrementAndGet() < 2) {
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ return "finally succeeded";
+ }, retryPolicy)).get())
+ .isEqualTo("finally succeeded");
+ assertThat(invocationCount).hasValue(2);
+ }
+
+ @Test
+ void asyncTaskWithExhaustedPolicy() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, retryPolicy)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(3);
+ }
+
+ @Test
+ void asyncTaskWithCustomTemplate() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(new RetryTask<>(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, customTemplate)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(4);
+ }
+
+ @Test
+ void callableWithImmediateSuccess() throws Exception {
+ assertThat(
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ invocationCount.incrementAndGet();
+ return "always succeeds";
+ }, retryPolicy)).get())
+ .isEqualTo("always succeeds");
+ assertThat(invocationCount).hasValue(1);
+ }
+
+ @Test
+ void callableWithSuccessAfterInitialFailures() throws Exception {
+ assertThat(
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ if (invocationCount.incrementAndGet() < 2) {
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ return "finally succeeded";
+ }, retryPolicy)).get())
+ .isEqualTo("finally succeeded");
+ assertThat(invocationCount).hasValue(2);
+ }
+
+ @Test
+ void callableWithExhaustedPolicy() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, retryPolicy)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(3);
+ }
+
+ @Test
+ void callableWithCustomTemplate() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ invocationCount.incrementAndGet();
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }, customTemplate)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(4);
+ }
+
+ @Test
+ void runnableWithImmediateSuccess() throws Exception {
+ assertThat(
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ if (true) { // forcing Runnable over Callable on overloaded wrap method
+ invocationCount.incrementAndGet();
+ }
+ }, retryPolicy)).get())
+ .isNull();
+ assertThat(invocationCount).hasValue(1);
+ }
+
+ @Test
+ void runnableWithSuccessAfterInitialFailures() throws Exception {
+ assertThat(
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ if (invocationCount.incrementAndGet() < 2) {
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ }, retryPolicy)).get())
+ .isNull();
+ assertThat(invocationCount).hasValue(2);
+ }
+
+ @Test
+ void runnableWithExhaustedPolicy() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ invocationCount.incrementAndGet();
+ if (true) { // forcing Runnable over Callable on overloaded wrap method
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ }, retryPolicy)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(3);
+ }
+
+ @Test
+ void runnableWithCustomTemplate() {
+ assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
+ asyncExecutor.submit(RetryTask.wrap(() -> {
+ invocationCount.incrementAndGet();
+ if (true) { // forcing Runnable over Callable on overloaded wrap method
+ throw new IllegalStateException("Boom " + invocationCount.get());
+ }
+ }, customTemplate)).get())
+ .withCauseExactlyInstanceOf(IllegalStateException.class);
+ assertThat(invocationCount).hasValue(4);
+ }
+
+}