Browse Source

Add support for setting removeOnCancelPolicy

This change introduces removeOnCancelPolicy on ThreadPoolTaskScheduler
and ScheduledExecutorFactoryBean and sets it to true for SockJS.
This ensures that cancelled tasks are removed immediately to avoid
the "unbounded retention of cancelled tasks" that is mentioned in
the Javadoc ScheduledThreadPoolExecutor:

"By default, such a cancelled task is not automatically removed from
the work queue until its delay elapses. While this enables further
inspection and monitoring, it may also cause unbounded retention of
cancelled tasks. To avoid this, set setRemoveOnCancelPolicy to true,
which causes tasks to be immediately removed from the work queue at
time of cancellation."

This is a backport for:
7441f23012

Issue: SPR-11918
pull/579/head
Rossen Stoyanchev 12 years ago
parent
commit
87077d3fad
  1. 17
      spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java
  2. 34
      spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java
  3. 1
      spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java
  4. 2
      spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java
  5. 1
      spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java
  6. 1
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java
  7. 7
      spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

17
spring-context/src/main/java/org/springframework/scheduling/concurrent/ScheduledExecutorFactoryBean.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -56,6 +56,7 @@ import org.springframework.util.ObjectUtils; @@ -56,6 +56,7 @@ import org.springframework.util.ObjectUtils;
* @author Juergen Hoeller
* @since 2.0
* @see #setPoolSize
* @see #setRemoveOnCancelPolicy
* @see #setThreadFactory
* @see ScheduledExecutorTask
* @see java.util.concurrent.ScheduledExecutorService
@ -67,6 +68,8 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport @@ -67,6 +68,8 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport
private int poolSize = 1;
private Boolean removeOnCancelPolicy;
private ScheduledExecutorTask[] scheduledExecutorTasks;
private boolean continueScheduledExecutionAfterException = false;
@ -85,6 +88,14 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport @@ -85,6 +88,14 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport
this.poolSize = poolSize;
}
/**
* Set the same property on ScheduledExecutorService (JDK 1.7+).
* There is no default. If not set, the executor property is not set.
*/
public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
this.removeOnCancelPolicy = removeOnCancelPolicy;
}
/**
* Register a list of ScheduledExecutorTask objects with the ScheduledExecutorService
* that this FactoryBean creates. Depending on each ScheduledExecutorTask's settings,
@ -130,6 +141,10 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport @@ -130,6 +141,10 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport
ScheduledExecutorService executor =
createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
if (executor instanceof ScheduledThreadPoolExecutor && this.removeOnCancelPolicy != null) {
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(this.removeOnCancelPolicy);
}
// Register specified ScheduledExecutorTasks, if necessary.
if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) {
registerTasks(this.scheduledExecutorTasks, executor);

34
spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -48,6 +48,7 @@ import org.springframework.util.concurrent.ListenableFutureTask; @@ -48,6 +48,7 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @author Mark Fisher
* @since 3.0
* @see #setPoolSize
* @see #setRemoveOnCancelPolicy
* @see #setThreadFactory
* @see #setErrorHandler
*/
@ -57,6 +58,8 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @@ -57,6 +58,8 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
private volatile int poolSize = 1;
private volatile Boolean removeOnCancelPolicy;
private volatile ScheduledExecutorService scheduledExecutor;
private volatile ErrorHandler errorHandler;
@ -75,6 +78,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @@ -75,6 +78,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
}
}
/**
* Set the same property on ScheduledExecutorService (JDK 1.7+).
* There is no default. If not set, the executor property is not set.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
this.removeOnCancelPolicy = removeOnCancelPolicy;
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy);
}
}
/**
* Set a custom {@link ErrorHandler} strategy.
*/
@ -88,6 +103,11 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @@ -88,6 +103,11 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor && this.removeOnCancelPolicy != null) {
((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(this.removeOnCancelPolicy);
}
return this.scheduledExecutor;
}
@ -145,6 +165,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @@ -145,6 +165,18 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
return getScheduledThreadPoolExecutor().getPoolSize();
}
/**
* Return the current setting of removeOnCancelPolicy.
* <p>Requires an underlying {@link ScheduledThreadPoolExecutor} and JDK 1.7+.
*/
public boolean isRemoveOnCancelPolicy() {
if (this.scheduledExecutor == null) {
// Not initialized yet: return false (the default of the executor)
return false;
}
return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
}
/**
* Return the number of currently active threads.
* <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.

1
spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java

@ -139,6 +139,7 @@ class WebSocketNamespaceUtils { @@ -139,6 +139,7 @@ class WebSocketNamespaceUtils {
taskSchedulerDef.setSource(source);
taskSchedulerDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
taskSchedulerDef.getPropertyValues().add("poolSize", Runtime.getRuntime().availableProcessors());
taskSchedulerDef.getPropertyValues().add("removeOnCancelPolicy", true);
taskSchedulerDef.getPropertyValues().add("threadNamePrefix", schedulerName + "-");
parserContext.getRegistry().registerBeanDefinition(schedulerName, taskSchedulerDef);
parserContext.registerComponent(new BeanComponentDefinition(taskSchedulerDef, schedulerName));

2
spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java

@ -62,6 +62,8 @@ public class WebSocketConfigurationSupport { @@ -62,6 +62,8 @@ public class WebSocketConfigurationSupport {
public ThreadPoolTaskScheduler defaultSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("SockJS-");
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}

1
spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java

@ -97,6 +97,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac @@ -97,6 +97,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setRemoveOnCancelPolicy(true);
scheduler.setThreadNamePrefix("MessageBrokerSockJS-");
return scheduler;
}

1
spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

@ -137,6 +137,7 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -137,6 +137,7 @@ public class MessageBrokerBeanDefinitionParserTests {
ThreadPoolTaskScheduler scheduler = (ThreadPoolTaskScheduler) defaultSockJsService.getTaskScheduler();
assertEquals(Runtime.getRuntime().availableProcessors(), scheduler.getScheduledThreadPoolExecutor().getCorePoolSize());
assertTrue(scheduler.getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy());
UserSessionRegistry userSessionRegistry = this.appContext.getBean(UserSessionRegistry.class);
assertNotNull(userSessionRegistry);

7
spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

@ -20,6 +20,7 @@ import java.util.ArrayList; @@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Before;
import org.junit.Test;
@ -50,6 +51,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler; @@ -50,6 +51,7 @@ import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
/**
* Test fixture for
@ -128,8 +130,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests { @@ -128,8 +130,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
ThreadPoolTaskScheduler taskScheduler =
this.config.getBean("messageBrokerSockJsTaskScheduler", ThreadPoolTaskScheduler.class);
assertEquals(Runtime.getRuntime().availableProcessors(),
taskScheduler.getScheduledThreadPoolExecutor().getCorePoolSize());
ScheduledThreadPoolExecutor executor = taskScheduler.getScheduledThreadPoolExecutor();
assertEquals(Runtime.getRuntime().availableProcessors(), executor.getCorePoolSize());
assertTrue(executor.getRemoveOnCancelPolicy());
}

Loading…
Cancel
Save