From 4de3291dc7bd6f425636b8d7ac7480d110349457 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 22 Nov 2013 13:49:38 -0500 Subject: [PATCH] Consolidate websocket/messaging code Before this change spring-messaging contained a few WebSocket-related classes including WebSocket sub-protocol support for STOMP as well as @EnableWebSocketMessageBroker and related configuration classes. After this change those classes are located in the spring-websocket module under org.springframework.web.socket.messaging. This means the following classes in application configuration must have their packages updated: org.springframework.web.socket.messaging.config.EnableWebSocketMessageBroker org.springframework.web.socket.messaging.config.StompEndpointRegistry org.springframework.web.socket.messaging.config.WebSocketMessageBrokerConfigurer MessageBrokerConfigurer has been renamed to MessageBrokerRegistry and is also located in the above package. --- build.gradle | 3 +- .../SubscriptionMethodReturnValueHandler.java | 4 + .../config/AbstractBrokerRegistration.java | 21 +- .../AbstractMessageBrokerConfiguration.java | 223 +++++++++++++++ .../AbstractStompEndpointRegistration.java | 154 ----------- ...igurer.java => MessageBrokerRegistry.java} | 29 +- .../ServletStompEndpointRegistration.java | 68 ----- .../simp/config/SimpleBrokerRegistration.java | 10 +- .../config/StompBrokerRelayRegistration.java | 13 +- ...cketMessageBrokerConfigurationSupport.java | 257 ------------------ .../SimpAnnotationMethodMessageHandler.java | 14 +- ...bstractStompEndpointRegistrationTests.java | 164 ----------- ...a => MessageBrokerConfigurationTests.java} | 110 ++------ ...mpAnnotationMethodMessageHandlerTests.java | 4 +- .../simp/stomp/StompHeaderAccessorTests.java | 3 +- .../messaging/StompSubProtocolHandler.java | 11 +- .../socket/messaging}/SubProtocolHandler.java | 2 +- .../SubProtocolWebSocketHandler.java | 32 ++- ...ngWebSocketMessageBrokerConfiguration.java | 16 +- .../config/EnableWebSocketMessageBroker.java | 9 +- .../config/StompEndpointRegistry.java | 9 +- .../StompWebSocketEndpointRegistration.java | 15 +- .../config/WebMvcStompEndpointRegistry.java | 62 +++-- ...MvcStompWebSocketEndpointRegistration.java | 137 ++++++++++ ...cketMessageBrokerConfigurationSupport.java | 88 ++++++ .../WebSocketMessageBrokerConfigurer.java | 11 +- .../support/WebSocketHttpRequestHandler.java | 7 + .../SimpAnnotationMethodIntegrationTests.java | 18 +- .../StompSubProtocolHandlerTests.java | 13 +- .../messaging}/StompTextMessageBuilder.java | 3 +- .../SubProtocolWebSocketHandlerTests.java | 2 +- .../WebMvcStompEndpointRegistrationTests.java | 124 +++++++++ .../WebMvcStompEndpointRegistryTests.java | 19 +- ...essageBrokerConfigurationSupportTests.java | 179 ++++++++++++ 34 files changed, 965 insertions(+), 869 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java rename spring-messaging/src/main/java/org/springframework/messaging/simp/config/{MessageBrokerConfigurer.java => MessageBrokerRegistry.java} (78%) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistration.java delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java delete mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java rename spring-messaging/src/test/java/org/springframework/messaging/simp/config/{WebSocketMessageBrokerConfigurationSupportTests.java => MessageBrokerConfigurationTests.java} (70%) rename spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java => spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java (95%) rename {spring-messaging/src/main/java/org/springframework/messaging/handler/websocket => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/SubProtocolHandler.java (98%) rename {spring-messaging/src/main/java/org/springframework/messaging/handler/websocket => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/SubProtocolWebSocketHandler.java (87%) rename {spring-messaging/src/main/java/org/springframework/messaging/simp => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/config/DelegatingWebSocketMessageBrokerConfiguration.java (76%) rename {spring-messaging/src/main/java/org/springframework/messaging/simp => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/config/EnableWebSocketMessageBroker.java (88%) rename {spring-messaging/src/main/java/org/springframework/messaging/simp => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/config/StompEndpointRegistry.java (73%) rename spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java => spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompWebSocketEndpointRegistration.java (76%) rename spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java => spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistry.java (60%) create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompWebSocketEndpointRegistration.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupport.java rename {spring-messaging/src/main/java/org/springframework/messaging/simp => spring-websocket/src/main/java/org/springframework/web/socket/messaging}/config/WebSocketMessageBrokerConfigurer.java (68%) rename {spring-messaging/src/test/java/org/springframework/messaging/simp/handler => spring-websocket/src/test/java/org/springframework/web/socket/messaging}/SimpAnnotationMethodIntegrationTests.java (92%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java => spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java (90%) rename {spring-messaging/src/test/java/org/springframework/messaging/simp/stomp => spring-websocket/src/test/java/org/springframework/web/socket/messaging}/StompTextMessageBuilder.java (94%) rename {spring-messaging/src/test/java/org/springframework/messaging/handler/websocket => spring-websocket/src/test/java/org/springframework/web/socket/messaging}/SubProtocolWebSocketHandlerTests.java (98%) create mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistrationTests.java rename spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java => spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistryTests.java (78%) create mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java diff --git a/build.gradle b/build.gradle index ec5adec010c..c7be9076479 100644 --- a/build.gradle +++ b/build.gradle @@ -387,8 +387,6 @@ project("spring-messaging") { compile(project(":spring-beans")) compile(project(":spring-core")) compile(project(":spring-context")) - optional(project(":spring-websocket")) - optional(project(":spring-webmvc")) optional("com.fasterxml.jackson.core:jackson-databind:2.2.2") optional("org.projectreactor:reactor-core:1.0.0.RELEASE") optional("org.projectreactor:reactor-tcp:1.0.0.RELEASE") @@ -583,6 +581,7 @@ project("spring-websocket") { compile(project(":spring-core")) compile(project(":spring-context")) compile(project(":spring-web")) + optional(project(":spring-messaging")) optional(project(":spring-webmvc")) optional("javax.servlet:javax.servlet-api:3.1.0") optional("javax.websocket:javax.websocket-api:1.0") diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java index e9b885d4cc6..0d6b358241a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java @@ -47,6 +47,10 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn private final MessageSendingOperations messagingTemplate; + /** + * @param messagingTemplate a messaging template for sending messages directly + * to clients, e.g. in response to a subscription + */ public SubscriptionMethodReturnValueHandler(MessageSendingOperations messagingTemplate) { Assert.notNull(messagingTemplate, "messagingTemplate is required"); this.messagingTemplate = messagingTemplate; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java index 666c5fd0a7e..84c7fc11376 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java @@ -19,6 +19,7 @@ package org.springframework.messaging.simp.config; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; @@ -32,25 +33,25 @@ import org.springframework.util.Assert; */ public abstract class AbstractBrokerRegistration { - private final MessageChannel webSocketReplyChannel; + private final MessageChannel clientOutboundChannel; - private final String[] destinationPrefixes; + private final List destinationPrefixes; - public AbstractBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { - Assert.notNull(webSocketReplyChannel, ""); - this.webSocketReplyChannel = webSocketReplyChannel; - this.destinationPrefixes = destinationPrefixes; + public AbstractBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) { + Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' is required"); + this.clientOutboundChannel = clientOutboundChannel; + this.destinationPrefixes = (destinationPrefixes != null) + ? Arrays.asList(destinationPrefixes) : Collections.emptyList(); } - protected MessageChannel getWebSocketReplyChannel() { - return this.webSocketReplyChannel; + protected MessageChannel getClientOutboundChannel() { + return this.clientOutboundChannel; } protected Collection getDestinationPrefixes() { - return (this.destinationPrefixes != null) - ? Arrays.asList(this.destinationPrefixes) : Collections.emptyList(); + return this.destinationPrefixes; } protected abstract AbstractBrokerMessageHandler getMessageHandler(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java new file mode 100644 index 00000000000..a12f3ca1580 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -0,0 +1,223 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.messaging.simp.handler.*; +import org.springframework.messaging.support.channel.AbstractSubscribableChannel; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.messaging.support.converter.*; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.ClassUtils; +import org.springframework.util.MimeTypeUtils; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Provides essential configuration for handling messages with simple messaging + * protocols such as STOMP. + *

+ * {@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver messages + * to and from remote clients to several message handlers such as + *

    + *
  • {@link #simpAnnotationMethodMessageHandler()}
  • + *
  • {@link #simpleBrokerMessageHandler()}
  • + *
  • {@link #stompBrokerRelayMessageHandler()}
  • + *
  • {@link #userDestinationMessageHandler()}
  • + *
+ * while {@link #brokerChannel()} delivers messages from within the application to the + * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected + * into any application component to send messages. + *

+ * Sub-classes are responsible for the part of the configuration that feed messages + * to and from the client inbound/outbound channels (e.g. STOMP over WebSokcet). + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class AbstractMessageBrokerConfiguration { + + private static final boolean jackson2Present= ClassUtils.isPresent( + "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader()); + + + private MessageBrokerRegistry brokerRegistry; + + + /** + * Protected constructor. + */ + protected AbstractMessageBrokerConfiguration() { + } + + + /** + * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation + * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. + */ + protected final MessageBrokerRegistry getBrokerRegistry() { + if (this.brokerRegistry == null) { + MessageBrokerRegistry registry = new MessageBrokerRegistry(clientOutboundChannel()); + configureMessageBroker(registry); + this.brokerRegistry = registry; + } + return this.brokerRegistry; + } + + /** + * A hook for sub-classes to customize message broker configuration through the + * provided {@link MessageBrokerRegistry} instance. + */ + protected abstract void configureMessageBroker(MessageBrokerRegistry registry); + + + @Bean + public AbstractSubscribableChannel clientInboundChannel() { + return new ExecutorSubscribableChannel(clientInboundChannelExecutor()); + } + + @Bean + public ThreadPoolTaskExecutor clientInboundChannelExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("ClientInboundChannel-"); + return executor; + } + + @Bean + public AbstractSubscribableChannel clientOutboundChannel() { + return new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); + } + + @Bean + public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix("ClientOutboundChannel-"); + return executor; + } + + @Bean + public AbstractSubscribableChannel brokerChannel() { + return new ExecutorSubscribableChannel(); // synchronous + } + + + @Bean + public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() { + + SimpAnnotationMethodMessageHandler handler = + new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), clientOutboundChannel()); + + handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes()); + handler.setMessageConverter(brokerMessageConverter()); + clientInboundChannel().subscribe(handler); + return handler; + } + + @Bean + public AbstractBrokerMessageHandler simpleBrokerMessageHandler() { + SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(); + if (handler != null) { + clientInboundChannel().subscribe(handler); + brokerChannel().subscribe(handler); + return handler; + } + return noopBroker; + } + + @Bean + public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { + AbstractBrokerMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(); + if (handler != null) { + clientInboundChannel().subscribe(handler); + brokerChannel().subscribe(handler); + return handler; + } + return noopBroker; + } + + @Bean + public UserDestinationMessageHandler userDestinationMessageHandler() { + + UserDestinationMessageHandler handler = new UserDestinationMessageHandler( + brokerMessagingTemplate(), userDestinationResolver()); + + clientInboundChannel().subscribe(handler); + brokerChannel().subscribe(handler); + return handler; + } + + @Bean + public SimpMessagingTemplate brokerMessagingTemplate() { + SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel()); + String prefix = getBrokerRegistry().getUserDestinationPrefix(); + if (prefix != null) { + template.setUserDestinationPrefix(prefix); + } + template.setMessageConverter(brokerMessageConverter()); + return template; + } + + @Bean + public CompositeMessageConverter brokerMessageConverter() { + + DefaultContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); + + List converters = new ArrayList(); + if (jackson2Present) { + converters.add(new MappingJackson2MessageConverter()); + contentTypeResolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON); + } + converters.add(new StringMessageConverter()); + converters.add(new ByteArrayMessageConverter()); + + return new CompositeMessageConverter(converters, contentTypeResolver); + } + + @Bean + public UserDestinationResolver userDestinationResolver() { + DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry()); + String prefix = getBrokerRegistry().getUserDestinationPrefix(); + if (prefix != null) { + resolver.setUserDestinationPrefix(prefix); + } + return resolver; + } + + @Bean + public UserSessionRegistry userSessionRegistry() { + return new DefaultUserSessionRegistry(); + } + + + private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) { + + @Override + protected void startInternal() { + } + @Override + protected void stopInternal() { + } + @Override + protected void handleMessageInternal(Message message) { + } + }; + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java deleted file mode 100644 index 68872562627..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistration.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.config; - -import java.util.Set; - -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.DefaultHandshakeHandler; -import org.springframework.web.socket.server.HandshakeHandler; -import org.springframework.web.socket.server.config.SockJsServiceRegistration; -import org.springframework.web.socket.sockjs.SockJsService; -import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; -import org.springframework.web.socket.support.WebSocketHandlerDecorator; - - -/** - * An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public abstract class AbstractStompEndpointRegistration implements StompEndpointRegistration { - - private final String[] paths; - - private final WebSocketHandler wsHandler; - - private HandshakeHandler handshakeHandler; - - private StompSockJsServiceRegistration sockJsServiceRegistration; - - private final TaskScheduler sockJsTaskScheduler; - - - public AbstractStompEndpointRegistration(String[] paths, WebSocketHandler webSocketHandler, - TaskScheduler sockJsTaskScheduler) { - - Assert.notEmpty(paths, "No paths specified"); - this.paths = paths; - this.wsHandler = webSocketHandler; - this.sockJsTaskScheduler = sockJsTaskScheduler; - } - - - /** - * Provide a custom or pre-configured {@link HandshakeHandler}. This property is - * optional. - */ - @Override - public StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) { - this.handshakeHandler = handshakeHandler; - return this; - } - - /** - * Enable SockJS fallback options. - */ - @Override - public SockJsServiceRegistration withSockJS() { - - this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler); - - if (this.handshakeHandler != null) { - WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler); - this.sockJsServiceRegistration.setTransportHandlerOverrides(transportHandler); - } - - return this.sockJsServiceRegistration; - } - - protected final M getMappings() { - - M mappings = createMappings(); - - if (this.sockJsServiceRegistration != null) { - SockJsService sockJsService = this.sockJsServiceRegistration.getSockJsService(); - for (String path : this.paths) { - String pathPattern = path.endsWith("/") ? path + "**" : path + "/**"; - addSockJsServiceMapping(mappings, sockJsService, this.wsHandler, pathPattern); - } - } - else { - HandshakeHandler handshakeHandler = getOrCreateHandshakeHandler(); - for (String path : this.paths) { - addWebSocketHandlerMapping(mappings, this.wsHandler, handshakeHandler, path); - } - } - - return mappings; - } - - protected abstract M createMappings(); - - private HandshakeHandler getOrCreateHandshakeHandler() { - - HandshakeHandler handler = (this.handshakeHandler != null) - ? this.handshakeHandler : new DefaultHandshakeHandler(); - - if (handler instanceof DefaultHandshakeHandler) { - DefaultHandshakeHandler defaultHandshakeHandler = (DefaultHandshakeHandler) handler; - if (ObjectUtils.isEmpty(defaultHandshakeHandler.getSupportedProtocols())) { - Set protocols = findSubProtocolWebSocketHandler(this.wsHandler).getSupportedProtocols(); - defaultHandshakeHandler.setSupportedProtocols(protocols.toArray(new String[protocols.size()])); - } - } - - return handler; - } - - private static SubProtocolWebSocketHandler findSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) { - WebSocketHandler actual = (webSocketHandler instanceof WebSocketHandlerDecorator) ? - ((WebSocketHandlerDecorator) webSocketHandler).getLastHandler() : webSocketHandler; - Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual, - "No SubProtocolWebSocketHandler found: " + webSocketHandler); - return (SubProtocolWebSocketHandler) actual; - } - - protected abstract void addSockJsServiceMapping(M mappings, SockJsService sockJsService, - WebSocketHandler wsHandler, String pathPattern); - - protected abstract void addWebSocketHandlerMapping(M mappings, - WebSocketHandler wsHandler, HandshakeHandler handshakeHandler, String path); - - - private class StompSockJsServiceRegistration extends SockJsServiceRegistration { - - public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) { - super(defaultTaskScheduler); - } - - protected SockJsService getSockJsService() { - return super.getSockJsService(); - } - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java similarity index 78% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 7a2a23301a9..f19db80bc81 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -20,18 +20,19 @@ import java.util.Arrays; import java.util.Collection; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; +import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.util.Assert; /** - * A helper class for configuring message broker options. + * A registry for configuring message broker options. * * @author Rossen Stoyanchev * @since 4.0 */ -public class MessageBrokerConfigurer { +public class MessageBrokerRegistry { - private final MessageChannel webSocketResponseChannel; + private final MessageChannel clientOutboundChannel; private SimpleBrokerRegistration simpleBroker; @@ -42,9 +43,9 @@ public class MessageBrokerConfigurer { private String userDestinationPrefix; - public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) { - Assert.notNull(webSocketResponseChannel); - this.webSocketResponseChannel = webSocketResponseChannel; + public MessageBrokerRegistry(MessageChannel clientOutboundChannel) { + Assert.notNull(clientOutboundChannel); + this.clientOutboundChannel = clientOutboundChannel; } /** @@ -52,7 +53,7 @@ public class MessageBrokerConfigurer { * destinations targeting the broker (e.g. destinations prefixed with "/topic"). */ public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { - this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes); + this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, destinationPrefixes); return this.simpleBroker; } @@ -62,7 +63,7 @@ public class MessageBrokerConfigurer { * destinations. */ public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { - this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes); + this.stompRelay = new StompBrokerRelayRegistration(this.clientOutboundChannel, destinationPrefixes); return this.stompRelay; } @@ -78,7 +79,7 @@ public class MessageBrokerConfigurer { *

* Prefixes that do not have a trailing slash will have one automatically appended. */ - public MessageBrokerConfigurer setApplicationDestinationPrefixes(String... prefixes) { + public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes) { this.applicationDestinationPrefixes = prefixes; return this; } @@ -97,24 +98,24 @@ public class MessageBrokerConfigurer { *

* The default prefix used to identify such destinations is "/user/". */ - public MessageBrokerConfigurer setUserDestinationPrefix(String destinationPrefix) { + public MessageBrokerRegistry setUserDestinationPrefix(String destinationPrefix) { this.userDestinationPrefix = destinationPrefix; return this; } - protected AbstractBrokerMessageHandler getSimpleBroker() { + protected SimpleBrokerMessageHandler getSimpleBroker() { initSimpleBrokerIfNecessary(); return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null; } protected void initSimpleBrokerIfNecessary() { if ((this.simpleBroker == null) && (this.stompRelay == null)) { - this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, null); + this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, null); } } - protected AbstractBrokerMessageHandler getStompBrokerRelay() { + protected StompBrokerRelayMessageHandler getStompBrokerRelay() { return (this.stompRelay != null) ? this.stompRelay.getMessageHandler() : null; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistration.java deleted file mode 100644 index 52f2423241f..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistration.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.config; - -import org.springframework.scheduling.TaskScheduler; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; -import org.springframework.web.HttpRequestHandler; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.HandshakeHandler; -import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; -import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; -import org.springframework.web.socket.sockjs.SockJsService; - - -/** - * A helper class for configuring STOMP protocol handling over WebSocket - * with optional SockJS fallback options. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class ServletStompEndpointRegistration - extends AbstractStompEndpointRegistration> { - - - public ServletStompEndpointRegistration(String[] paths, - WebSocketHandler wsHandler, TaskScheduler sockJsTaskScheduler) { - - super(paths, wsHandler, sockJsTaskScheduler); - } - - @Override - protected MultiValueMap createMappings() { - return new LinkedMultiValueMap(); - } - - @Override - protected void addSockJsServiceMapping(MultiValueMap mappings, - SockJsService sockJsService, WebSocketHandler wsHandler, String pathPattern) { - - SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, wsHandler); - mappings.add(httpHandler, pathPattern); - } - - @Override - protected void addWebSocketHandlerMapping(MultiValueMap mappings, - WebSocketHandler wsHandler, HandshakeHandler handshakeHandler, String path) { - - WebSocketHttpRequestHandler handler = new WebSocketHttpRequestHandler(wsHandler, handshakeHandler); - mappings.add(handler, path); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java index 063643dd624..ac38336b6fe 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java @@ -21,7 +21,7 @@ import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; /** - * A simple message broker alternative providing a simple getting started option. + * Registration class for configuring a {@link SimpleBrokerMessageHandler}. * * @author Rossen Stoyanchev * @since 4.0 @@ -29,15 +29,13 @@ import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; public class SimpleBrokerRegistration extends AbstractBrokerRegistration { - public SimpleBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { - super(webSocketReplyChannel, destinationPrefixes); + public SimpleBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) { + super(clientOutboundChannel, destinationPrefixes); } @Override protected SimpleBrokerMessageHandler getMessageHandler() { - SimpleBrokerMessageHandler handler = - new SimpleBrokerMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes()); - return handler; + return new SimpleBrokerMessageHandler(getClientOutboundChannel(), getDestinationPrefixes()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java index 83730d56c13..8b73855a2c3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -22,7 +22,7 @@ import org.springframework.util.Assert; /** - * A helper class for configuring a relay to an external STOMP message broker. + * Registration class for configuring a {@link StompBrokerRelayMessageHandler}. * * @author Rossen Stoyanchev * @since 4.0 @@ -44,8 +44,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { private boolean autoStartup = true; - public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { - super(webSocketReplyChannel, destinationPrefixes); + public StompBrokerRelayRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) { + super(clientOutboundChannel, destinationPrefixes); } @@ -124,18 +124,23 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { protected StompBrokerRelayMessageHandler getMessageHandler() { + StompBrokerRelayMessageHandler handler = - new StompBrokerRelayMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes()); + new StompBrokerRelayMessageHandler(getClientOutboundChannel(), getDestinationPrefixes()); + handler.setRelayHost(this.relayHost); handler.setRelayPort(this.relayPort); handler.setSystemLogin(this.applicationLogin); handler.setSystemPasscode(this.applicationPasscode); + if (this.systemHeartbeatSendInterval != null) { handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval); } + if (this.systemHeartbeatReceiveInterval != null) { handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval); } + handler.setAutoStartup(this.autoStartup); return handler; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java deleted file mode 100644 index ebf25df1bbe..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.messaging.Message; -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.messaging.simp.SimpMessageSendingOperations; -import org.springframework.messaging.simp.SimpMessagingTemplate; -import org.springframework.messaging.simp.handler.*; -import org.springframework.messaging.support.channel.AbstractSubscribableChannel; -import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; -import org.springframework.messaging.support.converter.*; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import org.springframework.util.ClassUtils; -import org.springframework.util.MimeTypeUtils; -import org.springframework.web.servlet.HandlerMapping; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; -import org.springframework.web.servlet.handler.AbstractHandlerMapping; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.config.SockJsServiceRegistration; - -import java.util.ArrayList; -import java.util.List; - - -/** - * Configuration support for broker-backed messaging over WebSocket using a higher-level - * messaging sub-protocol such as STOMP. This class can either be extended directly - * or its configuration can also be customized in a callback style via - * {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} and - * {@link WebSocketMessageBrokerConfigurer}. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public abstract class WebSocketMessageBrokerConfigurationSupport { - - private static final boolean jackson2Present = - ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", WebMvcConfigurationSupport.class.getClassLoader()) && - ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", WebMvcConfigurationSupport.class.getClassLoader()); - - private MessageBrokerConfigurer messageBrokerConfigurer; - - - // WebSocket configuration including message channels to/from the application - - @Bean - public HandlerMapping brokerWebSocketHandlerMapping() { - - ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry( - subProtocolWebSocketHandler(), userSessionRegistry(), brokerDefaultSockJsTaskScheduler()); - - registerStompEndpoints(registry); - AbstractHandlerMapping hm = registry.getHandlerMapping(); - hm.setOrder(1); - return hm; - } - - @Bean - public WebSocketHandler subProtocolWebSocketHandler() { - SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel()); - webSocketResponseChannel().subscribe(wsHandler); - return wsHandler; - } - - @Bean - public UserSessionRegistry userSessionRegistry() { - return new DefaultUserSessionRegistry(); - } - - /** - * The default TaskScheduler to use if none is configured via - * {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e. - *

-	 * @Configuration
-	 * @EnableWebSocketMessageBroker
-	 * public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
-	 *
-	 *   public void registerStompEndpoints(StompEndpointRegistry registry) {
-	 *     registry.addEndpoint("/stomp").withSockJS().setTaskScheduler(myScheduler());
-	 *   }
-	 *
-	 *   // ...
-	 *
-	 * }
-	 * 
- */ - @Bean - public ThreadPoolTaskScheduler brokerDefaultSockJsTaskScheduler() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setThreadNamePrefix("BrokerSockJS-"); - return scheduler; - } - - protected void registerStompEndpoints(StompEndpointRegistry registry) { - } - - @Bean - public AbstractSubscribableChannel webSocketRequestChannel() { - return new ExecutorSubscribableChannel(webSocketRequestChannelExecutor()); - } - - @Bean - public ThreadPoolTaskExecutor webSocketRequestChannelExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setThreadNamePrefix("WebSocketRequestChannel-"); - return executor; - } - - @Bean - public AbstractSubscribableChannel webSocketResponseChannel() { - return new ExecutorSubscribableChannel(webSocketResponseChannelExecutor()); - } - - @Bean - public ThreadPoolTaskExecutor webSocketResponseChannelExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setThreadNamePrefix("WebSocketResponseChannel-"); - return executor; - } - - // Handling of messages by the application - - @Bean - public SimpAnnotationMethodMessageHandler annotationMethodMessageHandler() { - - SimpAnnotationMethodMessageHandler handler = - new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel()); - - handler.setDestinationPrefixes(getMessageBrokerConfigurer().getApplicationDestinationPrefixes()); - handler.setMessageConverter(simpMessageConverter()); - webSocketRequestChannel().subscribe(handler); - return handler; - } - - @Bean - public AbstractBrokerMessageHandler simpleBrokerMessageHandler() { - AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getSimpleBroker(); - if (handler == null) { - return noopBroker; - } - else { - webSocketRequestChannel().subscribe(handler); - brokerChannel().subscribe(handler); - return handler; - } - } - - @Bean - public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { - AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getStompBrokerRelay(); - if (handler == null) { - return noopBroker; - } - else { - webSocketRequestChannel().subscribe(handler); - brokerChannel().subscribe(handler); - return handler; - } - } - - protected final MessageBrokerConfigurer getMessageBrokerConfigurer() { - if (this.messageBrokerConfigurer == null) { - MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketResponseChannel()); - configureMessageBroker(configurer); - this.messageBrokerConfigurer = configurer; - } - return this.messageBrokerConfigurer; - } - - protected void configureMessageBroker(MessageBrokerConfigurer configurer) { - } - - @Bean - public UserDestinationMessageHandler userDestinationMessageHandler() { - - UserDestinationMessageHandler handler = new UserDestinationMessageHandler( - brokerMessagingTemplate(), userDestinationResolver()); - - webSocketRequestChannel().subscribe(handler); - brokerChannel().subscribe(handler); - return handler; - } - - @Bean - public SimpMessageSendingOperations brokerMessagingTemplate() { - SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel()); - String userDestinationPrefix = getMessageBrokerConfigurer().getUserDestinationPrefix(); - if (userDestinationPrefix != null) { - template.setUserDestinationPrefix(userDestinationPrefix); - } - template.setMessageConverter(simpMessageConverter()); - return template; - } - - @Bean - public AbstractSubscribableChannel brokerChannel() { - return new ExecutorSubscribableChannel(); // synchronous - } - - @Bean - public CompositeMessageConverter simpMessageConverter() { - - DefaultContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); - - List converters = new ArrayList(); - if (jackson2Present) { - converters.add(new MappingJackson2MessageConverter()); - contentTypeResolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON); - } - converters.add(new StringMessageConverter()); - converters.add(new ByteArrayMessageConverter()); - - return new CompositeMessageConverter(converters, contentTypeResolver); - } - - @Bean - public UserDestinationResolver userDestinationResolver() { - DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry()); - String prefix = getMessageBrokerConfigurer().getUserDestinationPrefix(); - if (prefix != null) { - resolver.setUserDestinationPrefix(prefix); - } - return resolver; - } - - - private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) { - - @Override - protected void startInternal() { - } - @Override - protected void stopInternal() { - } - @Override - protected void handleMessageInternal(Message message) { - } - }; - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java index 0c2669658fc..9f5e853f610 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandler.java @@ -79,7 +79,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan private final SimpMessageSendingOperations brokerTemplate; - private final SimpMessageSendingOperations webSocketResponseTemplate; + private final SimpMessageSendingOperations clientMessagingTemplate; private MessageConverter messageConverter; @@ -90,15 +90,15 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan /** * @param brokerTemplate a messaging template to send application messages to the broker - * @param webSocketResponseChannel the channel for messages to WebSocket clients + * @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients) */ public SimpAnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate, - MessageChannel webSocketResponseChannel) { + MessageChannel clientOutboundChannel) { Assert.notNull(brokerTemplate, "brokerTemplate is required"); - Assert.notNull(webSocketResponseChannel, "webSocketReplyChannel is required"); + Assert.notNull(clientOutboundChannel, "clientOutboundChannel is required"); this.brokerTemplate = brokerTemplate; - this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel); + this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel); Collection converters = new ArrayList(); converters.add(new StringMessageConverter()); @@ -117,7 +117,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan public void setMessageConverter(MessageConverter converter) { this.messageConverter = converter; if (converter != null) { - ((AbstractMessageSendingTemplate) this.webSocketResponseTemplate).setMessageConverter(converter); + ((AbstractMessageSendingTemplate) this.clientMessagingTemplate).setMessageConverter(converter); } } @@ -194,7 +194,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan // Annotation-based return value types handlers.add(new SendToMethodReturnValueHandler(this.brokerTemplate, true)); - handlers.add(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate)); + handlers.add(new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate)); // custom return value types handlers.addAll(getCustomReturnValueHandlers()); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java deleted file mode 100644 index e5ab9b4f6b2..00000000000 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/AbstractStompEndpointRegistrationTests.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.config; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; -import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.DefaultHandshakeHandler; -import org.springframework.web.socket.server.HandshakeHandler; -import org.springframework.web.socket.sockjs.SockJsService; -import org.springframework.web.socket.sockjs.transport.TransportType; -import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService; -import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; - -import static org.junit.Assert.*; - - -/** - * Test fixture for {@link AbstractStompEndpointRegistration}. - * - * @author Rossen Stoyanchev - */ -public class AbstractStompEndpointRegistrationTests { - - private SubProtocolWebSocketHandler wsHandler; - - private TaskScheduler scheduler; - - - @Before - public void setup() { - this.wsHandler = new SubProtocolWebSocketHandler(new ExecutorSubscribableChannel()); - this.scheduler = Mockito.mock(TaskScheduler.class); - } - - @Test - public void minimalRegistration() { - - TestStompEndpointRegistration registration = - new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler); - - List mappings = registration.getMappings(); - assertEquals(1, mappings.size()); - - Mapping m1 = mappings.get(0); - assertSame(this.wsHandler, m1.webSocketHandler); - assertEquals("/foo", m1.path); - } - - @Test - public void customHandshakeHandler() { - - DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); - - TestStompEndpointRegistration registration = - new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler); - registration.setHandshakeHandler(handshakeHandler); - - List mappings = registration.getMappings(); - assertEquals(1, mappings.size()); - - Mapping m1 = mappings.get(0); - assertSame(this.wsHandler, m1.webSocketHandler); - assertEquals("/foo", m1.path); - assertSame(handshakeHandler, m1.handshakeHandler); - } - - @Test - public void customHandshakeHandlerPassedToSockJsService() { - - DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); - - TestStompEndpointRegistration registration = - new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler); - registration.setHandshakeHandler(handshakeHandler); - registration.withSockJS(); - - List mappings = registration.getMappings(); - assertEquals(1, mappings.size()); - - Mapping m1 = mappings.get(0); - assertSame(this.wsHandler, m1.webSocketHandler); - assertEquals("/foo/**", m1.path); - assertNotNull(m1.sockJsService); - - WebSocketTransportHandler transportHandler = - (WebSocketTransportHandler) m1.sockJsService.getTransportHandlers().get(TransportType.WEBSOCKET); - assertSame(handshakeHandler, transportHandler.getHandshakeHandler()); - } - - - private static class TestStompEndpointRegistration extends AbstractStompEndpointRegistration> { - - public TestStompEndpointRegistration(String[] paths, SubProtocolWebSocketHandler wsh, TaskScheduler scheduler) { - super(paths, wsh, scheduler); - } - - @Override - protected List createMappings() { - return new ArrayList<>(); - } - - @Override - protected void addSockJsServiceMapping(List mappings, SockJsService sockJsService, - WebSocketHandler wsHandler, String pathPattern) { - - mappings.add(new Mapping(wsHandler, pathPattern, sockJsService)); - } - - @Override - protected void addWebSocketHandlerMapping(List mappings, WebSocketHandler wsHandler, - HandshakeHandler handshakeHandler, String path) { - - mappings.add(new Mapping(wsHandler, path, handshakeHandler)); - } - } - - private static class Mapping { - - private final WebSocketHandler webSocketHandler; - - private final String path; - - private final HandshakeHandler handshakeHandler; - - private final DefaultSockJsService sockJsService; - - public Mapping(WebSocketHandler handler, String path, SockJsService sockJsService) { - this.webSocketHandler = handler; - this.path = path; - this.handshakeHandler = null; - this.sockJsService = (DefaultSockJsService) sockJsService; - } - - public Mapping(WebSocketHandler h, String path, HandshakeHandler hh) { - this.webSocketHandler = h; - this.path = path; - this.handshakeHandler = hh; - this.sockJsService = null; - } - } - -} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java similarity index 70% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index af68277a364..70907050be7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupportTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -25,7 +25,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.annotation.SubscribeMapping; import org.springframework.messaging.simp.handler.SimpAnnotationMethodMessageHandler; @@ -35,7 +34,6 @@ import org.springframework.messaging.simp.handler.UserSessionRegistry; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; -import org.springframework.messaging.simp.stomp.StompTextMessageBuilder; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.channel.AbstractSubscribableChannel; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; @@ -43,24 +41,19 @@ import org.springframework.messaging.support.converter.CompositeMessageConverter import org.springframework.messaging.support.converter.DefaultContentTypeResolver; import org.springframework.stereotype.Controller; import org.springframework.util.MimeTypeUtils; -import org.springframework.web.servlet.HandlerMapping; -import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.support.TestWebSocketSession; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static org.junit.Assert.*; /** - * Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}. + * Test fixture for {@link AbstractMessageBrokerConfiguration}. * * @author Rossen Stoyanchev */ -public class WebSocketMessageBrokerConfigurationSupportTests { +public class MessageBrokerConfigurationTests { private AnnotationConfigApplicationContext cxtSimpleBroker; @@ -71,29 +64,19 @@ public class WebSocketMessageBrokerConfigurationSupportTests { public void setupOnce() { this.cxtSimpleBroker = new AnnotationConfigApplicationContext(); - this.cxtSimpleBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestSimpleMessageBrokerConfig.class); + this.cxtSimpleBroker.register(TestMessageBrokerConfiguration.class); this.cxtSimpleBroker.refresh(); this.cxtStompBroker = new AnnotationConfigApplicationContext(); - this.cxtStompBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestStompMessageBrokerConfig.class); + this.cxtStompBroker.register(TestStompMessageBrokerConfig.class); this.cxtStompBroker.refresh(); } - @Test - public void handlerMapping() { - - SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.cxtSimpleBroker.getBean(HandlerMapping.class); - assertEquals(1, hm.getOrder()); - - Map handlerMap = hm.getHandlerMap(); - assertEquals(1, handlerMap.size()); - assertNotNull(handlerMap.get("/simpleBroker")); - } @Test - public void webSocketRequestChannel() { + public void clientInboundChannel() { - TestChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", TestChannel.class); + TestChannel channel = this.cxtSimpleBroker.getBean("clientInboundChannel", TestChannel.class); List handlers = channel.handlers; assertEquals(3, handlers.size()); @@ -103,8 +86,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } @Test - public void webSocketRequestChannelWithStompBroker() { - TestChannel channel = this.cxtStompBroker.getBean("webSocketRequestChannel", TestChannel.class); + public void clientInboundChannelWithStompBroker() { + TestChannel channel = this.cxtStompBroker.getBean("clientInboundChannel", TestChannel.class); List values = channel.handlers; assertEquals(3, values.size()); @@ -114,34 +97,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } @Test - public void webSocketRequestChannelSendMessage() throws Exception { - - TestChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", TestChannel.class); - SubProtocolWebSocketHandler webSocketHandler = this.cxtSimpleBroker.getBean(SubProtocolWebSocketHandler.class); - - TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build(); - webSocketHandler.handleMessage(new TestWebSocketSession(), textMessage); - - Message message = channel.messages.get(0); - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - - assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); - assertEquals("/foo", headers.getDestination()); - } - - @Test - public void webSocketResponseChannel() { - TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class); - List values = channel.handlers; - - assertEquals(1, values.size()); - assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler); - } - - @Test - public void webSocketResponseChannelUsedByAnnotatedMethod() { + public void clientOutboundChannelUsedByAnnotatedMethod() { - TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class); + TestChannel channel = this.cxtSimpleBroker.getBean("clientOutboundChannel", TestChannel.class); SimpAnnotationMethodMessageHandler messageHandler = this.cxtSimpleBroker.getBean(SimpAnnotationMethodMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); @@ -161,8 +119,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } @Test - public void webSocketResponseChannelUsedBySimpleBroker() { - TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class); + public void clientOutboundChannelUsedBySimpleBroker() { + TestChannel channel = this.cxtSimpleBroker.getBean("clientOutboundChannel", TestChannel.class); SimpleBrokerMessageHandler broker = this.cxtSimpleBroker.getBean(SimpleBrokerMessageHandler.class); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); @@ -252,7 +210,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests { @Test public void messageConverter() { CompositeMessageConverter messageConverter = this.cxtStompBroker.getBean( - "simpMessageConverter", CompositeMessageConverter.class); + "brokerMessageConverter", CompositeMessageConverter.class); DefaultContentTypeResolver resolver = (DefaultContentTypeResolver) messageConverter.getContentTypeResolver(); assertEquals(MimeTypeUtils.APPLICATION_JSON, resolver.getDefaultMimeType()); @@ -275,50 +233,26 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } @Configuration - static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { - - @Override - public void registerStompEndpoints(StompEndpointRegistry registry) { - registry.addEndpoint("/simpleBroker"); - } - - @Override - public void configureMessageBroker(MessageBrokerConfigurer configurer) { - // SimpleBroker used by default - } + static class TestMessageBrokerConfiguration extends AbstractMessageBrokerConfiguration { @Bean public TestController subscriptionController() { return new TestController(); } - } - - @Configuration - static class TestStompMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { - - @Override - public void registerStompEndpoints(StompEndpointRegistry registry) { - registry.addEndpoint("/stompBrokerRelay"); - } @Override - public void configureMessageBroker(MessageBrokerConfigurer configurer) { - configurer.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(false); + protected void configureMessageBroker(MessageBrokerRegistry registry) { } - } - - @Configuration - static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { @Override @Bean - public AbstractSubscribableChannel webSocketRequestChannel() { + public AbstractSubscribableChannel clientInboundChannel() { return new TestChannel(); } @Override @Bean - public AbstractSubscribableChannel webSocketResponseChannel() { + public AbstractSubscribableChannel clientOutboundChannel() { return new TestChannel(); } @@ -328,6 +262,16 @@ public class WebSocketMessageBrokerConfigurationSupportTests { } } + @Configuration + static class TestStompMessageBrokerConfig extends TestMessageBrokerConfiguration { + + @Override + public void configureMessageBroker(MessageBrokerRegistry registry) { + registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(false); + } + } + + private static class TestChannel extends ExecutorSubscribableChannel { private final List handlers = new ArrayList<>(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandlerTests.java index 6f9e757ae52..1c872ea1ea6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodMessageHandlerTests.java @@ -146,9 +146,9 @@ public class SimpAnnotationMethodMessageHandlerTests { private static class TestSimpAnnotationMethodMessageHandler extends SimpAnnotationMethodMessageHandler { public TestSimpAnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate, - MessageChannel webSocketResponseChannel) { + MessageChannel clientOutboundChannel) { - super(brokerTemplate, webSocketResponseChannel); + super(brokerTemplate, clientOutboundChannel); } public void registerHandler(Object handler) { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompHeaderAccessorTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompHeaderAccessorTests.java index e3614e4ec5f..24c77e96a00 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompHeaderAccessorTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompHeaderAccessorTests.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; import org.junit.Test; -import org.springframework.http.MediaType; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.simp.SimpMessageType; @@ -146,7 +145,7 @@ public class StompHeaderAccessorTests { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE); headers.setSubscriptionId("s1"); headers.setDestination("/d"); - headers.setContentType(MediaType.APPLICATION_JSON); + headers.setContentType(MimeTypeUtils.APPLICATION_JSON); Map> actual = headers.toNativeHeaderMap(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java similarity index 95% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 4aa333b0b8c..b6df3649ac4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.simp.stomp; +package org.springframework.web.socket.messaging; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,9 +28,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.handler.websocket.SubProtocolHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.handler.UserSessionRegistry; +import org.springframework.messaging.simp.stomp.*; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; @@ -46,7 +46,7 @@ import org.springframework.web.socket.WebSocketSession; * @author Andy Wilkinson * @since 4.0 */ -public class StompProtocolHandler implements SubProtocolHandler { +public class StompSubProtocolHandler implements SubProtocolHandler { /** * The name of the header set on the CONNECTED frame indicating the name of the user @@ -54,7 +54,7 @@ public class StompProtocolHandler implements SubProtocolHandler { */ public static final String CONNECTED_USER_HEADER = "user-name"; - private static final Log logger = LogFactory.getLog(StompProtocolHandler.class); + private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class); private final StompDecoder stompDecoder = new StompDecoder(); @@ -174,7 +174,7 @@ public class StompProtocolHandler implements SubProtocolHandler { try { message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); - byte[] bytes = this.stompEncoder.encode((Message)message); + byte[] bytes = this.stompEncoder.encode((Message) message); session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8")))); } catch (Throwable t) { @@ -219,7 +219,6 @@ public class StompProtocolHandler implements SubProtocolHandler { if (principal != null) { headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); if (this.userSessionRegistry != null) { - String suffix = session.getId(); this.userSessionRegistry.registerSessionId(principal.getName(), session.getId()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolHandler.java similarity index 98% rename from spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolHandler.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolHandler.java index fa780ed94df..8781dc86b4b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.handler.websocket; +package org.springframework.web.socket.messaging; import java.util.List; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java similarity index 87% rename from spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index 01d71a9cfb7..1bdca79d5ad 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.handler.websocket; +package org.springframework.web.socket.messaging; import java.util.Arrays; import java.util.HashSet; @@ -40,10 +40,15 @@ import org.springframework.web.socket.WebSocketSession; /** - * A {@link WebSocketHandler} that delegates messages to a {@link SubProtocolHandler} - * based on the sub-protocol value requested by the client through the - * {@code Sec-WebSocket-Protocol} request header A default handler can also be configured - * to use if the client does not request a specific sub-protocol. + * An implementation of {@link WebSocketHandler} that delegates incoming WebSocket + * messages to a {@link SubProtocolHandler} along with a {@link MessageChannel} to + * which the sub-protocol handler can send messages from WebSocket clients to + * the application. + *

+ * Also an implementation of {@link MessageHandler} that finds the WebSocket + * session associated with the {@link Message} and passes it, along with the message, + * to the sub-protocol handler to send messages from the application back to the + * client. * * @author Rossen Stoyanchev * @author Andy Wilkinson @@ -54,7 +59,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan private final Log logger = LogFactory.getLog(SubProtocolWebSocketHandler.class); - private final MessageChannel outputChannel; + private final MessageChannel clientOutboundChannel; private final Map protocolHandlers = new TreeMap(String.CASE_INSENSITIVE_ORDER); @@ -64,12 +69,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan private final Map sessions = new ConcurrentHashMap(); - /** - * @param outputChannel - */ - public SubProtocolWebSocketHandler(MessageChannel outputChannel) { - Assert.notNull(outputChannel, "outputChannel is required"); - this.outputChannel = outputChannel; + public SubProtocolWebSocketHandler(MessageChannel clientOutboundChannel) { + Assert.notNull(clientOutboundChannel, "clientOutboundChannel is required"); + this.clientOutboundChannel = clientOutboundChannel; } @@ -141,7 +143,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { this.sessions.put(session.getId(), session); - findProtocolHandler(session).afterSessionStarted(session, this.outputChannel); + findProtocolHandler(session).afterSessionStarted(session, this.clientOutboundChannel); } protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) { @@ -172,7 +174,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - findProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel); + findProtocolHandler(session).handleMessageFromClient(session, message, this.clientOutboundChannel); } @Override @@ -221,7 +223,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { this.sessions.remove(session.getId()); - findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel); + findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientOutboundChannel); } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java similarity index 76% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java index bb1d572e8a5..3cb0a5e1493 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/DelegatingWebSocketMessageBrokerConfiguration.java @@ -14,21 +14,22 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.util.CollectionUtils; /** - * A {@link WebSocketMessageBrokerConfiguration} extension that detects beans of type - * {@link WebSocketMessageBrokerConfigurer} and delegates to all of them allowing callback - * style customization of the configuration provided in - * {@link WebSocketMessageBrokerConfigurationSupport}. + * A {@link WebSocketMessageBrokerConfigurationSupport} extension that detects beans of type + * {@link WebSocketMessageBrokerConfigurer} + * and delegates to all of them allowing callback style customization of the + * configuration provided in {@link WebSocketMessageBrokerConfigurationSupport}. * *

This class is typically imported via {@link EnableWebSocketMessageBroker}. * @@ -49,6 +50,7 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess this.configurers.addAll(configurers); } + @Override protected void registerStompEndpoints(StompEndpointRegistry registry) { for (WebSocketMessageBrokerConfigurer c : this.configurers) { @@ -57,9 +59,9 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess } @Override - protected void configureMessageBroker(MessageBrokerConfigurer configurer) { + protected void configureMessageBroker(MessageBrokerRegistry registry) { for (WebSocketMessageBrokerConfigurer c : this.configurers) { - c.configureMessageBroker(configurer); + c.configureMessageBroker(registry); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/EnableWebSocketMessageBroker.java similarity index 88% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/EnableWebSocketMessageBroker.java index 5251c4cf985..6795c7b2179 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/EnableWebSocketMessageBroker.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -47,9 +47,9 @@ import org.springframework.context.annotation.Import; * } * * @Bean - * public void configureMessageBroker(MessageBrokerConfigurer configurer) { - * configurer.enableStompBrokerRelay("/queue/", "/topic/"); - * configurer.setApplicationDestinationPrefixes("/app/"); + * public void configureMessageBroker(MessageBrokerRegistry registry) { + * registry.enableStompBrokerRelay("/queue/", "/topic/"); + * registry.setApplicationDestinationPrefixes("/app/"); * } * } * @@ -62,4 +62,5 @@ import org.springframework.context.annotation.Import; @Documented @Import(DelegatingWebSocketMessageBrokerConfiguration.class) public @interface EnableWebSocketMessageBroker { + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompEndpointRegistry.java similarity index 73% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompEndpointRegistry.java index 2621474cb59..8889e5ee72b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompEndpointRegistry.java @@ -14,20 +14,19 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; /** - * Provides methods for configuring STOMP protocol handlers at specific URL paths. + * A contract for registering STOMP over WebSocket endpoints. * * @author Rossen Stoyanchev * @since 4.0 */ public interface StompEndpointRegistry { - /** - * Expose a STOMP endpoint at the specified URL path (or paths_. + * Register a STOMP over WebSocket endpoint at the given mapping path. */ - StompEndpointRegistration addEndpoint(String... paths); + StompWebSocketEndpointRegistration addEndpoint(String... paths); } \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompWebSocketEndpointRegistration.java similarity index 76% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompWebSocketEndpointRegistration.java index 28801aa1320..c501f63d570 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/StompWebSocketEndpointRegistration.java @@ -14,28 +14,27 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; import org.springframework.web.socket.server.HandshakeHandler; import org.springframework.web.socket.server.config.SockJsServiceRegistration; /** - * Provides methods for configuring a STOMP protocol handler including enabling SockJS - * fallback options. + * A contract for configuring a STOMP over WebSocket endpoint. * * @author Rossen Stoyanchev * @since 4.0 */ -public interface StompEndpointRegistration { +public interface StompWebSocketEndpointRegistration { /** - * Configure the HandshakeHandler to use. + * Enable SockJS fallback options. */ - StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler); + SockJsServiceRegistration withSockJS(); /** - * Enable SockJS fallback options. + * Configure the HandshakeHandler to use. */ - SockJsServiceRegistration withSockJS(); + StompWebSocketEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler); } \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistry.java similarity index 60% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistry.java index 0dec614f1d7..150d6696b61 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistry.java @@ -14,16 +14,12 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; import org.springframework.messaging.simp.handler.UserSessionRegistry; -import org.springframework.messaging.simp.stomp.StompProtocolHandler; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; @@ -31,42 +27,47 @@ import org.springframework.web.HttpRequestHandler; import org.springframework.web.servlet.handler.AbstractHandlerMapping; import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import org.springframework.web.socket.support.WebSocketHandlerDecorator; /** - * A helper class for configuring STOMP protocol handling over WebSocket. + * A registry for STOMP over WebSocket endpoints that maps the endpoints with a + * {@link SimpleUrlHandlerMapping} for use in Spring MVC. * * @author Rossen Stoyanchev * @since 4.0 */ -public class ServletStompEndpointRegistry implements StompEndpointRegistry { +public class WebMvcStompEndpointRegistry implements StompEndpointRegistry { private final WebSocketHandler webSocketHandler; private final SubProtocolWebSocketHandler subProtocolWebSocketHandler; - private final StompProtocolHandler stompHandler; + private final StompSubProtocolHandler stompHandler; - private final List registrations = new ArrayList(); + private final List registrations = + new ArrayList(); private final TaskScheduler sockJsScheduler; + private int order = 1; - public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler, + + public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler, UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) { Assert.notNull(webSocketHandler); Assert.notNull(userSessionRegistry); this.webSocketHandler = webSocketHandler; - this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler); - this.stompHandler = new StompProtocolHandler(); + this.subProtocolWebSocketHandler = unwrapSubProtocolWebSocketHandler(webSocketHandler); + this.stompHandler = new StompSubProtocolHandler(); this.stompHandler.setUserSessionRegistry(userSessionRegistry); this.sockJsScheduler = defaultSockJsTaskScheduler; } - private static SubProtocolWebSocketHandler findSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) { + private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) { WebSocketHandler actual = (webSocketHandler instanceof WebSocketHandlerDecorator) ? ((WebSocketHandlerDecorator) webSocketHandler).getLastHandler() : webSocketHandler; @@ -79,12 +80,30 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { @Override - public StompEndpointRegistration addEndpoint(String... paths) { + public StompWebSocketEndpointRegistration addEndpoint(String... paths) { + this.subProtocolWebSocketHandler.addProtocolHandler(this.stompHandler); - ServletStompEndpointRegistration r = new ServletStompEndpointRegistration( - paths, this.webSocketHandler, this.sockJsScheduler); - this.registrations.add(r); - return r; + Set subProtocols = this.subProtocolWebSocketHandler.getSupportedProtocols(); + + WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration( + paths, this.webSocketHandler, subProtocols, this.sockJsScheduler); + this.registrations.add(registration); + + return registration; + } + + /** + * Set the order for the resulting {@link SimpleUrlHandlerMapping} relative to + * other handler mappings configured in Spring MVC. + *

+ * The default value is 1. + */ + public void setOrder(int order) { + this.order = order; + } + + public int getOrder() { + return this.order; } /** @@ -92,7 +111,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { */ protected AbstractHandlerMapping getHandlerMapping() { Map urlMap = new LinkedHashMap(); - for (ServletStompEndpointRegistration registration : this.registrations) { + for (WebMvcStompWebSocketEndpointRegistration registration : this.registrations) { MultiValueMap mappings = registration.getMappings(); for (HttpRequestHandler httpHandler : mappings.keySet()) { for (String pattern : mappings.get(httpHandler)) { @@ -102,6 +121,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { } SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping(); hm.setUrlMap(urlMap); + hm.setOrder(this.order); return hm; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompWebSocketEndpointRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompWebSocketEndpointRegistration.java new file mode 100644 index 00000000000..8b5cf694968 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebMvcStompWebSocketEndpointRegistration.java @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.messaging.config; + +import java.util.Set; + +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.DefaultHandshakeHandler; +import org.springframework.web.socket.server.HandshakeHandler; +import org.springframework.web.socket.server.config.SockJsServiceRegistration; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsService; +import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; + + +/** + * An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebMvcStompWebSocketEndpointRegistration implements StompWebSocketEndpointRegistration { + + private final String[] paths; + + private final WebSocketHandler webSocketHandler; + + private final String[] subProtocols; + + private final TaskScheduler sockJsTaskScheduler; + + private HandshakeHandler handshakeHandler; + + private StompSockJsServiceRegistration registration; + + + public WebMvcStompWebSocketEndpointRegistration(String[] paths, WebSocketHandler webSocketHandler, + Set subProtocols, TaskScheduler sockJsTaskScheduler) { + + Assert.notEmpty(paths, "No paths specified"); + Assert.notNull(webSocketHandler, "'webSocketHandler' is required"); + Assert.notNull(subProtocols, "'subProtocols' is required"); + + this.paths = paths; + this.webSocketHandler = webSocketHandler; + this.subProtocols = subProtocols.toArray(new String[subProtocols.size()]); + this.sockJsTaskScheduler = sockJsTaskScheduler; + + this.handshakeHandler = new DefaultHandshakeHandler(); + updateHandshakeHandler(); + } + + private void updateHandshakeHandler() { + if (handshakeHandler instanceof DefaultHandshakeHandler) { + DefaultHandshakeHandler defaultHandshakeHandler = (DefaultHandshakeHandler) handshakeHandler; + if (ObjectUtils.isEmpty(defaultHandshakeHandler.getSupportedProtocols())) { + defaultHandshakeHandler.setSupportedProtocols(this.subProtocols); + } + } + } + + /** + * Provide a custom or pre-configured {@link HandshakeHandler}. + */ + @Override + public StompWebSocketEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) { + Assert.notNull(handshakeHandler, "'handshakeHandler' must not be null"); + this.handshakeHandler = handshakeHandler; + updateHandshakeHandler(); + return this; + } + + /** + * Enable SockJS fallback options. + */ + @Override + public SockJsServiceRegistration withSockJS() { + this.registration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler); + WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler); + this.registration.setTransportHandlerOverrides(transportHandler); + return this.registration; + } + + protected final MultiValueMap getMappings() { + MultiValueMap mappings = new LinkedMultiValueMap(); + if (this.registration != null) { + SockJsService sockJsService = this.registration.getSockJsService(); + for (String path : this.paths) { + String pattern = path.endsWith("/") ? path + "**" : path + "/**"; + SockJsHttpRequestHandler handler = new SockJsHttpRequestHandler(sockJsService, this.webSocketHandler); + mappings.add(handler, pattern); + } + } + else { + for (String path : this.paths) { + WebSocketHttpRequestHandler handler = + new WebSocketHttpRequestHandler(this.webSocketHandler, this.handshakeHandler); + mappings.add(handler, path); + } + } + return mappings; + } + + + private static class StompSockJsServiceRegistration extends SockJsServiceRegistration { + + public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) { + super(defaultTaskScheduler); + } + + protected SockJsService getSockJsService() { + return super.getSockJsService(); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupport.java new file mode 100644 index 00000000000..0461099bcdc --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupport.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.messaging.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.config.SockJsServiceRegistration; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; + + +/** + * Extends {@link AbstractMessageBrokerConfiguration} and adds configuration for + * receiving and responding to STOMP messages from WebSocket clients. + *

+ * Typically used in conjunction with + * {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} but can + * also be extended directly. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration { + + + protected WebSocketMessageBrokerConfigurationSupport() { + } + + @Bean + public HandlerMapping stompWebSocketHandlerMapping() { + + WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry( + subProtocolWebSocketHandler(), userSessionRegistry(), messageBrokerSockJsTaskScheduler()); + + registerStompEndpoints(registry); + return registry.getHandlerMapping(); + } + + @Bean + public WebSocketHandler subProtocolWebSocketHandler() { + SubProtocolWebSocketHandler handler = new SubProtocolWebSocketHandler(clientInboundChannel()); + clientOutboundChannel().subscribe(handler); + return handler; + } + + /** + * The default TaskScheduler to use if none is configured via + * {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e. + *

+	 * @Configuration
+	 * @EnableWebSocketMessageBroker
+	 * public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+	 *
+	 *   public void registerStompEndpoints(StompEndpointRegistry registry) {
+	 *     registry.addEndpoint("/stomp").withSockJS().setTaskScheduler(myScheduler());
+	 *   }
+	 *
+	 *   // ...
+	 *
+	 * }
+	 * 
+ */ + @Bean + public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("MessageBrokerSockJS-"); + return scheduler; + } + + protected abstract void registerStompEndpoints(StompEndpointRegistry registry); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java similarity index 68% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java rename to spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java index 48ea9b51ffc..5a3e00b75c9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurer.java @@ -14,12 +14,15 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; /** - * Defines callback methods to configure broker-backed messaging over WebSocket via + * Defines methods for configuring message handling with simple messaging + * protocols (e.g. STOMP) from WebSocket clients. Typically used to customize + * the configuration provided via * {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker}. * * @author Rossen Stoyanchev @@ -28,13 +31,13 @@ package org.springframework.messaging.simp.config; public interface WebSocketMessageBrokerConfigurer { /** - * Configure STOMP protocol handling over WebSocket at a specific URL. + * Configure STOMP over WebSocket endpoints. */ void registerStompEndpoints(StompEndpointRegistry registry); /** * Configure message broker options. */ - void configureMessageBroker(MessageBrokerConfigurer configurer); + void configureMessageBroker(MessageBrokerRegistry registry); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java index 98acb3b6ddb..0fac5055f01 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java @@ -84,6 +84,13 @@ public class WebSocketHttpRequestHandler implements HttpRequestHandler { return this.wsHandler; } + /** + * Return the HandshakeHandler. + */ + public HandshakeHandler getHandshakeHandler() { + return this.handshakeHandler; + } + /** * Configure one or more WebSocket handshake request interceptors. */ diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java similarity index 92% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodIntegrationTests.java rename to spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java index 3d83450ebfa..377a4d1e1ad 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpAnnotationMethodIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SimpAnnotationMethodIntegrationTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.simp.handler; +package org.springframework.web.socket.messaging; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -36,10 +36,7 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; -import org.springframework.messaging.simp.config.DelegatingWebSocketMessageBrokerConfiguration; -import org.springframework.messaging.simp.config.MessageBrokerConfigurer; -import org.springframework.messaging.simp.config.StompEndpointRegistry; -import org.springframework.messaging.simp.config.WebSocketMessageBrokerConfigurer; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.support.channel.AbstractSubscribableChannel; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; @@ -52,10 +49,13 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter; import org.springframework.web.socket.client.endpoint.StandardWebSocketClient; import org.springframework.web.socket.client.jetty.JettyWebSocketClient; +import org.springframework.web.socket.messaging.config.DelegatingWebSocketMessageBrokerConfiguration; +import org.springframework.web.socket.messaging.config.StompEndpointRegistry; +import org.springframework.web.socket.messaging.config.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.server.HandshakeHandler; import static org.junit.Assert.*; -import static org.springframework.messaging.simp.stomp.StompTextMessageBuilder.*; +import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*; /** @@ -216,7 +216,7 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg } @Override - public void configureMessageBroker(MessageBrokerConfigurer configurer) { + public void configureMessageBroker(MessageBrokerRegistry configurer) { configurer.setApplicationDestinationPrefixes("/app"); configurer.enableSimpleBroker("/topic", "/queue"); } @@ -227,13 +227,13 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg @Override @Bean - public AbstractSubscribableChannel webSocketRequestChannel() { + public AbstractSubscribableChannel clientInboundChannel() { return new ExecutorSubscribableChannel(); // synchronous } @Override @Bean - public AbstractSubscribableChannel webSocketResponseChannel() { + public AbstractSubscribableChannel clientOutboundChannel() { return new ExecutorSubscribableChannel(); // synchronous } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java similarity index 90% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java rename to spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java index bcc5ad7e05b..8dcc2de53d3 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.simp.stomp; +package org.springframework.web.socket.messaging; import java.nio.ByteBuffer; import java.util.Arrays; @@ -29,6 +29,9 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.TestPrincipal; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompDecoder; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.support.TestWebSocketSession; @@ -37,13 +40,13 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** - * Test fixture for {@link StompProtocolHandler} tests. + * Test fixture for {@link StompSubProtocolHandler} tests. * * @author Rossen Stoyanchev */ -public class StompProtocolHandlerTests { +public class StompSubProtocolHandlerTests { - private StompProtocolHandler stompHandler; + private StompSubProtocolHandler stompHandler; private TestWebSocketSession session; @@ -54,7 +57,7 @@ public class StompProtocolHandlerTests { @Before public void setup() { - this.stompHandler = new StompProtocolHandler(); + this.stompHandler = new StompSubProtocolHandler(); this.channel = Mockito.mock(MessageChannel.class); this.messageCaptor = ArgumentCaptor.forClass(Message.class); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompTextMessageBuilder.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompTextMessageBuilder.java similarity index 94% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompTextMessageBuilder.java rename to spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompTextMessageBuilder.java index e8c2df57f30..7a2f2e60cca 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompTextMessageBuilder.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompTextMessageBuilder.java @@ -14,12 +14,13 @@ * limitations under the License. */ -package org.springframework.messaging.simp.stomp; +package org.springframework.web.socket.messaging; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.web.socket.TextMessage; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java similarity index 98% rename from spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java rename to spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java index ce7b66a9dc0..b78b4f83ce0 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.handler.websocket; +package org.springframework.web.socket.messaging; import java.util.Arrays; diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistrationTests.java new file mode 100644 index 00000000000..0e1283e7fb6 --- /dev/null +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistrationTests.java @@ -0,0 +1,124 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.messaging.config; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.MultiValueMap; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; +import org.springframework.web.socket.server.DefaultHandshakeHandler; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; +import org.springframework.web.socket.sockjs.transport.TransportType; +import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService; +import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; + +import static org.junit.Assert.*; + + +/** + * Test fixture for {@link WebMvcStompWebSocketEndpointRegistration}. + * + * @author Rossen Stoyanchev + */ +public class WebMvcStompEndpointRegistrationTests { + + private SubProtocolWebSocketHandler wsHandler; + + private TaskScheduler scheduler; + + + @Before + public void setup() { + this.wsHandler = new SubProtocolWebSocketHandler(new ExecutorSubscribableChannel()); + this.scheduler = Mockito.mock(TaskScheduler.class); + } + + @Test + public void minimalRegistration() { + + + WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration( + new String[] {"/foo"}, this.wsHandler, Collections.emptySet(), this.scheduler); + + MultiValueMap mappings = registration.getMappings(); + assertEquals(1, mappings.size()); + + Map.Entry> entry = mappings.entrySet().iterator().next(); + assertNotNull(((WebSocketHttpRequestHandler) entry.getKey()).getWebSocketHandler()); + assertEquals(Arrays.asList("/foo"), entry.getValue()); + } + + @Test + public void customHandshakeHandler() { + + DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); + + WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration( + new String[] {"/foo"}, this.wsHandler, Collections.emptySet(), this.scheduler); + + registration.setHandshakeHandler(handshakeHandler); + + MultiValueMap mappings = registration.getMappings(); + assertEquals(1, mappings.size()); + + Map.Entry> entry = mappings.entrySet().iterator().next(); + assertEquals(Arrays.asList("/foo"), entry.getValue()); + + WebSocketHttpRequestHandler requestHandler = (WebSocketHttpRequestHandler) entry.getKey(); + assertNotNull(requestHandler.getWebSocketHandler()); + assertSame(handshakeHandler, requestHandler.getHandshakeHandler()); + } + + @Test + public void customHandshakeHandlerPassedToSockJsService() { + + DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler(); + + WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration( + new String[] {"/foo"}, this.wsHandler, Collections.emptySet(), this.scheduler); + + registration.setHandshakeHandler(handshakeHandler); + registration.withSockJS(); + + MultiValueMap mappings = registration.getMappings(); + assertEquals(1, mappings.size()); + + Map.Entry> entry = mappings.entrySet().iterator().next(); + assertEquals(Arrays.asList("/foo/**"), entry.getValue()); + + SockJsHttpRequestHandler requestHandler = (SockJsHttpRequestHandler) entry.getKey(); + assertNotNull(requestHandler.getWebSocketHandler()); + + DefaultSockJsService sockJsService = (DefaultSockJsService) requestHandler.getSockJsService(); + assertNotNull(sockJsService); + + WebSocketTransportHandler transportHandler = + (WebSocketTransportHandler) sockJsService.getTransportHandlers().get(TransportType.WEBSOCKET); + assertSame(handshakeHandler, transportHandler.getHandshakeHandler()); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistryTests.java similarity index 78% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java rename to spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistryTests.java index 480e9f34c70..73f0a2dc7e7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebMvcStompEndpointRegistryTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.messaging.simp.config; +package org.springframework.web.socket.messaging.config; import java.util.Map; @@ -22,25 +22,26 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.handler.websocket.SubProtocolHandler; -import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; import org.springframework.messaging.simp.handler.DefaultUserSessionRegistry; import org.springframework.messaging.simp.handler.UserSessionRegistry; -import org.springframework.messaging.simp.stomp.StompProtocolHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** - * Test fixture for {@link ServletStompEndpointRegistry}. + * Test fixture for {@link WebMvcStompEndpointRegistry}. * * @author Rossen Stoyanchev */ -public class ServletStompEndpointRegistryTests { +public class WebMvcStompEndpointRegistryTests { - private ServletStompEndpointRegistry registry; + private WebMvcStompEndpointRegistry registry; private SubProtocolWebSocketHandler webSocketHandler; @@ -53,7 +54,7 @@ public class ServletStompEndpointRegistryTests { this.webSocketHandler = new SubProtocolWebSocketHandler(channel); this.userSessionRegistry = new DefaultUserSessionRegistry(); TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class); - this.registry = new ServletStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler); + this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler); } @@ -68,7 +69,7 @@ public class ServletStompEndpointRegistryTests { assertNotNull(protocolHandlers.get("v11.stomp")); assertNotNull(protocolHandlers.get("v12.stomp")); - StompProtocolHandler stompHandler = (StompProtocolHandler) protocolHandlers.get("v10.stomp"); + StompSubProtocolHandler stompHandler = (StompSubProtocolHandler) protocolHandlers.get("v10.stomp"); assertSame(this.userSessionRegistry, stompHandler.getUserSessionRegistry()); } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java new file mode 100644 index 00000000000..352f5b002d8 --- /dev/null +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/config/WebSocketMessageBrokerConfigurationSupportTests.java @@ -0,0 +1,179 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.messaging.config; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.annotation.SubscribeMapping; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.web.socket.messaging.StompTextMessageBuilder; +import org.springframework.messaging.support.channel.AbstractSubscribableChannel; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.stereotype.Controller; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; +import org.springframework.web.socket.support.TestWebSocketSession; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + + +/** + * Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}. + * + * @author Rossen Stoyanchev + */ +public class WebSocketMessageBrokerConfigurationSupportTests { + + private AnnotationConfigApplicationContext config; + + + @Before + public void setupOnce() { + this.config = new AnnotationConfigApplicationContext(); + this.config.register(TestWebSocketMessageBrokerConfiguration.class, TestSimpleMessageBrokerConfig.class); + this.config.refresh(); + } + + @Test + public void handlerMapping() { + + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.getBean(HandlerMapping.class); + assertEquals(1, hm.getOrder()); + + Map handlerMap = hm.getHandlerMap(); + assertEquals(1, handlerMap.size()); + assertNotNull(handlerMap.get("/simpleBroker")); + } + + @Test + public void clientInboundChannelSendMessage() throws Exception { + + TestChannel channel = this.config.getBean("clientInboundChannel", TestChannel.class); + SubProtocolWebSocketHandler webSocketHandler = this.config.getBean(SubProtocolWebSocketHandler.class); + + TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build(); + webSocketHandler.handleMessage(new TestWebSocketSession(), textMessage); + + Message message = channel.messages.get(0); + StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + + assertEquals(SimpMessageType.MESSAGE, headers.getMessageType()); + assertEquals("/foo", headers.getDestination()); + } + + @Test + public void clientOutboundChannelChannel() { + TestChannel channel = this.config.getBean("clientOutboundChannel", TestChannel.class); + List values = channel.handlers; + + assertEquals(1, values.size()); + assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler); + } + + + @Controller + static class TestController { + + @SubscribeMapping("/foo") + public String handleSubscribe() { + return "bar"; + } + + @MessageMapping("/foo") + @SendTo("/bar") + public String handleMessage() { + return "bar"; + } + } + + @Configuration + static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer { + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/simpleBroker"); + } + + @Override + public void configureMessageBroker(MessageBrokerRegistry configurer) { + // SimpleBroker used by default + } + + @Bean + public TestController subscriptionController() { + return new TestController(); + } + } + + @Configuration + static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { + + @Override + @Bean + public AbstractSubscribableChannel clientInboundChannel() { + return new TestChannel(); + } + + @Override + @Bean + public AbstractSubscribableChannel clientOutboundChannel() { + return new TestChannel(); + } + + @Override + public AbstractSubscribableChannel brokerChannel() { + return new TestChannel(); + } + } + + private static class TestChannel extends ExecutorSubscribableChannel { + + private final List handlers = new ArrayList<>(); + + private final List> messages = new ArrayList<>(); + + + @Override + public boolean subscribeInternal(MessageHandler handler) { + this.handlers.add(handler); + return super.subscribeInternal(handler); + } + + @Override + public boolean sendInternal(Message message, long timeout) { + this.messages.add(message); + return true; + } + } + +}