Browse Source

Allow configuring custom ThreadPoolTaskExecutor

Issue: SPR-12272
pull/654/merge
Rossen Stoyanchev 11 years ago
parent
commit
521bbfcf56
  1. 12
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java
  2. 11
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java
  3. 31
      spring-messaging/src/main/java/org/springframework/messaging/support/Test.java
  4. 10
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

12
spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

@ -21,6 +21,7 @@ import java.util.Arrays; @@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.List;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* A registration class for customizing the configuration for a
@ -46,6 +47,17 @@ public class ChannelRegistration { @@ -46,6 +47,17 @@ public class ChannelRegistration {
return this.registration;
}
/**
* Configure the thread pool backing this message channel using a custom
* ThreadPoolTaskExecutor.
*/
public TaskExecutorRegistration taskExecutor(ThreadPoolTaskExecutor taskExecutor) {
if (this.registration == null) {
this.registration = new TaskExecutorRegistration(taskExecutor);
}
return this.registration;
}
/**
* Configure interceptors for the message channel.
*/

11
spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java

@ -26,6 +26,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -26,6 +26,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
*/
public class TaskExecutorRegistration {
private ThreadPoolTaskExecutor taskExecutor;
private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
private int maxPoolSize = Integer.MAX_VALUE;
@ -35,6 +37,13 @@ public class TaskExecutorRegistration { @@ -35,6 +37,13 @@ public class TaskExecutorRegistration {
private int keepAliveSeconds = 60;
public TaskExecutorRegistration() {
}
public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* Set the core pool size of the ThreadPoolExecutor.
* <p><strong>NOTE:</strong> The core pool size is effectively the max pool size
@ -93,7 +102,7 @@ public class TaskExecutorRegistration { @@ -93,7 +102,7 @@ public class TaskExecutorRegistration {
}
protected ThreadPoolTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
executor.setCorePoolSize(this.corePoolSize);
executor.setMaxPoolSize(this.maxPoolSize);
executor.setKeepAliveSeconds(this.keepAliveSeconds);

31
spring-messaging/src/main/java/org/springframework/messaging/support/Test.java

@ -1,31 +0,0 @@ @@ -1,31 +0,0 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
/**
* @author Rossen Stoyanchev
* @since 4.1
*/
public class Test {
public static void main(String[] args) {
ExecutorSubscribableChannel.ExecutorSubscribableChannelTask task = null;
}
}

10
spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

@ -132,8 +132,8 @@ public class MessageBrokerConfigurationTests { @@ -132,8 +132,8 @@ public class MessageBrokerConfigurationTests {
assertEquals(2, channel.getInterceptors().size());
ThreadPoolTaskExecutor taskExecutor = this.customContext.getBean(
"clientInboundChannelExecutor", ThreadPoolTaskExecutor.class);
CustomThreadPoolTaskExecutor taskExecutor = this.customContext.getBean(
"clientInboundChannelExecutor", CustomThreadPoolTaskExecutor.class);
assertEquals(11, taskExecutor.getCorePoolSize());
assertEquals(12, taskExecutor.getMaxPoolSize());
@ -489,7 +489,8 @@ public class MessageBrokerConfigurationTests { @@ -489,7 +489,8 @@ public class MessageBrokerConfigurationTests {
@Override
protected void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(this.interceptor);
registration.taskExecutor().corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14);
registration.taskExecutor(new CustomThreadPoolTaskExecutor())
.corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14);
}
@Override
@ -540,4 +541,7 @@ public class MessageBrokerConfigurationTests { @@ -540,4 +541,7 @@ public class MessageBrokerConfigurationTests {
}
}
private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}
}

Loading…
Cancel
Save