diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java index f1b2e137418..6ced0579be9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * A registration class for customizing the configuration for a @@ -46,6 +47,17 @@ public class ChannelRegistration { return this.registration; } + /** + * Configure the thread pool backing this message channel using a custom + * ThreadPoolTaskExecutor. + */ + public TaskExecutorRegistration taskExecutor(ThreadPoolTaskExecutor taskExecutor) { + if (this.registration == null) { + this.registration = new TaskExecutorRegistration(taskExecutor); + } + return this.registration; + } + /** * Configure interceptors for the message channel. */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java index e8a5e050673..d7ec1bcca15 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -26,6 +26,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; */ public class TaskExecutorRegistration { + private ThreadPoolTaskExecutor taskExecutor; + private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; private int maxPoolSize = Integer.MAX_VALUE; @@ -35,6 +37,13 @@ public class TaskExecutorRegistration { private int keepAliveSeconds = 60; + public TaskExecutorRegistration() { + } + + public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + /** * Set the core pool size of the ThreadPoolExecutor. *

NOTE: The core pool size is effectively the max pool size @@ -93,7 +102,7 @@ public class TaskExecutorRegistration { } protected ThreadPoolTaskExecutor getTaskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor()); executor.setCorePoolSize(this.corePoolSize); executor.setMaxPoolSize(this.maxPoolSize); executor.setKeepAliveSeconds(this.keepAliveSeconds); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/Test.java b/spring-messaging/src/main/java/org/springframework/messaging/support/Test.java deleted file mode 100644 index 2a7cf38575a..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/Test.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2002-2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.support; - -/** - * @author Rossen Stoyanchev - * @since 4.1 - */ -public class Test { - - public static void main(String[] args) { - - ExecutorSubscribableChannel.ExecutorSubscribableChannelTask task = null; - - } - -} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index ce4aceb30db..a7684b97e22 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -132,8 +132,8 @@ public class MessageBrokerConfigurationTests { assertEquals(2, channel.getInterceptors().size()); - ThreadPoolTaskExecutor taskExecutor = this.customContext.getBean( - "clientInboundChannelExecutor", ThreadPoolTaskExecutor.class); + CustomThreadPoolTaskExecutor taskExecutor = this.customContext.getBean( + "clientInboundChannelExecutor", CustomThreadPoolTaskExecutor.class); assertEquals(11, taskExecutor.getCorePoolSize()); assertEquals(12, taskExecutor.getMaxPoolSize()); @@ -489,7 +489,8 @@ public class MessageBrokerConfigurationTests { @Override protected void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(this.interceptor); - registration.taskExecutor().corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14); + registration.taskExecutor(new CustomThreadPoolTaskExecutor()) + .corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14); } @Override @@ -540,4 +541,7 @@ public class MessageBrokerConfigurationTests { } } + private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { + } + }