diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageSendingOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageSendingOperations.java new file mode 100644 index 00000000000..ff7bc7cbcc0 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageSendingOperations.java @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp; + +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.core.MessageSendingOperations; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface SimpMessageSendingOperations extends MessageSendingOperations { + + void convertAndSendToUser(String user, String destination, T message) throws MessagingException; + + void convertAndSendToUser(String user, String destination, T message, MessagePostProcessor postProcessor) + throws MessagingException; + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java index 15dcf9d5f06..ae7ab32e236 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessagingTemplate.java @@ -20,7 +20,9 @@ import java.util.Arrays; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageDeliveryException; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.core.AbstractMessageSendingTemplate; +import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -32,18 +34,44 @@ import org.springframework.util.Assert; * @author Mark Fisher * @since 4.0 */ -public class SimpMessagingTemplate extends AbstractMessageSendingTemplate { +public class SimpMessagingTemplate extends AbstractMessageSendingTemplate + implements SimpMessageSendingOperations { - private final MessageChannel outputChannel; + private final MessageChannel messageChannel; + + private String userDestinationPrefix = "/user/"; private volatile long sendTimeout = -1; - public SimpMessagingTemplate(MessageChannel outputChannel) { - Assert.notNull(outputChannel, "outputChannel is required"); - this.outputChannel = outputChannel; + public SimpMessagingTemplate(MessageChannel messageChannel) { + Assert.notNull(messageChannel, "outputChannel is required"); + this.messageChannel = messageChannel; + } + + + /** + * Configure the prefix to use for destinations targeting a specific user. + *

The default value is "/user/". + * @see org.springframework.messaging.simp.handler.UserDestinationMessageHandler + */ + public void setUserDestinationPrefix(String prefix) { + this.userDestinationPrefix = prefix; + } + + /** + * @return the userDestinationPrefix + */ + public String getUserDestinationPrefix() { + return this.userDestinationPrefix; } + /** + * @return the messageChannel + */ + public MessageChannel getMessageChannel() { + return this.messageChannel; + } /** * Specify the timeout value to use for send operations. @@ -54,6 +82,13 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate void send(Message

message) { @@ -64,22 +99,35 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate message) { Assert.notNull(destination, "destination is required"); - message = addDestinationToMessage(message, destination); + message = updateMessageHeaders(message, destination); long timeout = this.sendTimeout; boolean sent = (timeout >= 0) - ? this.outputChannel.send(message, timeout) - : this.outputChannel.send(message); + ? this.messageChannel.send(message, timeout) + : this.messageChannel.send(message); if (!sent) { throw new MessageDeliveryException(message, "failed to send message to destination '" + destination + "' within timeout: " + timeout); } } - protected

Message

addDestinationToMessage(Message

message, String destination) { + protected

Message

updateMessageHeaders(Message

message, String destination) { Assert.notNull(destination, "destination is required"); return MessageBuilder.fromMessage(message) .setHeader(SimpMessageHeaderAccessor.MESSAGE_TYPE, SimpMessageType.MESSAGE) .setHeader(SimpMessageHeaderAccessor.DESTINATIONS, Arrays.asList(destination)).build(); } + @Override + public void convertAndSendToUser(String user, String destination, T message) throws MessagingException { + convertAndSendToUser(user, destination, message, null); + } + + @Override + public void convertAndSendToUser(String user, String destination, T message, + MessagePostProcessor postProcessor) throws MessagingException { + + Assert.notNull(user, "user is required"); + convertAndSend(this.userDestinationPrefix + user + destination, message, postProcessor); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java index 82617a31d21..a1c12d9f2ec 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/ReplyToMethodReturnValueHandler.java @@ -17,18 +17,15 @@ package org.springframework.messaging.simp.annotation.support; import java.security.Principal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.core.MessagePostProcessor; -import org.springframework.messaging.core.MessageSendingOperations; import org.springframework.messaging.handler.annotation.ReplyTo; import org.springframework.messaging.handler.method.HandlerMethodReturnValueHandler; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.messaging.simp.annotation.ReplyToUser; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -49,10 +46,10 @@ import org.springframework.util.Assert; */ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValueHandler { - private final MessageSendingOperations messagingTemplate; + private final SimpMessageSendingOperations messagingTemplate; - public ReplyToMethodReturnValueHandler(MessageSendingOperations messagingTemplate) { + public ReplyToMethodReturnValueHandler(SimpMessageSendingOperations messagingTemplate) { Assert.notNull(messagingTemplate, "messagingTemplate is required"); this.messagingTemplate = messagingTemplate; } @@ -72,25 +69,22 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue return; } - ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class); - ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class); + MessagePostProcessor postProcessor = new SessionHeaderPostProcessor(inputMessage); - List destinations = new ArrayList(); + ReplyTo replyTo = returnType.getMethodAnnotation(ReplyTo.class); if (replyTo != null) { - destinations.addAll(Arrays.asList(replyTo.value())); + for (String destination : replyTo.value()) { + this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor); + } } + + ReplyToUser replyToUser = returnType.getMethodAnnotation(ReplyToUser.class); if (replyToUser != null) { - Principal user = getUser(inputMessage); + String user = getUser(inputMessage).getName(); for (String destination : replyToUser.value()) { - destinations.add("/user/" + user.getName() + destination); + this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, postProcessor); } } - - MessagePostProcessor postProcessor = new SessionIdHeaderPostProcessor(inputMessage); - - for (String destination : destinations) { - this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor); - } } private Principal getUser(Message inputMessage) { @@ -103,12 +97,12 @@ public class ReplyToMethodReturnValueHandler implements HandlerMethodReturnValue } - private final class SessionIdHeaderPostProcessor implements MessagePostProcessor { + private final class SessionHeaderPostProcessor implements MessagePostProcessor { private final Message inputMessage; - public SessionIdHeaderPostProcessor(Message inputMessage) { + public SessionHeaderPostProcessor(Message inputMessage) { this.inputMessage = inputMessage; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java index 495e8d68406..cb71a14d4d7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SubscriptionMethodReturnValueHandler.java @@ -74,27 +74,26 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn "No subsriptiondId in input message. Add @ReplyTo or @ReplyToUser to method: " + returnType.getMethod()); - MessagePostProcessor postProcessor = new InputHeaderCopyingPostProcessor(inputHeaders); + MessagePostProcessor postProcessor = new SubscriptionHeaderPostProcessor(inputHeaders); this.messagingTemplate.convertAndSend(destination, returnValue, postProcessor); } - private final class InputHeaderCopyingPostProcessor implements MessagePostProcessor { + private final class SubscriptionHeaderPostProcessor implements MessagePostProcessor { private final SimpMessageHeaderAccessor inputHeaders; - public InputHeaderCopyingPostProcessor(SimpMessageHeaderAccessor inputHeaders) { + public SubscriptionHeaderPostProcessor(SimpMessageHeaderAccessor inputHeaders) { this.inputHeaders = inputHeaders; } @Override public Message postProcessMessage(Message message) { - SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); return MessageBuilder.fromMessage(message) .setHeader(SimpMessageHeaderAccessor.SESSION_ID, this.inputHeaders.getSessionId()) .setHeader(SimpMessageHeaderAccessor.SUBSCRIPTION_ID, this.inputHeaders.getSubscriptionId()) - .copyHeaders(headers.toMap()).build(); + .build(); } } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java index e92a1610c49..6ecd598fdd6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java @@ -34,10 +34,10 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.core.MethodParameter; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.annotation.ReplyTo; import org.springframework.messaging.handler.annotation.support.ExceptionHandlerMethodResolver; import org.springframework.messaging.handler.annotation.support.MessageBodyMethodArgumentResolver; import org.springframework.messaging.handler.method.HandlerMethod; @@ -46,8 +46,8 @@ import org.springframework.messaging.handler.method.HandlerMethodReturnValueHand import org.springframework.messaging.handler.method.HandlerMethodSelector; import org.springframework.messaging.handler.method.InvocableHandlerMethod; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.annotation.SubscribeEvent; import org.springframework.messaging.simp.annotation.UnsubscribeEvent; import org.springframework.messaging.simp.annotation.support.PrincipalMethodArgumentResolver; @@ -68,9 +68,9 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class); - private final MessageChannel inboundChannel; + private final SimpMessageSendingOperations inboundMessagingTemplate; - private final MessageChannel outboundChannel; + private final SimpMessageSendingOperations outboundMessagingTemplate; private MessageConverter messageConverter; @@ -91,14 +91,24 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati /** - * @param inboundChannel a channel for processing incoming messages from clients - * @param outboundChannel a channel for messages going out to clients + * @param inboundMessagingTemplate a template for sending messages on the channel + * where incoming messages from clients are sent; essentially messages sent + * through this template will be re-processed by the application. One example + * is the use of {@link ReplyTo} annotation on a method to send a broadcast + * message. + * @param outboundMessagingTemplate a template for sending messages on the client used + * to send messages back out to connected clients; such messages must have all + * necessary information to reach the client such as session and subscription + * id's. One example is returning a value from an {@link SubscribeEvent} + * method. */ - public AnnotationMethodMessageHandler(MessageChannel inboundChannel, MessageChannel outboundChannel) { - Assert.notNull(inboundChannel, "inboundChannel is required"); - Assert.notNull(outboundChannel, "outboundChannel is required"); - this.inboundChannel = inboundChannel; - this.outboundChannel = outboundChannel; + public AnnotationMethodMessageHandler(SimpMessageSendingOperations inboundMessagingTemplate, + SimpMessageSendingOperations outboundMessagingTemplate) { + + Assert.notNull(inboundMessagingTemplate, "inboundMessagingTemplate is required"); + Assert.notNull(outboundMessagingTemplate, "outboundMessagingTemplate is required"); + this.inboundMessagingTemplate = inboundMessagingTemplate; + this.outboundMessagingTemplate = outboundMessagingTemplate; } /** @@ -121,14 +131,8 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati this.argumentResolvers.addResolver(new PrincipalMethodArgumentResolver()); this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter)); - SimpMessagingTemplate inboundMessagingTemplate = new SimpMessagingTemplate(this.inboundChannel); - inboundMessagingTemplate.setConverter(this.messageConverter); - - SimpMessagingTemplate outboundMessagingTemplate = new SimpMessagingTemplate(this.outboundChannel); - outboundMessagingTemplate.setConverter(this.messageConverter); - - this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(inboundMessagingTemplate)); - this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(outboundMessagingTemplate)); + this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.inboundMessagingTemplate)); + this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.outboundMessagingTemplate)); } protected void initHandlerMethods() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java similarity index 91% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java index e2423a110ad..ffe90a2f745 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionStore.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java @@ -21,7 +21,7 @@ package org.springframework.messaging.simp.handler; * @author Rossen Stoyanchev * @since 4.0 */ -public interface UserSessionStore { +public interface MutableUserSessionResolver extends UserSessionResolver { void storeUserSessionId(String user, String sessionId); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java index f1a6fd88415..334fd289289 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java @@ -27,7 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleUserSessionResolver implements UserSessionResolver, UserSessionStore { +public class SimpleUserSessionResolver implements MutableUserSessionResolver { // userId -> sessionId's private final Map> userSessionIds = new ConcurrentHashMap>(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java index 807763026b9..97b56ec5015 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java @@ -28,7 +28,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; -import org.springframework.messaging.simp.handler.UserSessionStore; +import org.springframework.messaging.simp.handler.MutableUserSessionResolver; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -63,7 +63,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement private MessageChannel clientInputChannel; - private UserSessionStore userSessionStore; + private MutableUserSessionResolver userSessionStore; private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); @@ -85,14 +85,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement * @param userSessionStore the userSessionStore to use to store user session id's * @see UserDestinationMessageHandler */ - public void setUserSessionResolver(UserSessionStore userSessionStore) { + public void setUserSessionResolver(MutableUserSessionResolver userSessionStore) { this.userSessionStore = userSessionStore; } /** * @return the userSessionResolver */ - public UserSessionStore getUserSessionResolver() { + public MutableUserSessionResolver getUserSessionResolver() { return this.userSessionStore; }