Browse Source

Separate internal ScheduledExecutorService for fixed-delay tasks

Closes gh-33408
pull/33594/head
Juergen Hoeller 1 year ago
parent
commit
8cfd6aed04
  1. 74
      spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java

74
spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java

@ -52,7 +52,7 @@ import org.springframework.util.concurrent.ListenableFuture; @@ -52,7 +52,7 @@ import org.springframework.util.concurrent.ListenableFuture;
* separate thread. This is an attractive choice with virtual threads on JDK 21,
* expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
*
* <p><b>NOTE: Scheduling with a fixed delay enforces execution on the single
* <p><b>NOTE: Scheduling with a fixed delay enforces execution on a single
* scheduler thread, in order to provide traditional fixed-delay semantics!</b>
* Prefer the use of fixed rates or cron triggers instead which are a better fit
* with this thread-per-task scheduler variant.
@ -113,9 +113,13 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -113,9 +113,13 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
private final ScheduledExecutorService scheduledExecutor = createScheduledExecutor();
private final ScheduledExecutorService triggerExecutor = createScheduledExecutor();
private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor);
private final ExecutorLifecycleDelegate triggerLifecycle = new ExecutorLifecycleDelegate(this.triggerExecutor);
private final ScheduledExecutorService fixedDelayExecutor = createFixedDelayExecutor();
private final ExecutorLifecycleDelegate fixedDelayLifecycle = new ExecutorLifecycleDelegate(this.fixedDelayExecutor);
@Nullable
private ErrorHandler errorHandler;
@ -195,11 +199,24 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -195,11 +199,24 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
return new ScheduledThreadPoolExecutor(1, this::newThread) {
@Override
protected void beforeExecute(Thread thread, Runnable task) {
lifecycleDelegate.beforeExecute(thread);
triggerLifecycle.beforeExecute(thread);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
triggerLifecycle.afterExecute();
}
};
}
private ScheduledExecutorService createFixedDelayExecutor() {
return new ScheduledThreadPoolExecutor(1, this::newThread) {
@Override
protected void beforeExecute(Thread thread, Runnable task) {
fixedDelayLifecycle.beforeExecute(thread);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
lifecycleDelegate.afterExecute();
fixedDelayLifecycle.afterExecute();
}
};
}
@ -227,7 +244,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -227,7 +244,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
}
else if (this.scheduledExecutor.isShutdown()) {
else if (this.triggerExecutor.isShutdown()) {
LogFactory.getLog(getClass()).debug("Ignoring scheduled task exception after shutdown", ex);
}
else {
@ -271,10 +288,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -271,10 +288,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(
delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
delegate, trigger, this.clock, this.triggerExecutor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.triggerExecutor, task, ex);
}
}
@ -282,10 +299,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -282,10 +299,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
Duration delay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO);
return this.triggerExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.triggerExecutor, task, ex);
}
}
@ -293,22 +310,22 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -293,22 +310,22 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
return this.triggerExecutor.scheduleAtFixedRate(scheduledTask(task),
NANO.convert(initialDelay), NANO.convert(period), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.triggerExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
try {
return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
return this.triggerExecutor.scheduleAtFixedRate(scheduledTask(task),
0, NANO.convert(period), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.triggerExecutor, task, ex);
}
}
@ -317,11 +334,11 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -317,11 +334,11 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try {
// Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
return this.fixedDelayExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
NANO.convert(initialDelay), NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.fixedDelayExecutor, task, ex);
}
}
@ -329,45 +346,54 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -329,45 +346,54 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
try {
// Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
return this.fixedDelayExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
0, NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
throw new TaskRejectedException(this.fixedDelayExecutor, task, ex);
}
}
@Override
public void start() {
this.lifecycleDelegate.start();
this.triggerLifecycle.start();
this.fixedDelayLifecycle.start();
}
@Override
public void stop() {
this.lifecycleDelegate.stop();
this.triggerLifecycle.stop();
this.fixedDelayLifecycle.stop();
}
@Override
public void stop(Runnable callback) {
this.lifecycleDelegate.stop(callback);
this.triggerLifecycle.stop(); // no callback necessary since it's just triggers with hand-offs
this.fixedDelayLifecycle.stop(callback); // callback for currently executing fixed-delay tasks
}
@Override
public boolean isRunning() {
return this.lifecycleDelegate.isRunning();
return this.triggerLifecycle.isRunning();
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
this.scheduledExecutor.shutdown();
this.triggerExecutor.shutdown();
this.fixedDelayExecutor.shutdown();
}
}
@Override
public void close() {
for (Runnable remainingTask : this.scheduledExecutor.shutdownNow()) {
for (Runnable remainingTask : this.triggerExecutor.shutdownNow()) {
if (remainingTask instanceof Future<?> future) {
future.cancel(true);
}
}
for (Runnable remainingTask : this.fixedDelayExecutor.shutdownNow()) {
if (remainingTask instanceof Future<?> future) {
future.cancel(true);
}

Loading…
Cancel
Save