From e386bdb82c9a6fb7e740e4321167ac4afafa1d07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my?= Date: Wed, 8 Jun 2022 11:49:22 +0200 Subject: [PATCH 1/2] Expose ThreadPoolTaskExecutor queue size and capacity for metrics We use Grafana to monitor our app via Spring's JMX exporter, and we think it could be interesting to have at least the current queue size for this purpose since the queue size directly affects the app memory load. Having the queue capacity seems also interesting to set up triggers whose values are calculated based on the maximum capacity of the queue. This commit introduces new getCurrentQueueSize() and getQueueCapacity() methods in ThreadPoolTaskExecutor. See gh-28583 --- .../concurrent/ThreadPoolTaskExecutor.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index 6a8334feebf..624191e2348 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -315,6 +315,20 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport } return this.threadPoolExecutor.getPoolSize(); } + + /** + * Return the current number of threads waiting in the queue + */ + public int getCurrentQueueSize() { + return this.getThreadPoolExecutor().getQueue().size(); + } + + /** + * Return the maximum capacity of the queue + */ + public int getQueueCapacity() { + return this.queueCapacity; + } /** * Return the number of currently active threads. From 8478e8e70aafb795c7ee0491204915ee5cea08e0 Mon Sep 17 00:00:00 2001 From: Sam Brannen Date: Wed, 8 Jun 2022 16:32:21 +0200 Subject: [PATCH 2/2] Polish contribution This commit: - fixes Checkstyle violations - improves Javadoc - adds missing @since tags - renames getCurrentQueueSize() to getQueueSize() - avoids NullPointerExceptions in getQueueSize() - introduces tests for queue size and queue capacity Closes gh-28583 --- .../concurrent/ThreadPoolTaskExecutor.java | 36 +++++++++----- .../ThreadPoolTaskExecutorTests.java | 47 +++++++++++++++++-- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index 624191e2348..1fb2708bcca 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -72,6 +72,8 @@ import org.springframework.util.concurrent.ListenableFutureTask; * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter. * * @author Juergen Hoeller + * @author Rémy Guihard + * @author Sam Brannen * @since 2.0 * @see org.springframework.core.task.TaskExecutor * @see java.util.concurrent.ThreadPoolExecutor @@ -155,7 +157,7 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport /** * Set the ThreadPoolExecutor's keep-alive seconds. - * Default is 60. + *

Default is 60. *

This setting can be modified at runtime, for example through JMX. */ public void setKeepAliveSeconds(int keepAliveSeconds) { @@ -178,7 +180,7 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. - * Default is {@code Integer.MAX_VALUE}. + *

Default is {@code Integer.MAX_VALUE}. *

Any positive value will lead to a LinkedBlockingQueue instance; * any other value will lead to a SynchronousQueue instance. * @see java.util.concurrent.LinkedBlockingQueue @@ -188,6 +190,15 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport this.queueCapacity = queueCapacity; } + /** + * Return the capacity for the ThreadPoolExecutor's BlockingQueue. + * @since 5.3.21 + * @see #setQueueCapacity(int) + */ + public int getQueueCapacity() { + return this.queueCapacity; + } + /** * Specify whether to allow core threads to time out. This enables dynamic * growing and shrinking even in combination with a non-zero queue (since @@ -315,19 +326,18 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport } return this.threadPoolExecutor.getPoolSize(); } - - /** - * Return the current number of threads waiting in the queue - */ - public int getCurrentQueueSize() { - return this.getThreadPoolExecutor().getQueue().size(); - } /** - * Return the maximum capacity of the queue - */ - public int getQueueCapacity() { - return this.queueCapacity; + * Return the current queue size. + * @since 5.3.21 + * @see java.util.concurrent.ThreadPoolExecutor#getQueue() + */ + public int getQueueSize() { + if (this.threadPoolExecutor == null) { + // Not initialized yet: assume no queued tasks. + return 0; + } + return this.threadPoolExecutor.getQueue().size(); } /** diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java index afc629b882a..0723a27abf7 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ package org.springframework.scheduling.concurrent; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -23,10 +26,15 @@ import org.junit.jupiter.api.Test; import org.springframework.core.task.AsyncListenableTaskExecutor; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.type; /** + * Unit tests for {@link ThreadPoolTaskExecutor}. + * * @author Juergen Hoeller + * @author Sam Brannen * @since 5.0.5 */ class ThreadPoolTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { @@ -50,8 +58,8 @@ class ThreadPoolTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { executor.setCorePoolSize(0); - assertThat(executor.getCorePoolSize()).isEqualTo(0); - assertThat(executor.getThreadPoolExecutor().getCorePoolSize()).isEqualTo(0); + assertThat(executor.getCorePoolSize()).isZero(); + assertThat(executor.getThreadPoolExecutor().getCorePoolSize()).isZero(); } @Test @@ -112,4 +120,37 @@ class ThreadPoolTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { assertThat(executor.getThreadPoolExecutor().getKeepAliveTime(TimeUnit.SECONDS)).isEqualTo(60); } + @Test + void queueCapacityDefault() { + assertThat(executor.getQueueCapacity()).isEqualTo(Integer.MAX_VALUE); + assertThat(executor.getThreadPoolExecutor().getQueue()) + .asInstanceOf(type(LinkedBlockingQueue.class)) + .extracting(BlockingQueue::remainingCapacity).isEqualTo(Integer.MAX_VALUE); + } + + @Test + void queueCapacityZero() { + executor.setQueueCapacity(0); + executor.afterPropertiesSet(); + + assertThat(executor.getQueueCapacity()).isZero(); + assertThat(executor.getThreadPoolExecutor().getQueue()) + .asInstanceOf(type(SynchronousQueue.class)) + .extracting(BlockingQueue::remainingCapacity).isEqualTo(0); + } + + @Test + void queueSize() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + assertThatIllegalStateException().isThrownBy(executor::getThreadPoolExecutor); + assertThat(executor.getQueueSize()).isZero(); + + executor.afterPropertiesSet(); + + assertThat(executor.getThreadPoolExecutor()).isNotNull(); + assertThat(executor.getThreadPoolExecutor().getQueue()).isEmpty(); + assertThat(executor.getQueueSize()).isZero(); + } + }