diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index a12f3ca1580..6c5945ac0e6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -59,6 +59,10 @@ public abstract class AbstractMessageBrokerConfiguration { "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader()); + private ChannelRegistration clientInboundChannelRegistration; + + private ChannelRegistration clientOutboundChannelRegistration; + private MessageBrokerRegistry brokerRegistry; @@ -69,55 +73,117 @@ public abstract class AbstractMessageBrokerConfiguration { } - /** - * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation - * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. - */ - protected final MessageBrokerRegistry getBrokerRegistry() { - if (this.brokerRegistry == null) { - MessageBrokerRegistry registry = new MessageBrokerRegistry(clientOutboundChannel()); - configureMessageBroker(registry); - this.brokerRegistry = registry; - } - return this.brokerRegistry; - } - - /** - * A hook for sub-classes to customize message broker configuration through the - * provided {@link MessageBrokerRegistry} instance. - */ - protected abstract void configureMessageBroker(MessageBrokerRegistry registry); - - @Bean public AbstractSubscribableChannel clientInboundChannel() { - return new ExecutorSubscribableChannel(clientInboundChannelExecutor()); + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); + ChannelRegistration r = getClientInboundChannelRegistration(); + if (r.hasInterceptors()) { + channel.setInterceptors(r.getInterceptors()); + } + return channel; } @Bean public ThreadPoolTaskExecutor clientInboundChannelExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration(); + ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("ClientInboundChannel-"); return executor; } + protected final ChannelRegistration getClientInboundChannelRegistration() { + if (this.clientInboundChannelRegistration == null) { + ChannelRegistration registration = new ChannelRegistration(); + configureClientInboundChannel(registration); + this.clientInboundChannelRegistration = registration; + } + return this.clientInboundChannelRegistration; + } + + + /** + * A hook for sub-classes to customize the message channel for inbound messages + * from WebSocket clients. + */ + protected abstract void configureClientInboundChannel(ChannelRegistration registration); + + @Bean public AbstractSubscribableChannel clientOutboundChannel() { - return new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); + ChannelRegistration r = getClientOutboundChannelRegistration(); + if (r.hasInterceptors()) { + channel.setInterceptors(r.getInterceptors()); + } + return channel; } @Bean public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration(); + ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("ClientOutboundChannel-"); return executor; } + protected final ChannelRegistration getClientOutboundChannelRegistration() { + if (this.clientOutboundChannelRegistration == null) { + ChannelRegistration registration = new ChannelRegistration(); + configureClientOutboundChannel(registration); + this.clientOutboundChannelRegistration = registration; + } + return this.clientOutboundChannelRegistration; + } + + /** + * A hook for sub-classes to customize the message channel for messages from + * the application or message broker to WebSocket clients. + */ + protected abstract void configureClientOutboundChannel(ChannelRegistration registration); + @Bean public AbstractSubscribableChannel brokerChannel() { - return new ExecutorSubscribableChannel(); // synchronous + ChannelRegistration r = getBrokerRegistry().getBrokerChannelRegistration(); + ExecutorSubscribableChannel channel; + if (r.hasTaskExecutor()) { + channel = new ExecutorSubscribableChannel(); // synchronous by default + } + else { + channel = new ExecutorSubscribableChannel(brokerChannelExecutor()); + } + if (r.hasInterceptors()) { + channel.setInterceptors(r.getInterceptors()); + } + return channel; + } + + @Bean + public ThreadPoolTaskExecutor brokerChannelExecutor() { + TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration(); + ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("BrokerChannel-"); + return executor; + } + + /** + * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation + * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. + */ + protected final MessageBrokerRegistry getBrokerRegistry() { + if (this.brokerRegistry == null) { + MessageBrokerRegistry registry = new MessageBrokerRegistry(clientOutboundChannel()); + configureMessageBroker(registry); + this.brokerRegistry = registry; + } + return this.brokerRegistry; } + /** + * A hook for sub-classes to customize message broker configuration through the + * provided {@link MessageBrokerRegistry} instance. + */ + protected abstract void configureMessageBroker(MessageBrokerRegistry registry); + @Bean public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java new file mode 100644 index 00000000000..52103d9dd07 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java @@ -0,0 +1,74 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.messaging.support.channel.ChannelInterceptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +/** + * A registration class for customizing the configuration for a + * {@link org.springframework.messaging.MessageChannel}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class ChannelRegistration { + + private TaskExecutorRegistration taskExecutorRegistration; + + private List interceptors = new ArrayList(); + + + /** + * Configure properties of the ThreadPoolTaskExecutor backing the message channel. + */ + public TaskExecutorRegistration taskExecutor() { + this.taskExecutorRegistration = new TaskExecutorRegistration(); + return this.taskExecutorRegistration; + } + + /** + * Configure interceptors for the message channel. + */ + public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) { + if (interceptors != null) { + this.interceptors.addAll(Arrays.asList(interceptors)); + } + return this; + } + + + protected boolean hasTaskExecutor() { + return (this.taskExecutorRegistration != null); + } + + protected TaskExecutorRegistration getTaskExecutorRegistration() { + return this.taskExecutorRegistration; + } + + protected boolean hasInterceptors() { + return !this.interceptors.isEmpty(); + } + + protected List getInterceptors() { + return this.interceptors; + } +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index f19db80bc81..a250ff73749 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -42,6 +42,8 @@ public class MessageBrokerRegistry { private String userDestinationPrefix; + private ChannelRegistration brokerChannelRegistration = new ChannelRegistration(); + public MessageBrokerRegistry(MessageChannel clientOutboundChannel) { Assert.notNull(clientOutboundChannel); @@ -103,6 +105,17 @@ public class MessageBrokerRegistry { return this; } + /** + * Customize the channel used to send messages from the application to the message + * broker. By default messages from the application to the message broker are sent + * synchronously, which means application code sending a message will find out + * if the message cannot be sent through an exception. However, this can be changed + * if the broker channel is configured here with task executor properties. + */ + public ChannelRegistration configureBrokerChannel() { + return this.brokerChannelRegistration; + } + protected SimpleBrokerMessageHandler getSimpleBroker() { initSimpleBrokerIfNecessary(); @@ -127,4 +140,8 @@ public class MessageBrokerRegistry { protected String getUserDestinationPrefix() { return this.userDestinationPrefix; } + + protected ChannelRegistration getBrokerChannelRegistration() { + return this.brokerChannelRegistration; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java new file mode 100644 index 00000000000..afa1ce81ff7 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +/** + * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class TaskExecutorRegistration { + + private int corePoolSize = 1; + + private int maxPoolSize = Integer.MAX_VALUE; + + private int keepAliveSeconds = 60; + + private int queueCapacity = Integer.MAX_VALUE; + + + /** + * Set the ThreadPoolExecutor's core pool size. + * Default is 1. + */ + public TaskExecutorRegistration corePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + return this; + } + + /** + * Set the ThreadPoolExecutor's maximum pool size. + * Default is {@code Integer.MAX_VALUE}. + */ + public TaskExecutorRegistration maxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + return this; + } + + /** + * Set the ThreadPoolExecutor's keep-alive seconds. + * Default is 60. + */ + public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { + this.keepAliveSeconds = keepAliveSeconds; + return this; + } + + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue. + * Default is {@code Integer.MAX_VALUE}. + *

Any positive value will lead to a LinkedBlockingQueue instance; + * any other value will lead to a SynchronousQueue instance. + * @see java.util.concurrent.LinkedBlockingQueue + * @see java.util.concurrent.SynchronousQueue + */ + public TaskExecutorRegistration queueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + return this; + } + + protected ThreadPoolTaskExecutor getTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(this.corePoolSize); + executor.setMaxPoolSize(this.maxPoolSize); + executor.setKeepAliveSeconds(this.keepAliveSeconds); + executor.setQueueCapacity(this.queueCapacity); + return executor; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java index 75d45c26839..440f64ba55f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java @@ -57,6 +57,10 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { } + public Executor getExecutor() { + return this.executor; + } + @Override protected boolean hasSubscription(MessageHandler handler) { return this.handlers.contains(handler); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 70907050be7..d803b336e30 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -36,13 +36,17 @@ import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.channel.AbstractSubscribableChannel; +import org.springframework.messaging.support.channel.ChannelInterceptor; +import org.springframework.messaging.support.channel.ChannelInterceptorAdapter; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; import org.springframework.messaging.support.converter.CompositeMessageConverter; import org.springframework.messaging.support.converter.DefaultContentTypeResolver; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Controller; import org.springframework.util.MimeTypeUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.junit.Assert.*; @@ -59,6 +63,8 @@ public class MessageBrokerConfigurationTests { private AnnotationConfigApplicationContext cxtStompBroker; + private AnnotationConfigApplicationContext cxtCustomizedChannelConfig; + @Before public void setupOnce() { @@ -70,6 +76,10 @@ public class MessageBrokerConfigurationTests { this.cxtStompBroker = new AnnotationConfigApplicationContext(); this.cxtStompBroker.register(TestStompMessageBrokerConfig.class); this.cxtStompBroker.refresh(); + + this.cxtCustomizedChannelConfig = new AnnotationConfigApplicationContext(); + this.cxtCustomizedChannelConfig.register(CustomizedChannelConfig.class); + this.cxtCustomizedChannelConfig.refresh(); } @@ -96,6 +106,22 @@ public class MessageBrokerConfigurationTests { assertTrue(values.contains(cxtStompBroker.getBean(StompBrokerRelayMessageHandler.class))); } + @Test + public void clientInboundChannelCustomized() { + + AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean( + "clientInboundChannel", AbstractSubscribableChannel.class); + + assertEquals(1, channel.getInterceptors().size()); + + ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean( + "clientInboundChannelExecutor", ThreadPoolTaskExecutor.class); + + assertEquals(11, taskExecutor.getCorePoolSize()); + assertEquals(12, taskExecutor.getMaxPoolSize()); + assertEquals(13, taskExecutor.getKeepAliveSeconds()); + } + @Test public void clientOutboundChannelUsedByAnnotatedMethod() { @@ -148,6 +174,22 @@ public class MessageBrokerConfigurationTests { assertEquals("bar", new String((byte[]) message.getPayload())); } + @Test + public void clientOutboundChannelCustomized() { + + AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean( + "clientOutboundChannel", AbstractSubscribableChannel.class); + + assertEquals(2, channel.getInterceptors().size()); + + ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean( + "clientOutboundChannelExecutor", ThreadPoolTaskExecutor.class); + + assertEquals(21, taskExecutor.getCorePoolSize()); + assertEquals(22, taskExecutor.getMaxPoolSize()); + assertEquals(23, taskExecutor.getKeepAliveSeconds()); + } + @Test public void brokerChannel() { TestChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", TestChannel.class); @@ -207,6 +249,22 @@ public class MessageBrokerConfigurationTests { assertEquals("/foo-users1", headers.getDestination()); } + @Test + public void brokerChannelCustomized() { + + AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean( + "brokerChannel", AbstractSubscribableChannel.class); + + assertEquals(3, channel.getInterceptors().size()); + + ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean( + "brokerChannelExecutor", ThreadPoolTaskExecutor.class); + + assertEquals(31, taskExecutor.getCorePoolSize()); + assertEquals(32, taskExecutor.getMaxPoolSize()); + assertEquals(33, taskExecutor.getKeepAliveSeconds()); + } + @Test public void messageConverter() { CompositeMessageConverter messageConverter = this.cxtStompBroker.getBean( @@ -240,9 +298,6 @@ public class MessageBrokerConfigurationTests { return new TestController(); } - @Override - protected void configureMessageBroker(MessageBrokerRegistry registry) { - } @Override @Bean @@ -250,16 +305,29 @@ public class MessageBrokerConfigurationTests { return new TestChannel(); } + @Override + protected void configureClientInboundChannel(ChannelRegistration registration) { + } + @Override @Bean public AbstractSubscribableChannel clientOutboundChannel() { return new TestChannel(); } + @Override + protected void configureClientOutboundChannel(ChannelRegistration registration) { + } + @Override public AbstractSubscribableChannel brokerChannel() { return new TestChannel(); } + + @Override + protected void configureMessageBroker(MessageBrokerRegistry registry) { + } + } @Configuration @@ -271,6 +339,32 @@ public class MessageBrokerConfigurationTests { } } + @Configuration + static class CustomizedChannelConfig extends AbstractMessageBrokerConfiguration { + + private ChannelInterceptor interceptor = new ChannelInterceptorAdapter(); + + + @Override + protected void configureClientInboundChannel(ChannelRegistration registration) { + registration.setInterceptors(this.interceptor); + registration.taskExecutor().corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14); + } + + @Override + protected void configureClientOutboundChannel(ChannelRegistration registration) { + registration.setInterceptors(this.interceptor, this.interceptor); + registration.taskExecutor().corePoolSize(21).maxPoolSize(22).keepAliveSeconds(23).queueCapacity(24); + } + + @Override + protected void configureMessageBroker(MessageBrokerRegistry registry) { + registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor); + registry.configureBrokerChannel().taskExecutor() + .corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34); + } + } + private static class TestChannel extends ExecutorSubscribableChannel { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java index 3cb0a5e1493..9c84f2c2041 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java @@ -21,6 +21,7 @@ import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.util.CollectionUtils; @@ -58,6 +59,20 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess } } + @Override + protected void configureClientInboundChannel(ChannelRegistration registration) { + for (WebSocketMessageBrokerConfigurer c : this.configurers) { + c.configureClientInboundChannel(registration); + } + } + + @Override + protected void configureClientOutboundChannel(ChannelRegistration registration) { + for (WebSocketMessageBrokerConfigurer c : this.configurers) { + c.configureClientOutboundChannel(registration); + } + } + @Override protected void configureMessageBroker(MessageBrokerRegistry registry) { for (WebSocketMessageBrokerConfigurer c : this.configurers) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java index 5a3e00b75c9..f7bba9c58fe 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java @@ -17,6 +17,7 @@ package org.springframework.web.socket.messaging.config; +import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; /** @@ -35,6 +36,22 @@ public interface WebSocketMessageBrokerConfigurer { */ void registerStompEndpoints(StompEndpointRegistry registry); + /** + * Configure the {@link org.springframework.messaging.MessageChannel} used for + * incoming messages from WebSocket clients. By default the channel is backed + * by a thread pool of size 1. It is recommended to customize thread pool + * settings for production use. + */ + void configureClientInboundChannel(ChannelRegistration registration); + + /** + * Configure the {@link org.springframework.messaging.MessageChannel} used for + * incoming messages from WebSocket clients. By default the channel is backed + * by a thread pool of size 1. It is recommended to customize thread pool + * settings for production use. + */ + void configureClientOutboundChannel(ChannelRegistration registration); + /** * Configure message broker options. */ diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java index 377a4d1e1ad..27879b0152d 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java @@ -36,6 +36,7 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.support.channel.AbstractSubscribableChannel; @@ -215,6 +216,14 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg registry.addEndpoint("/ws").setHandshakeHandler(this.handshakeHandler); } + @Override + public void configureClientInboundChannel(ChannelRegistration registration) { + } + + @Override + public void configureClientOutboundChannel(ChannelRegistration registration) { + } + @Override public void configureMessageBroker(MessageBrokerRegistry configurer) { configurer.setApplicationDestinationPrefixes("/app"); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java index 352f5b002d8..944a44f4c58 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java @@ -27,6 +27,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.annotation.SubscribeMapping; +import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @@ -119,20 +120,29 @@ public class WebSocketMessageBrokerConfigurationSupportTests { @Configuration static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { + @Bean + public TestController subscriptionController() { + return new TestController(); + } + @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/simpleBroker"); } @Override - public void configureMessageBroker(MessageBrokerRegistry configurer) { - // SimpleBroker used by default + public void configureClientInboundChannel(ChannelRegistration registration) { } - @Bean - public TestController subscriptionController() { - return new TestController(); + @Override + public void configureClientOutboundChannel(ChannelRegistration registration) { } + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + // SimpleBroker used by default + } + } @Configuration