Browse Source

Add TaskCallback/Callable/Runnable wrapper for retryable tasks

Closes gh-36208
pull/36209/head
Juergen Hoeller 1 week ago
parent
commit
c47da1247a
  1. 215
      spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java
  2. 11
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
  3. 14
      spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java
  4. 21
      spring-core/src/main/java/org/springframework/core/task/TaskCallback.java
  5. 238
      spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java

215
spring-core/src/main/java/org/springframework/core/retry/support/RetryTask.java

@ -0,0 +1,215 @@ @@ -0,0 +1,215 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.retry.support;
import java.util.concurrent.Callable;
import org.jspecify.annotations.Nullable;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryOperations;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.retry.Retryable;
import org.springframework.core.task.TaskCallback;
import org.springframework.util.Assert;
/**
* A {@link TaskCallback} that executes a given retryable task,
* re-executing it after failure according to a retry policy.
* Inherits {@link Callable} through {@link TaskCallback}.
*
* <p>For regular construction, this is designed to match the
* {@link org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback)}
* signature for propagating specific exception types but is
* also usable with the {@code Callback}-based methods on
* {@link org.springframework.core.task.AsyncTaskExecutor}.
*
* <p>Alternatively, this class can also be used to wrap a
* given {@link Callable} or {@link Runnable} into corresponding
* retrying variants: see the common static {@code wrap} methods.
* This is particularly useful for existing {@code Callable} and
* {@code Runnable} classes, as well as for scheduling methods
* which typically accept a {@code Runnable} in their signature.
*
* @author Juergen Hoeller
* @since 7.0.4
* @param <V> the returned value type, if any
* @param <E> the exception propagated, if any
* @see RetryTemplate
* @see org.springframework.core.task.SyncTaskExecutor#execute(TaskCallback)
* @see org.springframework.core.task.AsyncTaskExecutor#submit(Callable)
* @see org.springframework.core.task.AsyncTaskExecutor#submitCompletable(Callable)
*/
public class RetryTask<V extends @Nullable Object, E extends Exception> implements TaskCallback<V, E> {
private final TaskCallback<V, E> task;
private final RetryOperations retryTemplate;
/**
* Create a new {@code RetryTask} for the given retryable task.
* @param task a task that allows for re-execution after failure
* @see #RetryTask(TaskCallback, RetryPolicy)
* @see #RetryTask(TaskCallback, RetryOperations)
*/
public RetryTask(TaskCallback<V, E> task) {
this(task, new RetryTemplate());
}
/**
* Create a new {@code RetryTask} for the given retryable task.
* @param task a task that allows for re-execution after failure
* @param retryPolicy the retry policy to apply
* @see #RetryTask(TaskCallback, RetryOperations)
*/
public RetryTask(TaskCallback<V, E> task, RetryPolicy retryPolicy) {
this(task, new RetryTemplate(retryPolicy));
}
/**
* Create a new {@code RetryTask} for the given retryable task.
* @param task a task that allows for re-execution after failure
* @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
* but declaring the {@link RetryOperations} interface for flexibility)
*/
public RetryTask(TaskCallback<V, E> task, RetryOperations retryTemplate) {
Assert.notNull(task, "TaskCallback must not be null");
Assert.notNull(retryTemplate, "RetryTemplate must not be null");
this.task = task;
this.retryTemplate = retryTemplate;
}
@SuppressWarnings("unchecked")
@Override
public V call() throws E {
try {
return this.retryTemplate.execute(new Retryable<>() {
@Override
public V execute() throws E {
return task.call();
}
@Override
public String getName() {
return RetryTask.this.getName();
}
});
}
catch (RetryException retryException) {
throw (E) retryException.getCause();
}
}
/**
* Return the name of the retryable task:
* by default, the class name of the target task.
* @see Retryable#getName()
*/
public String getName() {
return this.task.getClass().getName();
}
@Override
public String toString() {
return "RetryTask for " + getName() + " using " +
(this.retryTemplate instanceof RetryTemplate rt ? rt.getRetryPolicy() : this.retryTemplate);
}
/**
* Wrap the given target {@code Callable} into a retrying {@code Callable}.
* @param task a task that allows for re-execution after failure
* @see #wrap(Callable, RetryPolicy)
* @see #wrap(Callable, RetryOperations)
*/
public static <V> Callable<V> wrap(Callable<V> task) {
return wrap(task, new RetryTemplate());
}
/**
* Wrap the given target {@code Callable} into a retrying {@code Callable}.
* @param task a task that allows for re-execution after failure
* @param retryPolicy the retry policy to apply
* @see #wrap(Callable, RetryOperations)
*/
public static <V> Callable<V> wrap(Callable<V> task, RetryPolicy retryPolicy) {
return wrap(task, new RetryTemplate(retryPolicy));
}
/**
* Wrap the given target {@code Callable} into a retrying {@code Callable}.
* @param task a task that allows for re-execution after failure
* @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
* but declaring the {@link RetryOperations} interface for flexibility)
*/
public static <V> Callable<V> wrap(Callable<V> task, RetryOperations retryTemplate) {
return new RetryTask<>(TaskCallback.from(task), retryTemplate) {
@Override
public String getName() {
return task.getClass().getName();
}
};
}
/**
* Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
* @param task a task that allows for re-execution after failure
* @see #wrap(Runnable, RetryPolicy)
* @see #wrap(Runnable, RetryOperations)
*/
public static Runnable wrap(Runnable task) {
return wrap(task, new RetryTemplate());
}
/**
* Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
* @param task a task that allows for re-execution after failure
* @param retryPolicy the retry policy to apply
* @see #wrap(Runnable, RetryOperations)
*/
public static Runnable wrap(Runnable task, RetryPolicy retryPolicy) {
return wrap(task, new RetryTemplate(retryPolicy));
}
/**
* Wrap the given target {@code Runnable} into a retrying {@code Runnable}.
* @param task a task that allows for re-execution after failure
* @param retryTemplate the retry delegate to use (typically a {@link RetryTemplate}
* but declaring the {@link RetryOperations} interface for flexibility)
*/
public static Runnable wrap(Runnable task, RetryOperations retryTemplate) {
RetryTask<Void, RuntimeException> rt = new RetryTask<>(TaskCallback.from(task), retryTemplate) {
@Override
public String getName() {
return task.getClass().getName();
}
};
return new Runnable() {
@Override
public void run() {
rt.call();
}
@Override
public String toString() {
return rt.toString();
}
};
}
}

11
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -31,13 +31,15 @@ import org.springframework.util.ConcurrencyThrottleSupport; @@ -31,13 +31,15 @@ import org.springframework.util.ConcurrencyThrottleSupport;
import org.springframework.util.CustomizableThreadCreator;
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously. Provides a virtual thread option on JDK 21.
* {@link TaskExecutor} implementation that fires up a new Thread for each task.
* Provides a {@link #setVirtualThreads virtual threads} option on JDK 21+.
*
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
* By default, the number of concurrent task executions is unlimited.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit};
* by default, the number of concurrent task executions is unlimited. Can be
* combined with {@link org.springframework.core.retry.support.RetryTask} for
* re-executing submitted tasks after failure, according to a retry policy.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
@ -54,6 +56,7 @@ import org.springframework.util.CustomizableThreadCreator; @@ -54,6 +56,7 @@ import org.springframework.util.CustomizableThreadCreator;
* @see #setVirtualThreads
* @see #setTaskTerminationTimeout
* @see #setConcurrencyLimit
* @see org.springframework.core.retry.support.RetryTask
* @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/

14
spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java

@ -63,7 +63,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task @@ -63,7 +63,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task
/**
* Execute the given {@code task} synchronously, through direct
* invocation of its {@link Runnable#run() run()} method.
* <p>This can be used with a {@link #setConcurrencyLimit concurrency limit},
* analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup.
* Also, the provided task may apply a retry policy via
* {@link org.springframework.core.retry.support.RetryTask}.
* @throws RuntimeException if propagated from the given {@code Runnable}
* @see #setConcurrencyLimit
* @see #setRejectTasksWhenLimitReached
* @see org.springframework.core.retry.support.RetryTask#wrap(Runnable)
*/
@Override
public void execute(Runnable task) {
@ -85,10 +92,17 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task @@ -85,10 +92,17 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task
/**
* Execute the given {@code task} synchronously, through direct
* invocation of its {@link TaskCallback#call() call()} method.
* <p>This can be used with a {@link #setConcurrencyLimit concurrency limit},
* analogous to a concurrency-bounded {@link SimpleAsyncTaskExecutor} setup.
* Also, the provided task may apply a retry policy via
* {@link org.springframework.core.retry.support.RetryTask}.
* @param <V> the returned value type, if any
* @param <E> the exception propagated, if any
* @throws E if propagated from the given {@code TaskCallback}
* @since 7.0
* @see #setConcurrencyLimit
* @see #setRejectTasksWhenLimitReached
* @see org.springframework.core.retry.support.RetryTask#RetryTask(TaskCallback)
*/
public <V extends @Nullable Object, E extends Exception> V execute(TaskCallback<V, E> task) throws E {
Assert.notNull(task, "Task must not be null");

21
spring-core/src/main/java/org/springframework/core/task/TaskCallback.java

@ -41,4 +41,25 @@ public interface TaskCallback<V extends @Nullable Object, E extends Exception> e @@ -41,4 +41,25 @@ public interface TaskCallback<V extends @Nullable Object, E extends Exception> e
@Override
V call() throws E;
/**
* Derive a {@link TaskCallback} from the given {@link Callable}.
* @since 7.0.4
*/
static <V> TaskCallback<V, Exception> from(Callable<V> task) {
return task::call;
}
/**
* Derive a {@link TaskCallback} from the given {@link Callable}.
* @since 7.0.4
*/
@SuppressWarnings("NullAway")
static TaskCallback<Void, RuntimeException> from(Runnable task) {
return () -> {
task.run();
return null;
};
}
}

238
spring-core/src/test/java/org/springframework/core/retry/support/RetryTaskTests.java

@ -0,0 +1,238 @@ @@ -0,0 +1,238 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.retry.support;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
/**
* @author Juergen Hoeller
* @since 7.0.4
*/
class RetryTaskTests {
RetryPolicy retryPolicy = RetryPolicy.builder().maxRetries(2).delay(Duration.ZERO).build();
RetryTemplate customTemplate = new RetryTemplate(RetryPolicy.builder().maxRetries(3).delay(Duration.ZERO).build());
SyncTaskExecutor syncExecutor = new SyncTaskExecutor();
AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor();
AtomicInteger invocationCount = new AtomicInteger();
@Test
void syncTaskWithImmediateSuccess() {
assertThat(
syncExecutor.execute(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
return "always succeeds";
}, retryPolicy)))
.isEqualTo("always succeeds");
assertThat(invocationCount).hasValue(1);
}
@Test
void syncTaskWithSuccessAfterInitialFailures() {
assertThat(
syncExecutor.execute(new RetryTask<>(() -> {
if (invocationCount.incrementAndGet() < 2) {
throw new IllegalStateException("Boom " + invocationCount.get());
}
return "finally succeeded";
}, retryPolicy)))
.isEqualTo("finally succeeded");
assertThat(invocationCount).hasValue(2);
}
@Test
void syncTaskWithExhaustedPolicy() {
assertThatIllegalStateException().isThrownBy(() ->
syncExecutor.execute(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, retryPolicy)));
assertThat(invocationCount).hasValue(3);
}
@Test
void syncTaskWithCustomTemplate() {
assertThatIllegalStateException().isThrownBy(() ->
syncExecutor.execute(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, customTemplate)));
assertThat(invocationCount).hasValue(4);
}
@Test
void asyncTaskWithImmediateSuccess() throws Exception {
assertThat(
asyncExecutor.submit(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
return "always succeeds";
}, retryPolicy)).get())
.isEqualTo("always succeeds");
assertThat(invocationCount).hasValue(1);
}
@Test
void asyncTaskWithSuccessAfterInitialFailures() throws Exception {
assertThat(
asyncExecutor.submit(new RetryTask<>(() -> {
if (invocationCount.incrementAndGet() < 2) {
throw new IllegalStateException("Boom " + invocationCount.get());
}
return "finally succeeded";
}, retryPolicy)).get())
.isEqualTo("finally succeeded");
assertThat(invocationCount).hasValue(2);
}
@Test
void asyncTaskWithExhaustedPolicy() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, retryPolicy)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(3);
}
@Test
void asyncTaskWithCustomTemplate() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(new RetryTask<>(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, customTemplate)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(4);
}
@Test
void callableWithImmediateSuccess() throws Exception {
assertThat(
asyncExecutor.submit(RetryTask.wrap(() -> {
invocationCount.incrementAndGet();
return "always succeeds";
}, retryPolicy)).get())
.isEqualTo("always succeeds");
assertThat(invocationCount).hasValue(1);
}
@Test
void callableWithSuccessAfterInitialFailures() throws Exception {
assertThat(
asyncExecutor.submit(RetryTask.wrap(() -> {
if (invocationCount.incrementAndGet() < 2) {
throw new IllegalStateException("Boom " + invocationCount.get());
}
return "finally succeeded";
}, retryPolicy)).get())
.isEqualTo("finally succeeded");
assertThat(invocationCount).hasValue(2);
}
@Test
void callableWithExhaustedPolicy() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(RetryTask.wrap(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, retryPolicy)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(3);
}
@Test
void callableWithCustomTemplate() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(RetryTask.wrap(() -> {
invocationCount.incrementAndGet();
throw new IllegalStateException("Boom " + invocationCount.get());
}, customTemplate)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(4);
}
@Test
void runnableWithImmediateSuccess() throws Exception {
assertThat(
asyncExecutor.submit(RetryTask.wrap(() -> {
if (true) { // forcing Runnable over Callable on overloaded wrap method
invocationCount.incrementAndGet();
}
}, retryPolicy)).get())
.isNull();
assertThat(invocationCount).hasValue(1);
}
@Test
void runnableWithSuccessAfterInitialFailures() throws Exception {
assertThat(
asyncExecutor.submit(RetryTask.wrap(() -> {
if (invocationCount.incrementAndGet() < 2) {
throw new IllegalStateException("Boom " + invocationCount.get());
}
}, retryPolicy)).get())
.isNull();
assertThat(invocationCount).hasValue(2);
}
@Test
void runnableWithExhaustedPolicy() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(RetryTask.wrap(() -> {
invocationCount.incrementAndGet();
if (true) { // forcing Runnable over Callable on overloaded wrap method
throw new IllegalStateException("Boom " + invocationCount.get());
}
}, retryPolicy)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(3);
}
@Test
void runnableWithCustomTemplate() {
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() ->
asyncExecutor.submit(RetryTask.wrap(() -> {
invocationCount.incrementAndGet();
if (true) { // forcing Runnable over Callable on overloaded wrap method
throw new IllegalStateException("Boom " + invocationCount.get());
}
}, customTemplate)).get())
.withCauseExactlyInstanceOf(IllegalStateException.class);
assertThat(invocationCount).hasValue(4);
}
}
Loading…
Cancel
Save