Browse Source
SimpleBrokerWebMessageHandler can be used as an alternative to the StompRelayWebMessageHandler.pull/286/merge
13 changed files with 963 additions and 205 deletions
@ -0,0 +1,43 @@
@@ -0,0 +1,43 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging; |
||||
|
||||
import java.util.Set; |
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public interface SessionSubscriptionRegistration { |
||||
|
||||
|
||||
String getSessionId(); |
||||
|
||||
void addSubscription(String destination, String subscriptionId); |
||||
|
||||
/** |
||||
* @param subscriptionId the subscription to remove |
||||
* @return the destination to which the subscriptionId was registered, or {@code null} |
||||
* if no matching subscriptionId was found |
||||
*/ |
||||
String removeSubscription(String subscriptionId); |
||||
|
||||
Set<String> getSubscriptionsByDestination(String destination); |
||||
|
||||
Set<String> getDestinations(); |
||||
|
||||
} |
||||
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging; |
||||
|
||||
import java.util.Set; |
||||
|
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public interface SessionSubscriptionRegistry { |
||||
|
||||
SessionSubscriptionRegistration getRegistration(String sessionId); |
||||
|
||||
SessionSubscriptionRegistration getOrCreateRegistration(String sessionId); |
||||
|
||||
SessionSubscriptionRegistration removeRegistration(String sessionId); |
||||
|
||||
Set<String> getSessionSubscriptions(String sessionId, String destination); |
||||
|
||||
} |
||||
@ -1,164 +0,0 @@
@@ -1,164 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.service; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.support.MessageBuilder; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.messaging.MessageType; |
||||
import org.springframework.web.messaging.converter.CompositeMessageConverter; |
||||
import org.springframework.web.messaging.converter.MessageConverter; |
||||
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; |
||||
|
||||
import reactor.core.Reactor; |
||||
import reactor.fn.Consumer; |
||||
import reactor.fn.Event; |
||||
import reactor.fn.registry.Registration; |
||||
import reactor.fn.selector.ObjectSelector; |
||||
|
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class ReactorWebMessageHandler extends AbstractWebMessageHandler { |
||||
|
||||
private MessageChannel clientChannel; |
||||
|
||||
private final Reactor reactor; |
||||
|
||||
private MessageConverter payloadConverter; |
||||
|
||||
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>(); |
||||
|
||||
|
||||
/** |
||||
* @param clientChannel the channel to which messages for clients should be sent. |
||||
*/ |
||||
public ReactorWebMessageHandler(MessageChannel clientChannel, Reactor reactor) { |
||||
Assert.notNull(clientChannel, "clientChannel is required"); |
||||
this.clientChannel = clientChannel; |
||||
this.reactor = reactor; |
||||
this.payloadConverter = new CompositeMessageConverter(null); |
||||
} |
||||
|
||||
public void setMessageConverters(List<MessageConverter> converters) { |
||||
this.payloadConverter = new CompositeMessageConverter(converters); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<MessageType> getSupportedMessageTypes() { |
||||
return Arrays.asList(MessageType.MESSAGE, MessageType.SUBSCRIBE, MessageType.UNSUBSCRIBE); |
||||
} |
||||
|
||||
@Override |
||||
public void handleSubscribe(Message<?> message) { |
||||
|
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Subscribe " + message); |
||||
} |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
String subscriptionId = headers.getSubscriptionId(); |
||||
BroadcastingConsumer consumer = new BroadcastingConsumer(subscriptionId); |
||||
|
||||
String key = getPublishKey(headers.getDestination()); |
||||
Registration<?> registration = this.reactor.on(new ObjectSelector<String>(key), consumer); |
||||
|
||||
String sessionId = headers.getSessionId(); |
||||
List<Registration<?>> list = this.subscriptionsBySession.get(sessionId); |
||||
if (list == null) { |
||||
list = new ArrayList<Registration<?>>(); |
||||
this.subscriptionsBySession.put(sessionId, list); |
||||
} |
||||
list.add(registration); |
||||
} |
||||
|
||||
private String getPublishKey(String destination) { |
||||
return "destination:" + destination; |
||||
} |
||||
|
||||
@Override |
||||
public void handlePublish(Message<?> message) { |
||||
|
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Message received: " + message); |
||||
} |
||||
|
||||
try { |
||||
// Convert to byte[] payload before the fan-out
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
byte[] payload = payloadConverter.convertToPayload(message.getPayload(), headers.getContentType()); |
||||
Message<?> m = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).build(); |
||||
|
||||
this.reactor.notify(getPublishKey(headers.getDestination()), Event.wrap(m)); |
||||
} |
||||
catch (Exception ex) { |
||||
logger.error("Failed to publish " + message, ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleDisconnect(Message<?> message) { |
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
removeSubscriptions(headers.getSessionId()); |
||||
} |
||||
|
||||
private void removeSubscriptions(String sessionId) { |
||||
List<Registration<?>> registrations = this.subscriptionsBySession.remove(sessionId); |
||||
if (logger.isTraceEnabled()) { |
||||
logger.trace("Cancelling " + registrations.size() + " subscriptions for session=" + sessionId); |
||||
} |
||||
for (Registration<?> registration : registrations) { |
||||
registration.cancel(); |
||||
} |
||||
} |
||||
|
||||
|
||||
private final class BroadcastingConsumer implements Consumer<Event<Message<?>>> { |
||||
|
||||
private final String subscriptionId; |
||||
|
||||
|
||||
private BroadcastingConsumer(String subscriptionId) { |
||||
this.subscriptionId = subscriptionId; |
||||
} |
||||
|
||||
@Override |
||||
public void accept(Event<Message<?>> event) { |
||||
|
||||
Message<?> sentMessage = event.getData(); |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(sentMessage); |
||||
headers.setSubscriptionId(this.subscriptionId); |
||||
|
||||
Message<?> clientMessage = MessageBuilder.withPayload( |
||||
sentMessage.getPayload()).copyHeaders(headers.toMap()).build(); |
||||
|
||||
clientChannel.send(clientMessage); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,126 @@
@@ -0,0 +1,126 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.service; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Set; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.support.MessageBuilder; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.messaging.MessageType; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistry; |
||||
import org.springframework.web.messaging.support.CachingSessionSubscriptionRegistry; |
||||
import org.springframework.web.messaging.support.DefaultSessionSubscriptionRegistry; |
||||
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; |
||||
|
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class SimpleBrokerWebMessageHandler extends AbstractWebMessageHandler { |
||||
|
||||
private final MessageChannel clientChannel; |
||||
|
||||
private CachingSessionSubscriptionRegistry subscriptionRegistry= |
||||
new CachingSessionSubscriptionRegistry(new DefaultSessionSubscriptionRegistry()); |
||||
|
||||
|
||||
/** |
||||
* @param clientChannel the channel to which messages for clients should be sent |
||||
* @param observable an Observable to use to manage subscriptions |
||||
*/ |
||||
public SimpleBrokerWebMessageHandler(MessageChannel clientChannel) { |
||||
Assert.notNull(clientChannel, "clientChannel is required"); |
||||
this.clientChannel = clientChannel; |
||||
} |
||||
|
||||
|
||||
public void setSubscriptionRegistry(SessionSubscriptionRegistry subscriptionRegistry) { |
||||
Assert.notNull(subscriptionRegistry, "subscriptionRegistry is required"); |
||||
this.subscriptionRegistry = new CachingSessionSubscriptionRegistry(subscriptionRegistry); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<MessageType> getSupportedMessageTypes() { |
||||
return Arrays.asList(MessageType.MESSAGE, MessageType.SUBSCRIBE, MessageType.UNSUBSCRIBE); |
||||
} |
||||
|
||||
@Override |
||||
public void handleSubscribe(Message<?> message) { |
||||
|
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Subscribe " + message); |
||||
} |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
String sessionId = headers.getSessionId(); |
||||
String subscriptionId = headers.getSubscriptionId(); |
||||
String destination = headers.getDestination(); |
||||
|
||||
SessionSubscriptionRegistration registration = this.subscriptionRegistry.getOrCreateRegistration(sessionId); |
||||
registration.addSubscription(destination, subscriptionId); |
||||
} |
||||
|
||||
@Override |
||||
public void handlePublish(Message<?> message) { |
||||
|
||||
if (logger.isDebugEnabled()) { |
||||
logger.debug("Message received: " + message); |
||||
} |
||||
|
||||
String destination = WebMessageHeaderAccesssor.wrap(message).getDestination(); |
||||
|
||||
Set<SessionSubscriptionRegistration> registrations = |
||||
this.subscriptionRegistry.getRegistrationsByDestination(destination); |
||||
|
||||
if (registrations == null) { |
||||
return; |
||||
} |
||||
|
||||
for (SessionSubscriptionRegistration registration : registrations) { |
||||
for (String subscriptionId : registration.getSubscriptionsByDestination(destination)) { |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
headers.setSessionId(registration.getSessionId()); |
||||
headers.setSubscriptionId(subscriptionId); |
||||
|
||||
Message<?> clientMessage = MessageBuilder.withPayload( |
||||
message.getPayload()).copyHeaders(headers.toMap()).build(); |
||||
|
||||
try { |
||||
this.clientChannel.send(clientMessage); |
||||
} |
||||
catch (Throwable ex) { |
||||
logger.error("Failed to send message to destination=" + destination + |
||||
", sessionId=" + registration.getSessionId() + ", subscriptionId=" + subscriptionId, ex); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleDisconnect(Message<?> message) { |
||||
String sessionId = WebMessageHeaderAccesssor.wrap(message).getSessionId(); |
||||
this.subscriptionRegistry.removeRegistration(sessionId); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,172 @@
@@ -0,0 +1,172 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.CopyOnWriteArraySet; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistry; |
||||
|
||||
|
||||
/** |
||||
* A decorator for a {@link SessionSubscriptionRegistry} that intercepts subscriptions |
||||
* being added and removed, and maintains a cache that tracks registrations for a |
||||
* given destination. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class CachingSessionSubscriptionRegistry implements SessionSubscriptionRegistry { |
||||
|
||||
private final SessionSubscriptionRegistry delegate; |
||||
|
||||
private final DestinationCache destinationCache = new DestinationCache(); |
||||
|
||||
|
||||
public CachingSessionSubscriptionRegistry(SessionSubscriptionRegistry delegate) { |
||||
Assert.notNull(delegate, "delegate SessionSubscriptionRegistry is required"); |
||||
this.delegate = delegate; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration getRegistration(String sessionId) { |
||||
return new CachingSessionSubscriptionRegistration(this.delegate.getRegistration(sessionId)); |
||||
} |
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration getOrCreateRegistration(String sessionId) { |
||||
return new CachingSessionSubscriptionRegistration(this.delegate.getOrCreateRegistration(sessionId)); |
||||
} |
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration removeRegistration(String sessionId) { |
||||
SessionSubscriptionRegistration registration = this.delegate.removeRegistration(sessionId); |
||||
if (registration != null) { |
||||
this.destinationCache.removeRegistration(registration); |
||||
} |
||||
return registration; |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getSessionSubscriptions(String sessionId, String destination) { |
||||
return this.delegate.getSessionSubscriptions(sessionId, destination); |
||||
} |
||||
|
||||
public Set<SessionSubscriptionRegistration> getRegistrationsByDestination(String destination) { |
||||
return this.destinationCache.getRegistrations(destination); |
||||
} |
||||
|
||||
|
||||
private static class DestinationCache { |
||||
|
||||
private final Map<String, Set<SessionSubscriptionRegistration>> cache = |
||||
new ConcurrentHashMap<String, Set<SessionSubscriptionRegistration>>(); |
||||
|
||||
private final Object monitor = new Object(); |
||||
|
||||
|
||||
public void mapRegistration(String destination, SessionSubscriptionRegistration registration) { |
||||
synchronized (monitor) { |
||||
Set<SessionSubscriptionRegistration> registrations = this.cache.get(destination); |
||||
if (registrations == null) { |
||||
registrations = new CopyOnWriteArraySet<SessionSubscriptionRegistration>(); |
||||
this.cache.put(destination, registrations); |
||||
} |
||||
registrations.add(registration); |
||||
} |
||||
} |
||||
|
||||
public void unmapRegistration(String destination, SessionSubscriptionRegistration registration) { |
||||
synchronized (monitor) { |
||||
Set<SessionSubscriptionRegistration> registrations = this.cache.get(destination); |
||||
if (registrations != null) { |
||||
registrations.remove(registration); |
||||
if (registrations.isEmpty()) { |
||||
this.cache.remove(destination); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void removeRegistration(SessionSubscriptionRegistration registration) { |
||||
for (String destination : registration.getDestinations()) { |
||||
unmapRegistration(destination, registration); |
||||
} |
||||
} |
||||
|
||||
public Set<SessionSubscriptionRegistration> getRegistrations(String destination) { |
||||
return this.cache.get(destination); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "DestinationCache [cache=" + this.cache + "]"; |
||||
} |
||||
} |
||||
|
||||
private class CachingSessionSubscriptionRegistration implements SessionSubscriptionRegistration { |
||||
|
||||
private final SessionSubscriptionRegistration delegate; |
||||
|
||||
|
||||
public CachingSessionSubscriptionRegistration(SessionSubscriptionRegistration delegate) { |
||||
Assert.notNull(delegate, "delegate SessionSubscriptionRegistration is required"); |
||||
this.delegate = delegate; |
||||
} |
||||
|
||||
@Override |
||||
public String getSessionId() { |
||||
return this.delegate.getSessionId(); |
||||
} |
||||
|
||||
@Override |
||||
public void addSubscription(String destination, String subscriptionId) { |
||||
CachingSessionSubscriptionRegistry.this.destinationCache.mapRegistration(destination, this.delegate); |
||||
this.delegate.addSubscription(destination, subscriptionId); |
||||
} |
||||
|
||||
@Override |
||||
public String removeSubscription(String subscriptionId) { |
||||
String destination = this.delegate.removeSubscription(subscriptionId); |
||||
if (destination != null && this.delegate.getSubscriptionsByDestination(destination) == null) { |
||||
CachingSessionSubscriptionRegistry.this.destinationCache.unmapRegistration(destination, this); |
||||
} |
||||
return destination; |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getSubscriptionsByDestination(String destination) { |
||||
return this.delegate.getSubscriptionsByDestination(destination); |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getDestinations() { |
||||
return this.delegate.getDestinations(); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "CachingSessionSubscriptionRegistration [delegate=" + delegate + "]"; |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,99 @@
@@ -0,0 +1,99 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
|
||||
|
||||
/** |
||||
* A default implementation of SessionSubscriptionRegistration. Uses a map to keep track |
||||
* of subscriptions by destination. This implementation assumes that only one thread will |
||||
* access and update subscriptions at a time. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class DefaultSessionSubscriptionRegistration implements SessionSubscriptionRegistration { |
||||
|
||||
private final String sessionId; |
||||
|
||||
// destination -> subscriptionIds
|
||||
private final Map<String, Set<String>> subscriptions = new HashMap<String, Set<String>>(4); |
||||
|
||||
|
||||
public DefaultSessionSubscriptionRegistration(String sessionId) { |
||||
Assert.notNull(sessionId, "sessionId is required"); |
||||
this.sessionId = sessionId; |
||||
} |
||||
|
||||
|
||||
public String getSessionId() { |
||||
return this.sessionId; |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getDestinations() { |
||||
return this.subscriptions.keySet(); |
||||
} |
||||
|
||||
@Override |
||||
public void addSubscription(String destination, String subscriptionId) { |
||||
Assert.hasText(destination, "destination must not be empty"); |
||||
Assert.hasText(subscriptionId, "subscriptionId must not be empty"); |
||||
Set<String> subs = this.subscriptions.get(destination); |
||||
if (subs == null) { |
||||
subs = new HashSet<String>(4); |
||||
this.subscriptions.put(destination, subs); |
||||
} |
||||
subs.add(subscriptionId); |
||||
} |
||||
|
||||
@Override |
||||
public String removeSubscription(String subscriptionId) { |
||||
Assert.hasText(subscriptionId, "subscriptionId must not be empty"); |
||||
for (String destination : this.subscriptions.keySet()) { |
||||
Set<String> subscriptionIds = this.subscriptions.get(destination); |
||||
if (subscriptionIds.remove(subscriptionId)) { |
||||
if (subscriptionIds.isEmpty()) { |
||||
this.subscriptions.remove(destination); |
||||
} |
||||
return destination; |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getSubscriptionsByDestination(String destination) { |
||||
Assert.hasText(destination, "destination must not be empty"); |
||||
return this.subscriptions.get(destination); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "DefaultSessionSubscriptionRegistration [sessionId=" + this.sessionId |
||||
+ ", subscriptions=" + this.subscriptions + "]"; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,66 @@
@@ -0,0 +1,66 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistry; |
||||
|
||||
|
||||
/** |
||||
* A default implementation of SessionSubscriptionRegistry. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class DefaultSessionSubscriptionRegistry implements SessionSubscriptionRegistry { |
||||
|
||||
// sessionId -> SessionSubscriptionRegistration
|
||||
private final Map<String, SessionSubscriptionRegistration> registrations = |
||||
new ConcurrentHashMap<String, SessionSubscriptionRegistration>(); |
||||
|
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration getRegistration(String sessionId) { |
||||
return this.registrations.get(sessionId); |
||||
} |
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration getOrCreateRegistration(String sessionId) { |
||||
SessionSubscriptionRegistration registration = this.registrations.get(sessionId); |
||||
if (registration == null) { |
||||
registration = new DefaultSessionSubscriptionRegistration(sessionId); |
||||
this.registrations.put(sessionId, registration); |
||||
} |
||||
return registration; |
||||
} |
||||
|
||||
@Override |
||||
public SessionSubscriptionRegistration removeRegistration(String sessionId) { |
||||
return this.registrations.remove(sessionId); |
||||
} |
||||
|
||||
@Override |
||||
public Set<String> getSessionSubscriptions(String sessionId, String destination) { |
||||
SessionSubscriptionRegistration registration = this.registrations.get(sessionId); |
||||
return (registration != null) ? registration.getSubscriptionsByDestination(destination) : null; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,142 @@
@@ -0,0 +1,142 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.service; |
||||
|
||||
import java.util.Arrays; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Captor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.MockitoAnnotations; |
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.MessageChannel; |
||||
import org.springframework.messaging.support.MessageBuilder; |
||||
import org.springframework.web.messaging.MessageType; |
||||
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor; |
||||
|
||||
import static org.junit.Assert.*; |
||||
import static org.mockito.Mockito.*; |
||||
|
||||
|
||||
/** |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class SimpleBrokerWebMessageHandlerTests { |
||||
|
||||
private AbstractWebMessageHandler messageHandler; |
||||
|
||||
@Mock |
||||
private MessageChannel clientChannel; |
||||
|
||||
@Captor |
||||
ArgumentCaptor<Message<?>> messageCaptor; |
||||
|
||||
|
||||
@Before |
||||
public void setup() { |
||||
MockitoAnnotations.initMocks(this); |
||||
this.messageHandler = new SimpleBrokerWebMessageHandler(this.clientChannel); |
||||
} |
||||
|
||||
|
||||
@Test |
||||
public void getSupportedMessageTypes() { |
||||
assertEquals(Arrays.asList(MessageType.MESSAGE, MessageType.SUBSCRIBE, MessageType.UNSUBSCRIBE), |
||||
this.messageHandler.getSupportedMessageTypes()); |
||||
} |
||||
|
||||
@Test |
||||
public void subcribePublish() { |
||||
|
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub1", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub2", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub3", "/bar")); |
||||
|
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub1", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub2", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub3", "/bar")); |
||||
|
||||
this.messageHandler.handlePublish(createMessage("/foo", "message1")); |
||||
this.messageHandler.handlePublish(createMessage("/bar", "message2")); |
||||
|
||||
verify(this.clientChannel, times(6)).send(this.messageCaptor.capture()); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(0), "sess1", "sub1", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(1), "sess1", "sub2", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(2), "sess2", "sub1", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(3), "sess2", "sub2", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(4), "sess1", "sub3", "/bar"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(5), "sess2", "sub3", "/bar"); |
||||
} |
||||
|
||||
@Test |
||||
public void subcribeDisconnectPublish() { |
||||
|
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub1", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub2", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess1", "sub3", "/bar")); |
||||
|
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub1", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub2", "/foo")); |
||||
this.messageHandler.handleSubscribe(createSubscriptionMessage("sess2", "sub3", "/bar")); |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.DISCONNECT); |
||||
headers.setSessionId("sess1"); |
||||
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build(); |
||||
this.messageHandler.handleDisconnect(message); |
||||
|
||||
this.messageHandler.handlePublish(createMessage("/foo", "message1")); |
||||
this.messageHandler.handlePublish(createMessage("/bar", "message2")); |
||||
|
||||
verify(this.clientChannel, times(3)).send(this.messageCaptor.capture()); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(0), "sess2", "sub1", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(1), "sess2", "sub2", "/foo"); |
||||
assertCapturedMessage(this.messageCaptor.getAllValues().get(2), "sess2", "sub3", "/bar"); |
||||
} |
||||
|
||||
|
||||
protected Message<String> createSubscriptionMessage(String sessionId, String subcriptionId, String destination) { |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.SUBSCRIBE); |
||||
headers.setSubscriptionId(subcriptionId); |
||||
headers.setDestination(destination); |
||||
headers.setSessionId(sessionId); |
||||
|
||||
return MessageBuilder.withPayload("").copyHeaders(headers.toMap()).build(); |
||||
} |
||||
|
||||
protected Message<String> createMessage(String destination, String payload) { |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.MESSAGE); |
||||
headers.setDestination(destination); |
||||
|
||||
return MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build(); |
||||
} |
||||
|
||||
protected void assertCapturedMessage(Message<?> message, String sessionId, |
||||
String subcriptionId, String destination) { |
||||
|
||||
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.wrap(message); |
||||
assertEquals(sessionId, headers.getSessionId()); |
||||
assertEquals(subcriptionId, headers.getSubscriptionId()); |
||||
assertEquals(destination, headers.getDestination()); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,75 @@
@@ -0,0 +1,75 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.Set; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistry; |
||||
|
||||
import static org.junit.Assert.*; |
||||
|
||||
|
||||
/** |
||||
* Test fixture for {@link CachingSessionSubscriptionRegistry}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class CachingSessionSubscriptionRegistryTests { |
||||
|
||||
private CachingSessionSubscriptionRegistry registry; |
||||
|
||||
|
||||
@Before |
||||
public void setup() { |
||||
SessionSubscriptionRegistry delegate = new DefaultSessionSubscriptionRegistry(); |
||||
this.registry = new CachingSessionSubscriptionRegistry(delegate); |
||||
} |
||||
|
||||
@Test |
||||
public void getRegistrationsByDestination() { |
||||
|
||||
SessionSubscriptionRegistration reg1 = this.registry.getOrCreateRegistration("sess1"); |
||||
reg1.addSubscription("/foo", "sub1"); |
||||
reg1.addSubscription("/foo", "sub1"); |
||||
|
||||
SessionSubscriptionRegistration reg2 = this.registry.getOrCreateRegistration("sess2"); |
||||
reg2.addSubscription("/foo", "sub1"); |
||||
reg2.addSubscription("/foo", "sub1"); |
||||
|
||||
Set<SessionSubscriptionRegistration> actual = this.registry.getRegistrationsByDestination("/foo"); |
||||
assertEquals(2, actual.size()); |
||||
assertTrue(actual.contains(reg1)); |
||||
assertTrue(actual.contains(reg2)); |
||||
|
||||
reg1.removeSubscription("sub1"); |
||||
reg1.removeSubscription("sub2"); |
||||
|
||||
actual = this.registry.getRegistrationsByDestination("/foo"); |
||||
assertEquals("Invalid set of registrations " + actual, 1, actual.size()); |
||||
assertTrue(actual.contains(reg2)); |
||||
|
||||
reg2.removeSubscription("sub1"); |
||||
reg2.removeSubscription("sub2"); |
||||
|
||||
actual = this.registry.getRegistrationsByDestination("/foo"); |
||||
assertNull("Unexpected registrations " + actual, actual); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,82 @@
@@ -0,0 +1,82 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.Set; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
import static org.junit.Assert.*; |
||||
|
||||
|
||||
/** |
||||
* Test fixture for {@link DefaultSessionSubscriptionRegistration}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class DefaultSessionSubscriptionRegistrationTests { |
||||
|
||||
private DefaultSessionSubscriptionRegistration registration; |
||||
|
||||
|
||||
@Before |
||||
public void setup() { |
||||
this.registration = new DefaultSessionSubscriptionRegistration("sess1"); |
||||
} |
||||
|
||||
@Test |
||||
public void addSubscriptions() { |
||||
this.registration.addSubscription("/foo", "sub1"); |
||||
this.registration.addSubscription("/foo", "sub2"); |
||||
this.registration.addSubscription("/bar", "sub3"); |
||||
this.registration.addSubscription("/bar", "sub4"); |
||||
|
||||
assertSet(this.registration.getSubscriptionsByDestination("/foo"), 2, "sub1", "sub2"); |
||||
assertSet(this.registration.getSubscriptionsByDestination("/bar"), 2, "sub3", "sub4"); |
||||
assertSet(this.registration.getDestinations(), 2, "/foo", "/bar"); |
||||
} |
||||
|
||||
@Test |
||||
public void removeSubscriptions() { |
||||
this.registration.addSubscription("/foo", "sub1"); |
||||
this.registration.addSubscription("/foo", "sub2"); |
||||
this.registration.addSubscription("/bar", "sub3"); |
||||
this.registration.addSubscription("/bar", "sub4"); |
||||
|
||||
assertEquals("/foo", this.registration.removeSubscription("sub1")); |
||||
assertEquals("/foo", this.registration.removeSubscription("sub2")); |
||||
|
||||
assertNull(this.registration.getSubscriptionsByDestination("/foo")); |
||||
assertSet(this.registration.getDestinations(), 1, "/bar"); |
||||
|
||||
assertEquals("/bar", this.registration.removeSubscription("sub3")); |
||||
assertEquals("/bar", this.registration.removeSubscription("sub4")); |
||||
|
||||
assertNull(this.registration.getSubscriptionsByDestination("/bar")); |
||||
assertSet(this.registration.getDestinations(), 0); |
||||
} |
||||
|
||||
|
||||
private void assertSet(Set<String> set, int size, String... elements) { |
||||
assertEquals("Wrong number of elements in " + set, size, set.size()); |
||||
for (String element : elements) { |
||||
assertTrue("Set does not contain element " + element, set.contains(element)); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,86 @@
@@ -0,0 +1,86 @@
|
||||
/* |
||||
* Copyright 2002-2013 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.web.messaging.support; |
||||
|
||||
import java.util.Set; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.springframework.web.messaging.SessionSubscriptionRegistration; |
||||
|
||||
import static org.junit.Assert.*; |
||||
|
||||
|
||||
/** |
||||
* Test fixture for {@link DefaultSessionSubscriptionRegistry}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class DefaultSessionSubscriptionRegistryTests { |
||||
|
||||
private DefaultSessionSubscriptionRegistry registry; |
||||
|
||||
|
||||
@Before |
||||
public void setup() { |
||||
this.registry = new DefaultSessionSubscriptionRegistry(); |
||||
} |
||||
|
||||
@Test |
||||
public void getRegistration() { |
||||
String sessionId = "sess1"; |
||||
assertNull(this.registry.getRegistration(sessionId)); |
||||
|
||||
this.registry.getOrCreateRegistration(sessionId); |
||||
assertNotNull(this.registry.getRegistration(sessionId)); |
||||
assertEquals(sessionId, this.registry.getRegistration(sessionId).getSessionId()); |
||||
} |
||||
|
||||
@Test |
||||
public void getOrCreateRegistration() { |
||||
String sessionId = "sess1"; |
||||
assertNull(this.registry.getRegistration(sessionId)); |
||||
|
||||
SessionSubscriptionRegistration registration = this.registry.getOrCreateRegistration(sessionId); |
||||
assertSame(registration, this.registry.getOrCreateRegistration(sessionId)); |
||||
} |
||||
|
||||
@Test |
||||
public void removeRegistration() { |
||||
String sessionId = "sess1"; |
||||
this.registry.getOrCreateRegistration(sessionId); |
||||
assertNotNull(this.registry.getRegistration(sessionId)); |
||||
assertEquals(sessionId, this.registry.getRegistration(sessionId).getSessionId()); |
||||
|
||||
this.registry.removeRegistration(sessionId); |
||||
assertNull(this.registry.getRegistration(sessionId)); |
||||
} |
||||
|
||||
@Test |
||||
public void getSessionSubscriptions() { |
||||
String sessionId = "sess1"; |
||||
SessionSubscriptionRegistration registration = this.registry.getOrCreateRegistration(sessionId); |
||||
registration.addSubscription("/foo", "sub1"); |
||||
registration.addSubscription("/foo", "sub2"); |
||||
|
||||
Set<String> subscriptions = this.registry.getSessionSubscriptions(sessionId, "/foo"); |
||||
assertEquals("Wrong number of subscriptions " + subscriptions, 2, subscriptions.size()); |
||||
assertTrue(subscriptions.contains("sub1")); |
||||
assertTrue(subscriptions.contains("sub2")); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue