Browse Source

Revise UserSessionResolver to UserQueueSuffixResolver

The resolver for /user/{username} prefixed destinations is now
more explicitly designed to store queue suffixes rather than session
id's, which is what we happen to use as queue suffixes.

This allows something other than the sessionId to be used without
having to change many places. It also enables applications to
construct destinations with user-specific queue suffixes without
making assumptions about what's used for queue suffixes. For
example a controller may construct a map with subscription destinations
and send that down to the client.
pull/297/merge
Rossen Stoyanchev 13 years ago
parent
commit
90c4712d06
  1. 6
      spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java
  2. 75
      spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java
  3. 65
      spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java
  4. 41
      spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java
  5. 20
      spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java
  6. 61
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java
  7. 36
      spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java

6
spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserSessionResolver.java → spring-messaging/src/main/java/org/springframework/messaging/simp/handler/MutableUserQueueSuffixResolver.java

@ -21,10 +21,10 @@ package org.springframework.messaging.simp.handler; @@ -21,10 +21,10 @@ package org.springframework.messaging.simp.handler;
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface MutableUserSessionResolver extends UserSessionResolver {
public interface MutableUserQueueSuffixResolver extends UserQueueSuffixResolver {
void addUserSessionId(String user, String sessionId);
void addQueueSuffix(String user, String sessionId, String suffix);
void removeUserSessionId(String user, String sessionId);
void removeQueueSuffix(String user, String sessionId);
}

75
spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolver.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleUserQueueSuffixResolver implements MutableUserQueueSuffixResolver {
// userId -> [sessionId -> queueSuffix]
private final ConcurrentMap<String, Map<String, String>> cache = new ConcurrentHashMap<String, Map<String, String>>();
@Override
public void addQueueSuffix(String user, String sessionId, String suffix) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes == null) {
suffixes = new ConcurrentHashMap<String, String>();
Map<String, String> prevSuffixes = this.cache.putIfAbsent(user, suffixes);
if (prevSuffixes != null) {
suffixes = prevSuffixes;
}
}
suffixes.put(sessionId, suffix);
}
@Override
public void removeQueueSuffix(String user, String sessionId) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes != null) {
if (suffixes.remove(sessionId) != null) {
this.cache.remove(user, Collections.emptyMap());
}
}
}
@Override
public Set<String> getUserQueueSuffixes(String user) {
Map<String, String> suffixes = this.cache.get(user);
return (suffixes != null) ? new HashSet<String>(suffixes.values()) : Collections.<String>emptySet();
}
@Override
public String getUserQueueSuffix(String user, String sessionId) {
Map<String, String> suffixes = this.cache.get(user);
if (suffixes != null) {
return suffixes.get(sessionId);
}
return null;
}
}

65
spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolver.java

@ -1,65 +0,0 @@ @@ -1,65 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleUserSessionResolver implements MutableUserSessionResolver {
// userId -> sessionId's
private final ConcurrentMap<String, Set<String>> userSessionIds = new ConcurrentHashMap<String, Set<String>>();
@Override
public void addUserSessionId(String user, String sessionId) {
Set<String> sessionIds = this.userSessionIds.get(user);
if (sessionIds == null) {
sessionIds = new CopyOnWriteArraySet<String>();
Set<String> value = this.userSessionIds.putIfAbsent(user, sessionIds);
if (value != null) {
sessionIds = value;
}
}
sessionIds.add(sessionId);
}
@Override
public void removeUserSessionId(String user, String sessionId) {
Set<String> sessionIds = this.userSessionIds.get(user);
if (sessionIds != null) {
if (sessionIds.remove(sessionId)) {
this.userSessionIds.remove(user, Collections.<String>emptySet());
}
}
}
@Override
public Set<String> resolveUserSessionIds(String user) {
Set<String> sessionIds = this.userSessionIds.get(user);
return (sessionIds != null) ? sessionIds : Collections.<String>emptySet();
}
}

41
spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserDestinationMessageHandler.java

@ -30,11 +30,14 @@ import org.springframework.util.StringUtils; @@ -30,11 +30,14 @@ import org.springframework.util.StringUtils;
/**
* Supports destinations prefixed with "/user/{username}", transforms the
* destination to a unique queue to which the user is subscribed, and then sends
* the message for further processing.
*
* Supports destinations prefixed with "/user/{username}" and resolves them into a
* destination to which the user is currently subscribed by appending the user session id.
* For example a destination such as "/user/john/queue/trade-confirmation" would resolve
* to "/queue/trade-confirmation/i9oqdfzo" if "i9oqdfzo" is the user's session id.
* <p>The target destination has the prefix removed and a unique queue suffix,
* resolved via {@link #setUserQueueSuffixResolver(UserQueueSuffixResolver)}, appended.
* For example a destination such as "/user/john/queue/trade-confirmation" could
* be transformed to "/queue/trade-confirmation/i9oqdfzo".
*
* @author Rossen Stoyanchev
* @since 4.0
@ -47,11 +50,22 @@ public class UserDestinationMessageHandler implements MessageHandler { @@ -47,11 +50,22 @@ public class UserDestinationMessageHandler implements MessageHandler {
private String prefix = "/user/";
private UserSessionResolver userSessionResolver = new SimpleUserSessionResolver();
private UserQueueSuffixResolver userQueueSuffixResolver = new SimpleUserQueueSuffixResolver();
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate) {
/**
*
* @param messagingTemplate
* @param resolver the resolver to use to find queue suffixes for a user
*/
public UserDestinationMessageHandler(MessageSendingOperations<String> messagingTemplate,
UserQueueSuffixResolver userQueueSuffixResolver) {
Assert.notNull(messagingTemplate, "messagingTemplate is required");
Assert.notNull(userQueueSuffixResolver, "userQueueSuffixResolver is required");
this.messagingTemplate = messagingTemplate;
this.userQueueSuffixResolver = userQueueSuffixResolver;
}
/**
@ -71,17 +85,10 @@ public class UserDestinationMessageHandler implements MessageHandler { @@ -71,17 +85,10 @@ public class UserDestinationMessageHandler implements MessageHandler {
}
/**
* @param userSessionResolver the userSessionResolver to set
*/
public void setUserSessionResolver(UserSessionResolver userSessionResolver) {
this.userSessionResolver = userSessionResolver;
}
/**
* @return the userSessionResolver
* @return the resolver for queue suffixes for a user
*/
public UserSessionResolver getUserSessionResolver() {
return this.userSessionResolver;
public UserQueueSuffixResolver getUserQueueSuffixResolver() {
return this.userQueueSuffixResolver;
}
/**
@ -116,7 +123,7 @@ public class UserDestinationMessageHandler implements MessageHandler { @@ -116,7 +123,7 @@ public class UserDestinationMessageHandler implements MessageHandler {
return;
}
for (String sessionId : this.userSessionResolver.resolveUserSessionIds(user)) {
for (String sessionId : this.userQueueSuffixResolver.getUserQueueSuffixes(user)) {
String targetDestination = destinationParser.getTargetDestination(sessionId);
headers.setDestination(targetDestination);

20
spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserSessionResolver.java → spring-messaging/src/main/java/org/springframework/messaging/simp/handler/UserQueueSuffixResolver.java

@ -20,19 +20,29 @@ import java.util.Set; @@ -20,19 +20,29 @@ import java.util.Set;
/**
* A strategy for resolving a user name to one or more session id's.
* A strategy for resolving unique queue suffixes for a connected user.
* There can be only one suffix per user per session.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface UserSessionResolver {
public interface UserQueueSuffixResolver {
/**
* Retrieve the sessionId(s) associated with the given user.
* Retrieve the suffixes for all sessions associated with this user.
*
* @param user the user name
* @return a Set with zero, one, or more, current session id's.
* @return a Set with zero, one, or more, queue suffixes
*/
Set<String> resolveUserSessionIds(String user);
Set<String> getUserQueueSuffixes(String user);
/**
* Retrieve the queue suffix associated with the given user session.
*
* @param user the user name
* @param sessionId the session id
* @return a queue suffix or {@code null}
*/
String getUserQueueSuffix(String user, String sessionId);
}

61
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompWebSocketHandler.java

@ -17,6 +17,7 @@ package org.springframework.messaging.simp.stomp; @@ -17,6 +17,7 @@ package org.springframework.messaging.simp.stomp;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.Principal;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -27,8 +28,7 @@ import org.springframework.messaging.Message; @@ -27,8 +28,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.MutableUserSessionResolver;
import org.springframework.messaging.simp.handler.UserDestinationMessageHandler;
import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
@ -51,17 +51,19 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -51,17 +51,19 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
public static final String CONNECTED_USER_HEADER = "user-name";
/**
* A suffix unique to the current session that a client can append to a destination.
* @see UserDestinationMessageHandler
* A suffix unique to the current session that a client can use to append to
* a destination to make it unique.
*
* @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
*/
public static final String QUEUE_SUFFIX_HEADER = "queue-suffix";
private static Log logger = LogFactory.getLog(StompWebSocketHandler.class);
private MessageChannel clientInputChannel;
private MessageChannel dispatchChannel;
private MutableUserSessionResolver userSessionStore;
private MutableUserQueueSuffixResolver queueSuffixResolver;
private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
@ -69,29 +71,27 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -69,29 +71,27 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
/**
* @param clientInputChannel the channel to which incoming STOMP/WebSocket messages should
* be sent to
* @param dispatchChannel the channel to send client STOMP/WebSocket messages to
*/
public StompWebSocketHandler(MessageChannel clientInputChannel) {
Assert.notNull(clientInputChannel, "clientInputChannel is required");
this.clientInputChannel = clientInputChannel;
public StompWebSocketHandler(MessageChannel dispatchChannel) {
Assert.notNull(dispatchChannel, "dispatchChannel is required");
this.dispatchChannel = dispatchChannel;
}
/**
* Configure a store for saving user session information.
* @param userSessionStore the userSessionStore to use to store user session id's
* @see UserDestinationMessageHandler
* Configure a resolver to use to maintain queue suffixes for user
* @see {@link org.springframework.messaging.simp.handler.UserDestinationMessageHandler}
*/
public void setUserSessionResolver(MutableUserSessionResolver userSessionStore) {
this.userSessionStore = userSessionStore;
public void setUserQueueSuffixResolver(MutableUserQueueSuffixResolver resolver) {
this.queueSuffixResolver = resolver;
}
/**
* @return the userSessionResolver
* @return the resolver for queue suffixes for a user
*/
public MutableUserSessionResolver getUserSessionResolver() {
return this.userSessionStore;
public MutableUserQueueSuffixResolver getUserQueueSuffixResolver() {
return this.queueSuffixResolver;
}
public StompMessageConverter getStompMessageConverter() {
@ -101,12 +101,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -101,12 +101,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages.");
this.sessions.put(session.getId(), session);
if ((this.userSessionStore != null) && (session.getPrincipal() != null)) {
this.userSessionStore.addUserSessionId(session.getPrincipal().getName(), session.getId());
}
}
/**
@ -135,7 +130,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -135,7 +130,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
handleConnect(session, message);
}
this.clientInputChannel.send(message);
this.dispatchChannel.send(message);
}
catch (Throwable t) {
@ -172,9 +167,15 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -172,9 +167,15 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
}
connectedHeaders.setHeartbeat(0,0); // TODO
if (session.getPrincipal() != null) {
connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, session.getPrincipal().getName());
Principal principal = session.getPrincipal();
if (principal != null) {
connectedHeaders.setNativeHeader(CONNECTED_USER_HEADER, principal.getName());
connectedHeaders.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId());
if (this.queueSuffixResolver != null) {
String suffix = session.getId();
this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix);
}
}
// TODO: security
@ -204,14 +205,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement @@ -204,14 +205,14 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
String sessionId = session.getId();
this.sessions.remove(sessionId);
if ((this.userSessionStore != null) && (session.getPrincipal() != null)) {
this.userSessionStore.removeUserSessionId(session.getPrincipal().getName(), sessionId);
if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) {
this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), sessionId);
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(sessionId);
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
this.clientInputChannel.send(message);
this.dispatchChannel.send(message);
}
/**

36
spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserSessionResolverTests.java → spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleUserQueueSuffixResolverTests.java

@ -27,12 +27,12 @@ import static org.junit.Assert.*; @@ -27,12 +27,12 @@ import static org.junit.Assert.*;
/**
* Test fixture for {@link SimpleUserSessionResolver}
* Test fixture for {@link SimpleUserQueueSuffixResolver}
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleUserSessionResolverTests {
public class SimpleUserQueueSuffixResolverTests {
private static final String user = "joe";
private static final List<String> sessionIds = Arrays.asList("sess01", "sess02", "sess03");
@ -41,42 +41,42 @@ public class SimpleUserSessionResolverTests { @@ -41,42 +41,42 @@ public class SimpleUserSessionResolverTests {
@Test
public void addOneSessionId() {
SimpleUserSessionResolver resolver = new SimpleUserSessionResolver();
resolver.addUserSessionId(user, sessionIds.get(0));
SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
resolver.addQueueSuffix(user, sessionIds.get(0), sessionIds.get(0));
assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user));
assertSame(Collections.emptySet(), resolver.resolveUserSessionIds("jane"));
assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user));
assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes("jane"));
}
@Test
public void addMultipleSessionIds() {
SimpleUserSessionResolver resolver = new SimpleUserSessionResolver();
SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
for (String sessionId : sessionIds) {
resolver.addUserSessionId(user, sessionId);
resolver.addQueueSuffix(user, sessionId, sessionId);
}
assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user));
assertEquals(Collections.emptySet(), resolver.resolveUserSessionIds("jane"));
assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user));
assertEquals(Collections.emptySet(), resolver.getUserQueueSuffixes("jane"));
}
@Test
public void removeSessionIds() {
SimpleUserSessionResolver resolver = new SimpleUserSessionResolver();
SimpleUserQueueSuffixResolver resolver = new SimpleUserQueueSuffixResolver();
for (String sessionId : sessionIds) {
resolver.addUserSessionId(user, sessionId);
resolver.addQueueSuffix(user, sessionId, sessionId);
}
assertEquals(new LinkedHashSet<>(sessionIds), resolver.resolveUserSessionIds(user));
assertEquals(new LinkedHashSet<>(sessionIds), resolver.getUserQueueSuffixes(user));
resolver.removeUserSessionId(user, sessionIds.get(1));
resolver.removeUserSessionId(user, sessionIds.get(2));
assertEquals(Collections.singleton(sessionIds.get(0)), resolver.resolveUserSessionIds(user));
resolver.removeQueueSuffix(user, sessionIds.get(1));
resolver.removeQueueSuffix(user, sessionIds.get(2));
assertEquals(Collections.singleton(sessionIds.get(0)), resolver.getUserQueueSuffixes(user));
resolver.removeUserSessionId(user, sessionIds.get(0));
assertSame(Collections.emptySet(), resolver.resolveUserSessionIds(user));
resolver.removeQueueSuffix(user, sessionIds.get(0));
assertSame(Collections.emptySet(), resolver.getUserQueueSuffixes(user));
}
}
Loading…
Cancel
Save