Browse Source

Add cancelRemainingTasksOnClose flag for enforcing early interruption

Closes gh-35372
pull/35405/head
Juergen Hoeller 4 months ago
parent
commit
f62519bb55
  1. 59
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

59
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -92,6 +92,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -92,6 +92,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
@Nullable
private Set<Thread> activeThreads;
private boolean cancelRemainingTasksOnClose = false;
private boolean rejectTasksWhenLimitReached = false;
private volatile boolean active = true;
@ -184,12 +186,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -184,12 +186,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
* @param timeout the timeout in milliseconds
* @since 6.1
* @see #close()
* @see #setCancelRemainingTasksOnClose
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis
*/
public void setTaskTerminationTimeout(long timeout) {
Assert.isTrue(timeout >= 0, "Timeout value must be >=0");
this.taskTerminationTimeout = timeout;
this.activeThreads = (timeout > 0 ? ConcurrentHashMap.newKeySet() : null);
trackActiveThreadsIfNecessary();
}
/**
* Specify whether to cancel remaining tasks on close: that is, whether to
* interrupt any active threads at the time of the {@link #close()} call.
* <p>The default is {@code false}, not tracking active threads at all or
* just interrupting any remaining threads that still have not finished after
* the specified {@link #setTaskTerminationTimeout taskTerminationTimeout}.
* Switch this to {@code true} for immediate interruption on close, either in
* combination with a subsequent termination timeout or without any waiting
* at all, depending on whether a {@code taskTerminationTimeout} has been
* specified as well.
* @since 6.2.11
* @see #close()
* @see #setTaskTerminationTimeout
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setWaitForTasksToCompleteOnShutdown
*/
public void setCancelRemainingTasksOnClose(boolean cancelRemainingTasksOnClose) {
this.cancelRemainingTasksOnClose = cancelRemainingTasksOnClose;
trackActiveThreadsIfNecessary();
}
/**
@ -249,6 +272,15 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -249,6 +272,15 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
return this.active;
}
/**
* Track active threads only when a task termination timeout has been
* specified or interruption of remaining threads has been requested.
*/
private void trackActiveThreadsIfNecessary() {
this.activeThreads = (this.taskTerminationTimeout > 0 || this.cancelRemainingTasksOnClose ?
ConcurrentHashMap.newKeySet() : null);
}
/**
* Executes the given task, within a concurrency throttle
@ -353,7 +385,7 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -353,7 +385,7 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
}
/**
* This close methods tracks the termination of active threads if a concrete
* This close method tracks the termination of active threads if a concrete
* {@link #setTaskTerminationTimeout task termination timeout} has been set.
* Otherwise, it is not necessary to close this executor.
* @since 6.1
@ -364,17 +396,26 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -364,17 +396,26 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
this.active = false;
Set<Thread> threads = this.activeThreads;
if (threads != null) {
synchronized (threads) {
try {
if (!threads.isEmpty()) {
threads.wait(this.taskTerminationTimeout);
if (this.cancelRemainingTasksOnClose) {
// Early interrupt for remaining tasks on close
threads.forEach(Thread::interrupt);
}
if (this.taskTerminationTimeout > 0) {
synchronized (threads) {
try {
if (!threads.isEmpty()) {
threads.wait(this.taskTerminationTimeout);
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
if (!this.cancelRemainingTasksOnClose) {
// Late interrupt for remaining tasks after timeout
threads.forEach(Thread::interrupt);
}
}
threads.forEach(Thread::interrupt);
}
}
}

Loading…
Cancel
Save