diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index e26848b7fd4..6a61943a426 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -17,7 +17,13 @@ package org.springframework.messaging.simp.config; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; @@ -93,17 +99,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientInboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); - ChannelRegistration r = getClientInboundChannelRegistration(); - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getClientInboundChannelRegistration(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor clientInboundChannelExecutor() { - TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration(); + ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientInboundChannel-"); return executor; } @@ -129,17 +133,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientOutboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); - ChannelRegistration r = getClientOutboundChannelRegistration(); - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getClientOutboundChannelRegistration(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { - TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration(); + ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientOutboundChannel-"); return executor; } @@ -162,24 +164,27 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel brokerChannel() { - ChannelRegistration r = getBrokerRegistry().getBrokerChannelRegistration(); - ExecutorSubscribableChannel channel; - if (r.hasTaskExecutor()) { - channel = new ExecutorSubscribableChannel(); // synchronous by default - } - else { - channel = new ExecutorSubscribableChannel(brokerChannelExecutor()); - } - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); + ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ? + new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor brokerChannelExecutor() { - TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); + ThreadPoolTaskExecutor executor; + if (reg.hasTaskExecutor()) { + executor = reg.taskExecutor().getTaskExecutor(); + } + else { + // Should never be used + executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(0); + executor.setMaxPoolSize(1); + executor.setQueueCapacity(0); + } executor.setThreadNamePrefix("brokerChannel-"); return executor; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java index 9260c909a60..f1b2e137418 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,17 +31,19 @@ import org.springframework.messaging.support.ChannelInterceptor; */ public class ChannelRegistration { - private TaskExecutorRegistration taskExecutorRegistration; + private TaskExecutorRegistration registration; - private List interceptors = new ArrayList(); + private final List interceptors = new ArrayList(); /** - * Configure properties of the ThreadPoolTaskExecutor backing the message channel. + * Configure the thread pool backing this message channel. */ public TaskExecutorRegistration taskExecutor() { - this.taskExecutorRegistration = new TaskExecutorRegistration(); - return this.taskExecutorRegistration; + if (this.registration == null) { + this.registration = new TaskExecutorRegistration(); + } + return this.registration; } /** @@ -56,11 +58,15 @@ public class ChannelRegistration { protected boolean hasTaskExecutor() { - return (this.taskExecutorRegistration != null); + return (this.registration != null); + } + + protected TaskExecutorRegistration getTaskExecRegistration() { + return this.registration; } - protected TaskExecutorRegistration getTaskExecutorRegistration() { - return this.taskExecutorRegistration; + protected TaskExecutorRegistration getOrCreateTaskExecRegistration() { + return taskExecutor(); } protected boolean hasInterceptors() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java index afa1ce81ff7..35955cd6666 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.messaging.simp.config; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + /** * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. * @@ -26,18 +27,28 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; */ public class TaskExecutorRegistration { - private int corePoolSize = 1; + private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; private int maxPoolSize = Integer.MAX_VALUE; - private int keepAliveSeconds = 60; - private int queueCapacity = Integer.MAX_VALUE; + private int keepAliveSeconds = 60; + /** - * Set the ThreadPoolExecutor's core pool size. - * Default is 1. + * Set the core pool size of the ThreadPoolExecutor. + * + *

NOTE: the core pool size is effectively the max pool size + * when an unbounded {@link #queueCapacity(int) queueCapacity} is configured + * (the default). This is essentially the "Unbounded queues" strategy as explained + * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When + * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. + * + *

By default this is set to twice the value of + * {@link Runtime#availableProcessors()}. In an an application where tasks do not + * block frequently, the number should be closer to or equal to the number of + * available CPUs/cores. */ public TaskExecutorRegistration corePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; @@ -45,8 +56,15 @@ public class TaskExecutorRegistration { } /** - * Set the ThreadPoolExecutor's maximum pool size. - * Default is {@code Integer.MAX_VALUE}. + * Set the max pool size of the ThreadPoolExecutor. + * + *

NOTE: when an unbounded + * {@link #queueCapacity(int) queueCapacity} is configured (the default), the + * max pool size is effectively ignored. See the "Unbounded queues" strategy + * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for + * more details. + * + *

By default this is set to {@code Integer.MAX_VALUE}. */ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; @@ -54,24 +72,32 @@ public class TaskExecutorRegistration { } /** - * Set the ThreadPoolExecutor's keep-alive seconds. - * Default is 60. + * Set the queue capacity for the ThreadPoolExecutor. + * + *

NOTE: when an unbounded + * {@link #queueCapacity(int) queueCapacity} is configured (the default) the + * core pool size is effectively the max pool size. This is essentially the + * "Unbounded queues" strategy as explained in + * {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When + * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. + * + *

By default this is set to {@code Integer.MAX_VALUE}. */ - public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { - this.keepAliveSeconds = keepAliveSeconds; + public TaskExecutorRegistration queueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; return this; } /** - * Set the capacity for the ThreadPoolExecutor's BlockingQueue. - * Default is {@code Integer.MAX_VALUE}. - *

Any positive value will lead to a LinkedBlockingQueue instance; - * any other value will lead to a SynchronousQueue instance. - * @see java.util.concurrent.LinkedBlockingQueue - * @see java.util.concurrent.SynchronousQueue + * Set the time limit for which threads may remain idle before being terminated. + * If there are more than the core number of threads currently in the pool, + * after waiting this amount of time without processing a task, excess threads + * will be terminated. This overrides any value set in the constructor. + * + *

By default this is set to 60. */ - public TaskExecutorRegistration queueCapacity(int queueCapacity) { - this.queueCapacity = queueCapacity; + public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; return this; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 090b402c232..dd2e973ba84 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.StaticApplicationContext; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.converter.*; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -66,24 +67,30 @@ import static org.junit.Assert.*; */ public class MessageBrokerConfigurationTests { - private AnnotationConfigApplicationContext simpleContext; + private AnnotationConfigApplicationContext simpleBrokerContext; private AnnotationConfigApplicationContext brokerRelayContext; + private AnnotationConfigApplicationContext defaultContext; + private AnnotationConfigApplicationContext customChannelContext; @Before public void setupOnce() { - this.simpleContext = new AnnotationConfigApplicationContext(); - this.simpleContext.register(SimpleConfig.class); - this.simpleContext.refresh(); + this.simpleBrokerContext = new AnnotationConfigApplicationContext(); + this.simpleBrokerContext.register(SimpleBrokerConfig.class); + this.simpleBrokerContext.refresh(); this.brokerRelayContext = new AnnotationConfigApplicationContext(); this.brokerRelayContext.register(BrokerRelayConfig.class); this.brokerRelayContext.refresh(); + this.defaultContext = new AnnotationConfigApplicationContext(); + this.defaultContext.register(DefaultConfig.class); + this.defaultContext.refresh(); + this.customChannelContext = new AnnotationConfigApplicationContext(); this.customChannelContext.register(CustomChannelConfig.class); this.customChannelContext.refresh(); @@ -93,13 +100,13 @@ public class MessageBrokerConfigurationTests { @Test public void clientInboundChannel() { - TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientInboundChannel", TestChannel.class); Set handlers = channel.getSubscribers(); assertEquals(3, handlers.size()); - assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class))); } @Test @@ -130,8 +137,8 @@ public class MessageBrokerConfigurationTests { @Test public void clientOutboundChannelUsedByAnnotatedMethod() { - TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class); - SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class); + SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); headers.setSessionId("sess1"); @@ -151,8 +158,8 @@ public class MessageBrokerConfigurationTests { @Test public void clientOutboundChannelUsedBySimpleBroker() { - TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class); - SimpleBrokerMessageHandler broker = this.simpleContext.getBean(SimpleBrokerMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class); + SimpleBrokerMessageHandler broker = this.simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); headers.setSessionId("sess1"); @@ -197,12 +204,14 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannel() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); Set handlers = channel.getSubscribers(); assertEquals(2, handlers.size()); - assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class))); + + assertNull(channel.getExecutor()); } @Test @@ -217,8 +226,9 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannelUsedByAnnotatedMethod() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); - SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); + SimpAnnotationMethodMessageHandler messageHandler = + this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setDestination("/foo"); @@ -236,10 +246,10 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannelUsedByUserDestinationMessageHandler() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); - UserDestinationMessageHandler messageHandler = this.simpleContext.getBean(UserDestinationMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); + UserDestinationMessageHandler messageHandler = this.simpleBrokerContext.getBean(UserDestinationMessageHandler.class); - this.simpleContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1"); + this.simpleBrokerContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1"); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setDestination("/user/joe/foo"); @@ -285,6 +295,24 @@ public class MessageBrokerConfigurationTests { assertEquals(MimeTypeUtils.APPLICATION_JSON, ((DefaultContentTypeResolver) resolver).getDefaultMimeType()); } + @Test + public void threadPoolSizeDefault() { + + String name = "clientInboundChannelExecutor"; + ThreadPoolTaskExecutor executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize()); + // No way to verify queue capacity + + name = "clientOutboundChannelExecutor"; + executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize()); + + name = "brokerChannelExecutor"; + executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(0, executor.getCorePoolSize()); + assertEquals(1, executor.getMaxPoolSize()); + } + @Test public void configureMessageConvertersCustom() { final MessageConverter testConverter = Mockito.mock(MessageConverter.class); @@ -360,7 +388,7 @@ public class MessageBrokerConfigurationTests { @Test public void simpValidatorInjected() { SimpAnnotationMethodMessageHandler messageHandler = - this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); assertThat(messageHandler.getValidator(), Matchers.notNullValue(Validator.class)); } @@ -381,8 +409,9 @@ public class MessageBrokerConfigurationTests { } } + @Configuration - static class SimpleConfig extends AbstractMessageBrokerConfiguration { + static class SimpleBrokerConfig extends AbstractMessageBrokerConfiguration { @Bean public TestController subscriptionController() { @@ -409,7 +438,7 @@ public class MessageBrokerConfigurationTests { } @Configuration - static class BrokerRelayConfig extends SimpleConfig { + static class BrokerRelayConfig extends SimpleBrokerConfig { @Override public void configureMessageBroker(MessageBrokerRegistry registry) { @@ -417,6 +446,10 @@ public class MessageBrokerConfigurationTests { } } + @Configuration + static class DefaultConfig extends AbstractMessageBrokerConfiguration { + } + @Configuration static class CustomChannelConfig extends AbstractMessageBrokerConfiguration { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 3c322807b21..c2cf38e322e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -201,6 +201,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { } else if (!channelName.equals("brokerChannel")) { executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class); + executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2); + executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE); + executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE); } ConstructorArgumentValues values = new ConstructorArgumentValues(); diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd index 2d192a336dd..0b902c9d425 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd @@ -152,7 +152,7 @@ @@ -346,31 +346,44 @@ diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index b69dc175e91..2efd503057d 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -136,11 +136,11 @@ public class MessageBrokerBeanDefinitionParserTests { Arrays.>asList(SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 0); - testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList(SubProtocolWebSocketHandler.class); testChannel("clientOutboundChannel", subscriberTypes, 0); - testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList( SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class); @@ -199,11 +199,11 @@ public class MessageBrokerBeanDefinitionParserTests { Arrays.>asList(SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 0); - testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList(SubProtocolWebSocketHandler.class); testChannel("clientOutboundChannel", subscriberTypes, 0); - testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList( StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);