diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java index 42c578062e3..2909685fa47 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java @@ -31,10 +31,10 @@ import org.springframework.util.Assert; /** * A {@link HandlerMethodReturnValueHandler} for replying directly to a subscription. It - * supports methods annotated with {@link SubscribeEvent} that do not also annotated with - * neither {@link ReplyTo} nor {@link ReplyToUser}. - * - *

The value returned from the method is converted, and turned to a {@link Message} and + * supports methods annotated with {@link SubscribeEvent} unless they're also annotated + * with {@link ReplyTo} or {@link ReplyToUser}. + *

+ * The value returned from the method is converted, and turned to a {@link Message} and * then enriched with the sessionId, subscriptionId, and destination of the input message. * The message is then sent directly back to the connected client. * @@ -73,8 +73,7 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn String destination = inputHeaders.getDestination(); Assert.state(inputHeaders.getSubscriptionId() != null, - "No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: " - + returnType.getMethod()); + "No subsriptiondId in input message to method " + returnType.getMethod()); MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(sessionId, subscriptionId); this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java index f66e8d5746f..4bd710bb867 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java @@ -30,8 +30,7 @@ import org.springframework.web.socket.sockjs.transport.handler.WebSocketTranspor /** - * A helper class for configuring STOMP protocol handling over WebSocket - * with optional SockJS fallback options. + * An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints. * * @author Rossen Stoyanchev * @since 4.0 @@ -46,33 +45,36 @@ public abstract class AbstractStompEndpointRegistration implements StompEndpo private StompSockJsServiceRegistration sockJsServiceRegistration; - private final TaskScheduler defaultSockJsTaskScheduler; + private final TaskScheduler sockJsTaskScheduler; public AbstractStompEndpointRegistration(String[] paths, SubProtocolWebSocketHandler webSocketHandler, - TaskScheduler defaultSockJsTaskScheduler) { + TaskScheduler sockJsTaskScheduler) { Assert.notEmpty(paths, "No paths specified"); this.paths = paths; this.wsHandler = webSocketHandler; - this.defaultSockJsTaskScheduler = defaultSockJsTaskScheduler; + this.sockJsTaskScheduler = sockJsTaskScheduler; } - protected SubProtocolWebSocketHandler getWsHandler() { - return this.wsHandler; - } - + /** + * Provide a custom or pre-configured {@link HandshakeHandler}. This property is + * optional. + */ @Override public StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) { this.handshakeHandler = handshakeHandler; return this; } + /** + * Enable SockJS fallback options. + */ @Override public SockJsServiceRegistration withSockJS() { - this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.defaultSockJsTaskScheduler); + this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler); if (this.handshakeHandler != null) { WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler); @@ -82,7 +84,7 @@ public abstract class AbstractStompEndpointRegistration implements StompEndpo return this.sockJsServiceRegistration; } - protected M getMappings() { + protected final M getMappings() { M mappings = createMappings(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java index 61f0d4e9972..8784399e70d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java @@ -33,7 +33,7 @@ import reactor.util.Assert; */ public class MessageBrokerConfigurer { - private final MessageChannel webSocketReplyChannel; + private final MessageChannel webSocketResponseChannel; private SimpleBrokerRegistration simpleBroker; @@ -42,18 +42,18 @@ public class MessageBrokerConfigurer { private String[] annotationMethodDestinationPrefixes; - public MessageBrokerConfigurer(MessageChannel webSocketReplyChannel) { - Assert.notNull(webSocketReplyChannel); - this.webSocketReplyChannel = webSocketReplyChannel; + public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) { + Assert.notNull(webSocketResponseChannel); + this.webSocketResponseChannel = webSocketResponseChannel; } public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { - this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, destinationPrefixes); + this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes); return this.simpleBroker; } public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { - this.stompRelay = new StompBrokerRelayRegistration(this.webSocketReplyChannel, destinationPrefixes); + this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes); return this.stompRelay; } @@ -69,7 +69,7 @@ public class MessageBrokerConfigurer { protected void initSimpleBrokerIfNecessary() { if ((this.simpleBroker == null) && (this.stompRelay == null)) { - this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, null); + this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, null); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java index 62fddf2a54f..aef035bc0b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -37,6 +37,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { private String applicationPasscode = "guest"; + private boolean autoStartup = true; + public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { super(webSocketReplyChannel, destinationPrefixes); @@ -52,13 +54,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } - /** - * @return the STOMP message broker host. - */ - protected String getRelayHost() { - return this.relayHost; - } - /** * Set the STOMP message broker port. */ @@ -67,13 +62,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } - /** - * @return the STOMP message broker port. - */ - protected int getRelayPort() { - return this.relayPort; - } - /** * Set the login for a "system" TCP connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). @@ -84,13 +72,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { return this; } - /** - * @return the login for a shared, "system" connection to the STOMP message broker. - */ - protected String getApplicationLogin() { - return this.applicationLogin; - } - /** * Set the passcode for a "system" TCP connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). @@ -102,10 +83,14 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { } /** - * @return the passcode for a shared, "system" connection to the STOMP message broker. + * Configure whether the {@link StompBrokerRelayMessageHandler} should start + * automatically when the Spring ApplicationContext is refreshed. + *

+ * The default setting is {@code true}. */ - protected String getApplicationPasscode() { - return this.applicationPasscode; + public StompBrokerRelayRegistration setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + return this; } @@ -116,6 +101,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { handler.setRelayPort(this.relayPort); handler.setSystemLogin(this.applicationLogin); handler.setSystemPasscode(this.applicationPasscode); + handler.setAutoStartup(this.autoStartup); return handler; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java index 78f8656d2e8..9c0d8d5033c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java @@ -67,7 +67,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { @Bean public SubProtocolWebSocketHandler subProtocolWebSocketHandler() { SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel()); - webSocketReplyChannel().subscribe(wsHandler); + webSocketResponseChannel().subscribe(wsHandler); return wsHandler; } @@ -109,7 +109,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { } @Bean - public SubscribableChannel webSocketReplyChannel() { + public SubscribableChannel webSocketResponseChannel() { return new ExecutorSubscribableChannel(webSocketChannelExecutor()); } @@ -125,7 +125,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { @Bean public AnnotationMethodMessageHandler annotationMethodMessageHandler() { AnnotationMethodMessageHandler handler = - new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketReplyChannel()); + new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel()); handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes()); handler.setMessageConverter(brokerMessageConverter()); webSocketRequestChannel().subscribe(handler); @@ -140,7 +140,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { } else { webSocketRequestChannel().subscribe(handler); - brokerMessageChannel().subscribe(handler); + brokerChannel().subscribe(handler); return handler; } } @@ -153,14 +153,14 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { } else { webSocketRequestChannel().subscribe(handler); - brokerMessageChannel().subscribe(handler); + brokerChannel().subscribe(handler); return handler; } } protected final MessageBrokerConfigurer getMessageBrokerConfigurer() { if (this.messageBrokerConfigurer == null) { - MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketReplyChannel()); + MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketResponseChannel()); configureMessageBroker(configurer); this.messageBrokerConfigurer = configurer; } @@ -175,19 +175,19 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { UserDestinationMessageHandler handler = new UserDestinationMessageHandler( brokerMessagingTemplate(), userQueueSuffixResolver()); webSocketRequestChannel().subscribe(handler); - brokerMessageChannel().subscribe(handler); + brokerChannel().subscribe(handler); return handler; } @Bean public SimpMessageSendingOperations brokerMessagingTemplate() { - SimpMessagingTemplate template = new SimpMessagingTemplate(webSocketRequestChannel()); + SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel()); template.setMessageConverter(brokerMessageConverter()); return template; } @Bean - public SubscribableChannel brokerMessageChannel() { + public SubscribableChannel brokerChannel() { return new ExecutorSubscribableChannel(); // synchronous } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java index bc8be30f88c..17aa1ffc171 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java @@ -47,6 +47,8 @@ public abstract class AbstractBrokerMessageHandler private AtomicBoolean brokerAvailable = new AtomicBoolean(false); + private boolean autoStartup = true; + private Object lifecycleMonitor = new Object(); private volatile boolean running = false; @@ -71,9 +73,13 @@ public abstract class AbstractBrokerMessageHandler return this.eventPublisher; } + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + @Override public boolean isAutoStartup() { - return true; + return this.autoStartup; } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java index 99d1213dcca..4a7e5c0c350 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java @@ -78,7 +78,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati private final SimpMessageSendingOperations brokerTemplate; - private final SimpMessageSendingOperations webSocketReplyTemplate; + private final SimpMessageSendingOperations webSocketResponseTemplate; private Collection destinationPrefixes; @@ -106,15 +106,15 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati /** * @param brokerTemplate a messaging template to sending messages to the broker - * @param webSocketReplyChannel the channel for messages to WebSocket clients + * @param webSocketResponseChannel the channel for messages to WebSocket clients */ public AnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate, - MessageChannel webSocketReplyChannel) { + MessageChannel webSocketResponseChannel) { Assert.notNull(brokerTemplate, "brokerTemplate is required"); - Assert.notNull(webSocketReplyChannel, "webSocketReplyChannel is required"); + Assert.notNull(webSocketResponseChannel, "webSocketReplyChannel is required"); this.brokerTemplate = brokerTemplate; - this.webSocketReplyTemplate = new SimpMessagingTemplate(webSocketReplyChannel); + this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel); } @@ -129,7 +129,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati public void setMessageConverter(MessageConverter converter) { this.messageConverter = converter; if (converter != null) { - ((AbstractMessageSendingTemplate) this.webSocketReplyTemplate).setMessageConverter(converter); + ((AbstractMessageSendingTemplate) this.webSocketResponseTemplate).setMessageConverter(converter); } } @@ -181,7 +181,7 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati // Annotation-based return value types this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate)); - this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketReplyTemplate)); + this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate)); // custom return value types this.returnValueHandlers.addHandlers(this.customReturnValueHandlers); @@ -221,14 +221,14 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati final Class annotationType, MappingInfoCreator mappingInfoCreator, Map handlerMethods) { - Set messageMethods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() { + Set methods = HandlerMethodSelector.selectMethods(handlerType, new MethodFilter() { @Override public boolean matches(Method method) { return AnnotationUtils.findAnnotation(method, annotationType) != null; } }); - for (Method method : messageMethods) { + for (Method method : methods) { A annotation = AnnotationUtils.findAnnotation(method, annotationType); HandlerMethod hm = createHandlerMethod(handler, method); handlerMethods.put(mappingInfoCreator.create(annotation), hm); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java index b446981363b..32fdf94a130 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/AbstractMessageChannel.java @@ -127,4 +127,10 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName protected abstract boolean sendInternal(Message message, long timeout); + + @Override + public String toString() { + return "MessageChannel [name=" + this.beanName + "]"; + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java index dd286be5169..0a81564b7d9 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java @@ -54,7 +54,7 @@ public class AbstractStompEndpointRegistrationTests { } @Test - public void minimal() { + public void minimalRegistration() { TestStompEndpointRegistration registration = new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler); @@ -68,7 +68,7 @@ public class AbstractStompEndpointRegistrationTests { } @Test - public void handshakeHandler() { + public void customHandshakeHandler() { DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); @@ -86,7 +86,7 @@ public class AbstractStompEndpointRegistrationTests { } @Test - public void handshakeHandlerPassedToSockJsService() { + public void customHandshakeHandlerPassedToSockJsService() { DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java new file mode 100644 index 00000000000..7531e8ed208 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.handler.websocket.SubProtocolHandler; +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver; +import org.springframework.messaging.simp.stomp.StompProtocolHandler; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; + +import static org.junit.Assert.*; + + +/** + * Test fixture for {@link ServletStompEndpointRegistry}. + * + * @author Rossen Stoyanchev + */ +public class ServletStompEndpointRegistryTests { + + private ServletStompEndpointRegistry registry; + + private SubProtocolWebSocketHandler webSocketHandler; + + private MutableUserQueueSuffixResolver queueSuffixResolver; + + + @Before + public void setup() { + MessageChannel channel = Mockito.mock(MessageChannel.class); + this.webSocketHandler = new SubProtocolWebSocketHandler(channel); + this.queueSuffixResolver = new SimpleUserQueueSuffixResolver(); + TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class); + this.registry = new ServletStompEndpointRegistry(webSocketHandler, queueSuffixResolver, taskScheduler); + } + + + @Test + public void stompProtocolHandler() { + + this.registry.addEndpoint("/stomp"); + + Map protocolHandlers = webSocketHandler.getProtocolHandlers(); + assertEquals(3, protocolHandlers.size()); + assertNotNull(protocolHandlers.get("v10.stomp")); + assertNotNull(protocolHandlers.get("v11.stomp")); + assertNotNull(protocolHandlers.get("v12.stomp")); + + StompProtocolHandler stompHandler = (StompProtocolHandler) protocolHandlers.get("v10.stomp"); + assertSame(this.queueSuffixResolver, stompHandler.getUserQueueSuffixResolver()); + } + + @Test + public void handlerMapping() { + + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.registry.getHandlerMapping(); + assertEquals(0, hm.getUrlMap().size()); + + this.registry.addEndpoint("/stompOverWebSocket"); + this.registry.addEndpoint("/stompOverSockJS").withSockJS(); + + hm = (SimpleUrlHandlerMapping) this.registry.getHandlerMapping(); + assertEquals(2, hm.getUrlMap().size()); + assertNotNull(hm.getUrlMap().get("/stompOverWebSocket")); + assertNotNull(hm.getUrlMap().get("/stompOverSockJS/**")); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java new file mode 100644 index 00000000000..87217baef5d --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java @@ -0,0 +1,347 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.ReplyTo; +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.annotation.SubscribeEvent; +import org.springframework.messaging.simp.handler.AnnotationMethodMessageHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; +import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.simp.stomp.StompTextMessageBuilder; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Controller; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.support.TestWebSocketSession; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + + +/** + * Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}. + * + * @author Rossen Stoyanchev + */ +public class WebSocketMessageBrokerConfigurationSupportTests { + + private AnnotationConfigApplicationContext cxtSimpleBroker; + + private AnnotationConfigApplicationContext cxtStompBroker; + + + @Before + public void setupOnce() { + + this.cxtSimpleBroker = new AnnotationConfigApplicationContext(); + this.cxtSimpleBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestSimpleMessageBrokerConfig.class); + this.cxtSimpleBroker.refresh(); + + this.cxtStompBroker = new AnnotationConfigApplicationContext(); + this.cxtStompBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestStompMessageBrokerConfig.class); + this.cxtStompBroker.refresh(); + } + + @Test + public void handlerMapping() { + + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.cxtSimpleBroker.getBean(HandlerMapping.class); + assertEquals(1, hm.getOrder()); + + Map handlerMap = hm.getHandlerMap(); + assertEquals(1, handlerMap.size()); + assertNotNull(handlerMap.get("/simpleBroker")); + } + + @Test + public void webSocketRequestChannel() { + + SubscribableChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", SubscribableChannel.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MessageHandler.class); + verify(channel, times(3)).subscribe(captor.capture()); + + List values = captor.getAllValues(); + assertEquals(3, values.size()); + + assertTrue(values.contains(cxtSimpleBroker.getBean(AnnotationMethodMessageHandler.class))); + assertTrue(values.contains(cxtSimpleBroker.getBean(UserDestinationMessageHandler.class))); + assertTrue(values.contains(cxtSimpleBroker.getBean(SimpleBrokerMessageHandler.class))); + } + + @Test + public void webSocketRequestChannelWithStompBroker() { + SubscribableChannel channel = this.cxtStompBroker.getBean("webSocketRequestChannel", SubscribableChannel.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MessageHandler.class); + verify(channel, times(3)).subscribe(captor.capture()); + + List values = captor.getAllValues(); + assertEquals(3, values.size()); + assertTrue(values.contains(cxtStompBroker.getBean(AnnotationMethodMessageHandler.class))); + assertTrue(values.contains(cxtStompBroker.getBean(UserDestinationMessageHandler.class))); + assertTrue(values.contains(cxtStompBroker.getBean(StompBrokerRelayMessageHandler.class))); + } + + @Test + public void webSocketRequestChannelSendMessage() throws Exception { + + SubscribableChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", SubscribableChannel.class); + SubProtocolWebSocketHandler webSocketHandler = this.cxtSimpleBroker.getBean(SubProtocolWebSocketHandler.class); + + TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build(); + webSocketHandler.handleMessage(new TestWebSocketSession(), textMessage); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Message.class); + verify(channel).send(captor.capture()); + + Message message = captor.getValue(); + StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/foo", headers.getDestination()); + } + + @Test + public void webSocketResponseChannel() { + SubscribableChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", SubscribableChannel.class); + verify(channel).subscribe(any(SubProtocolWebSocketHandler.class)); + verifyNoMoreInteractions(channel); + } + + @Test + public void webSocketResponseChannelUsedByAnnotatedMethod() { + + SubscribableChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", SubscribableChannel.class); + AnnotationMethodMessageHandler messageHandler = this.cxtSimpleBroker.getBean(AnnotationMethodMessageHandler.class); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); + headers.setSessionId("sess1"); + headers.setSubscriptionId("subs1"); + headers.setDestination("/foo"); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + + when(channel.send(any(Message.class))).thenReturn(true); + messageHandler.handleMessage(message); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Message.class); + verify(channel).send(captor.capture()); + message = captor.getValue(); + headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/foo", headers.getDestination()); + assertEquals("\"bar\"", new String((byte[]) message.getPayload())); + } + + @Test + public void webSocketResponseChannelUsedBySimpleBroker() { + SubscribableChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", SubscribableChannel.class); + SimpleBrokerMessageHandler broker = this.cxtSimpleBroker.getBean(SimpleBrokerMessageHandler.class); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); + headers.setSessionId("sess1"); + headers.setSubscriptionId("subs1"); + headers.setDestination("/foo"); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + + // subscribe + broker.handleMessage(message); + + headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setSessionId("sess1"); + headers.setDestination("/foo"); + message = MessageBuilder.withPayloadAndHeaders("bar".getBytes(), headers).build(); + + // message + when(channel.send(any(Message.class))).thenReturn(true); + broker.handleMessage(message); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Message.class); + verify(channel).send(captor.capture()); + message = captor.getValue(); + headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/foo", headers.getDestination()); + assertEquals("bar", new String((byte[]) message.getPayload())); + } + + @Test + public void brokerChannel() { + SubscribableChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", SubscribableChannel.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MessageHandler.class); + verify(channel, times(2)).subscribe(captor.capture()); + + List values = captor.getAllValues(); + assertEquals(2, values.size()); + assertTrue(values.contains(cxtSimpleBroker.getBean(UserDestinationMessageHandler.class))); + assertTrue(values.contains(cxtSimpleBroker.getBean(SimpleBrokerMessageHandler.class))); + } + + @Test + public void brokerChannelWithStompBroker() { + SubscribableChannel channel = this.cxtStompBroker.getBean("brokerChannel", SubscribableChannel.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MessageHandler.class); + verify(channel, times(2)).subscribe(captor.capture()); + + List values = captor.getAllValues(); + assertEquals(2, values.size()); + assertTrue(values.contains(cxtStompBroker.getBean(UserDestinationMessageHandler.class))); + assertTrue(values.contains(cxtStompBroker.getBean(StompBrokerRelayMessageHandler.class))); + } + + @Test + public void brokerChannelUsedByAnnotatedMethod() { + SubscribableChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", SubscribableChannel.class); + AnnotationMethodMessageHandler messageHandler = this.cxtSimpleBroker.getBean(AnnotationMethodMessageHandler.class); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setDestination("/foo"); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + + when(channel.send(any(Message.class))).thenReturn(true); + messageHandler.handleMessage(message); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Message.class); + verify(channel).send(captor.capture()); + message = captor.getValue(); + headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/bar", headers.getDestination()); + assertEquals("\"bar\"", new String((byte[]) message.getPayload())); + } + + @Test + public void brokerChannelUsedByUserDestinationMessageHandler() { + SubscribableChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", SubscribableChannel.class); + UserDestinationMessageHandler messageHandler = this.cxtSimpleBroker.getBean(UserDestinationMessageHandler.class); + + this.cxtSimpleBroker.getBean(MutableUserQueueSuffixResolver.class).addQueueSuffix("joe", "s1", "s1"); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setDestination("/user/joe/foo"); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + + when(channel.send(any(Message.class))).thenReturn(true); + messageHandler.handleMessage(message); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Message.class); + verify(channel).send(captor.capture()); + message = captor.getValue(); + headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/foos1", headers.getDestination()); + } + + + @Controller + static class TestController { + + @SubscribeEvent("/foo") + public String handleSubscribe() { + return "bar"; + } + + @MessageMapping("/foo") + @ReplyTo("/bar") + public String handleMessage() { + return "bar"; + } + } + + @Configuration + static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/simpleBroker"); + } + + @Override + public void configureMessageBroker(MessageBrokerConfigurer configurer) { + // SimpleBroker used by default + } + + @Bean + public TestController subscriptionController() { + return new TestController(); + } + } + + @Configuration + static class TestStompMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/stompBrokerRelay"); + } + + @Override + public void configureMessageBroker(MessageBrokerConfigurer configurer) { + configurer.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(false); + } + } + + @Configuration + static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { + + @Override + @Bean + public SubscribableChannel webSocketRequestChannel() { + return Mockito.mock(SubscribableChannel.class); + } + + @Override + @Bean + public SubscribableChannel webSocketResponseChannel() { + return Mockito.mock(SubscribableChannel.class); + } + + @Override + public SubscribableChannel brokerChannel() { + return Mockito.mock(SubscribableChannel.class); + } + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/AnnotationMethodIntegrationTests.java similarity index 51% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/handler/AnnotationMethodIntegrationTests.java index a2e2f530d8e..2bf9cd911e4 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/AnnotationMethodIntegrationTests.java @@ -14,9 +14,15 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.messaging.simp.handler; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -26,36 +32,37 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.simp.config.DelegatingWebSocketMessageBrokerConfiguration; +import org.springframework.messaging.simp.config.MessageBrokerConfigurer; +import org.springframework.messaging.simp.config.StompEndpointRegistry; +import org.springframework.messaging.simp.config.WebSocketMessageBrokerConfigurer; import org.springframework.messaging.simp.stomp.StompCommand; -import org.springframework.messaging.simp.stomp.StompTextMessageBuilder; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; import org.springframework.stereotype.Controller; import org.springframework.web.socket.AbstractWebSocketIntegrationTests; import org.springframework.web.socket.JettyWebSocketTestServer; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.TomcatWebSocketTestServer; -import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter; import org.springframework.web.socket.client.endpoint.StandardWebSocketClient; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; import org.springframework.web.socket.server.HandshakeHandler; -import org.springframework.web.socket.server.config.WebSocketConfigurationSupport; -import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; import static org.junit.Assert.*; +import static org.springframework.messaging.simp.stomp.StompTextMessageBuilder.*; /** - * Test fixture for {@link WebSocketConfigurationSupport}. - * + * Integration tests with annotated message-handling methods. * @author Rossen Stoyanchev */ @RunWith(Parameterized.class) -public class WebSocketMessageBrokerConfigurationTests extends AbstractWebSocketIntegrationTests { +public class AnnotationMethodIntegrationTests extends AbstractWebSocketIntegrationTests { @Parameters public static Iterable arguments() { @@ -68,89 +75,103 @@ public class WebSocketMessageBrokerConfigurationTests extends AbstractWebSocketI @Override protected Class[] getAnnotatedConfigClasses() { - return new Class[] { TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class }; + return new Class[] { TestMessageBrokerConfiguration.class, TestMessageBrokerConfigurer.class }; } - @Test - public void sendMessage() throws Exception { - final TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND) - .headers("destination:/app/foo").build(); + @Test + public void simpleController() throws Exception { - WebSocketHandler clientHandler = new TextWebSocketHandlerAdapter() { - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - session.sendMessage(textMessage); - } - }; + TextMessage message = create(StompCommand.SEND).headers("destination:/app/simple").build(); + WebSocketSession session = doHandshake(new TestClientWebSocketHandler(message, 0), "/ws"); - TestController testController = this.wac.getBean(TestController.class); + SimpleController controller = this.wac.getBean(SimpleController.class); + assertTrue(controller.latch.await(2, TimeUnit.SECONDS)); - WebSocketSession session = this.webSocketClient.doHandshake(clientHandler, getWsBaseUrl() + "/ws"); - assertTrue(testController.latch.await(2, TimeUnit.SECONDS)); session.close(); + } - testController.latch = new CountDownLatch(1); - session = this.webSocketClient.doHandshake(clientHandler, getWsBaseUrl() + "/sockjs/websocket"); - assertTrue(testController.latch.await(2, TimeUnit.SECONDS)); - session.close(); + + @IntegrationTestController + static class SimpleController { + + private CountDownLatch latch = new CountDownLatch(1); + + @MessageMapping(value="/app/simple") + public void handle() { + this.latch.countDown(); + } } + private static class TestClientWebSocketHandler extends TextWebSocketHandlerAdapter { - @Configuration - static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { + private final TextMessage messageToSend; - @Override - @Bean - public SubscribableChannel webSocketRequestChannel() { - return new ExecutorSubscribableChannel(); // synchronous + private final int expected; + + private final List actual = new CopyOnWriteArrayList(); + + private final CountDownLatch latch; + + + public TestClientWebSocketHandler(TextMessage messageToSend, int expectedNumberOfMessages) { + this.messageToSend = messageToSend; + this.expected = expectedNumberOfMessages; + this.latch = new CountDownLatch(this.expected); } @Override - @Bean - public SubscribableChannel webSocketReplyChannel() { - return new ExecutorSubscribableChannel(); // synchronous + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + session.sendMessage(this.messageToSend); } - @Bean - public TestController testController() { - return new TestController(); + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + this.actual.add(message); + this.latch.countDown(); } } @Configuration - static class SimpleBrokerConfigurer implements WebSocketMessageBrokerConfigurer { + @ComponentScan(basePackageClasses=AnnotationMethodIntegrationTests.class, + includeFilters=@ComponentScan.Filter(IntegrationTestController.class)) + static class TestMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer { @Autowired private HandshakeHandler handshakeHandler; // can't rely on classpath for server detection - @Override public void registerStompEndpoints(StompEndpointRegistry registry) { - - registry.addEndpoint("/ws") - .setHandshakeHandler(this.handshakeHandler); - - registry.addEndpoint("/sockjs").withSockJS() - .setTransportHandlerOverrides(new WebSocketTransportHandler(this.handshakeHandler));; + registry.addEndpoint("/ws").setHandshakeHandler(this.handshakeHandler); } @Override public void configureMessageBroker(MessageBrokerConfigurer configurer) { configurer.setAnnotationMethodDestinationPrefixes("/app/"); - configurer.enableSimpleBroker("/topic"); + configurer.enableSimpleBroker("/topic", "/queue"); } } - @Controller - private static class TestController { + @Configuration + static class TestMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { - private CountDownLatch latch = new CountDownLatch(1); + @Override + @Bean + public SubscribableChannel webSocketRequestChannel() { + return new ExecutorSubscribableChannel(); // synchronous + } - @MessageMapping(value="/app/foo") - public void handleFoo() { - this.latch.countDown(); + @Override + @Bean + public SubscribableChannel webSocketResponseChannel() { + return new ExecutorSubscribableChannel(); // synchronous } } + @Target({ElementType.TYPE}) + @Retention(RetentionPolicy.RUNTIME) + @Controller + private @interface IntegrationTestController { + } + } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/AbstractWebSocketIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/AbstractWebSocketIntegrationTests.java index 3a068f82713..9e68cdccef0 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/AbstractWebSocketIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/AbstractWebSocketIntegrationTests.java @@ -108,6 +108,10 @@ public abstract class AbstractWebSocketIntegrationTests { return "ws://localhost:" + this.server.getPort(); } + protected WebSocketSession doHandshake(WebSocketHandler clientHandler, String endpointPath) { + return this.webSocketClient.doHandshake(clientHandler, getWsBaseUrl() + endpointPath); + } + static abstract class AbstractRequestUpgradeStrategyConfig {