From 87077d3fad3ad196d4b523bf7b401134cfd4e923 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 29 Jun 2014 15:47:08 -0400 Subject: [PATCH] Add support for setting removeOnCancelPolicy This change introduces removeOnCancelPolicy on ThreadPoolTaskScheduler and ScheduledExecutorFactoryBean and sets it to true for SockJS. This ensures that cancelled tasks are removed immediately to avoid the "unbounded retention of cancelled tasks" that is mentioned in the Javadoc ScheduledThreadPoolExecutor: "By default, such a cancelled task is not automatically removed from the work queue until its delay elapses. While this enables further inspection and monitoring, it may also cause unbounded retention of cancelled tasks. To avoid this, set setRemoveOnCancelPolicy to true, which causes tasks to be immediately removed from the work queue at time of cancellation." This is a backport for: https://github.com/spring-projects/spring-framework/commit/7441f2301281049cff7224bf650646490388080e Issue: SPR-11918 --- .../ScheduledExecutorFactoryBean.java | 17 +++++++++- .../concurrent/ThreadPoolTaskScheduler.java | 34 ++++++++++++++++++- .../config/WebSocketNamespaceUtils.java | 1 + .../WebSocketConfigurationSupport.java | 2 ++ ...cketMessageBrokerConfigurationSupport.java | 1 + ...essageBrokerBeanDefinitionParserTests.java | 1 + ...essageBrokerConfigurationSupportTests.java | 7 ++-- 7 files changed, 59 insertions(+), 4 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java index 2990e8b3a27..a8d47eda296 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ import org.springframework.util.ObjectUtils; * @author Juergen Hoeller * @since 2.0 * @see #setPoolSize + * @see #setRemoveOnCancelPolicy * @see #setThreadFactory * @see ScheduledExecutorTask * @see java.util.concurrent.ScheduledExecutorService @@ -67,6 +68,8 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport private int poolSize = 1; + private Boolean removeOnCancelPolicy; + private ScheduledExecutorTask[] scheduledExecutorTasks; private boolean continueScheduledExecutionAfterException = false; @@ -85,6 +88,14 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport this.poolSize = poolSize; } + /** + * Set the same property on ScheduledExecutorService (JDK 1.7+). + * There is no default. If not set, the executor property is not set. + */ + public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) { + this.removeOnCancelPolicy = removeOnCancelPolicy; + } + /** * Register a list of ScheduledExecutorTask objects with the ScheduledExecutorService * that this FactoryBean creates. Depending on each ScheduledExecutorTask's settings, @@ -130,6 +141,10 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport ScheduledExecutorService executor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); + if (executor instanceof ScheduledThreadPoolExecutor && this.removeOnCancelPolicy != null) { + ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(this.removeOnCancelPolicy); + } + // Register specified ScheduledExecutorTasks, if necessary. if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) { registerTasks(this.scheduledExecutorTasks, executor); diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index be5f525df7d..a4c64790878 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import org.springframework.util.concurrent.ListenableFutureTask; * @author Mark Fisher * @since 3.0 * @see #setPoolSize + * @see #setRemoveOnCancelPolicy * @see #setThreadFactory * @see #setErrorHandler */ @@ -57,6 +58,8 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport private volatile int poolSize = 1; + private volatile Boolean removeOnCancelPolicy; + private volatile ScheduledExecutorService scheduledExecutor; private volatile ErrorHandler errorHandler; @@ -75,6 +78,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport } } + /** + * Set the same property on ScheduledExecutorService (JDK 1.7+). + * There is no default. If not set, the executor property is not set. + *

This setting can be modified at runtime, for example through JMX. + */ + public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) { + this.removeOnCancelPolicy = removeOnCancelPolicy; + if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { + ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy); + } + } + /** * Set a custom {@link ErrorHandler} strategy. */ @@ -88,6 +103,11 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); + + if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor && this.removeOnCancelPolicy != null) { + ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(this.removeOnCancelPolicy); + } + return this.scheduledExecutor; } @@ -145,6 +165,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport return getScheduledThreadPoolExecutor().getPoolSize(); } + /** + * Return the current setting of removeOnCancelPolicy. + *

Requires an underlying {@link ScheduledThreadPoolExecutor} and JDK 1.7+. + */ + public boolean isRemoveOnCancelPolicy() { + if (this.scheduledExecutor == null) { + // Not initialized yet: return false (the default of the executor) + return false; + } + return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy(); + } + /** * Return the number of currently active threads. *

Requires an underlying {@link ScheduledThreadPoolExecutor}. diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java index 18f1481d7c8..3eb522fafca 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java @@ -139,6 +139,7 @@ class WebSocketNamespaceUtils { taskSchedulerDef.setSource(source); taskSchedulerDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); taskSchedulerDef.getPropertyValues().add("poolSize", Runtime.getRuntime().availableProcessors()); + taskSchedulerDef.getPropertyValues().add("removeOnCancelPolicy", true); taskSchedulerDef.getPropertyValues().add("threadNamePrefix", schedulerName + "-"); parserContext.getRegistry().registerBeanDefinition(schedulerName, taskSchedulerDef); parserContext.registerComponent(new BeanComponentDefinition(taskSchedulerDef, schedulerName)); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java index 3d1e4427248..83100eb647d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java @@ -62,6 +62,8 @@ public class WebSocketConfigurationSupport { public ThreadPoolTaskScheduler defaultSockJsTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("SockJS-"); + scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + scheduler.setRemoveOnCancelPolicy(true); return scheduler; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index 216bca35de3..c14e626259e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -97,6 +97,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + scheduler.setRemoveOnCancelPolicy(true); scheduler.setThreadNamePrefix("MessageBrokerSockJS-"); return scheduler; } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 1c9b295d971..9b636fda6ff 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -137,6 +137,7 @@ public class MessageBrokerBeanDefinitionParserTests { ThreadPoolTaskScheduler scheduler = (ThreadPoolTaskScheduler) defaultSockJsService.getTaskScheduler(); assertEquals(Runtime.getRuntime().availableProcessors(), scheduler.getScheduledThreadPoolExecutor().getCorePoolSize()); + assertTrue(scheduler.getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy()); UserSessionRegistry userSessionRegistry = this.appContext.getBean(UserSessionRegistry.class); assertNotNull(userSessionRegistry); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java index 16d64a05772..3c11ba1ad34 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.junit.Before; import org.junit.Test; @@ -50,6 +51,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * Test fixture for @@ -128,8 +130,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests { ThreadPoolTaskScheduler taskScheduler = this.config.getBean("messageBrokerSockJsTaskScheduler", ThreadPoolTaskScheduler.class); - assertEquals(Runtime.getRuntime().availableProcessors(), - taskScheduler.getScheduledThreadPoolExecutor().getCorePoolSize()); + ScheduledThreadPoolExecutor executor = taskScheduler.getScheduledThreadPoolExecutor(); + assertEquals(Runtime.getRuntime().availableProcessors(), executor.getCorePoolSize()); + assertTrue(executor.getRemoveOnCancelPolicy()); }