diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java similarity index 77% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java index 8ef3397d439..1fe326958d0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java @@ -21,10 +21,10 @@ package org.springframework.messaging.simp.handler; * @author Rossen Stoyanchev * @since 4.0 */ -public interface MutableUserSessionResolver extends UserSessionResolver { +public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver { - void addUserSessionId(String user, String sessionId); + void addQueueSuffix(String user, String sessionId, String suffix); - void removeUserSessionId(String user, String sessionId); + void removeQueueSuffix(String user, String sessionId); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java new file mode 100644 index 00000000000..6b3d018b59f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java @@ -0,0 +1,75 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver { + + // userId -> [sessionId -> queueSuffix] + private final ConcurrentMap> cache = new ConcurrentHashMap>(); + + + @Override + public void addQueueSuffix(String user, String sessionId, String suffix) { + Map suffixes = this.cache.get(user); + if (suffixes == null) { + suffixes = new ConcurrentHashMap(); + Map prevSuffixes = this.cache.putIfAbsent(user, suffixes); + if (prevSuffixes != null) { + suffixes = prevSuffixes; + } + } + suffixes.put(sessionId, suffix); + } + + @Override + public void removeQueueSuffix(String user, String sessionId) { + Map suffixes = this.cache.get(user); + if (suffixes != null) { + if (suffixes.remove(sessionId) != null) { + this.cache.remove(user, Collections.emptyMap()); + } + } + } + + @Override + public Set getUserQueueSuffixes(String user) { + Map suffixes = this.cache.get(user); + return (suffixes != null) ? new HashSet(suffixes.values()) : Collections.emptySet(); + } + + @Override + public String getUserQueueSuffix(String user, String sessionId) { + Map suffixes = this.cache.get(user); + if (suffixes != null) { + return suffixes.get(sessionId); + } + return null; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java deleted file mode 100644 index cd897f30245..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.handler; - -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArraySet; - - -/** - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class SimpleUserSessionResolver implements MutableUserSessionResolver { - - // userId -> sessionId's - private final ConcurrentMap> userSessionIds = new ConcurrentHashMap>(); - - - @Override - public void addUserSessionId(String user, String sessionId) { - Set sessionIds = this.userSessionIds.get(user); - if (sessionIds == null) { - sessionIds = new CopyOnWriteArraySet(); - Set value = this.userSessionIds.putIfAbsent(user, sessionIds); - if (value != null) { - sessionIds = value; - } - } - sessionIds.add(sessionId); - } - - @Override - public void removeUserSessionId(String user, String sessionId) { - Set sessionIds = this.userSessionIds.get(user); - if (sessionIds != null) { - if (sessionIds.remove(sessionId)) { - this.userSessionIds.remove(user, Collections.emptySet()); - } - } - } - - @Override - public Set resolveUserSessionIds(String user) { - Set sessionIds = this.userSessionIds.get(user); - return (sessionIds != null) ? sessionIds : Collections.emptySet(); - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java index dbb4b9bdf1e..91843e92874 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java @@ -30,11 +30,14 @@ import org.springframework.util.StringUtils; /** + * Supports destinations prefixed with "/user/{username}", transforms the + * destination to a unique queue to which the user is subscribed, and then sends + * the message for further processing. * - * Supports destinations prefixed with "/user/{username}" and resolves them into a - * destination to which the user is currently subscribed by appending the user session id. - * For example a destination such as "/user/john/queue/trade-confirmation" would resolve - * to "/queue/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id. + *

The target destination has the prefix removed and a unique queue suffix, + * resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended. + * For example a destination such as "/user/john/queue/trade-confirmation" could + * be transformed to "/queue/trade-confirmation/i9oqdfzo". * * @author Rossen Stoyanchev * @since 4.0 @@ -47,11 +50,22 @@ public class UserDestinationMessageHandler implements MessageHandler { private String prefix = "/user/"; - private UserSessionResolver userSessionResolver = new SimpleUserSessionResolver(); + private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver(); - public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate) { + /** + * + * @param messagingTemplate + * @param resolver the resolver to use to find queue suffixes for a user + */ + public UserDestinationMessageHandler(MessageSendingOperations messagingTemplate, + UserQueueSuffixResolver userQueueSuffixResolver) { + + Assert.notNull(messagingTemplate, "messagingTemplate is required"); + Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required"); + this.messagingTemplate = messagingTemplate; + this.userQueueSuffixResolver = userQueueSuffixResolver; } /** @@ -71,17 +85,10 @@ public class UserDestinationMessageHandler implements MessageHandler { } /** - * @param userSessionResolver the userSessionResolver to set - */ - public void setUserSessionResolver(UserSessionResolver userSessionResolver) { - this.userSessionResolver = userSessionResolver; - } - - /** - * @return the userSessionResolver + * @return the resolver for queue suffixes for a user */ - public UserSessionResolver getUserSessionResolver() { - return this.userSessionResolver; + public UserQueueSuffixResolver getUserQueueSuffixResolver() { + return this.userQueueSuffixResolver; } /** @@ -116,7 +123,7 @@ public class UserDestinationMessageHandler implements MessageHandler { return; } - for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) { + for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) { String targetDestination = destinationParser.getTargetDestination(sessionId); headers.setDestination(targetDestination); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java similarity index 57% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java index 18b06045007..0acf82c253f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java @@ -20,19 +20,29 @@ import java.util.Set; /** - * A strategy for resolving a user name to one or more session id's. + * A strategy for resolving unique queue suffixes for a connected user. + * There can be only one suffix per user per session. * * @author Rossen Stoyanchev * @since 4.0 */ -public interface UserSessionResolver { +public interface UserQueueSuffixResolver { /** - * Retrieve the sessionId(s) associated with the given user. + * Retrieve the suffixes for all sessions associated with this user. * * @param user the user name - * @return a Set with zero, one, or more, current session id's. + * @return a Set with zero, one, or more, queue suffixes */ - Set resolveUserSessionIds(String user); + Set getUserQueueSuffixes(String user); + + /** + * Retrieve the queue suffix associated with the given user session. + * + * @param user the user name + * @param sessionId the session id + * @return a queue suffix or {@code null} + */ + String getUserQueueSuffix(String user, String sessionId); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java index 6e4c1a096da..260de8687f7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java @@ -17,6 +17,7 @@ package org.springframework.messaging.simp.stomp; import java.io.IOException; import java.nio.charset.Charset; +import java.security.Principal; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -27,8 +28,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.handler.MutableUserSessionResolver; -import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -51,17 +51,19 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement public static final String CONNECTED_USER_HEADER = "user-name"; /** - * A suffix unique to the current session that a client can append to a destination. - * @see UserDestinationMessageHandler + * A suffix unique to the current session that a client can use to append to + * a destination to make it unique. + * + * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} */ public static final String QUEUE_SUFFIX_HEADER = "queue-suffix"; private static Log logger = LogFactory.getLog(StompWebSocketHandler.class); - private MessageChannel clientInputChannel; + private MessageChannel dispatchChannel; - private MutableUserSessionResolver userSessionStore; + private MutableUserQueueSuffixResolver queueSuffixResolver; private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); @@ -69,29 +71,27 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement /** - * @param clientInputChannel the channel to which incoming STOMP/WebSocket messages should - * be sent to + * @param dispatchChannel the channel to send client STOMP/WebSocket messages to */ - public StompWebSocketHandler(MessageChannel clientInputChannel) { - Assert.notNull(clientInputChannel, "clientInputChannel is required"); - this.clientInputChannel = clientInputChannel; + public StompWebSocketHandler(MessageChannel dispatchChannel) { + Assert.notNull(dispatchChannel, "dispatchChannel is required"); + this.dispatchChannel = dispatchChannel; } /** - * Configure a store for saving user session information. - * @param userSessionStore the userSessionStore to use to store user session id's - * @see UserDestinationMessageHandler + * Configure a resolver to use to maintain queue suffixes for user + * @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler} */ - public void setUserSessionResolver(MutableUserSessionResolver userSessionStore) { - this.userSessionStore = userSessionStore; + public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) { + this.queueSuffixResolver = resolver; } /** - * @return the userSessionResolver + * @return the resolver for queue suffixes for a user */ - public MutableUserSessionResolver getUserSessionResolver() { - return this.userSessionStore; + public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() { + return this.queueSuffixResolver; } public StompMessageConverter getStompMessageConverter() { @@ -101,12 +101,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages."); this.sessions.put(session.getId(), session); - - if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { - this.userSessionStore.addUserSessionId(session.getPrincipal().getName(), session.getId()); - } } /** @@ -135,7 +130,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement handleConnect(session, message); } - this.clientInputChannel.send(message); + this.dispatchChannel.send(message); } catch (Throwable t) { @@ -172,9 +167,15 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement } connectedHeaders.setHeartbeat(0,0); // TODO - if (session.getPrincipal() != null) { - connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, session.getPrincipal().getName()); + Principal principal = session.getPrincipal(); + if (principal != null) { + connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); connectedHeaders.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); + + if (this.queueSuffixResolver != null) { + String suffix = session.getId(); + this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix); + } } // TODO: security @@ -204,14 +205,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement String sessionId = session.getId(); this.sessions.remove(sessionId); - if ((this.userSessionStore != null) && (session.getPrincipal() != null)) { - this.userSessionStore.removeUserSessionId(session.getPrincipal().getName(), sessionId); + if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) { + this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), sessionId); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId(sessionId); Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - this.clientInputChannel.send(message); + this.dispatchChannel.send(message); } /** diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java similarity index 54% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java index 51a9bc9ab06..8a2386640c7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java @@ -27,12 +27,12 @@ import static org.junit.Assert.*; /** - * Test fixture for {@link SimpleUserSessionResolver} + * Test fixture for {@link SimpleUserQueueSuffixResolver} * * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleUserSessionResolverTests { +public class SimpleUserQueueSuffixResolverTests { private static final String user = "joe"; private static final List sessionIds = Arrays.asList("sess01", "sess02", "sess03"); @@ -41,42 +41,42 @@ public class SimpleUserSessionResolverTests { @Test public void addOneSessionId() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); - resolver.addUserSessionId(user, sessionIds.get(0)); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); + resolver.addQueueSuffix(user, sessionIds.get(0), sessionIds.get(0)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user)); - assertSame(Collections.emptySet(), resolver.resolveUserSessionIds("jane")); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); + assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); } @Test public void addMultipleSessionIds() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); for (String sessionId : sessionIds) { - resolver.addUserSessionId(user, sessionId); + resolver.addQueueSuffix(user, sessionId, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user)); - assertEquals(Collections.emptySet(), resolver.resolveUserSessionIds("jane")); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); + assertEquals(Collections.emptySet(), resolver.getUserQueueSuffixes("jane")); } @Test public void removeSessionIds() { - SimpleUserSessionResolver resolver = new SimpleUserSessionResolver(); + SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver(); for (String sessionId : sessionIds) { - resolver.addUserSessionId(user, sessionId); + resolver.addQueueSuffix(user, sessionId, sessionId); } - assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user)); + assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user)); - resolver.removeUserSessionId(user, sessionIds.get(1)); - resolver.removeUserSessionId(user, sessionIds.get(2)); - assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user)); + resolver.removeQueueSuffix(user, sessionIds.get(1)); + resolver.removeQueueSuffix(user, sessionIds.get(2)); + assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user)); - resolver.removeUserSessionId(user, sessionIds.get(0)); - assertSame(Collections.emptySet(), resolver.resolveUserSessionIds(user)); + resolver.removeQueueSuffix(user, sessionIds.get(0)); + assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes(user)); } }