From 8cfd6aed04d4fe94a7ef7578b833d77cdbc48660 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 25 Sep 2024 10:04:17 +0200 Subject: [PATCH] Separate internal ScheduledExecutorService for fixed-delay tasks Closes gh-33408 --- .../concurrent/SimpleAsyncTaskScheduler.java | 74 +++++++++++++------ 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java index f19724de79e..abf72504df6 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java @@ -52,7 +52,7 @@ import org.springframework.util.concurrent.ListenableFuture; * separate thread. This is an attractive choice with virtual threads on JDK 21, * expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}. * - *

NOTE: Scheduling with a fixed delay enforces execution on the single + *

NOTE: Scheduling with a fixed delay enforces execution on a single * scheduler thread, in order to provide traditional fixed-delay semantics! * Prefer the use of fixed rates or cron triggers instead which are a better fit * with this thread-per-task scheduler variant. @@ -113,9 +113,13 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements private static final TimeUnit NANO = TimeUnit.NANOSECONDS; - private final ScheduledExecutorService scheduledExecutor = createScheduledExecutor(); + private final ScheduledExecutorService triggerExecutor = createScheduledExecutor(); - private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor); + private final ExecutorLifecycleDelegate triggerLifecycle = new ExecutorLifecycleDelegate(this.triggerExecutor); + + private final ScheduledExecutorService fixedDelayExecutor = createFixedDelayExecutor(); + + private final ExecutorLifecycleDelegate fixedDelayLifecycle = new ExecutorLifecycleDelegate(this.fixedDelayExecutor); @Nullable private ErrorHandler errorHandler; @@ -195,11 +199,24 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements return new ScheduledThreadPoolExecutor(1, this::newThread) { @Override protected void beforeExecute(Thread thread, Runnable task) { - lifecycleDelegate.beforeExecute(thread); + triggerLifecycle.beforeExecute(thread); + } + @Override + protected void afterExecute(Runnable task, Throwable ex) { + triggerLifecycle.afterExecute(); + } + }; + } + + private ScheduledExecutorService createFixedDelayExecutor() { + return new ScheduledThreadPoolExecutor(1, this::newThread) { + @Override + protected void beforeExecute(Thread thread, Runnable task) { + fixedDelayLifecycle.beforeExecute(thread); } @Override protected void afterExecute(Runnable task, Throwable ex) { - lifecycleDelegate.afterExecute(); + fixedDelayLifecycle.afterExecute(); } }; } @@ -227,7 +244,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements if (this.errorHandler != null) { this.errorHandler.handleError(ex); } - else if (this.scheduledExecutor.isShutdown()) { + else if (this.triggerExecutor.isShutdown()) { LogFactory.getLog(getClass()).debug("Ignoring scheduled task exception after shutdown", ex); } else { @@ -271,10 +288,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable( - delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule(); + delegate, trigger, this.clock, this.triggerExecutor, errorHandler).schedule(); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.triggerExecutor, task, ex); } } @@ -282,10 +299,10 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements public ScheduledFuture schedule(Runnable task, Instant startTime) { Duration delay = Duration.between(this.clock.instant(), startTime); try { - return this.scheduledExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO); + return this.triggerExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.triggerExecutor, task, ex); } } @@ -293,22 +310,22 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements public ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { Duration initialDelay = Duration.between(this.clock.instant(), startTime); try { - return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task), + return this.triggerExecutor.scheduleAtFixedRate(scheduledTask(task), NANO.convert(initialDelay), NANO.convert(period), NANO); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.triggerExecutor, task, ex); } } @Override public ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period) { try { - return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task), + return this.triggerExecutor.scheduleAtFixedRate(scheduledTask(task), 0, NANO.convert(period), NANO); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.triggerExecutor, task, ex); } } @@ -317,11 +334,11 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements Duration initialDelay = Duration.between(this.clock.instant(), startTime); try { // Blocking task on scheduler thread for fixed delay semantics - return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task), + return this.fixedDelayExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task), NANO.convert(initialDelay), NANO.convert(delay), NANO); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.fixedDelayExecutor, task, ex); } } @@ -329,45 +346,54 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) { try { // Blocking task on scheduler thread for fixed delay semantics - return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task), + return this.fixedDelayExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task), 0, NANO.convert(delay), NANO); } catch (RejectedExecutionException ex) { - throw new TaskRejectedException(this.scheduledExecutor, task, ex); + throw new TaskRejectedException(this.fixedDelayExecutor, task, ex); } } @Override public void start() { - this.lifecycleDelegate.start(); + this.triggerLifecycle.start(); + this.fixedDelayLifecycle.start(); } @Override public void stop() { - this.lifecycleDelegate.stop(); + this.triggerLifecycle.stop(); + this.fixedDelayLifecycle.stop(); } @Override public void stop(Runnable callback) { - this.lifecycleDelegate.stop(callback); + this.triggerLifecycle.stop(); // no callback necessary since it's just triggers with hand-offs + this.fixedDelayLifecycle.stop(callback); // callback for currently executing fixed-delay tasks } @Override public boolean isRunning() { - return this.lifecycleDelegate.isRunning(); + return this.triggerLifecycle.isRunning(); } @Override public void onApplicationEvent(ContextClosedEvent event) { if (event.getApplicationContext() == this.applicationContext) { - this.scheduledExecutor.shutdown(); + this.triggerExecutor.shutdown(); + this.fixedDelayExecutor.shutdown(); } } @Override public void close() { - for (Runnable remainingTask : this.scheduledExecutor.shutdownNow()) { + for (Runnable remainingTask : this.triggerExecutor.shutdownNow()) { + if (remainingTask instanceof Future future) { + future.cancel(true); + } + } + for (Runnable remainingTask : this.fixedDelayExecutor.shutdownNow()) { if (remainingTask instanceof Future future) { future.cancel(true); }