diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index 2f6380d002f..df7353cfbc0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -126,13 +126,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC public AbstractSubscribableChannel clientInboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); ChannelRegistration reg = getClientInboundChannelRegistration(); - channel.setInterceptors(reg.getInterceptors()); + if (reg.hasInterceptors()) { + channel.setInterceptors(reg.getInterceptors()); + } return channel; } @Bean public ThreadPoolTaskExecutor clientInboundChannelExecutor() { - TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration(); + TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor(); ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientInboundChannel-"); return executor; @@ -142,7 +144,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC if (this.clientInboundChannelRegistration == null) { ChannelRegistration registration = new ChannelRegistration(); configureClientInboundChannel(registration); - registration.setInterceptors(new ImmutableMessageChannelInterceptor()); + registration.interceptors(new ImmutableMessageChannelInterceptor()); this.clientInboundChannelRegistration = registration; } return this.clientInboundChannelRegistration; @@ -159,13 +161,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC public AbstractSubscribableChannel clientOutboundChannel() { ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); ChannelRegistration reg = getClientOutboundChannelRegistration(); - channel.setInterceptors(reg.getInterceptors()); + if (reg.hasInterceptors()) { + channel.setInterceptors(reg.getInterceptors()); + } return channel; } @Bean public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { - TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration(); + TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor(); ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); executor.setThreadNamePrefix("clientOutboundChannel-"); return executor; @@ -175,7 +179,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC if (this.clientOutboundChannelRegistration == null) { ChannelRegistration registration = new ChannelRegistration(); configureClientOutboundChannel(registration); - registration.setInterceptors(new ImmutableMessageChannelInterceptor()); + registration.interceptors(new ImmutableMessageChannelInterceptor()); this.clientOutboundChannelRegistration = registration; } return this.clientOutboundChannelRegistration; @@ -191,9 +195,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean public AbstractSubscribableChannel brokerChannel() { ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); - ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ? - new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel(); - reg.setInterceptors(new ImmutableMessageChannelInterceptor()); + ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ? + new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel()); + reg.interceptors(new ImmutableMessageChannelInterceptor()); channel.setInterceptors(reg.getInterceptors()); return channel; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java index e59d2997594..7cb9f095cd2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java @@ -43,43 +43,43 @@ public class ChannelRegistration { * Configure the thread pool backing this message channel. */ public TaskExecutorRegistration taskExecutor() { - if (this.registration == null) { - this.registration = new TaskExecutorRegistration(); - } - return this.registration; + return taskExecutor(null); } /** * Configure the thread pool backing this message channel using a custom * ThreadPoolTaskExecutor. + * @param taskExecutor the executor to use (or {@code null} for a default executor) */ - public TaskExecutorRegistration taskExecutor(ThreadPoolTaskExecutor taskExecutor) { + public TaskExecutorRegistration taskExecutor(@Nullable ThreadPoolTaskExecutor taskExecutor) { if (this.registration == null) { - this.registration = new TaskExecutorRegistration(taskExecutor); + this.registration = (taskExecutor != null ? new TaskExecutorRegistration(taskExecutor) : + new TaskExecutorRegistration()); } return this.registration; } /** - * Configure interceptors for the message channel. + * Configure the given interceptors for this message channel, + * adding them to the channel's current list of interceptors. + * @since 4.3.12 */ - public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) { + public ChannelRegistration interceptors(ChannelInterceptor... interceptors) { this.interceptors.addAll(Arrays.asList(interceptors)); return this; } - - protected boolean hasTaskExecutor() { - return (this.registration != null); + /** + * @deprecated as of 4.3.12, in favor of {@link #interceptors(ChannelInterceptor...)} + */ + @Deprecated + public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) { + return interceptors(interceptors); } - @Nullable - protected TaskExecutorRegistration getTaskExecRegistration() { - return this.registration; - } - protected TaskExecutorRegistration getOrCreateTaskExecRegistration() { - return taskExecutor(); + protected boolean hasTaskExecutor() { + return (this.registration != null); } protected boolean hasInterceptors() { @@ -89,4 +89,5 @@ public class ChannelRegistration { protected List getInterceptors() { return this.interceptors; } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java index 45abd159748..d6954898e53 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -18,34 +18,53 @@ package org.springframework.messaging.simp.config; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.Assert; /** * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. * * @author Rossen Stoyanchev + * @author Juergen Hoeller * @since 4.0 */ public class TaskExecutorRegistration { - @Nullable - private ThreadPoolTaskExecutor taskExecutor; + private final ThreadPoolTaskExecutor taskExecutor; - private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + @Nullable + private Integer corePoolSize; - private int maxPoolSize = Integer.MAX_VALUE; + @Nullable + private Integer maxPoolSize; - private int queueCapacity = Integer.MAX_VALUE; + @Nullable + private Integer keepAliveSeconds; - private int keepAliveSeconds = 60; + @Nullable + private Integer queueCapacity; + /** + * Create a new {@code TaskExecutorRegistration} for a default + * {@link ThreadPoolTaskExecutor}. + */ public TaskExecutorRegistration() { + this.taskExecutor = new ThreadPoolTaskExecutor(); + this.taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); + this.taskExecutor.setAllowCoreThreadTimeOut(true); } + /** + * Create a new {@code TaskExecutorRegistration} for a given + * {@link ThreadPoolTaskExecutor}. + * @param taskExecutor the executor to use + */ public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) { + Assert.notNull(taskExecutor, "ThreadPoolTaskExecutor must not be null"); this.taskExecutor = taskExecutor; } + /** * Set the core pool size of the ThreadPoolExecutor. *

NOTE: The core pool size is effectively the max pool size @@ -77,6 +96,18 @@ public class TaskExecutorRegistration { return this; } + /** + * Set the time limit for which threads may remain idle before being terminated. + * If there are more than the core number of threads currently in the pool, + * after waiting this amount of time without processing a task, excess threads + * will be terminated. This overrides any value set in the constructor. + *

By default this is set to 60. + */ + public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + return this; + } + /** * Set the queue capacity for the ThreadPoolExecutor. *

NOTE: when an unbounded {@code queueCapacity} is configured @@ -91,26 +122,21 @@ public class TaskExecutorRegistration { return this; } - /** - * Set the time limit for which threads may remain idle before being terminated. - * If there are more than the core number of threads currently in the pool, - * after waiting this amount of time without processing a task, excess threads - * will be terminated. This overrides any value set in the constructor. - *

By default this is set to 60. - */ - public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { - this.keepAliveSeconds = keepAliveSeconds; - return this; - } protected ThreadPoolTaskExecutor getTaskExecutor() { - ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor()); - executor.setCorePoolSize(this.corePoolSize); - executor.setMaxPoolSize(this.maxPoolSize); - executor.setKeepAliveSeconds(this.keepAliveSeconds); - executor.setQueueCapacity(this.queueCapacity); - executor.setAllowCoreThreadTimeOut(true); - return executor; + if (this.corePoolSize != null) { + this.taskExecutor.setCorePoolSize(this.corePoolSize); + } + if (this.maxPoolSize != null) { + this.taskExecutor.setMaxPoolSize(this.maxPoolSize); + } + if (this.keepAliveSeconds != null) { + this.taskExecutor.setKeepAliveSeconds(this.keepAliveSeconds); + } + if (this.queueCapacity != null) { + this.taskExecutor.setQueueCapacity(this.queueCapacity); + } + return this.taskExecutor; } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 65837755d79..77a0b7265e6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -512,14 +512,14 @@ public class MessageBrokerConfigurationTests { @Override protected void configureClientInboundChannel(ChannelRegistration registration) { - registration.setInterceptors(this.interceptor); + registration.interceptors(this.interceptor); registration.taskExecutor(new CustomThreadPoolTaskExecutor()) .corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14); } @Override protected void configureClientOutboundChannel(ChannelRegistration registration) { - registration.setInterceptors(this.interceptor, this.interceptor); + registration.interceptors(this.interceptor, this.interceptor); registration.taskExecutor().corePoolSize(21).maxPoolSize(22).keepAliveSeconds(23).queueCapacity(24); } @@ -535,7 +535,7 @@ public class MessageBrokerConfigurationTests { @Override protected void configureMessageBroker(MessageBrokerRegistry registry) { - registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor); + registry.configureBrokerChannel().interceptors(this.interceptor, this.interceptor, this.interceptor); registry.configureBrokerChannel().taskExecutor().corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34); registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue"); registry.setCacheLimit(8192);