From 10af128e9654d346a3f6ca6a2d3201e9b409e2a1 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 20 Mar 2014 00:41:31 -0400 Subject: [PATCH] Update thread pool settings in STOMP/WebSocket config The clientInboundChannel and clientOutboundChannel now use twice the number of available processors by default to accomodate for some degree of blocking in task execution on average. In practice these settings still need to be configured explicitly in applications but these should serve as better default values than the default values in ThreadPoolTaskExecutor. Issue: SPR-11450 --- .../AbstractMessageBrokerConfiguration.java | 55 +++++++------ .../simp/config/ChannelRegistration.java | 24 +++--- .../simp/config/TaskExecutorRegistration.java | 66 +++++++++++----- .../MessageBrokerConfigurationTests.java | 79 +++++++++++++------ .../MessageBrokerBeanDefinitionParser.java | 3 + .../socket/config/spring-websocket-4.0.xsd | 29 +++++-- ...essageBrokerBeanDefinitionParserTests.java | 8 +- 7 files changed, 175 insertions(+), 89 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index e26848b7fd4..6a61943a426 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -17,7 +17,13 @@ package org.springframework.messaging.simp.config; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; @@ -93,17 +99,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientInboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); - ChannelRegistration r = getClientInboundChannelRegistration(); - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getClientInboundChannelRegistration(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor clientInboundChannelExecutor() { - TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration(); + ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientInboundChannel-"); return executor; } @@ -129,17 +133,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel clientOutboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); - ChannelRegistration r = getClientOutboundChannelRegistration(); - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getClientOutboundChannelRegistration(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { - TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration(); + ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientOutboundChannel-"); return executor; } @@ -162,24 +164,27 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel brokerChannel() { - ChannelRegistration r = getBrokerRegistry().getBrokerChannelRegistration(); - ExecutorSubscribableChannel channel; - if (r.hasTaskExecutor()) { - channel = new ExecutorSubscribableChannel(); // synchronous by default - } - else { - channel = new ExecutorSubscribableChannel(brokerChannelExecutor()); - } - if (r.hasInterceptors()) { - channel.setInterceptors(r.getInterceptors()); - } + ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); + ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ? + new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel(); + channel.setInterceptors(reg.getInterceptors()); return channel; } @Bean public ThreadPoolTaskExecutor brokerChannelExecutor() { - TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration(); - ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); + ThreadPoolTaskExecutor executor; + if (reg.hasTaskExecutor()) { + executor = reg.taskExecutor().getTaskExecutor(); + } + else { + // Should never be used + executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(0); + executor.setMaxPoolSize(1); + executor.setQueueCapacity(0); + } executor.setThreadNamePrefix("brokerChannel-"); return executor; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java index 9260c909a60..f1b2e137418 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,17 +31,19 @@ import org.springframework.messaging.support.ChannelInterceptor; */ public class ChannelRegistration { - private TaskExecutorRegistration taskExecutorRegistration; + private TaskExecutorRegistration registration; - private List interceptors = new ArrayList(); + private final List interceptors = new ArrayList(); /** - * Configure properties of the ThreadPoolTaskExecutor backing the message channel. + * Configure the thread pool backing this message channel. */ public TaskExecutorRegistration taskExecutor() { - this.taskExecutorRegistration = new TaskExecutorRegistration(); - return this.taskExecutorRegistration; + if (this.registration == null) { + this.registration = new TaskExecutorRegistration(); + } + return this.registration; } /** @@ -56,11 +58,15 @@ public class ChannelRegistration { protected boolean hasTaskExecutor() { - return (this.taskExecutorRegistration != null); + return (this.registration != null); + } + + protected TaskExecutorRegistration getTaskExecRegistration() { + return this.registration; } - protected TaskExecutorRegistration getTaskExecutorRegistration() { - return this.taskExecutorRegistration; + protected TaskExecutorRegistration getOrCreateTaskExecRegistration() { + return taskExecutor(); } protected boolean hasInterceptors() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java index afa1ce81ff7..35955cd6666 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.messaging.simp.config; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + /** * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. * @@ -26,18 +27,28 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; */ public class TaskExecutorRegistration { - private int corePoolSize = 1; + private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; private int maxPoolSize = Integer.MAX_VALUE; - private int keepAliveSeconds = 60; - private int queueCapacity = Integer.MAX_VALUE; + private int keepAliveSeconds = 60; + /** - * Set the ThreadPoolExecutor's core pool size. - * Default is 1. + * Set the core pool size of the ThreadPoolExecutor. + * + *

NOTE: the core pool size is effectively the max pool size + * when an unbounded {@link #queueCapacity(int) queueCapacity} is configured + * (the default). This is essentially the "Unbounded queues" strategy as explained + * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When + * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. + * + *

By default this is set to twice the value of + * {@link Runtime#availableProcessors()}. In an an application where tasks do not + * block frequently, the number should be closer to or equal to the number of + * available CPUs/cores. */ public TaskExecutorRegistration corePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; @@ -45,8 +56,15 @@ public class TaskExecutorRegistration { } /** - * Set the ThreadPoolExecutor's maximum pool size. - * Default is {@code Integer.MAX_VALUE}. + * Set the max pool size of the ThreadPoolExecutor. + * + *

NOTE: when an unbounded + * {@link #queueCapacity(int) queueCapacity} is configured (the default), the + * max pool size is effectively ignored. See the "Unbounded queues" strategy + * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for + * more details. + * + *

By default this is set to {@code Integer.MAX_VALUE}. */ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; @@ -54,24 +72,32 @@ public class TaskExecutorRegistration { } /** - * Set the ThreadPoolExecutor's keep-alive seconds. - * Default is 60. + * Set the queue capacity for the ThreadPoolExecutor. + * + *

NOTE: when an unbounded + * {@link #queueCapacity(int) queueCapacity} is configured (the default) the + * core pool size is effectively the max pool size. This is essentially the + * "Unbounded queues" strategy as explained in + * {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When + * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. + * + *

By default this is set to {@code Integer.MAX_VALUE}. */ - public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { - this.keepAliveSeconds = keepAliveSeconds; + public TaskExecutorRegistration queueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; return this; } /** - * Set the capacity for the ThreadPoolExecutor's BlockingQueue. - * Default is {@code Integer.MAX_VALUE}. - *

Any positive value will lead to a LinkedBlockingQueue instance; - * any other value will lead to a SynchronousQueue instance. - * @see java.util.concurrent.LinkedBlockingQueue - * @see java.util.concurrent.SynchronousQueue + * Set the time limit for which threads may remain idle before being terminated. + * If there are more than the core number of threads currently in the pool, + * after waiting this amount of time without processing a task, excess threads + * will be terminated. This overrides any value set in the constructor. + * + *

By default this is set to 60. */ - public TaskExecutorRegistration queueCapacity(int queueCapacity) { - this.queueCapacity = queueCapacity; + public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; return this; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 090b402c232..dd2e973ba84 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.StaticApplicationContext; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.converter.*; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -66,24 +67,30 @@ import static org.junit.Assert.*; */ public class MessageBrokerConfigurationTests { - private AnnotationConfigApplicationContext simpleContext; + private AnnotationConfigApplicationContext simpleBrokerContext; private AnnotationConfigApplicationContext brokerRelayContext; + private AnnotationConfigApplicationContext defaultContext; + private AnnotationConfigApplicationContext customChannelContext; @Before public void setupOnce() { - this.simpleContext = new AnnotationConfigApplicationContext(); - this.simpleContext.register(SimpleConfig.class); - this.simpleContext.refresh(); + this.simpleBrokerContext = new AnnotationConfigApplicationContext(); + this.simpleBrokerContext.register(SimpleBrokerConfig.class); + this.simpleBrokerContext.refresh(); this.brokerRelayContext = new AnnotationConfigApplicationContext(); this.brokerRelayContext.register(BrokerRelayConfig.class); this.brokerRelayContext.refresh(); + this.defaultContext = new AnnotationConfigApplicationContext(); + this.defaultContext.register(DefaultConfig.class); + this.defaultContext.refresh(); + this.customChannelContext = new AnnotationConfigApplicationContext(); this.customChannelContext.register(CustomChannelConfig.class); this.customChannelContext.refresh(); @@ -93,13 +100,13 @@ public class MessageBrokerConfigurationTests { @Test public void clientInboundChannel() { - TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientInboundChannel", TestChannel.class); Set handlers = channel.getSubscribers(); assertEquals(3, handlers.size()); - assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class))); } @Test @@ -130,8 +137,8 @@ public class MessageBrokerConfigurationTests { @Test public void clientOutboundChannelUsedByAnnotatedMethod() { - TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class); - SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class); + SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); headers.setSessionId("sess1"); @@ -151,8 +158,8 @@ public class MessageBrokerConfigurationTests { @Test public void clientOutboundChannelUsedBySimpleBroker() { - TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class); - SimpleBrokerMessageHandler broker = this.simpleContext.getBean(SimpleBrokerMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class); + SimpleBrokerMessageHandler broker = this.simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); headers.setSessionId("sess1"); @@ -197,12 +204,14 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannel() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); Set handlers = channel.getSubscribers(); assertEquals(2, handlers.size()); - assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class))); - assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class))); + assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class))); + + assertNull(channel.getExecutor()); } @Test @@ -217,8 +226,9 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannelUsedByAnnotatedMethod() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); - SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); + SimpAnnotationMethodMessageHandler messageHandler = + this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setDestination("/foo"); @@ -236,10 +246,10 @@ public class MessageBrokerConfigurationTests { @Test public void brokerChannelUsedByUserDestinationMessageHandler() { - TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class); - UserDestinationMessageHandler messageHandler = this.simpleContext.getBean(UserDestinationMessageHandler.class); + TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class); + UserDestinationMessageHandler messageHandler = this.simpleBrokerContext.getBean(UserDestinationMessageHandler.class); - this.simpleContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1"); + this.simpleBrokerContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1"); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setDestination("/user/joe/foo"); @@ -285,6 +295,24 @@ public class MessageBrokerConfigurationTests { assertEquals(MimeTypeUtils.APPLICATION_JSON, ((DefaultContentTypeResolver) resolver).getDefaultMimeType()); } + @Test + public void threadPoolSizeDefault() { + + String name = "clientInboundChannelExecutor"; + ThreadPoolTaskExecutor executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize()); + // No way to verify queue capacity + + name = "clientOutboundChannelExecutor"; + executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize()); + + name = "brokerChannelExecutor"; + executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class); + assertEquals(0, executor.getCorePoolSize()); + assertEquals(1, executor.getMaxPoolSize()); + } + @Test public void configureMessageConvertersCustom() { final MessageConverter testConverter = Mockito.mock(MessageConverter.class); @@ -360,7 +388,7 @@ public class MessageBrokerConfigurationTests { @Test public void simpValidatorInjected() { SimpAnnotationMethodMessageHandler messageHandler = - this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class); + this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class); assertThat(messageHandler.getValidator(), Matchers.notNullValue(Validator.class)); } @@ -381,8 +409,9 @@ public class MessageBrokerConfigurationTests { } } + @Configuration - static class SimpleConfig extends AbstractMessageBrokerConfiguration { + static class SimpleBrokerConfig extends AbstractMessageBrokerConfiguration { @Bean public TestController subscriptionController() { @@ -409,7 +438,7 @@ public class MessageBrokerConfigurationTests { } @Configuration - static class BrokerRelayConfig extends SimpleConfig { + static class BrokerRelayConfig extends SimpleBrokerConfig { @Override public void configureMessageBroker(MessageBrokerRegistry registry) { @@ -417,6 +446,10 @@ public class MessageBrokerConfigurationTests { } } + @Configuration + static class DefaultConfig extends AbstractMessageBrokerConfiguration { + } + @Configuration static class CustomChannelConfig extends AbstractMessageBrokerConfiguration { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 3c322807b21..c2cf38e322e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -201,6 +201,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { } else if (!channelName.equals("brokerChannel")) { executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class); + executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2); + executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE); + executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE); } ConstructorArgumentValues values = new ConstructorArgumentValues(); diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd index 2d192a336dd..0b902c9d425 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd @@ -152,7 +152,7 @@ @@ -346,31 +346,44 @@ diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index b69dc175e91..2efd503057d 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -136,11 +136,11 @@ public class MessageBrokerBeanDefinitionParserTests { Arrays.>asList(SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 0); - testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList(SubProtocolWebSocketHandler.class); testChannel("clientOutboundChannel", subscriberTypes, 0); - testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList( SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class); @@ -199,11 +199,11 @@ public class MessageBrokerBeanDefinitionParserTests { Arrays.>asList(SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 0); - testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList(SubProtocolWebSocketHandler.class); testChannel("clientOutboundChannel", subscriberTypes, 0); - testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60); + testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); subscriberTypes = Arrays.>asList( StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);