From f62519bb55d039e7c491e739757c3830ea735bdf Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Sun, 24 Aug 2025 10:31:01 +0200 Subject: [PATCH] Add cancelRemainingTasksOnClose flag for enforcing early interruption Closes gh-35372 --- .../core/task/SimpleAsyncTaskExecutor.java | 59 ++++++++++++++++--- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index 33b35c4b37d..bde5c547180 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -92,6 +92,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @Nullable private Set activeThreads; + private boolean cancelRemainingTasksOnClose = false; + private boolean rejectTasksWhenLimitReached = false; private volatile boolean active = true; @@ -184,12 +186,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator * @param timeout the timeout in milliseconds * @since 6.1 * @see #close() + * @see #setCancelRemainingTasksOnClose * @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis */ public void setTaskTerminationTimeout(long timeout) { Assert.isTrue(timeout >= 0, "Timeout value must be >=0"); this.taskTerminationTimeout = timeout; - this.activeThreads = (timeout > 0 ? ConcurrentHashMap.newKeySet() : null); + trackActiveThreadsIfNecessary(); + } + + /** + * Specify whether to cancel remaining tasks on close: that is, whether to + * interrupt any active threads at the time of the {@link #close()} call. + *

The default is {@code false}, not tracking active threads at all or + * just interrupting any remaining threads that still have not finished after + * the specified {@link #setTaskTerminationTimeout taskTerminationTimeout}. + * Switch this to {@code true} for immediate interruption on close, either in + * combination with a subsequent termination timeout or without any waiting + * at all, depending on whether a {@code taskTerminationTimeout} has been + * specified as well. + * @since 6.2.11 + * @see #close() + * @see #setTaskTerminationTimeout + * @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setWaitForTasksToCompleteOnShutdown + */ + public void setCancelRemainingTasksOnClose(boolean cancelRemainingTasksOnClose) { + this.cancelRemainingTasksOnClose = cancelRemainingTasksOnClose; + trackActiveThreadsIfNecessary(); } /** @@ -249,6 +272,15 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator return this.active; } + /** + * Track active threads only when a task termination timeout has been + * specified or interruption of remaining threads has been requested. + */ + private void trackActiveThreadsIfNecessary() { + this.activeThreads = (this.taskTerminationTimeout > 0 || this.cancelRemainingTasksOnClose ? + ConcurrentHashMap.newKeySet() : null); + } + /** * Executes the given task, within a concurrency throttle @@ -353,7 +385,7 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator } /** - * This close methods tracks the termination of active threads if a concrete + * This close method tracks the termination of active threads if a concrete * {@link #setTaskTerminationTimeout task termination timeout} has been set. * Otherwise, it is not necessary to close this executor. * @since 6.1 @@ -364,17 +396,26 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator this.active = false; Set threads = this.activeThreads; if (threads != null) { - synchronized (threads) { - try { - if (!threads.isEmpty()) { - threads.wait(this.taskTerminationTimeout); + if (this.cancelRemainingTasksOnClose) { + // Early interrupt for remaining tasks on close + threads.forEach(Thread::interrupt); + } + if (this.taskTerminationTimeout > 0) { + synchronized (threads) { + try { + if (!threads.isEmpty()) { + threads.wait(this.taskTerminationTimeout); + } + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); + if (!this.cancelRemainingTasksOnClose) { + // Late interrupt for remaining tasks after timeout + threads.forEach(Thread::interrupt); } } - threads.forEach(Thread::interrupt); } } }