diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java index da9f2ce76ca..401dbb18a88 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java @@ -325,20 +325,16 @@ public abstract class AbstractMethodMessageHandler String destination = getDestination(message); if (destination == null) { - logger.trace("Ignoring message, no destination"); return; } String lookupDestination = getLookupDestination(destination); if (lookupDestination == null) { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); - } return; } if (logger.isDebugEnabled()) { - logger.debug("Handling message, lookupDestination=" + lookupDestination); + logger.debug("Handling message to " + destination); } MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message); @@ -397,7 +393,8 @@ public abstract class AbstractMethodMessageHandler Collections.sort(matches, comparator); if (logger.isTraceEnabled()) { - logger.trace("Found " + matches.size() + " matching mapping(s) for [" + lookupDestination + "] : " + matches); + logger.trace("Found " + matches.size() + " matching mapping(s) for [" + + lookupDestination + "] : " + matches); } Match bestMatch = matches.get(0); @@ -507,7 +504,7 @@ public abstract class AbstractMethodMessageHandler protected void handleNoMatch(Set ts, String lookupDestination, Message message) { if (logger.isDebugEnabled()) { - logger.debug("No matching method found"); + logger.debug("No matching method found."); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java index a0ed1a9816f..08d3eb774c3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java @@ -62,6 +62,22 @@ public class SimpAttributes { this.attributes = attributes; } + /** + * Extract the SiMP session attributes from the given message, wrap them in + * a {@link SimpAttributes} instance. + * @param message the message to extract session attributes from + */ + public static SimpAttributes fromMessage(Message message) { + Assert.notNull(message); + MessageHeaders headers = message.getHeaders(); + String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); + Map sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers); + if (sessionId == null || sessionAttributes == null) { + throw new IllegalStateException( + "Message does not contain SiMP session id or attributes: " + message); + } + return new SimpAttributes(sessionId, sessionAttributes); + } /** * Return the value for the attribute of the given name, if any. @@ -175,22 +191,4 @@ public class SimpAttributes { } } - - /** - * Extract the SiMP session attributes from the given message, wrap them in - * a {@link SimpAttributes} instance. - * @param message the message to extract session attributes from - */ - public static SimpAttributes fromMessage(Message message) { - Assert.notNull(message); - MessageHeaders headers = message.getHeaders(); - String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); - Map sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers); - if (sessionId == null || sessionAttributes == null) { - throw new IllegalStateException( - "Message does not contain SiMP session id or attributes: " + message); - } - return new SimpAttributes(sessionId, sessionAttributes); - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java index 8265bbe62bf..510a66ba16c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java @@ -133,9 +133,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH } @Override - public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) - throws Exception { - + public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) throws Exception { if (returnValue == null) { return; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java index f3194c917f2..0e82b56134b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java @@ -187,7 +187,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan * The configured Validator instance */ public Validator getValidator() { - return validator; + return this.validator; } /** @@ -315,25 +315,23 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan @Override protected SimpMessageMappingInfo getMappingForMethod(Method method, Class handlerType) { - MessageMapping typeAnnot = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class); + MessageMapping typeAnnotation = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class); MessageMapping messageAnnot = AnnotationUtils.findAnnotation(method, MessageMapping.class); if (messageAnnot != null) { SimpMessageMappingInfo result = createMessageMappingCondition(messageAnnot); - if (typeAnnot != null) { - result = createMessageMappingCondition(typeAnnot).combine(result); + if (typeAnnotation != null) { + result = createMessageMappingCondition(typeAnnotation).combine(result); } return result; } - - SubscribeMapping subsribeAnnot = AnnotationUtils.findAnnotation(method, SubscribeMapping.class); - if (subsribeAnnot != null) { - SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnot); - if (typeAnnot != null) { - result = createMessageMappingCondition(typeAnnot).combine(result); + SubscribeMapping subsribeAnnotation = AnnotationUtils.findAnnotation(method, SubscribeMapping.class); + if (subsribeAnnotation != null) { + SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnotation); + if (typeAnnotation != null) { + result = createMessageMappingCondition(typeAnnotation).combine(result); } return result; } - return null; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index 04770c8af0e..b26a3802095 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -43,12 +43,17 @@ public abstract class AbstractBrokerMessageHandler protected final Log logger = LogFactory.getLog(getClass()); + private final Collection destinationPrefixes; private ApplicationEventPublisher eventPublisher; private AtomicBoolean brokerAvailable = new AtomicBoolean(false); + private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this); + + private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this); + private boolean autoStartup = true; private Object lifecycleMonitor = new Object(); @@ -128,12 +133,12 @@ public abstract class AbstractBrokerMessageHandler public final void start() { synchronized (this.lifecycleMonitor) { if (logger.isDebugEnabled()) { - logger.debug("Starting"); + logger.debug("Starting..."); } startInternal(); this.running = true; if (logger.isDebugEnabled()) { - logger.debug("Started"); + logger.debug("Started."); } } } @@ -145,12 +150,12 @@ public abstract class AbstractBrokerMessageHandler public final void stop() { synchronized (this.lifecycleMonitor) { if (logger.isDebugEnabled()) { - logger.debug("Stopping"); + logger.debug("Stopping..."); } stopInternal(); this.running = false; if (logger.isDebugEnabled()) { - logger.debug("Stopped"); + logger.debug("Stopped."); } } } @@ -170,7 +175,7 @@ public abstract class AbstractBrokerMessageHandler public final void handleMessage(Message message) { if (!this.running) { if (logger.isTraceEnabled()) { - logger.trace("Message broker is not running. Ignoring message=" + message); + logger.trace(this + " not running yet. Ignoring " + message); } return; } @@ -194,20 +199,20 @@ public abstract class AbstractBrokerMessageHandler protected void publishBrokerAvailableEvent() { boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); if (this.eventPublisher != null && shouldPublish) { - if (logger.isDebugEnabled()) { - logger.debug("Publishing BrokerAvailabilityEvent (available)"); + if (logger.isInfoEnabled()) { + logger.info("Publishing " + this.availableEvent); } - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); + this.eventPublisher.publishEvent(this.availableEvent); } } protected void publishBrokerUnavailableEvent() { boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); if (this.eventPublisher != null && shouldPublish) { - if (logger.isDebugEnabled()) { - logger.debug("Publishing BrokerAvailabilityEvent (unavailable)"); + if (logger.isInfoEnabled()) { + logger.info("Publishing " + this.notAvailableEvent); } - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); + this.eventPublisher.publishEvent(this.notAvailableEvent); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java index bc0cf0c1271..01fe8a271bf 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java @@ -22,6 +22,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; /** @@ -39,32 +40,24 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final void registerSubscription(Message message) { - MessageHeaders headers = message.getHeaders(); - SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.SUBSCRIBE.equals(type)) { - logger.error("Expected SUBSCRIBE message: " + message); - return; - } + SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); + Assert.isTrue(SimpMessageType.SUBSCRIBE.equals(messageType), "Expected SUBSCRIBE: " + message); String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); if (sessionId == null) { - logger.error("Ignoring subscription. No sessionId in message: " + message); + logger.error("No sessionId in " + message); return; } String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); if (subscriptionId == null) { - logger.error("Ignoring subscription. No subscriptionId in message: " + message); + logger.error("No subscriptionId in " + message); return; } String destination = SimpMessageHeaderAccessor.getDestination(headers); if (destination == null) { - logger.error("Ignoring destination. No destination in message: " + message); + logger.error("No destination in " + message); return; } - if (logger.isDebugEnabled()) { - logger.debug("Adding subscription id=" + subscriptionId + ", destination=" + destination); - } addSubscriptionInternal(sessionId, subscriptionId, destination, message); } @@ -73,27 +66,19 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final void unregisterSubscription(Message message) { - MessageHeaders headers = message.getHeaders(); - SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.UNSUBSCRIBE.equals(type)) { - logger.error("Expected UNSUBSCRIBE message: " + message); - return; - } + SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); + Assert.isTrue(SimpMessageType.UNSUBSCRIBE.equals(messageType), "Expected UNSUBSCRIBE: " + message); String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); if (sessionId == null) { - logger.error("Ignoring subscription. No sessionId in message: " + message); + logger.error("No sessionId in " + message); return; } String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); if (subscriptionId == null) { - logger.error("Ignoring subscription. No subscriptionId in message: " + message); + logger.error("No subscriptionId " + message); return; } - if (logger.isDebugEnabled()) { - logger.debug("Unubscribe request: " + message); - } removeSubscriptionInternal(sessionId, subscriptionId, message); } @@ -104,24 +89,15 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final MultiValueMap findSubscriptions(Message message) { - MessageHeaders headers = message.getHeaders(); SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.MESSAGE.equals(type)) { - logger.trace("Ignoring message type " + type); - return null; - } + Assert.isTrue(SimpMessageType.MESSAGE.equals(type), "Unexpected message type: " + type); String destination = SimpMessageHeaderAccessor.getDestination(headers); if (destination == null) { - logger.trace("Ignoring message, no destination"); + logger.error("No destination in " + message); return null; } - MultiValueMap result = findSubscriptionsInternal(destination, message); - if (logger.isTraceEnabled()) { - logger.trace("Found " + result.size() + " subscriptions for destination=" + destination); - } - return result; + return findSubscriptionsInternal(destination, message); } protected abstract MultiValueMap findSubscriptionsInternal( diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java index 24f836f8fed..77522321744 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java @@ -50,7 +50,7 @@ public class BrokerAvailabilityEvent extends ApplicationEvent { @Override public String toString() { - return "BrokerAvailabilityEvent=" + this.brokerAvailable; + return "BrokerAvailabilityEvent[available=" + this.brokerAvailable + ", " + getSource() + "]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index fa38761851b..a53495c6c5d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -106,9 +106,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { public void unregisterAllSubscriptions(String sessionId) { SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId); if (info != null) { - if (logger.isDebugEnabled()) { - logger.debug("Unregistering subscriptions for sessionId=" + sessionId); - } this.destinationCache.updateAfterRemovedSession(info); } } @@ -137,8 +134,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[destinationCache=" + this.destinationCache + ", subscriptionRegistry=" - + this.subscriptionRegistry + "]"; + return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]"; } @@ -241,7 +237,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[cache=" + this.accessCache + "]"; + return "cache[" + this.accessCache.size() + " destination(s)]"; } } @@ -282,7 +278,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[sessions=" + sessions + "]"; + return "registry[" + sessions.size() + " session(s)]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index c2b0038c631..c29978bb083 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -64,12 +64,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { SubscribableChannel brokerChannel, Collection destinationPrefixes) { super(destinationPrefixes); - Assert.notNull(clientInboundChannel, "'clientInboundChannel' must not be null"); Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null"); Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); - - this.clientInboundChannel = clientInboundChannel; this.clientOutboundChannel = clientOutboundChannel; this.brokerChannel = brokerChannel; @@ -139,7 +136,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { if (!checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); + logger.trace("No match on destination in " + message); } return; } @@ -147,16 +144,10 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { if (SimpMessageType.MESSAGE.equals(messageType)) { sendMessageToSubscribers(destination, message); } - else if (SimpMessageType.SUBSCRIBE.equals(messageType)) { - this.subscriptionRegistry.registerSubscription(message); - } - else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) { - this.subscriptionRegistry.unregisterSubscription(message); - } - else if (SimpMessageType.DISCONNECT.equals(messageType)) { - this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); - } else if (SimpMessageType.CONNECT.equals(messageType)) { + if (logger.isInfoEnabled()) { + logger.info("Handling CONNECT: " + message); + } SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); initHeaders(accessor); accessor.setSessionId(sessionId); @@ -164,9 +155,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { Message connectAck = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); this.clientOutboundChannel.send(connectAck); } + else if (SimpMessageType.DISCONNECT.equals(messageType)) { + if (logger.isInfoEnabled()) { + logger.info("Handling DISCONNECT: " + message); + } + this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); + } + else if (SimpMessageType.SUBSCRIBE.equals(messageType)) { + if (logger.isDebugEnabled()) { + logger.debug("Handling SUBSCRIBE: " + message); + } + this.subscriptionRegistry.registerSubscription(message); + } + else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) { + if (logger.isDebugEnabled()) { + logger.debug("Handling UNSUBSCRIBE: " + message); + } + this.subscriptionRegistry.unregisterSubscription(message); + } else { if (logger.isTraceEnabled()) { - logger.trace("Message type not supported. Ignoring: " + message); + logger.trace("Unsupported message type in " + message); } } } @@ -179,9 +188,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { protected void sendMessageToSubscribers(String destination, Message message) { MultiValueMap subscriptions = this.subscriptionRegistry.findSubscriptions(message); - if ((subscriptions.size() > 0) && logger.isDebugEnabled()) { - logger.debug("Sending message with destination=" + destination - + " to " + subscriptions.size() + " subscriber(s)"); + if ((subscriptions.size() > 0) && logger.isTraceEnabled()) { + logger.trace("Sending to " + subscriptions.size() + " subscriber(s): " + message); } for (String sessionId : subscriptions.keySet()) { for (String subscriptionId : subscriptions.get(sessionId)) { @@ -196,10 +204,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { this.clientOutboundChannel.send(reply); } catch (Throwable ex) { - logger.error("Failed to send message=" + message, ex); + logger.error("Failed to send " + message, ex); } } } } + @Override + public String toString() { + return "SimpleBroker[" + this.subscriptionRegistry + "]"; + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index eed1f76ba6d..ec07d15bfdc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -327,6 +327,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.tcpClient; } + /** + * Return the current count of TCP connection to the broker. + */ + public int getConnectionCount() { + return this.connectionHandlers.size(); + } + /** * Configure a {@link MessageHeaderInitializer} to apply to the headers of all * messages created through the {@code StompBrokerRelayMessageHandler} that @@ -359,8 +366,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); } - if (logger.isDebugEnabled()) { - logger.debug("Initializing \"system\" connection"); + if (logger.isInfoEnabled()) { + logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); @@ -388,7 +395,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); } catch (Throwable t) { - logger.error("Error while shutting down TCP client", t); + logger.error("Error in shutdown of TCP client", t); } } @@ -399,14 +406,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (!isBrokerAvailable()) { if (sessionId == null || SystemStompConnectionHandler.SESSION_ID.equals(sessionId)) { - throw new MessageDeliveryException("Message broker is not active."); + throw new MessageDeliveryException("Message broker not active. Consider subscribing to " + + "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); } SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); if (messageType.equals(SimpMessageType.CONNECT) && logger.isErrorEnabled()) { - logger.error("Message broker is not active. Ignoring: " + message); + logger.error("Broker not active. Ignoring " + message); } else if (logger.isDebugEnabled()) { - logger.debug("Message broker is not active. Ignoring: " + message); + logger.debug("Broker not active. Ignoring " + message); } return; } @@ -416,8 +424,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); if (accessor == null) { - logger.error("No header accessor, please use SimpMessagingTemplate. Ignoring: " + message); - return; + throw new IllegalStateException( + "No header accessor (not using the SimpMessagingTemplate?): " + message); } else if (accessor instanceof StompHeaderAccessor) { stompAccessor = (StompHeaderAccessor) accessor; @@ -431,14 +439,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } else { - // Should not happen - logger.error("Unexpected header accessor type: " + accessor + ". Ignoring: " + message); - return; + throw new IllegalStateException( + "Unexpected header accessor type " + accessor.getClass() + " in " + message); } if (sessionId == null) { if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) { - logger.error("Only STOMP SEND frames supported on \"system\" connection. Ignoring: " + message); + logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message); return; } sessionId = SystemStompConnectionHandler.SESSION_ID; @@ -448,18 +455,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler String destination = stompAccessor.getDestination(); if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); + logger.trace("No match on destination. Ignoring " + message); } return; } - if (logger.isTraceEnabled()) { - logger.trace("Processing message=" + message); - } - if (StompCommand.CONNECT.equals(command)) { if (logger.isDebugEnabled()) { - logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")"); + logger.debug("STOMP CONNECT in session " + sessionId + " (" + getConnectionCount() + " connections)."); } stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message)); stompAccessor.setLogin(this.clientLogin); @@ -474,8 +477,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else if (StompCommand.DISCONNECT.equals(command)) { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - if (logger.isTraceEnabled()) { - logger.trace("Connection already removed for sessionId '" + sessionId + "'"); + if (logger.isDebugEnabled()) { + logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up."); } return; } @@ -484,8 +487,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - if (logger.isWarnEnabled()) { - logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message"); + if (logger.isDebugEnabled()) { + logger.debug("No TCP connection for session " + sessionId + " in " + message); } return; } @@ -493,6 +496,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + @Override + public String toString() { + return "StompBrokerRelay[broker=" + this.relayHost + ":" + this.relayPort + + ", " + getConnectionCount() + " connection(s)]"; + } + private class StompConnectionHandler implements TcpConnectionHandler { @@ -511,15 +520,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this(sessionId, connectHeaders, true); } - private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, - boolean isRemoteClientSession) { - - Assert.notNull(sessionId, "SessionId must not be null"); - Assert.notNull(connectHeaders, "ConnectHeaders must not be null"); - + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { + Assert.notNull(sessionId, "'sessionId' must not be null"); + Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); this.sessionId = sessionId; this.connectHeaders = connectHeaders; - this.isRemoteClientSession = isRemoteClientSession; + this.isRemoteClientSession = isClientSession; } public String getSessionId() { @@ -528,8 +534,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnected(TcpConnection connection) { - if (logger.isDebugEnabled()) { - logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'"); + if (logger.isInfoEnabled()) { + logger.info("TCP connection established. Forwarding: " + this.connectHeaders); } this.tcpConnection = connection; connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders())); @@ -537,19 +543,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnectFailure(Throwable ex) { - handleTcpConnectionFailure("Failed to connect to message broker", ex); + handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex); } /** * Invoked when any TCP connectivity issue is detected, i.e. failure to establish - * the TCP connection, failure to send a message, missed heartbeat. + * the TCP connection, failure to send a message, missed heartbeat, etc. */ - protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { + protected void handleTcpConnectionFailure(String error, Throwable ex) { if (logger.isErrorEnabled()) { - logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex); + logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex); } try { - sendStompErrorToClient(errorMessage); + sendStompErrorFrameToClient(error); } finally { try { @@ -557,13 +563,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } catch (Throwable t) { if (logger.isErrorEnabled()) { - logger.error("Failed to close connection: " + t.getMessage()); + logger.error("Failure while cleaning up state for TCP connection" + + " in session " + this.sessionId, t); } } } } - private void sendStompErrorToClient(String errorText) { + private void sendStompErrorFrameToClient(String errorText) { if (this.isRemoteClientSession) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); if (getHeaderInitializer() != null) { @@ -584,24 +591,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void handleMessage(Message message) { - - StompHeaderAccessor headerAccessor = - MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); - + StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); headerAccessor.setSessionId(this.sessionId); - if (headerAccessor.isHeartbeat()) { - logger.trace("Received broker heartbeat"); - } - else if (logger.isDebugEnabled()) { - logger.debug("Received message from broker in session '" + this.sessionId + "'"); + if (StompCommand.CONNECTED.equals(headerAccessor.getCommand())) { + if (logger.isInfoEnabled()) { + logger.info("Received STOMP CONNECTED: " + headerAccessor); + } + afterStompConnected(headerAccessor); } - else if (logger.isErrorEnabled() && StompCommand.ERROR == headerAccessor.getCommand()) { + else if (StompCommand.ERROR.equals(headerAccessor.getCommand()) && logger.isErrorEnabled()) { logger.error("Received STOMP ERROR: " + message); } - - if (StompCommand.CONNECTED == headerAccessor.getCommand()) { - afterStompConnected(headerAccessor); + else if (logger.isTraceEnabled()) { + logger.trace(headerAccessor.isHeartbeat() ? + "Received heartbeat in session " + this.sessionId : "Received " + headerAccessor); } headerAccessor.setImmutable(); @@ -618,12 +622,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } private void initHeartbeats(StompHeaderAccessor connectedHeaders) { - - // Remote clients do their own heartbeat management if (this.isRemoteClientSession) { return; } - long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; @@ -640,7 +641,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler conn.send(HEARTBEAT_MESSAGE).addCallback( new ListenableFutureCallback() { public void onFailure(Throwable t) { - handleTcpConnectionFailure("Failed to send heartbeat", t); + String error = "failed to forward heartbeat in \"system\" session."; + handleTcpConnectionFailure(error, t); } public void onSuccess(Void result) {} }); @@ -648,25 +650,25 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } }, interval); } - if (clientReceiveInterval > 0 && serverSendInterval > 0) { final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - handleTcpConnectionFailure("No hearbeat from broker for more than " + - interval + "ms, closing connection", null); + handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null); } - }, interval); + }, interval); } } @Override - public void handleFailure(Throwable ex) { - if (this.tcpConnection == null) { - return; + public void handleFailure(Throwable failure) { + if (this.tcpConnection != null) { + handleTcpConnectionFailure("transport failure.", failure); + } + else if (logger.isErrorEnabled()) { + logger.error("Transport failure: " + failure); } - handleTcpConnectionFailure("Closing connection after TCP failure", ex); } @Override @@ -675,17 +677,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } try { - if (logger.isDebugEnabled()) { - logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'"); + if (logger.isInfoEnabled()) { + logger.info("TCP connection to broker closed in session " + this.sessionId); } - sendStompErrorToClient("Connection to broker closed"); + sendStompErrorFrameToClient("Connection to broker closed."); } finally { try { + // Prevent clearConnection() from trying to close + this.tcpConnection = null; clearConnection(); } catch (Throwable t) { - // Ignore + // Shouldn't happen with connection reset beforehand } } } @@ -697,7 +701,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * received the STOMP CONNECTED frame. For client messages this should be * false only if we lose the TCP connection around the same time when a * client message is being forwarded, so we simply log the ignored message - * at trace level. For messages from within the application being sent on + * at debug level. For messages from within the application being sent on * the "system" connection an exception is raised so that components sending * the message have a chance to handle it -- by default the broker message * channel is synchronous. @@ -714,78 +718,77 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * @return a future to wait for the result */ @SuppressWarnings("unchecked") - public ListenableFuture forward(Message message, final StompHeaderAccessor headerAccessor) { + public ListenableFuture forward(Message message, final StompHeaderAccessor accessor) { TcpConnection conn = this.tcpConnection; if (!this.isStompConnected) { if (this.isRemoteClientSession) { if (logger.isDebugEnabled()) { - logger.debug("Ignoring client message received " + message + - (conn != null ? "before CONNECTED frame" : "after TCP connection closed")); + logger.debug("TCP connection closed already, ignoring " + message); } return EMPTY_TASK; } else { - throw new IllegalStateException("Cannot forward messages on system connection " + - (conn != null ? "before STOMP CONNECTED frame" : "while inactive") + - ". Try listening for BrokerAvailabilityEvent ApplicationContext events."); - + throw new IllegalStateException("Cannot forward messages " + + (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") + + "Consider subscribing to receive BrokerAvailabilityEvent's from " + + "an ApplicationListener Spring bean. Dropped " + message); } } - if (logger.isDebugEnabled()) { - if (headerAccessor.isHeartbeat()) { - logger.trace("Forwarding heartbeat to broker"); - } - else { - logger.debug("Forwarding message to broker"); - } - } + final Message messageToSend = (accessor.isMutable() && accessor.isModified()) ? + MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message; - if (headerAccessor.isMutable() && headerAccessor.isModified()) { - message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders()); + StompCommand command = accessor.getCommand(); + if (accessor.isHeartbeat()) { + logger.trace("Forwarding heartbeat in session " + this.sessionId); + } + else if (StompCommand.SUBSCRIBE.equals(command) && logger.isDebugEnabled()) { + logger.debug("Forwarding SUBSCRIBE: " + messageToSend); + } + else if (StompCommand.UNSUBSCRIBE.equals(command) && logger.isDebugEnabled()) { + logger.debug("Forwarding UNSUBSCRIBE: " + messageToSend); + } + else if (StompCommand.DISCONNECT.equals(command) && logger.isInfoEnabled()) { + logger.info("Forwarding DISCONNECT: " + messageToSend); + } + else if (logger.isTraceEnabled()) { + logger.trace("Forwarding " + command + ": " + messageToSend); } - ListenableFuture future = conn.send((Message) message); - + ListenableFuture future = conn.send((Message) messageToSend); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Void result) { - if (headerAccessor.getCommand() == StompCommand.DISCONNECT) { + if (accessor.getCommand() == StompCommand.DISCONNECT) { clearConnection(); } } @Override public void onFailure(Throwable t) { - if (tcpConnection == null) { - // already reset + if (tcpConnection != null) { + handleTcpConnectionFailure("failed to forward " + messageToSend, t); } - else { - handleTcpConnectionFailure("Failed to send message " + headerAccessor, t); + else if (logger.isErrorEnabled()) { + logger.error("Failed to forward " + messageToSend); } } }); - return future; } /** - * Close the TCP connection to the broker and release the connection reference, - * Any exception arising from closing the connection is propagated. The caller - * must handle and log the exception accordingly. - * - *

If the connection belongs to a client session, the connection handler - * for the session (basically the current instance) is also released from the - * {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}. + * Clean up state associated with the connection and close it. + * Any exception arising from closing the connection are propagated. */ public void clearConnection() { + if (logger.isDebugEnabled()) { + logger.debug("Cleaning up connection state for session " + sessionId + " (" + + (getConnectionCount() - 1) + " remaining connections)."); + } if (this.isRemoteClientSession) { - if (logger.isDebugEnabled()) { - logger.debug("Removing session '" + sessionId + "' (total remaining=" + - (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); - } StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } @@ -794,13 +797,16 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler TcpConnection conn = this.tcpConnection; this.tcpConnection = null; if (conn != null) { + if (logger.isInfoEnabled()) { + logger.info("Closing TCP connection in session " + this.sessionId); + } conn.close(); } } @Override public String toString() { - return "StompConnectionHandler{" + "sessionId=" + this.sessionId + "}"; + return "StompConnectionHandler[sessionId=" + this.sessionId + "]"; } } @@ -820,8 +826,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - protected void handleTcpConnectionFailure(String errorMessage, Throwable t) { - super.handleTcpConnectionFailure(errorMessage, t); + protected void handleTcpConnectionFailure(String error, Throwable t) { + super.handleTcpConnectionFailure(error, t); publishBrokerUnavailableEvent(); } @@ -832,9 +838,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - public ListenableFuture forward(Message message, StompHeaderAccessor headerAccessor) { + public ListenableFuture forward(Message message, StompHeaderAccessor accessor) { try { - ListenableFuture future = super.forward(message, headerAccessor); + ListenableFuture future = super.forward(message, accessor); future.get(); return future; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java index 1ef03d77785..aae88e7100d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java @@ -18,6 +18,8 @@ package org.springframework.messaging.simp.stomp; import org.springframework.core.NestedRuntimeException; /** + * Raised after a failure to encode or decode a STOMP message. + * * @author Gary Russell * @since 4.0 */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java index 3afbee28e25..d9d46ba3817 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java @@ -156,13 +156,13 @@ public class StompDecoder { headerAccessor.updateSimpMessageHeadersFromStompHeaders(); headerAccessor.setLeaveMutable(true); decodedMessage = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders()); - if (logger.isDebugEnabled()) { - logger.debug("Decoded " + decodedMessage); + if (logger.isTraceEnabled()) { + logger.trace("Decoded " + decodedMessage); } } else { if (logger.isTraceEnabled()) { - logger.trace("Received incomplete frame. Resetting buffer."); + logger.trace("Incomplete frame, resetting input buffer."); } if (headers != null && headerAccessor != null) { String name = NativeMessageHeaderAccessor.NATIVE_HEADERS; @@ -177,7 +177,7 @@ public class StompDecoder { } else { if (logger.isTraceEnabled()) { - logger.trace("Decoded heartbeat"); + logger.trace("Decoded heartbeat."); } StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat(); initHeaders(headerAccessor); @@ -224,8 +224,8 @@ public class StompDecoder { int colonIndex = header.indexOf(':'); if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) { if (buffer.remaining() > 0) { - throw new StompConversionException( - "Illegal header: '" + header + "'. A header must be of the form :"); + throw new StompConversionException("Illegal header: '" + header + + "'. A header must be of the form :."); } } else { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java index d097d98dee2..3a311e3a4c8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java @@ -91,7 +91,7 @@ public final class StompEncoder { return baos.toByteArray(); } catch (IOException e) { - throw new StompConversionException("Failed to encode STOMP frame", e); + throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers + ".", e); } } @@ -102,8 +102,8 @@ public final class StompEncoder { Map> nativeHeaders = (Map>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (logger.isDebugEnabled()) { - logger.debug("Encoding STOMP " + command + ", headers=" + nativeHeaders); + if (logger.isTraceEnabled()) { + logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders + "."); } if (nativeHeaders == null) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java index 1ab47d3300a..23d99969c14 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java @@ -100,13 +100,11 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { @Override public UserDestinationResult resolveDestination(Message message) { - String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); DestinationInfo info = parseUserDestination(message); if (info == null) { return null; } - Set resolved = new HashSet(); for (String sessionId : info.getSessionIds()) { String d = getTargetDestination(destination, info.getDestinationWithoutPrefix(), sessionId, info.getUser()); @@ -114,12 +112,10 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { resolved.add(d); } } - return new UserDestinationResult(destination, resolved, info.getSubscribeDestination(), info.getUser()); } private DestinationInfo parseUserDestination(Message message) { - MessageHeaders headers = message.getHeaders(); SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); String destination = SimpMessageHeaderAccessor.getDestination(headers); @@ -131,12 +127,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { String user; Set sessionIds; + if (destination == null || !checkDestination(destination, this.destinationPrefix)) { + return null; + } + if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { - if (!checkDestination(destination, this.destinationPrefix)) { - return null; - } if (sessionId == null) { - logger.error("Ignoring message, no session id available"); + logger.error("No session id. Ignoring " + message); return null; } destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1); @@ -145,9 +142,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { sessionIds = Collections.singleton(sessionId); } else if (SimpMessageType.MESSAGE.equals(messageType)) { - if (!checkDestination(destination, this.destinationPrefix)) { - return null; - } int startIndex = this.destinationPrefix.length(); int endIndex = destination.indexOf('/', startIndex); Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\""); @@ -160,27 +154,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { Collections.singleton(sessionId) : this.userSessionRegistry.getSessionIds(user)); } else { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring " + messageType + " message"); - } return null; } - return new DestinationInfo(destinationWithoutPrefix, subscribeDestination, user, sessionIds); } protected boolean checkDestination(String destination, String requiredPrefix) { - if (destination == null) { - logger.trace("Ignoring message, no destination"); - return false; - } - if (!destination.startsWith(requiredPrefix)) { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to " + destination + ", not a \"user\" destination"); - } - return false; - } - return true; + return destination.startsWith(requiredPrefix); } /** @@ -237,6 +217,12 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { public Set getSessionIds() { return this.sessionIds; } + + @Override + public String toString() { + return "DestinationInfo[destination=" + this.destinationWithoutPrefix + ", subscribeDestination=" + + this.subscribeDestination + ", user=" + this.user + ", sessionIds=" + this.sessionIds + "]"; + } } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java index efe3e1ba479..cd36efa5c45 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java @@ -171,8 +171,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec } Set destinations = result.getTargetDestinations(); if (destinations.isEmpty()) { - if (logger.isTraceEnabled()) { - logger.trace("No target destinations, message=" + message); + if (logger.isDebugEnabled()) { + logger.debug("Use destination not resolved (no active sessions?): " + message); } return; } @@ -184,8 +184,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders()); } for (String destination : destinations) { - if (logger.isDebugEnabled()) { - logger.debug("Sending message to resolved destination=" + destination); + if (logger.isTraceEnabled()) { + logger.trace("Sending message with resolved user destination: " + message); } this.brokerMessagingTemplate.send(destination, message); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java index d73a6b9457b..d9504c9a47b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java @@ -91,4 +91,10 @@ public class UserDestinationResult { public String getUser() { return this.user; } + + @Override + public String toString() { + return "UserDestinationResult[source=" + this.sourceDestination + ", target=" + this.targetDestinations + + ", subscribeDestination=" + this.subscribeDestination + ", user=" + this.user + "]"; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java index a7a9dee4db8..70dfb407c7b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java @@ -100,18 +100,14 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName @Override public final boolean send(Message message, long timeout) { - Assert.notNull(message, "Message must not be null"); - if (logger.isTraceEnabled()) { - logger.trace("[" + this.beanName + "] sending message=" + message); + logger.trace(this + " sending " + message); } - message = this.interceptorChain.preSend(message, this); if (message == null) { return false; } - try { boolean sent = sendInternal(message, timeout); this.interceptorChain.postSend(message, this, sent); @@ -121,8 +117,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName if (e instanceof MessagingException) { throw (MessagingException) e; } - throw new MessageDeliveryException(message, - "Failed to send message to channel '" + this.getBeanName() + "'", e); + throw new MessageDeliveryException(message,"Failed to send message to " + this, e); } } @@ -131,7 +126,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName @Override public String toString() { - return "MessageChannel [name=" + this.beanName + "]"; + return "MessageChannel[name=" + this.beanName + "]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java index 19ff6357924..82cb9e4cc67 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java @@ -47,7 +47,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel boolean result = this.handlers.add(handler); if (result) { if (logger.isDebugEnabled()) { - logger.debug("[" + getBeanName() + "] subscribed " + handler); + logger.debug(this + " subscribed " + handler); } } return result; @@ -58,7 +58,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel boolean result = this.handlers.remove(handler); if (result) { if (logger.isDebugEnabled()) { - logger.debug("[" + getBeanName() + "] unsubscribed " + handler); + logger.debug(this + " unsubscribed " + handler); } } return result; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java index 68193fa58ca..3b0b5102574 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java @@ -471,7 +471,7 @@ public class MessageHeaderAccessor { @Override public String toString() { - return getClass().getSimpleName() + " [headers=" + this.headers + "]"; + return getClass().getSimpleName() + "[headers=" + this.headers + "]"; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java index 2d8f4e16365..d2be486a282 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java @@ -133,8 +133,8 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public final void close(CloseStatus status) throws IOException { checkNativeSessionInitialized(); - if (logger.isDebugEnabled()) { - logger.debug("Closing " + this); + if (logger.isInfoEnabled()) { + logger.info("Closing " + this); } closeInternal(status); } @@ -144,7 +144,7 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public String toString() { - return "WebSocket session id=" + getId(); + return this.getClass().getSimpleName() + "[id=" + getId() + "]"; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java index cfdbba34ffc..2c081bb625e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java @@ -50,7 +50,7 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { if (logger.isTraceEnabled()) { - logger.trace(message + ", " + session); + logger.trace("Handling " + message + " in " + session); } super.handleMessage(session, message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java index 348609b1a7e..52c4f0e34e1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java @@ -73,6 +73,6 @@ public class SessionConnectEvent extends ApplicationEvent { @Override public String toString() { - return "SessionConnectEvent: message=" + message; + return "SessionConnectEvent" + this.message; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java index 45e47dce81e..01498d565b5 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java @@ -56,6 +56,6 @@ public class SessionConnectedEvent extends ApplicationEvent { @Override public String toString() { - return "SessionConnectedEvent: message=" + message; + return "SessionConnectedEvent" + this.message; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java index 76ea9990fa7..15ad6303a14 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java @@ -68,6 +68,7 @@ public class SessionDisconnectEvent extends ApplicationEvent { @Override public String toString() { - return "SessionDisconnectEvent: sessionId=" + this.sessionId; + return "SessionDisconnectEvent[sessionId=" + this.sessionId + + (this.status != null ? this.status.toString() : "closeStatus=null") + "]"; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 417955a47d3..fd1149a1105 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -39,7 +39,6 @@ import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.stomp.BufferingStompDecoder; import org.springframework.messaging.simp.stomp.StompCommand; -import org.springframework.messaging.simp.stomp.StompConversionException; import org.springframework.messaging.simp.stomp.StompDecoder; import org.springframework.messaging.simp.stomp.StompEncoder; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @@ -188,31 +187,31 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE messages = decoder.decode(byteBuffer); if (messages.isEmpty()) { - logger.debug("Incomplete STOMP frame content received," + "buffered=" + - decoder.getBufferSize() + ", buffer size limit=" + decoder.getBufferSizeLimit()); + if (logger.isTraceEnabled()) { + logger.trace("Incomplete STOMP frame content received, bufferSize=" + + decoder.getBufferSize() + ", bufferSizeLimit=" + decoder.getBufferSizeLimit() + "."); + } return; } } catch (Throwable ex) { - logger.error("Failed to parse WebSocket message to STOMP." + - "Sending STOMP ERROR to client, sessionId=" + session.getId(), ex); + if (logger.isErrorEnabled()) { + logger.error("Failed to parse " + webSocketMessage + + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); + } sendErrorMessage(session, ex); return; } for (Message message : messages) { try { - StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (logger.isTraceEnabled()) { - if (headerAccessor.isHeartbeat()) { - logger.trace("Received heartbeat from client session=" + session.getId()); - } - else { - logger.trace("Received message from client session=" + session.getId()); - } + logger.trace(headerAccessor.isHeartbeat() ? + "Received heartbeat from broker in session " + session.getId() + "." : + "Received message from broker in session " + session.getId() + ": " + message + "."); } headerAccessor.setSessionId(session.getId()); @@ -233,19 +232,23 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } catch (Throwable ex) { - logger.error("Parsed STOMP message but could not send it to to message channel. " + - "Sending STOMP ERROR to client, sessionId=" + session.getId(), ex); + logger.error("Failed to send STOMP message from client to application MessageChannel" + + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); sendErrorMessage(session, ex); + } } } private void publishEvent(ApplicationEvent event) { try { + if (logger.isInfoEnabled()) { + logger.info("Publishing " + event); + } this.eventPublisher.publishEvent(event); } catch (Throwable ex) { - logger.error("Error while publishing " + event, ex); + logger.error("Error publishing " + event + ".", ex); } } @@ -257,7 +260,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE session.sendMessage(new TextMessage(bytes)); } catch (Throwable ex) { - // ignore + logger.error("Failed to send STOMP ERROR to client.", ex); } } @@ -267,45 +270,17 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @SuppressWarnings("unchecked") @Override public void handleMessageToClient(WebSocketSession session, Message message) { - if (!(message.getPayload() instanceof byte[])) { - logger.error("Ignoring message, expected byte[] content: " + message); - return; - } - - MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); - if (accessor == null) { - logger.error("No header accessor: " + message); - return; - } - - StompHeaderAccessor stompAccessor; - if (accessor instanceof StompHeaderAccessor) { - stompAccessor = (StompHeaderAccessor) accessor; - } - else if (accessor instanceof SimpMessageHeaderAccessor) { - stompAccessor = StompHeaderAccessor.wrap(message); - if (SimpMessageType.CONNECT_ACK.equals(stompAccessor.getMessageType())) { - StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); - connectedHeaders.setVersion(getVersion(stompAccessor)); - connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker - stompAccessor = connectedHeaders; - } - else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) { - stompAccessor.updateStompCommandAsServerMessage(); - } - } - else { - // Should not happen - logger.error("Unexpected header accessor type: " + accessor); + logger.error("Expected byte[] payload. Ignoring " + message + "."); return; } - + StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message); StompCommand command = stompAccessor.getCommand(); if (StompCommand.MESSAGE.equals(command)) { if (stompAccessor.getSubscriptionId() == null) { - logger.error("Ignoring message, no subscriptionId header: " + message); - return; + if (logger.isWarnEnabled()) { + logger.warn("No STOMP \"subscription\" header in " + message); + } } String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); if (origDestination != null) { @@ -320,19 +295,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE publishEvent(new SessionConnectedEvent(this, (Message) message)); } } - try { byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), (byte[]) message.getPayload()); - TextMessage textMessage = new TextMessage(bytes); - - session.sendMessage(textMessage); + session.sendMessage(new TextMessage(bytes)); } catch (SessionLimitExceededException ex) { // Bad session, just get out throw ex; } catch (Throwable ex) { - logger.error("Failed to send WebSocket message to client, sessionId=" + session.getId(), ex); + logger.error("Failed to send WebSocket message to client in session " + session.getId() + ".", ex); command = StompCommand.ERROR; } finally { @@ -347,58 +319,93 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } - protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message message) { - return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message)); + private StompHeaderAccessor getStompHeaderAccessor(Message message) { + MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); + if (accessor == null) { + // Shouldn't happen (only broker broadcasts directly to clients) + throw new IllegalStateException("No header accessor in " + message + "."); + } + StompHeaderAccessor stompAccessor; + if (accessor instanceof StompHeaderAccessor) { + stompAccessor = (StompHeaderAccessor) accessor; + } + else if (accessor instanceof SimpMessageHeaderAccessor) { + stompAccessor = StompHeaderAccessor.wrap(message); + if (SimpMessageType.CONNECT_ACK.equals(stompAccessor.getMessageType())) { + stompAccessor = convertConnectAcktoStompConnected(stompAccessor); + } + else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) { + stompAccessor.updateStompCommandAsServerMessage(); + } + } + else { + // Shouldn't happen (only broker broadcasts directly to clients) + throw new IllegalStateException( + "Unexpected header accessor type: " + accessor.getClass() + " in " + message + "."); + } + return stompAccessor; } - private String getVersion(StompHeaderAccessor connectAckHeaders) { - + /** + * The simple broker produces {@code SimpMessageType.CONNECT_ACK} that's not STOMP + * specific and needs to be turned into a STOMP CONNECTED frame. + */ + private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) { String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; - Message connectMessage = (Message) connectAckHeaders.getHeader(name); - Assert.notNull(connectMessage, "CONNECT_ACK does not contain original CONNECT " + connectAckHeaders); - - StompHeaderAccessor connectHeaders = - MessageHeaderAccessor.getAccessor(connectMessage, StompHeaderAccessor.class); - + Message message = (Message) connectAckHeaders.getHeader(name); + Assert.notNull(message, "Original STOMP CONNECT not found in " + connectAckHeaders); + StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + String version; Set acceptVersions = connectHeaders.getAcceptVersion(); if (acceptVersions.contains("1.2")) { - return "1.2"; + version = "1.2"; } else if (acceptVersions.contains("1.1")) { - return "1.1"; + version = "1.1"; } else if (acceptVersions.isEmpty()) { - return null; + version = null; } else { - throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); + throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'"); } + StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); + connectedHeaders.setVersion(version); + connectedHeaders.setHeartbeat(0, 0); // not supported + return connectedHeaders; } - private StompHeaderAccessor afterStompSessionConnected( - Message message, StompHeaderAccessor headerAccessor, WebSocketSession session) { + protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message message) { + return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message)); + } + + private StompHeaderAccessor afterStompSessionConnected(Message message, StompHeaderAccessor accessor, + WebSocketSession session) { Principal principal = session.getPrincipal(); if (principal != null) { - headerAccessor = toMutableAccessor(headerAccessor, message); - headerAccessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); + accessor = toMutableAccessor(accessor, message); + accessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); if (this.userSessionRegistry != null) { - String userName = resolveNameForUserSessionRegistry(principal); + String userName = getSessionRegistryUserName(principal); this.userSessionRegistry.registerSessionId(userName, session.getId()); } } - long[] heartbeat = headerAccessor.getHeartbeat(); + long[] heartbeat = accessor.getHeartbeat(); if (heartbeat[1] > 0) { session = WebSocketSessionDecorator.unwrap(session); if (session instanceof SockJsSession) { - logger.debug("STOMP heartbeats negotiated, disabling SockJS heartbeats."); + if (logger.isDebugEnabled()) { + logger.debug("STOMP heartbeats enabled. " + + "Turning off SockJS heartbeats in " + session.getId() + "."); + } ((SockJsSession) session).disableHeartbeat(); } } - return headerAccessor; + return accessor; } - private String resolveNameForUserSessionRegistry(Principal principal) { + private String getSessionRegistryUserName(Principal principal) { String userName = principal.getName(); if (principal instanceof DestinationUserNameProvider) { userName = ((DestinationUserNameProvider) principal).getDestinationUserName(); @@ -421,25 +428,18 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @Override public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { - this.decoders.remove(session.getId()); - Principal principal = session.getPrincipal(); - if ((this.userSessionRegistry != null) && (principal != null)) { - String userName = resolveNameForUserSessionRegistry(principal); + if (principal != null && this.userSessionRegistry != null) { + String userName = getSessionRegistryUserName(principal); this.userSessionRegistry.unregisterSessionId(userName, session.getId()); } - if (this.eventPublisher != null) { publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus)); } - Message message = createDisconnectMessage(session); SimpAttributes simpAttributes = SimpAttributes.fromMessage(message); try { - if (logger.isDebugEnabled()) { - logger.debug("WebSocket session ended, sending DISCONNECT message to broker"); - } SimpAttributesContextHolder.setAttributes(simpAttributes); outputChannel.send(message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index a5ae409c474..ab0adb1a9de 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -19,10 +19,8 @@ package org.springframework.web.socket.messaging; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -82,9 +80,11 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, private final SubscribableChannel clientOutboundChannel; - private final Map protocolHandlers = + private final Map protocolHandlerLookup = new TreeMap(String.CASE_INSENSITIVE_ORDER); + private final List protocolHandlers = new ArrayList(); + private SubProtocolHandler defaultProtocolHandler; private final Map sessions = new ConcurrentHashMap(); @@ -116,6 +116,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * @param protocolHandlers the sub-protocol handlers to use */ public void setProtocolHandlers(List protocolHandlers) { + this.protocolHandlerLookup.clear(); this.protocolHandlers.clear(); for (SubProtocolHandler handler: protocolHandlers) { addProtocolHandler(handler); @@ -123,7 +124,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } public List getProtocolHandlers() { - return new ArrayList(protocolHandlers.values()); + return new ArrayList(this.protocolHandlerLookup.values()); } @@ -131,27 +132,26 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * Register a sub-protocol handler. */ public void addProtocolHandler(SubProtocolHandler handler) { - List protocols = handler.getSupportedProtocols(); if (CollectionUtils.isEmpty(protocols)) { - logger.warn("No sub-protocols, ignoring handler " + handler); + logger.error("No sub-protocols for " + handler + "."); return; } - for (String protocol: protocols) { - SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler); + SubProtocolHandler replaced = this.protocolHandlerLookup.put(protocol, handler); if ((replaced != null) && (replaced != handler) ) { - throw new IllegalStateException("Failed to map handler " + handler - + " to protocol '" + protocol + "', it is already mapped to handler " + replaced); + throw new IllegalStateException("Can't map " + handler + + " to protocol '" + protocol + "'. Already mapped to " + replaced + "."); } } + this.protocolHandlers.add(handler); } /** * Return the sub-protocols keyed by protocol name. */ public Map getProtocolHandlerMap() { - return this.protocolHandlers; + return this.protocolHandlerLookup; } /** @@ -161,7 +161,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, */ public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler) { this.defaultProtocolHandler = defaultProtocolHandler; - if (this.protocolHandlers.isEmpty()) { + if (this.protocolHandlerLookup.isEmpty()) { setProtocolHandlers(Arrays.asList(defaultProtocolHandler)); } } @@ -177,7 +177,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * Return all supported protocols. */ public List getSubProtocols() { - return new ArrayList(this.protocolHandlers.keySet()); + return new ArrayList(this.protocolHandlerLookup.keySet()); } @@ -216,6 +216,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public final void start() { + Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers"); synchronized (this.lifecycleMonitor) { this.clientOutboundChannel.subscribe(this); this.running = true; @@ -225,11 +226,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public final void stop() { synchronized (this.lifecycleMonitor) { - this.running = false; this.clientOutboundChannel.unsubscribe(this); - - // Notify sessions to stop flushing messages for (WebSocketSessionHolder holder : this.sessions.values()) { try { holder.getSession().close(CloseStatus.GOING_AWAY); @@ -254,45 +252,44 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit()); this.sessions.put(session.getId(), new WebSocketSessionHolder(session)); if (logger.isDebugEnabled()) { - logger.debug("Started session " + session.getId() + ", number of sessions=" + this.sessions.size()); + logger.debug("Started session " + session.getId() + " (" + this.sessions.size() + " sessions)"); } findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel); } protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) { - String protocol = null; try { protocol = session.getAcceptedProtocol(); } catch (Exception ex) { - logger.warn("Ignoring protocol in WebSocket session after failure to obtain it: " + ex.toString()); + // Shouldn't happen + logger.error("Failed to obtain session.getAcceptedProtocol(). Will use the " + + "default protocol handler (if configured).", ex); } - SubProtocolHandler handler; if (!StringUtils.isEmpty(protocol)) { - handler = this.protocolHandlers.get(protocol); - Assert.state(handler != null, - "No handler for sub-protocol '" + protocol + "', handlers=" + this.protocolHandlers); + handler = this.protocolHandlerLookup.get(protocol); + Assert.state(handler != null, "No handler for '" + protocol + "' among " + this.protocolHandlerLookup); } else { if (this.defaultProtocolHandler != null) { handler = this.defaultProtocolHandler; } + else if (this.protocolHandlers.size() == 1) { + handler = this.protocolHandlers.get(0); + } else { - Set handlers = new HashSet(this.protocolHandlers.values()); - if (handlers.size() == 1) { - handler = handlers.iterator().next(); - } - else { - throw new IllegalStateException( - "No sub-protocol was requested and a default sub-protocol handler was not configured"); - } + throw new IllegalStateException("Multiple protocol handlers configured and " + + "no protocol was negotiated. Consider configuring a default SubProtocolHandler."); } } return handler; } + /** + * Handle an inbound message from a WebSocket client. + */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { SubProtocolHandler protocolHandler = findProtocolHandler(session); @@ -308,16 +305,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, checkSessions(); } + /** + * Handle an outbound Spring Message to a WebSocket client. + */ @Override public void handleMessage(Message message) throws MessagingException { String sessionId = resolveSessionId(message); if (sessionId == null) { - logger.error("sessionId not found in message " + message); + logger.error("Couldn't find sessionId in " + message); return; } WebSocketSessionHolder holder = this.sessions.get(sessionId); if (holder == null) { - logger.error("Session not found for session with id '" + sessionId + "', ignoring message " + message); + logger.error("No session for " + message); return; } WebSocketSession session = holder.getSession(); @@ -327,22 +327,20 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, catch (SessionLimitExceededException ex) { try { logger.error("Terminating '" + session + "'", ex); - - // Session may be unresponsive so clear first - clearSession(session, ex.getStatus()); + clearSession(session, ex.getStatus()); // clear first, session may be unresponsive session.close(ex.getStatus()); } catch (Exception secondException) { - logger.error("Exception terminating '" + sessionId + "'", secondException); + logger.error("Failure while closing session " + sessionId + ".", secondException); } } catch (Exception e) { - logger.error("Failed to send message to client " + message + " in " + session, e); + logger.error("Failed to send message to client in " + session + ": " + message, e); } } private String resolveSessionId(Message message) { - for (SubProtocolHandler handler : this.protocolHandlers.values()) { + for (SubProtocolHandler handler : this.protocolHandlerLookup.values()) { String sessionId = handler.resolveSessionId(message); if (sessionId != null) { return sessionId; @@ -366,8 +364,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, if (!isRunning() && currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE) { return; } - try { - if (this.sessionCheckLock.tryLock()) { + if (this.sessionCheckLock.tryLock()) { + try { for (WebSocketSessionHolder holder : this.sessions.values()) { if (holder.hasHandledMessages()) { continue; @@ -378,19 +376,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } WebSocketSession session = holder.getSession(); if (logger.isErrorEnabled()) { - logger.error("No messages received after " + timeSinceCreated + " ms. Closing " + holder); + logger.error("No messages received after " + timeSinceCreated + " ms. Closing " + holder + "."); } try { session.close(CloseStatus.PROTOCOL_ERROR); } catch (Throwable t) { - logger.error("Failed to close " + session, t); + logger.error("Failure while closing " + session, t); } } } - } - finally { - this.sessionCheckLock.unlock(); + finally { + this.sessionCheckLock.unlock(); + } } } @@ -404,6 +402,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } private void clearSession(WebSocketSession session, CloseStatus closeStatus) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)"); + } this.sessions.remove(session.getId()); findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientInboundChannel); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java index 16217fa03c0..825b8a0c8ab 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java @@ -241,8 +241,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { private void handleMessageFrame(SockJsFrame frame) { if (!isOpen()) { - if (logger.isWarnEnabled()) { - logger.warn("Ignoring received message due to state=" + this.state + " in " + this); + if (logger.isErrorEnabled()) { + logger.error("Ignoring received message due to state=" + this.state + " in " + this); } return; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java index 447de17efe1..0d5fbe63ac1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java @@ -266,23 +266,23 @@ public abstract class AbstractSockJsService implements SockJsService { String sockJsPath, WebSocketHandler wsHandler) throws SockJsException { if (sockJsPath == null) { - if (logger.isWarnEnabled()) { - logger.warn("No SockJS path provided, URI=\"" + request.getURI()); + if (logger.isErrorEnabled()) { + logger.error("Expected SockJS path. Failing request: " + request.getURI()); } response.setStatusCode(HttpStatus.NOT_FOUND); return; } - if (logger.isDebugEnabled()) { - logger.debug(request.getMethod() + " with SockJS path [" + sockJsPath + "]"); + if (logger.isTraceEnabled()) { + logger.trace(request.getMethod() + " with SockJS path [" + sockJsPath + "]"); } try { request.getHeaders(); } catch (InvalidMediaTypeException ex) { - if (logger.isWarnEnabled()) { - logger.warn("Invalid media type ignored: " + ex.getMediaType()); + if (logger.isDebugEnabled()) { + logger.debug("Invalid media type ignored: " + ex.getMediaType()); } } @@ -305,8 +305,8 @@ public abstract class AbstractSockJsService implements SockJsService { else { String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/"); if (pathSegments.length != 3) { - if (logger.isWarnEnabled()) { - logger.warn("Expected \"/{server}/{session}/{transport}\" but got \"" + sockJsPath + "\""); + if (logger.isErrorEnabled()) { + logger.error("Expected \"/{server}/{session}/{transport}\" but got \"" + sockJsPath + "\""); } response.setStatusCode(HttpStatus.NOT_FOUND); return; @@ -330,18 +330,18 @@ public abstract class AbstractSockJsService implements SockJsService { protected boolean validateRequest(String serverId, String sessionId, String transport) { if (!StringUtils.hasText(serverId) || !StringUtils.hasText(sessionId) || !StringUtils.hasText(transport)) { - logger.warn("Empty server, session, or transport value"); + logger.error("Empty server, session, or transport value"); return false; } // Server and session id's must not contain "." if (serverId.contains(".") || sessionId.contains(".")) { - logger.warn("Server or session contain a \".\""); + logger.error("Server or session contain a \".\""); return false; } if (!isWebSocketEnabled() && transport.equals("websocket")) { - logger.warn("Websocket transport is disabled"); + logger.debug("Ignoring WebSocket request (transport disabled via SockJsService property)."); return false; } @@ -370,7 +370,7 @@ public abstract class AbstractSockJsService implements SockJsService { try { // Perhaps a CORS Filter has already added this? if (!CollectionUtils.isEmpty(responseHeaders.get("Access-Control-Allow-Origin"))) { - logger.debug("Skip adding CORS headers, response already contains \"Access-Control-Allow-Origin\""); + logger.trace("Skip adding CORS headers, response already contains \"Access-Control-Allow-Origin\""); return; } } @@ -407,7 +407,7 @@ public abstract class AbstractSockJsService implements SockJsService { } protected void sendMethodNotAllowed(ServerHttpResponse response, HttpMethod... httpMethods) { - logger.debug("Sending Method Not Allowed (405)"); + logger.error("Sending Method Not Allowed (405)"); response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED); response.getHeaders().setAllow(new HashSet(Arrays.asList(httpMethods))); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java index dc8d020b745..271ae0d91a4 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java @@ -156,7 +156,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportHandler transportHandler = this.handlers.get(TransportType.WEBSOCKET); if (!(transportHandler instanceof HandshakeHandler)) { - logger.warn("No handler for raw WebSocket messages"); + logger.error("No handler configured for raw WebSocket messages"); response.setStatusCode(HttpStatus.NOT_FOUND); return; } @@ -192,8 +192,8 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportType transportType = TransportType.fromValue(transport); if (transportType == null) { - if (logger.isDebugEnabled()) { - logger.debug("Unknown transport type: " + transportType); + if (logger.isErrorEnabled()) { + logger.error("Unknown transport type: " + transportType); } response.setStatusCode(HttpStatus.NOT_FOUND); return; @@ -201,7 +201,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportHandler transportHandler = this.handlers.get(transportType); if (transportHandler == null) { - logger.debug("Transport handler not found"); + logger.error("Transport handler not found"); response.setStatusCode(HttpStatus.NOT_FOUND); return; } @@ -238,7 +238,9 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem } else { response.setStatusCode(HttpStatus.NOT_FOUND); - logger.warn("Session not found, sessionId=" + sessionId); + if (logger.isDebugEnabled()) { + logger.debug("Session not found, sessionId=" + sessionId); + } return; } } @@ -281,7 +283,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem } if (logger.isDebugEnabled()) { - logger.debug("Creating new session with session id \"" + sessionId + "\""); + logger.debug("Creating new SockJS session, sessionId=" + sessionId); } session = sessionFactory.createSession(sessionId, handler, attributes); this.sessions.put(sessionId, session); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java index 0acd7337672..d15c35a4fda 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java @@ -62,11 +62,13 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp AbstractHttpSockJsSession sockJsSession) throws SockJsException { if (sockJsSession.isNew()) { - logger.debug("Opening " + getTransportType() + " connection"); + if (logger.isDebugEnabled()) { + logger.debug("Opening " + getTransportType() + " connection."); + } sockJsSession.handleInitialRequest(request, response, getFrameFormat(request)); } else if (sockJsSession.isClosed()) { - logger.debug("Connection already closed (but not removed yet)"); + logger.debug("Connection already closed (but not removed yet)."); SockJsFrame frame = SockJsFrame.closeFrameGoAway(); try { response.getBody().write(frame.getContentBytes()); @@ -77,11 +79,15 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp return; } else if (!sockJsSession.isActive()) { - logger.debug("starting " + getTransportType() + " async request"); + if (logger.isTraceEnabled()) { + logger.trace("Starting " + getTransportType() + " async request."); + } sockJsSession.handleSuccessiveRequest(request, response, getFrameFormat(request)); } else { - logger.debug("another " + getTransportType() + " connection still open: " + sockJsSession); + if (logger.isDebugEnabled()) { + logger.debug("Another " + getTransportType() + " connection still open: " + sockJsSession); + } String formattedFrame = getFrameFormat(request).format(SockJsFrame.closeFrameAnotherConnectionOpen()); try { response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index c2f8de47180..c2b9c9a0e3c 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -322,7 +322,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { if (control != null && !control.isCompleted()) { if (control.isStarted()) { try { - logger.debug("Completing asynchronous request"); control.complete(); } catch (Throwable ex) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java index 0a0ec5770a3..01e3a30f40f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java @@ -65,7 +65,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { * *

We make a best effort to identify such network failures, on a per-server * basis, and log them under a separate log category. A simple one-line message - * is logged at DEBUG level, while a full stack trace is shown at TRACE level. + * is logged at INFO level, while a full stack trace is shown at TRACE level. * * @see #disconnectedClientLogger */ @@ -225,16 +225,10 @@ public abstract class AbstractSockJsSession implements SockJsSession { } /** - * Invoked in reaction to the underlying connection being closed by the remote side - * (or the WebSocket container) in order to perform cleanup and notify the - * {@link WebSocketHandler}. This is in contrast to {@link #close()} that pro-actively - * closes the connection. + * Invoked when the underlying connection is closed. */ public final void delegateConnectionClosed(CloseStatus status) throws Exception { if (!isClosed()) { - if (logger.isDebugEnabled()) { - logger.debug(this + " was closed, " + status); - } try { updateLastActiveTime(); cancelHeartbeat(); @@ -260,8 +254,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * {@inheritDoc} - * - *

Performs cleanup and notifies the {@link WebSocketHandler}. + *

Perform cleanup and notify the {@link WebSocketHandler}. */ @Override public final void close() throws IOException { @@ -270,22 +263,21 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * {@inheritDoc} - *

Performs cleanup and notifies the {@link WebSocketHandler}. + *

Perform cleanup and notify the {@link WebSocketHandler}. */ @Override public final void close(CloseStatus status) throws IOException { if (isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Closing " + this + ", " + status); + if (logger.isInfoEnabled()) { + logger.info("Closing SockJS session " + getId() + " with " + status); } try { if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { try { - // bypass writeFrame writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason())); } catch (Throwable ex) { - logger.warn("Failed to send SockJS close frame: " + ex.getMessage()); + logger.debug("Failure while send SockJS close frame", ex); } } updateLastActiveTime(); @@ -298,7 +290,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { this.handler.afterConnectionClosed(this, status); } catch (Throwable ex) { - logger.error("Unhandled error for " + this, ex); + logger.error("Error from WebSocketHandler.afterConnectionClosed in " + this, ex); } } } @@ -313,19 +305,19 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * Close due to error arising from SockJS transport handling. */ - public void tryCloseWithSockJsTransportError(Throwable ex, CloseStatus closeStatus) { + public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) { logger.error("Closing due to transport error for " + this); try { - delegateError(ex); + delegateError(error); } - catch (Throwable delegateEx) { + catch (Throwable delegateException) { // ignore } try { close(closeStatus); } - catch (Throwable closeEx) { - // ignore + catch (Throwable closeException) { + logger.error("Failure while closing " + this, closeException); } } @@ -343,11 +335,17 @@ public abstract class AbstractSockJsSession implements SockJsSession { catch (Throwable ex) { logWriteFrameFailure(ex); try { + // Force disconnect (so we won't try to send close frame) disconnect(CloseStatus.SERVER_ERROR); + } + catch (Throwable disconnectFailure) { + logger.error("Failure while closing " + this, disconnectFailure); + } + try { close(CloseStatus.SERVER_ERROR); } - catch (Throwable ex2) { - // ignore + catch (Throwable t) { + // Nothing of consequence, already forced disconnect } throw new SockJsTransportFailureException("Failed to write " + frame, this.getId(), ex); } @@ -364,8 +362,8 @@ public abstract class AbstractSockJsSession implements SockJsSession { if (disconnectedClientLogger.isTraceEnabled()) { disconnectedClientLogger.trace("Looks like the client has gone away", failure); } - else if (disconnectedClientLogger.isDebugEnabled()) { - disconnectedClientLogger.debug("Looks like the client has gone away: " + + else if (disconnectedClientLogger.isInfoEnabled()) { + disconnectedClientLogger.info("Looks like the client has gone away: " + nestedException.getMessage() + " (For full stack trace, set the '" + DISCONNECTED_CLIENT_LOG_CATEGORY + "' log category to TRACE level)"); } @@ -388,7 +386,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { if (this.heartbeatDisabled) { return; } - Assert.state(this.config.getTaskScheduler() != null, "No TaskScheduler configured for heartbeat"); + Assert.state(this.config.getTaskScheduler() != null, "Expecteded SockJS TaskScheduler."); cancelHeartbeat(); if (!isActive()) { return; @@ -405,20 +403,24 @@ public abstract class AbstractSockJsSession implements SockJsSession { } }, time); if (logger.isTraceEnabled()) { - logger.trace("Scheduled heartbeat after " + this.config.getHeartbeatTime() / 1000 + " seconds"); + logger.trace("Scheduled heartbeat in session " + getId()); } } protected void cancelHeartbeat() { + try { + ScheduledFuture task = this.heartbeatTask; + this.heartbeatTask = null; - ScheduledFuture task = this.heartbeatTask; - this.heartbeatTask = null; - - if ((task != null) && !task.isDone()) { - if (logger.isTraceEnabled()) { - logger.trace("Cancelling heartbeat"); + if ((task != null) && !task.isDone()) { + if (logger.isTraceEnabled()) { + logger.trace("Cancelling heartbeat in session " + getId()); + } + task.cancel(false); } - task.cancel(false); + } + catch (Throwable ex) { + logger.error("Failure while cancelling heartbeat in session " + getId(), ex); } } @@ -426,7 +428,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { @Override public String toString() { long currentTime = System.currentTimeMillis(); - return "SockJsSession[id=" + this.id + ", state=" + this.state + ", sinceCreated=" + + return getClass().getSimpleName() + "[id=" + this.id + ", state=" + this.state + ", sinceCreated=" + (currentTime - this.timeCreated) + ", sinceLastActive=" + (currentTime - this.timeLastActive) + "]"; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java index d250d9e34c4..8b937dadcfc 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; @@ -54,7 +52,9 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen private final Queue initSessionCache = new LinkedBlockingDeque(); - private final Lock initSessionLock = new ReentrantLock(); + private final Object initSessionLock = new Object(); + + private volatile boolean disconnected; public WebSocketServerSockJsSession(String id, SockJsServiceConfig config, @@ -174,7 +174,7 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @Override public boolean isActive() { - return ((this.webSocketSession != null) && this.webSocketSession.isOpen()); + return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected); } public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception { @@ -225,8 +225,11 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @Override protected void disconnect(CloseStatus status) throws IOException { - if (isActive()) { - this.webSocketSession.close(status); + synchronized (this) { + if (isActive()) { + this.disconnected = true; + this.webSocketSession.close(status); + } } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java index 3ef5d87f0b9..8075197908e 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java @@ -116,7 +116,7 @@ public class SubProtocolWebSocketHandlerTests { @Test public void emptySubProtocol() throws Exception { this.session.setAcceptedProtocol(""); - this.webSocketHandler.setDefaultProtocolHandler(defaultHandler); + this.webSocketHandler.setDefaultProtocolHandler(this.defaultHandler); this.webSocketHandler.afterConnectionEstablished(session); verify(this.defaultHandler).afterSessionStarted(