From 113fd1180a14656a89d6d6639d259c577d70fbc5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 27 Jun 2014 03:39:47 -0400 Subject: [PATCH] Fine tune STOMP and WebSocket related logging Optimize logging with tracking the opening and closing of WebSocket sessions and STOMP broker connections in mind. While the volume of messages makes it impractical to log every message at anything higher than TRACE, the opening and closing of connections is more manageable and can be logged at INFO. This makes it possible to drop to INFO in production and get useful information without getting too much in a short period of time. The logging is also optimized to avoid providing the same information from multiple places since messages pass through multiple layers. Issue: SPR-11884 --- .../AbstractMethodMessageHandler.java | 11 +- .../messaging/simp/SimpAttributes.java | 34 ++- .../SendToMethodReturnValueHandler.java | 4 +- .../SimpAnnotationMethodMessageHandler.java | 20 +- .../broker/AbstractBrokerMessageHandler.java | 27 ++- .../broker/AbstractSubscriptionRegistry.java | 50 +--- .../simp/broker/BrokerAvailabilityEvent.java | 2 +- .../broker/DefaultSubscriptionRegistry.java | 10 +- .../broker/SimpleBrokerMessageHandler.java | 49 ++-- .../stomp/StompBrokerRelayMessageHandler.java | 224 +++++++++--------- .../simp/stomp/StompConversionException.java | 2 + .../messaging/simp/stomp/StompDecoder.java | 12 +- .../messaging/simp/stomp/StompEncoder.java | 6 +- .../user/DefaultUserDestinationResolver.java | 38 +-- .../user/UserDestinationMessageHandler.java | 8 +- .../simp/user/UserDestinationResult.java | 6 + .../support/AbstractMessageChannel.java | 11 +- .../support/AbstractSubscribableChannel.java | 4 +- .../support/MessageHeaderAccessor.java | 2 +- .../adapter/AbstractWebSocketSession.java | 6 +- .../LoggingWebSocketHandlerDecorator.java | 2 +- .../socket/messaging/SessionConnectEvent.java | 2 +- .../messaging/SessionConnectedEvent.java | 2 +- .../messaging/SessionDisconnectEvent.java | 3 +- .../messaging/StompSubProtocolHandler.java | 172 +++++++------- .../SubProtocolWebSocketHandler.java | 93 ++++---- .../client/AbstractClientSockJsSession.java | 4 +- .../sockjs/support/AbstractSockJsService.java | 26 +- .../TransportHandlingSockJsService.java | 14 +- .../AbstractHttpSendingTransportHandler.java | 14 +- .../session/AbstractHttpSockJsSession.java | 1 - .../session/AbstractSockJsSession.java | 72 +++--- .../session/WebSocketServerSockJsSession.java | 15 +- .../SubProtocolWebSocketHandlerTests.java | 2 +- 34 files changed, 469 insertions(+), 479 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java index da9f2ce76ca..401dbb18a88 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/AbstractMethodMessageHandler.java @@ -325,20 +325,16 @@ public abstract class AbstractMethodMessageHandler String destination = getDestination(message); if (destination == null) { - logger.trace("Ignoring message, no destination"); return; } String lookupDestination = getLookupDestination(destination); if (lookupDestination == null) { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); - } return; } if (logger.isDebugEnabled()) { - logger.debug("Handling message, lookupDestination=" + lookupDestination); + logger.debug("Handling message to " + destination); } MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message); @@ -397,7 +393,8 @@ public abstract class AbstractMethodMessageHandler Collections.sort(matches, comparator); if (logger.isTraceEnabled()) { - logger.trace("Found " + matches.size() + " matching mapping(s) for [" + lookupDestination + "] : " + matches); + logger.trace("Found " + matches.size() + " matching mapping(s) for [" + + lookupDestination + "] : " + matches); } Match bestMatch = matches.get(0); @@ -507,7 +504,7 @@ public abstract class AbstractMethodMessageHandler protected void handleNoMatch(Set ts, String lookupDestination, Message message) { if (logger.isDebugEnabled()) { - logger.debug("No matching method found"); + logger.debug("No matching method found."); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java index a0ed1a9816f..08d3eb774c3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/SimpAttributes.java @@ -62,6 +62,22 @@ public class SimpAttributes { this.attributes = attributes; } + /** + * Extract the SiMP session attributes from the given message, wrap them in + * a {@link SimpAttributes} instance. + * @param message the message to extract session attributes from + */ + public static SimpAttributes fromMessage(Message message) { + Assert.notNull(message); + MessageHeaders headers = message.getHeaders(); + String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); + Map sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers); + if (sessionId == null || sessionAttributes == null) { + throw new IllegalStateException( + "Message does not contain SiMP session id or attributes: " + message); + } + return new SimpAttributes(sessionId, sessionAttributes); + } /** * Return the value for the attribute of the given name, if any. @@ -175,22 +191,4 @@ public class SimpAttributes { } } - - /** - * Extract the SiMP session attributes from the given message, wrap them in - * a {@link SimpAttributes} instance. - * @param message the message to extract session attributes from - */ - public static SimpAttributes fromMessage(Message message) { - Assert.notNull(message); - MessageHeaders headers = message.getHeaders(); - String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); - Map sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers); - if (sessionId == null || sessionAttributes == null) { - throw new IllegalStateException( - "Message does not contain SiMP session id or attributes: " + message); - } - return new SimpAttributes(sessionId, sessionAttributes); - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java index 8265bbe62bf..510a66ba16c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SendToMethodReturnValueHandler.java @@ -133,9 +133,7 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH } @Override - public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) - throws Exception { - + public void handleReturnValue(Object returnValue, MethodParameter returnType, Message message) throws Exception { if (returnValue == null) { return; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java index f3194c917f2..0e82b56134b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java @@ -187,7 +187,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan * The configured Validator instance */ public Validator getValidator() { - return validator; + return this.validator; } /** @@ -315,25 +315,23 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan @Override protected SimpMessageMappingInfo getMappingForMethod(Method method, Class handlerType) { - MessageMapping typeAnnot = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class); + MessageMapping typeAnnotation = AnnotationUtils.findAnnotation(handlerType, MessageMapping.class); MessageMapping messageAnnot = AnnotationUtils.findAnnotation(method, MessageMapping.class); if (messageAnnot != null) { SimpMessageMappingInfo result = createMessageMappingCondition(messageAnnot); - if (typeAnnot != null) { - result = createMessageMappingCondition(typeAnnot).combine(result); + if (typeAnnotation != null) { + result = createMessageMappingCondition(typeAnnotation).combine(result); } return result; } - - SubscribeMapping subsribeAnnot = AnnotationUtils.findAnnotation(method, SubscribeMapping.class); - if (subsribeAnnot != null) { - SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnot); - if (typeAnnot != null) { - result = createMessageMappingCondition(typeAnnot).combine(result); + SubscribeMapping subsribeAnnotation = AnnotationUtils.findAnnotation(method, SubscribeMapping.class); + if (subsribeAnnotation != null) { + SimpMessageMappingInfo result = createSubscribeCondition(subsribeAnnotation); + if (typeAnnotation != null) { + result = createMessageMappingCondition(typeAnnotation).combine(result); } return result; } - return null; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index 04770c8af0e..b26a3802095 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -43,12 +43,17 @@ public abstract class AbstractBrokerMessageHandler protected final Log logger = LogFactory.getLog(getClass()); + private final Collection destinationPrefixes; private ApplicationEventPublisher eventPublisher; private AtomicBoolean brokerAvailable = new AtomicBoolean(false); + private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this); + + private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this); + private boolean autoStartup = true; private Object lifecycleMonitor = new Object(); @@ -128,12 +133,12 @@ public abstract class AbstractBrokerMessageHandler public final void start() { synchronized (this.lifecycleMonitor) { if (logger.isDebugEnabled()) { - logger.debug("Starting"); + logger.debug("Starting..."); } startInternal(); this.running = true; if (logger.isDebugEnabled()) { - logger.debug("Started"); + logger.debug("Started."); } } } @@ -145,12 +150,12 @@ public abstract class AbstractBrokerMessageHandler public final void stop() { synchronized (this.lifecycleMonitor) { if (logger.isDebugEnabled()) { - logger.debug("Stopping"); + logger.debug("Stopping..."); } stopInternal(); this.running = false; if (logger.isDebugEnabled()) { - logger.debug("Stopped"); + logger.debug("Stopped."); } } } @@ -170,7 +175,7 @@ public abstract class AbstractBrokerMessageHandler public final void handleMessage(Message message) { if (!this.running) { if (logger.isTraceEnabled()) { - logger.trace("Message broker is not running. Ignoring message=" + message); + logger.trace(this + " not running yet. Ignoring " + message); } return; } @@ -194,20 +199,20 @@ public abstract class AbstractBrokerMessageHandler protected void publishBrokerAvailableEvent() { boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); if (this.eventPublisher != null && shouldPublish) { - if (logger.isDebugEnabled()) { - logger.debug("Publishing BrokerAvailabilityEvent (available)"); + if (logger.isInfoEnabled()) { + logger.info("Publishing " + this.availableEvent); } - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); + this.eventPublisher.publishEvent(this.availableEvent); } } protected void publishBrokerUnavailableEvent() { boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); if (this.eventPublisher != null && shouldPublish) { - if (logger.isDebugEnabled()) { - logger.debug("Publishing BrokerAvailabilityEvent (unavailable)"); + if (logger.isInfoEnabled()) { + logger.info("Publishing " + this.notAvailableEvent); } - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); + this.eventPublisher.publishEvent(this.notAvailableEvent); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java index bc0cf0c1271..01fe8a271bf 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java @@ -22,6 +22,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; /** @@ -39,32 +40,24 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final void registerSubscription(Message message) { - MessageHeaders headers = message.getHeaders(); - SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.SUBSCRIBE.equals(type)) { - logger.error("Expected SUBSCRIBE message: " + message); - return; - } + SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); + Assert.isTrue(SimpMessageType.SUBSCRIBE.equals(messageType), "Expected SUBSCRIBE: " + message); String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); if (sessionId == null) { - logger.error("Ignoring subscription. No sessionId in message: " + message); + logger.error("No sessionId in " + message); return; } String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); if (subscriptionId == null) { - logger.error("Ignoring subscription. No subscriptionId in message: " + message); + logger.error("No subscriptionId in " + message); return; } String destination = SimpMessageHeaderAccessor.getDestination(headers); if (destination == null) { - logger.error("Ignoring destination. No destination in message: " + message); + logger.error("No destination in " + message); return; } - if (logger.isDebugEnabled()) { - logger.debug("Adding subscription id=" + subscriptionId + ", destination=" + destination); - } addSubscriptionInternal(sessionId, subscriptionId, destination, message); } @@ -73,27 +66,19 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final void unregisterSubscription(Message message) { - MessageHeaders headers = message.getHeaders(); - SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.UNSUBSCRIBE.equals(type)) { - logger.error("Expected UNSUBSCRIBE message: " + message); - return; - } + SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); + Assert.isTrue(SimpMessageType.UNSUBSCRIBE.equals(messageType), "Expected UNSUBSCRIBE: " + message); String sessionId = SimpMessageHeaderAccessor.getSessionId(headers); if (sessionId == null) { - logger.error("Ignoring subscription. No sessionId in message: " + message); + logger.error("No sessionId in " + message); return; } String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers); if (subscriptionId == null) { - logger.error("Ignoring subscription. No subscriptionId in message: " + message); + logger.error("No subscriptionId " + message); return; } - if (logger.isDebugEnabled()) { - logger.debug("Unubscribe request: " + message); - } removeSubscriptionInternal(sessionId, subscriptionId, message); } @@ -104,24 +89,15 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist @Override public final MultiValueMap findSubscriptions(Message message) { - MessageHeaders headers = message.getHeaders(); SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers); - - if (!SimpMessageType.MESSAGE.equals(type)) { - logger.trace("Ignoring message type " + type); - return null; - } + Assert.isTrue(SimpMessageType.MESSAGE.equals(type), "Unexpected message type: " + type); String destination = SimpMessageHeaderAccessor.getDestination(headers); if (destination == null) { - logger.trace("Ignoring message, no destination"); + logger.error("No destination in " + message); return null; } - MultiValueMap result = findSubscriptionsInternal(destination, message); - if (logger.isTraceEnabled()) { - logger.trace("Found " + result.size() + " subscriptions for destination=" + destination); - } - return result; + return findSubscriptionsInternal(destination, message); } protected abstract MultiValueMap findSubscriptionsInternal( diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java index 24f836f8fed..77522321744 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/BrokerAvailabilityEvent.java @@ -50,7 +50,7 @@ public class BrokerAvailabilityEvent extends ApplicationEvent { @Override public String toString() { - return "BrokerAvailabilityEvent=" + this.brokerAvailable; + return "BrokerAvailabilityEvent[available=" + this.brokerAvailable + ", " + getSource() + "]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index fa38761851b..a53495c6c5d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -106,9 +106,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { public void unregisterAllSubscriptions(String sessionId) { SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId); if (info != null) { - if (logger.isDebugEnabled()) { - logger.debug("Unregistering subscriptions for sessionId=" + sessionId); - } this.destinationCache.updateAfterRemovedSession(info); } } @@ -137,8 +134,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[destinationCache=" + this.destinationCache + ", subscriptionRegistry=" - + this.subscriptionRegistry + "]"; + return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]"; } @@ -241,7 +237,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[cache=" + this.accessCache + "]"; + return "cache[" + this.accessCache.size() + " destination(s)]"; } } @@ -282,7 +278,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { @Override public String toString() { - return "[sessions=" + sessions + "]"; + return "registry[" + sessions.size() + " session(s)]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index c2b0038c631..c29978bb083 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -64,12 +64,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { SubscribableChannel brokerChannel, Collection destinationPrefixes) { super(destinationPrefixes); - Assert.notNull(clientInboundChannel, "'clientInboundChannel' must not be null"); Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' must not be null"); Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); - - this.clientInboundChannel = clientInboundChannel; this.clientOutboundChannel = clientOutboundChannel; this.brokerChannel = brokerChannel; @@ -139,7 +136,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { if (!checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); + logger.trace("No match on destination in " + message); } return; } @@ -147,16 +144,10 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { if (SimpMessageType.MESSAGE.equals(messageType)) { sendMessageToSubscribers(destination, message); } - else if (SimpMessageType.SUBSCRIBE.equals(messageType)) { - this.subscriptionRegistry.registerSubscription(message); - } - else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) { - this.subscriptionRegistry.unregisterSubscription(message); - } - else if (SimpMessageType.DISCONNECT.equals(messageType)) { - this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); - } else if (SimpMessageType.CONNECT.equals(messageType)) { + if (logger.isInfoEnabled()) { + logger.info("Handling CONNECT: " + message); + } SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); initHeaders(accessor); accessor.setSessionId(sessionId); @@ -164,9 +155,27 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { Message connectAck = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders()); this.clientOutboundChannel.send(connectAck); } + else if (SimpMessageType.DISCONNECT.equals(messageType)) { + if (logger.isInfoEnabled()) { + logger.info("Handling DISCONNECT: " + message); + } + this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); + } + else if (SimpMessageType.SUBSCRIBE.equals(messageType)) { + if (logger.isDebugEnabled()) { + logger.debug("Handling SUBSCRIBE: " + message); + } + this.subscriptionRegistry.registerSubscription(message); + } + else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) { + if (logger.isDebugEnabled()) { + logger.debug("Handling UNSUBSCRIBE: " + message); + } + this.subscriptionRegistry.unregisterSubscription(message); + } else { if (logger.isTraceEnabled()) { - logger.trace("Message type not supported. Ignoring: " + message); + logger.trace("Unsupported message type in " + message); } } } @@ -179,9 +188,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { protected void sendMessageToSubscribers(String destination, Message message) { MultiValueMap subscriptions = this.subscriptionRegistry.findSubscriptions(message); - if ((subscriptions.size() > 0) && logger.isDebugEnabled()) { - logger.debug("Sending message with destination=" + destination - + " to " + subscriptions.size() + " subscriber(s)"); + if ((subscriptions.size() > 0) && logger.isTraceEnabled()) { + logger.trace("Sending to " + subscriptions.size() + " subscriber(s): " + message); } for (String sessionId : subscriptions.keySet()) { for (String subscriptionId : subscriptions.get(sessionId)) { @@ -196,10 +204,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { this.clientOutboundChannel.send(reply); } catch (Throwable ex) { - logger.error("Failed to send message=" + message, ex); + logger.error("Failed to send " + message, ex); } } } } + @Override + public String toString() { + return "SimpleBroker[" + this.subscriptionRegistry + "]"; + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index eed1f76ba6d..ec07d15bfdc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -327,6 +327,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return this.tcpClient; } + /** + * Return the current count of TCP connection to the broker. + */ + public int getConnectionCount() { + return this.connectionHandlers.size(); + } + /** * Configure a {@link MessageHeaderInitializer} to apply to the headers of all * messages created through the {@code StompBrokerRelayMessageHandler} that @@ -359,8 +366,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); } - if (logger.isDebugEnabled()) { - logger.debug("Initializing \"system\" connection"); + if (logger.isInfoEnabled()) { + logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); @@ -388,7 +395,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS); } catch (Throwable t) { - logger.error("Error while shutting down TCP client", t); + logger.error("Error in shutdown of TCP client", t); } } @@ -399,14 +406,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (!isBrokerAvailable()) { if (sessionId == null || SystemStompConnectionHandler.SESSION_ID.equals(sessionId)) { - throw new MessageDeliveryException("Message broker is not active."); + throw new MessageDeliveryException("Message broker not active. Consider subscribing to " + + "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean."); } SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); if (messageType.equals(SimpMessageType.CONNECT) && logger.isErrorEnabled()) { - logger.error("Message broker is not active. Ignoring: " + message); + logger.error("Broker not active. Ignoring " + message); } else if (logger.isDebugEnabled()) { - logger.debug("Message broker is not active. Ignoring: " + message); + logger.debug("Broker not active. Ignoring " + message); } return; } @@ -416,8 +424,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); if (accessor == null) { - logger.error("No header accessor, please use SimpMessagingTemplate. Ignoring: " + message); - return; + throw new IllegalStateException( + "No header accessor (not using the SimpMessagingTemplate?): " + message); } else if (accessor instanceof StompHeaderAccessor) { stompAccessor = (StompHeaderAccessor) accessor; @@ -431,14 +439,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } else { - // Should not happen - logger.error("Unexpected header accessor type: " + accessor + ". Ignoring: " + message); - return; + throw new IllegalStateException( + "Unexpected header accessor type " + accessor.getClass() + " in " + message); } if (sessionId == null) { if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) { - logger.error("Only STOMP SEND frames supported on \"system\" connection. Ignoring: " + message); + logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message); return; } sessionId = SystemStompConnectionHandler.SESSION_ID; @@ -448,18 +455,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler String destination = stompAccessor.getDestination(); if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) { if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to destination=" + destination); + logger.trace("No match on destination. Ignoring " + message); } return; } - if (logger.isTraceEnabled()) { - logger.trace("Processing message=" + message); - } - if (StompCommand.CONNECT.equals(command)) { if (logger.isDebugEnabled()) { - logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")"); + logger.debug("STOMP CONNECT in session " + sessionId + " (" + getConnectionCount() + " connections)."); } stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message)); stompAccessor.setLogin(this.clientLogin); @@ -474,8 +477,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else if (StompCommand.DISCONNECT.equals(command)) { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - if (logger.isTraceEnabled()) { - logger.trace("Connection already removed for sessionId '" + sessionId + "'"); + if (logger.isDebugEnabled()) { + logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up."); } return; } @@ -484,8 +487,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - if (logger.isWarnEnabled()) { - logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message"); + if (logger.isDebugEnabled()) { + logger.debug("No TCP connection for session " + sessionId + " in " + message); } return; } @@ -493,6 +496,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + @Override + public String toString() { + return "StompBrokerRelay[broker=" + this.relayHost + ":" + this.relayPort + + ", " + getConnectionCount() + " connection(s)]"; + } + private class StompConnectionHandler implements TcpConnectionHandler { @@ -511,15 +520,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this(sessionId, connectHeaders, true); } - private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, - boolean isRemoteClientSession) { - - Assert.notNull(sessionId, "SessionId must not be null"); - Assert.notNull(connectHeaders, "ConnectHeaders must not be null"); - + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) { + Assert.notNull(sessionId, "'sessionId' must not be null"); + Assert.notNull(connectHeaders, "'connectHeaders' must not be null"); this.sessionId = sessionId; this.connectHeaders = connectHeaders; - this.isRemoteClientSession = isRemoteClientSession; + this.isRemoteClientSession = isClientSession; } public String getSessionId() { @@ -528,8 +534,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnected(TcpConnection connection) { - if (logger.isDebugEnabled()) { - logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'"); + if (logger.isInfoEnabled()) { + logger.info("TCP connection established. Forwarding: " + this.connectHeaders); } this.tcpConnection = connection; connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders())); @@ -537,19 +543,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnectFailure(Throwable ex) { - handleTcpConnectionFailure("Failed to connect to message broker", ex); + handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex); } /** * Invoked when any TCP connectivity issue is detected, i.e. failure to establish - * the TCP connection, failure to send a message, missed heartbeat. + * the TCP connection, failure to send a message, missed heartbeat, etc. */ - protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { + protected void handleTcpConnectionFailure(String error, Throwable ex) { if (logger.isErrorEnabled()) { - logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex); + logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex); } try { - sendStompErrorToClient(errorMessage); + sendStompErrorFrameToClient(error); } finally { try { @@ -557,13 +563,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } catch (Throwable t) { if (logger.isErrorEnabled()) { - logger.error("Failed to close connection: " + t.getMessage()); + logger.error("Failure while cleaning up state for TCP connection" + + " in session " + this.sessionId, t); } } } } - private void sendStompErrorToClient(String errorText) { + private void sendStompErrorFrameToClient(String errorText) { if (this.isRemoteClientSession) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); if (getHeaderInitializer() != null) { @@ -584,24 +591,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void handleMessage(Message message) { - - StompHeaderAccessor headerAccessor = - MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); - + StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); headerAccessor.setSessionId(this.sessionId); - if (headerAccessor.isHeartbeat()) { - logger.trace("Received broker heartbeat"); - } - else if (logger.isDebugEnabled()) { - logger.debug("Received message from broker in session '" + this.sessionId + "'"); + if (StompCommand.CONNECTED.equals(headerAccessor.getCommand())) { + if (logger.isInfoEnabled()) { + logger.info("Received STOMP CONNECTED: " + headerAccessor); + } + afterStompConnected(headerAccessor); } - else if (logger.isErrorEnabled() && StompCommand.ERROR == headerAccessor.getCommand()) { + else if (StompCommand.ERROR.equals(headerAccessor.getCommand()) && logger.isErrorEnabled()) { logger.error("Received STOMP ERROR: " + message); } - - if (StompCommand.CONNECTED == headerAccessor.getCommand()) { - afterStompConnected(headerAccessor); + else if (logger.isTraceEnabled()) { + logger.trace(headerAccessor.isHeartbeat() ? + "Received heartbeat in session " + this.sessionId : "Received " + headerAccessor); } headerAccessor.setImmutable(); @@ -618,12 +622,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } private void initHeartbeats(StompHeaderAccessor connectedHeaders) { - - // Remote clients do their own heartbeat management if (this.isRemoteClientSession) { return; } - long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; @@ -640,7 +641,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler conn.send(HEARTBEAT_MESSAGE).addCallback( new ListenableFutureCallback() { public void onFailure(Throwable t) { - handleTcpConnectionFailure("Failed to send heartbeat", t); + String error = "failed to forward heartbeat in \"system\" session."; + handleTcpConnectionFailure(error, t); } public void onSuccess(Void result) {} }); @@ -648,25 +650,25 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } }, interval); } - if (clientReceiveInterval > 0 && serverSendInterval > 0) { final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - handleTcpConnectionFailure("No hearbeat from broker for more than " + - interval + "ms, closing connection", null); + handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null); } - }, interval); + }, interval); } } @Override - public void handleFailure(Throwable ex) { - if (this.tcpConnection == null) { - return; + public void handleFailure(Throwable failure) { + if (this.tcpConnection != null) { + handleTcpConnectionFailure("transport failure.", failure); + } + else if (logger.isErrorEnabled()) { + logger.error("Transport failure: " + failure); } - handleTcpConnectionFailure("Closing connection after TCP failure", ex); } @Override @@ -675,17 +677,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } try { - if (logger.isDebugEnabled()) { - logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'"); + if (logger.isInfoEnabled()) { + logger.info("TCP connection to broker closed in session " + this.sessionId); } - sendStompErrorToClient("Connection to broker closed"); + sendStompErrorFrameToClient("Connection to broker closed."); } finally { try { + // Prevent clearConnection() from trying to close + this.tcpConnection = null; clearConnection(); } catch (Throwable t) { - // Ignore + // Shouldn't happen with connection reset beforehand } } } @@ -697,7 +701,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * received the STOMP CONNECTED frame. For client messages this should be * false only if we lose the TCP connection around the same time when a * client message is being forwarded, so we simply log the ignored message - * at trace level. For messages from within the application being sent on + * at debug level. For messages from within the application being sent on * the "system" connection an exception is raised so that components sending * the message have a chance to handle it -- by default the broker message * channel is synchronous. @@ -714,78 +718,77 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler * @return a future to wait for the result */ @SuppressWarnings("unchecked") - public ListenableFuture forward(Message message, final StompHeaderAccessor headerAccessor) { + public ListenableFuture forward(Message message, final StompHeaderAccessor accessor) { TcpConnection conn = this.tcpConnection; if (!this.isStompConnected) { if (this.isRemoteClientSession) { if (logger.isDebugEnabled()) { - logger.debug("Ignoring client message received " + message + - (conn != null ? "before CONNECTED frame" : "after TCP connection closed")); + logger.debug("TCP connection closed already, ignoring " + message); } return EMPTY_TASK; } else { - throw new IllegalStateException("Cannot forward messages on system connection " + - (conn != null ? "before STOMP CONNECTED frame" : "while inactive") + - ". Try listening for BrokerAvailabilityEvent ApplicationContext events."); - + throw new IllegalStateException("Cannot forward messages " + + (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") + + "Consider subscribing to receive BrokerAvailabilityEvent's from " + + "an ApplicationListener Spring bean. Dropped " + message); } } - if (logger.isDebugEnabled()) { - if (headerAccessor.isHeartbeat()) { - logger.trace("Forwarding heartbeat to broker"); - } - else { - logger.debug("Forwarding message to broker"); - } - } + final Message messageToSend = (accessor.isMutable() && accessor.isModified()) ? + MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message; - if (headerAccessor.isMutable() && headerAccessor.isModified()) { - message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders()); + StompCommand command = accessor.getCommand(); + if (accessor.isHeartbeat()) { + logger.trace("Forwarding heartbeat in session " + this.sessionId); + } + else if (StompCommand.SUBSCRIBE.equals(command) && logger.isDebugEnabled()) { + logger.debug("Forwarding SUBSCRIBE: " + messageToSend); + } + else if (StompCommand.UNSUBSCRIBE.equals(command) && logger.isDebugEnabled()) { + logger.debug("Forwarding UNSUBSCRIBE: " + messageToSend); + } + else if (StompCommand.DISCONNECT.equals(command) && logger.isInfoEnabled()) { + logger.info("Forwarding DISCONNECT: " + messageToSend); + } + else if (logger.isTraceEnabled()) { + logger.trace("Forwarding " + command + ": " + messageToSend); } - ListenableFuture future = conn.send((Message) message); - + ListenableFuture future = conn.send((Message) messageToSend); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Void result) { - if (headerAccessor.getCommand() == StompCommand.DISCONNECT) { + if (accessor.getCommand() == StompCommand.DISCONNECT) { clearConnection(); } } @Override public void onFailure(Throwable t) { - if (tcpConnection == null) { - // already reset + if (tcpConnection != null) { + handleTcpConnectionFailure("failed to forward " + messageToSend, t); } - else { - handleTcpConnectionFailure("Failed to send message " + headerAccessor, t); + else if (logger.isErrorEnabled()) { + logger.error("Failed to forward " + messageToSend); } } }); - return future; } /** - * Close the TCP connection to the broker and release the connection reference, - * Any exception arising from closing the connection is propagated. The caller - * must handle and log the exception accordingly. - * - *

If the connection belongs to a client session, the connection handler - * for the session (basically the current instance) is also released from the - * {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}. + * Clean up state associated with the connection and close it. + * Any exception arising from closing the connection are propagated. */ public void clearConnection() { + if (logger.isDebugEnabled()) { + logger.debug("Cleaning up connection state for session " + sessionId + " (" + + (getConnectionCount() - 1) + " remaining connections)."); + } if (this.isRemoteClientSession) { - if (logger.isDebugEnabled()) { - logger.debug("Removing session '" + sessionId + "' (total remaining=" + - (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); - } StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } @@ -794,13 +797,16 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler TcpConnection conn = this.tcpConnection; this.tcpConnection = null; if (conn != null) { + if (logger.isInfoEnabled()) { + logger.info("Closing TCP connection in session " + this.sessionId); + } conn.close(); } } @Override public String toString() { - return "StompConnectionHandler{" + "sessionId=" + this.sessionId + "}"; + return "StompConnectionHandler[sessionId=" + this.sessionId + "]"; } } @@ -820,8 +826,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - protected void handleTcpConnectionFailure(String errorMessage, Throwable t) { - super.handleTcpConnectionFailure(errorMessage, t); + protected void handleTcpConnectionFailure(String error, Throwable t) { + super.handleTcpConnectionFailure(error, t); publishBrokerUnavailableEvent(); } @@ -832,9 +838,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override - public ListenableFuture forward(Message message, StompHeaderAccessor headerAccessor) { + public ListenableFuture forward(Message message, StompHeaderAccessor accessor) { try { - ListenableFuture future = super.forward(message, headerAccessor); + ListenableFuture future = super.forward(message, accessor); future.get(); return future; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java index 1ef03d77785..aae88e7100d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompConversionException.java @@ -18,6 +18,8 @@ package org.springframework.messaging.simp.stomp; import org.springframework.core.NestedRuntimeException; /** + * Raised after a failure to encode or decode a STOMP message. + * * @author Gary Russell * @since 4.0 */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java index 3afbee28e25..d9d46ba3817 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java @@ -156,13 +156,13 @@ public class StompDecoder { headerAccessor.updateSimpMessageHeadersFromStompHeaders(); headerAccessor.setLeaveMutable(true); decodedMessage = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders()); - if (logger.isDebugEnabled()) { - logger.debug("Decoded " + decodedMessage); + if (logger.isTraceEnabled()) { + logger.trace("Decoded " + decodedMessage); } } else { if (logger.isTraceEnabled()) { - logger.trace("Received incomplete frame. Resetting buffer."); + logger.trace("Incomplete frame, resetting input buffer."); } if (headers != null && headerAccessor != null) { String name = NativeMessageHeaderAccessor.NATIVE_HEADERS; @@ -177,7 +177,7 @@ public class StompDecoder { } else { if (logger.isTraceEnabled()) { - logger.trace("Decoded heartbeat"); + logger.trace("Decoded heartbeat."); } StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat(); initHeaders(headerAccessor); @@ -224,8 +224,8 @@ public class StompDecoder { int colonIndex = header.indexOf(':'); if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) { if (buffer.remaining() > 0) { - throw new StompConversionException( - "Illegal header: '" + header + "'. A header must be of the form :"); + throw new StompConversionException("Illegal header: '" + header + + "'. A header must be of the form :."); } } else { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java index d097d98dee2..3a311e3a4c8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompEncoder.java @@ -91,7 +91,7 @@ public final class StompEncoder { return baos.toByteArray(); } catch (IOException e) { - throw new StompConversionException("Failed to encode STOMP frame", e); + throw new StompConversionException("Failed to encode STOMP frame, headers=" + headers + ".", e); } } @@ -102,8 +102,8 @@ public final class StompEncoder { Map> nativeHeaders = (Map>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (logger.isDebugEnabled()) { - logger.debug("Encoding STOMP " + command + ", headers=" + nativeHeaders); + if (logger.isTraceEnabled()) { + logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders + "."); } if (nativeHeaders == null) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java index 1ab47d3300a..23d99969c14 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/DefaultUserDestinationResolver.java @@ -100,13 +100,11 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { @Override public UserDestinationResult resolveDestination(Message message) { - String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders()); DestinationInfo info = parseUserDestination(message); if (info == null) { return null; } - Set resolved = new HashSet(); for (String sessionId : info.getSessionIds()) { String d = getTargetDestination(destination, info.getDestinationWithoutPrefix(), sessionId, info.getUser()); @@ -114,12 +112,10 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { resolved.add(d); } } - return new UserDestinationResult(destination, resolved, info.getSubscribeDestination(), info.getUser()); } private DestinationInfo parseUserDestination(Message message) { - MessageHeaders headers = message.getHeaders(); SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers); String destination = SimpMessageHeaderAccessor.getDestination(headers); @@ -131,12 +127,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { String user; Set sessionIds; + if (destination == null || !checkDestination(destination, this.destinationPrefix)) { + return null; + } + if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { - if (!checkDestination(destination, this.destinationPrefix)) { - return null; - } if (sessionId == null) { - logger.error("Ignoring message, no session id available"); + logger.error("No session id. Ignoring " + message); return null; } destinationWithoutPrefix = destination.substring(this.destinationPrefix.length()-1); @@ -145,9 +142,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { sessionIds = Collections.singleton(sessionId); } else if (SimpMessageType.MESSAGE.equals(messageType)) { - if (!checkDestination(destination, this.destinationPrefix)) { - return null; - } int startIndex = this.destinationPrefix.length(); int endIndex = destination.indexOf('/', startIndex); Assert.isTrue(endIndex > 0, "Expected destination pattern \"/user/{userId}/**\""); @@ -160,27 +154,13 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { Collections.singleton(sessionId) : this.userSessionRegistry.getSessionIds(user)); } else { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring " + messageType + " message"); - } return null; } - return new DestinationInfo(destinationWithoutPrefix, subscribeDestination, user, sessionIds); } protected boolean checkDestination(String destination, String requiredPrefix) { - if (destination == null) { - logger.trace("Ignoring message, no destination"); - return false; - } - if (!destination.startsWith(requiredPrefix)) { - if (logger.isTraceEnabled()) { - logger.trace("Ignoring message to " + destination + ", not a \"user\" destination"); - } - return false; - } - return true; + return destination.startsWith(requiredPrefix); } /** @@ -237,6 +217,12 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { public Set getSessionIds() { return this.sessionIds; } + + @Override + public String toString() { + return "DestinationInfo[destination=" + this.destinationWithoutPrefix + ", subscribeDestination=" + + this.subscribeDestination + ", user=" + this.user + ", sessionIds=" + this.sessionIds + "]"; + } } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java index efe3e1ba479..cd36efa5c45 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java @@ -171,8 +171,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec } Set destinations = result.getTargetDestinations(); if (destinations.isEmpty()) { - if (logger.isTraceEnabled()) { - logger.trace("No target destinations, message=" + message); + if (logger.isDebugEnabled()) { + logger.debug("Use destination not resolved (no active sessions?): " + message); } return; } @@ -184,8 +184,8 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders()); } for (String destination : destinations) { - if (logger.isDebugEnabled()) { - logger.debug("Sending message to resolved destination=" + destination); + if (logger.isTraceEnabled()) { + logger.trace("Sending message with resolved user destination: " + message); } this.brokerMessagingTemplate.send(destination, message); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java index d73a6b9457b..d9504c9a47b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationResult.java @@ -91,4 +91,10 @@ public class UserDestinationResult { public String getUser() { return this.user; } + + @Override + public String toString() { + return "UserDestinationResult[source=" + this.sourceDestination + ", target=" + this.targetDestinations + + ", subscribeDestination=" + this.subscribeDestination + ", user=" + this.user + "]"; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java index a7a9dee4db8..70dfb407c7b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMessageChannel.java @@ -100,18 +100,14 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName @Override public final boolean send(Message message, long timeout) { - Assert.notNull(message, "Message must not be null"); - if (logger.isTraceEnabled()) { - logger.trace("[" + this.beanName + "] sending message=" + message); + logger.trace(this + " sending " + message); } - message = this.interceptorChain.preSend(message, this); if (message == null) { return false; } - try { boolean sent = sendInternal(message, timeout); this.interceptorChain.postSend(message, this, sent); @@ -121,8 +117,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName if (e instanceof MessagingException) { throw (MessagingException) e; } - throw new MessageDeliveryException(message, - "Failed to send message to channel '" + this.getBeanName() + "'", e); + throw new MessageDeliveryException(message,"Failed to send message to " + this, e); } } @@ -131,7 +126,7 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName @Override public String toString() { - return "MessageChannel [name=" + this.beanName + "]"; + return "MessageChannel[name=" + this.beanName + "]"; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java index 19ff6357924..82cb9e4cc67 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractSubscribableChannel.java @@ -47,7 +47,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel boolean result = this.handlers.add(handler); if (result) { if (logger.isDebugEnabled()) { - logger.debug("[" + getBeanName() + "] subscribed " + handler); + logger.debug(this + " subscribed " + handler); } } return result; @@ -58,7 +58,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel boolean result = this.handlers.remove(handler); if (result) { if (logger.isDebugEnabled()) { - logger.debug("[" + getBeanName() + "] unsubscribed " + handler); + logger.debug(this + " unsubscribed " + handler); } } return result; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java index 68193fa58ca..3b0b5102574 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java @@ -471,7 +471,7 @@ public class MessageHeaderAccessor { @Override public String toString() { - return getClass().getSimpleName() + " [headers=" + this.headers + "]"; + return getClass().getSimpleName() + "[headers=" + this.headers + "]"; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java index 2d8f4e16365..d2be486a282 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/AbstractWebSocketSession.java @@ -133,8 +133,8 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public final void close(CloseStatus status) throws IOException { checkNativeSessionInitialized(); - if (logger.isDebugEnabled()) { - logger.debug("Closing " + this); + if (logger.isInfoEnabled()) { + logger.info("Closing " + this); } closeInternal(status); } @@ -144,7 +144,7 @@ public abstract class AbstractWebSocketSession implements NativeWebSocketSess @Override public String toString() { - return "WebSocket session id=" + getId(); + return this.getClass().getSimpleName() + "[id=" + getId() + "]"; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java index cfdbba34ffc..2c081bb625e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/handler/LoggingWebSocketHandlerDecorator.java @@ -50,7 +50,7 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { if (logger.isTraceEnabled()) { - logger.trace(message + ", " + session); + logger.trace("Handling " + message + " in " + session); } super.handleMessage(session, message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java index 348609b1a7e..52c4f0e34e1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectEvent.java @@ -73,6 +73,6 @@ public class SessionConnectEvent extends ApplicationEvent { @Override public String toString() { - return "SessionConnectEvent: message=" + message; + return "SessionConnectEvent" + this.message; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java index 45e47dce81e..01498d565b5 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionConnectedEvent.java @@ -56,6 +56,6 @@ public class SessionConnectedEvent extends ApplicationEvent { @Override public String toString() { - return "SessionConnectedEvent: message=" + message; + return "SessionConnectedEvent" + this.message; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java index 76ea9990fa7..15ad6303a14 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SessionDisconnectEvent.java @@ -68,6 +68,7 @@ public class SessionDisconnectEvent extends ApplicationEvent { @Override public String toString() { - return "SessionDisconnectEvent: sessionId=" + this.sessionId; + return "SessionDisconnectEvent[sessionId=" + this.sessionId + + (this.status != null ? this.status.toString() : "closeStatus=null") + "]"; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 417955a47d3..fd1149a1105 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -39,7 +39,6 @@ import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.stomp.BufferingStompDecoder; import org.springframework.messaging.simp.stomp.StompCommand; -import org.springframework.messaging.simp.stomp.StompConversionException; import org.springframework.messaging.simp.stomp.StompDecoder; import org.springframework.messaging.simp.stomp.StompEncoder; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; @@ -188,31 +187,31 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE messages = decoder.decode(byteBuffer); if (messages.isEmpty()) { - logger.debug("Incomplete STOMP frame content received," + "buffered=" + - decoder.getBufferSize() + ", buffer size limit=" + decoder.getBufferSizeLimit()); + if (logger.isTraceEnabled()) { + logger.trace("Incomplete STOMP frame content received, bufferSize=" + + decoder.getBufferSize() + ", bufferSizeLimit=" + decoder.getBufferSizeLimit() + "."); + } return; } } catch (Throwable ex) { - logger.error("Failed to parse WebSocket message to STOMP." + - "Sending STOMP ERROR to client, sessionId=" + session.getId(), ex); + if (logger.isErrorEnabled()) { + logger.error("Failed to parse " + webSocketMessage + + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); + } sendErrorMessage(session, ex); return; } for (Message message : messages) { try { - StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (logger.isTraceEnabled()) { - if (headerAccessor.isHeartbeat()) { - logger.trace("Received heartbeat from client session=" + session.getId()); - } - else { - logger.trace("Received message from client session=" + session.getId()); - } + logger.trace(headerAccessor.isHeartbeat() ? + "Received heartbeat from broker in session " + session.getId() + "." : + "Received message from broker in session " + session.getId() + ": " + message + "."); } headerAccessor.setSessionId(session.getId()); @@ -233,19 +232,23 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } catch (Throwable ex) { - logger.error("Parsed STOMP message but could not send it to to message channel. " + - "Sending STOMP ERROR to client, sessionId=" + session.getId(), ex); + logger.error("Failed to send STOMP message from client to application MessageChannel" + + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); sendErrorMessage(session, ex); + } } } private void publishEvent(ApplicationEvent event) { try { + if (logger.isInfoEnabled()) { + logger.info("Publishing " + event); + } this.eventPublisher.publishEvent(event); } catch (Throwable ex) { - logger.error("Error while publishing " + event, ex); + logger.error("Error publishing " + event + ".", ex); } } @@ -257,7 +260,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE session.sendMessage(new TextMessage(bytes)); } catch (Throwable ex) { - // ignore + logger.error("Failed to send STOMP ERROR to client.", ex); } } @@ -267,45 +270,17 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @SuppressWarnings("unchecked") @Override public void handleMessageToClient(WebSocketSession session, Message message) { - if (!(message.getPayload() instanceof byte[])) { - logger.error("Ignoring message, expected byte[] content: " + message); - return; - } - - MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); - if (accessor == null) { - logger.error("No header accessor: " + message); - return; - } - - StompHeaderAccessor stompAccessor; - if (accessor instanceof StompHeaderAccessor) { - stompAccessor = (StompHeaderAccessor) accessor; - } - else if (accessor instanceof SimpMessageHeaderAccessor) { - stompAccessor = StompHeaderAccessor.wrap(message); - if (SimpMessageType.CONNECT_ACK.equals(stompAccessor.getMessageType())) { - StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); - connectedHeaders.setVersion(getVersion(stompAccessor)); - connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker - stompAccessor = connectedHeaders; - } - else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) { - stompAccessor.updateStompCommandAsServerMessage(); - } - } - else { - // Should not happen - logger.error("Unexpected header accessor type: " + accessor); + logger.error("Expected byte[] payload. Ignoring " + message + "."); return; } - + StompHeaderAccessor stompAccessor = getStompHeaderAccessor(message); StompCommand command = stompAccessor.getCommand(); if (StompCommand.MESSAGE.equals(command)) { if (stompAccessor.getSubscriptionId() == null) { - logger.error("Ignoring message, no subscriptionId header: " + message); - return; + if (logger.isWarnEnabled()) { + logger.warn("No STOMP \"subscription\" header in " + message); + } } String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION); if (origDestination != null) { @@ -320,19 +295,16 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE publishEvent(new SessionConnectedEvent(this, (Message) message)); } } - try { byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), (byte[]) message.getPayload()); - TextMessage textMessage = new TextMessage(bytes); - - session.sendMessage(textMessage); + session.sendMessage(new TextMessage(bytes)); } catch (SessionLimitExceededException ex) { // Bad session, just get out throw ex; } catch (Throwable ex) { - logger.error("Failed to send WebSocket message to client, sessionId=" + session.getId(), ex); + logger.error("Failed to send WebSocket message to client in session " + session.getId() + ".", ex); command = StompCommand.ERROR; } finally { @@ -347,58 +319,93 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } - protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message message) { - return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message)); + private StompHeaderAccessor getStompHeaderAccessor(Message message) { + MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); + if (accessor == null) { + // Shouldn't happen (only broker broadcasts directly to clients) + throw new IllegalStateException("No header accessor in " + message + "."); + } + StompHeaderAccessor stompAccessor; + if (accessor instanceof StompHeaderAccessor) { + stompAccessor = (StompHeaderAccessor) accessor; + } + else if (accessor instanceof SimpMessageHeaderAccessor) { + stompAccessor = StompHeaderAccessor.wrap(message); + if (SimpMessageType.CONNECT_ACK.equals(stompAccessor.getMessageType())) { + stompAccessor = convertConnectAcktoStompConnected(stompAccessor); + } + else if (stompAccessor.getCommand() == null || StompCommand.SEND.equals(stompAccessor.getCommand())) { + stompAccessor.updateStompCommandAsServerMessage(); + } + } + else { + // Shouldn't happen (only broker broadcasts directly to clients) + throw new IllegalStateException( + "Unexpected header accessor type: " + accessor.getClass() + " in " + message + "."); + } + return stompAccessor; } - private String getVersion(StompHeaderAccessor connectAckHeaders) { - + /** + * The simple broker produces {@code SimpMessageType.CONNECT_ACK} that's not STOMP + * specific and needs to be turned into a STOMP CONNECTED frame. + */ + private StompHeaderAccessor convertConnectAcktoStompConnected(StompHeaderAccessor connectAckHeaders) { String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; - Message connectMessage = (Message) connectAckHeaders.getHeader(name); - Assert.notNull(connectMessage, "CONNECT_ACK does not contain original CONNECT " + connectAckHeaders); - - StompHeaderAccessor connectHeaders = - MessageHeaderAccessor.getAccessor(connectMessage, StompHeaderAccessor.class); - + Message message = (Message) connectAckHeaders.getHeader(name); + Assert.notNull(message, "Original STOMP CONNECT not found in " + connectAckHeaders); + StompHeaderAccessor connectHeaders = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + String version; Set acceptVersions = connectHeaders.getAcceptVersion(); if (acceptVersions.contains("1.2")) { - return "1.2"; + version = "1.2"; } else if (acceptVersions.contains("1.1")) { - return "1.1"; + version = "1.1"; } else if (acceptVersions.isEmpty()) { - return null; + version = null; } else { - throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); + throw new IllegalArgumentException("Unsupported STOMP version '" + acceptVersions + "'"); } + StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); + connectedHeaders.setVersion(version); + connectedHeaders.setHeartbeat(0, 0); // not supported + return connectedHeaders; } - private StompHeaderAccessor afterStompSessionConnected( - Message message, StompHeaderAccessor headerAccessor, WebSocketSession session) { + protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message message) { + return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message)); + } + + private StompHeaderAccessor afterStompSessionConnected(Message message, StompHeaderAccessor accessor, + WebSocketSession session) { Principal principal = session.getPrincipal(); if (principal != null) { - headerAccessor = toMutableAccessor(headerAccessor, message); - headerAccessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); + accessor = toMutableAccessor(accessor, message); + accessor.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); if (this.userSessionRegistry != null) { - String userName = resolveNameForUserSessionRegistry(principal); + String userName = getSessionRegistryUserName(principal); this.userSessionRegistry.registerSessionId(userName, session.getId()); } } - long[] heartbeat = headerAccessor.getHeartbeat(); + long[] heartbeat = accessor.getHeartbeat(); if (heartbeat[1] > 0) { session = WebSocketSessionDecorator.unwrap(session); if (session instanceof SockJsSession) { - logger.debug("STOMP heartbeats negotiated, disabling SockJS heartbeats."); + if (logger.isDebugEnabled()) { + logger.debug("STOMP heartbeats enabled. " + + "Turning off SockJS heartbeats in " + session.getId() + "."); + } ((SockJsSession) session).disableHeartbeat(); } } - return headerAccessor; + return accessor; } - private String resolveNameForUserSessionRegistry(Principal principal) { + private String getSessionRegistryUserName(Principal principal) { String userName = principal.getName(); if (principal instanceof DestinationUserNameProvider) { userName = ((DestinationUserNameProvider) principal).getDestinationUserName(); @@ -421,25 +428,18 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @Override public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { - this.decoders.remove(session.getId()); - Principal principal = session.getPrincipal(); - if ((this.userSessionRegistry != null) && (principal != null)) { - String userName = resolveNameForUserSessionRegistry(principal); + if (principal != null && this.userSessionRegistry != null) { + String userName = getSessionRegistryUserName(principal); this.userSessionRegistry.unregisterSessionId(userName, session.getId()); } - if (this.eventPublisher != null) { publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus)); } - Message message = createDisconnectMessage(session); SimpAttributes simpAttributes = SimpAttributes.fromMessage(message); try { - if (logger.isDebugEnabled()) { - logger.debug("WebSocket session ended, sending DISCONNECT message to broker"); - } SimpAttributesContextHolder.setAttributes(simpAttributes); outputChannel.send(message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java index a5ae409c474..ab0adb1a9de 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandler.java @@ -19,10 +19,8 @@ package org.springframework.web.socket.messaging; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -82,9 +80,11 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, private final SubscribableChannel clientOutboundChannel; - private final Map protocolHandlers = + private final Map protocolHandlerLookup = new TreeMap(String.CASE_INSENSITIVE_ORDER); + private final List protocolHandlers = new ArrayList(); + private SubProtocolHandler defaultProtocolHandler; private final Map sessions = new ConcurrentHashMap(); @@ -116,6 +116,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * @param protocolHandlers the sub-protocol handlers to use */ public void setProtocolHandlers(List protocolHandlers) { + this.protocolHandlerLookup.clear(); this.protocolHandlers.clear(); for (SubProtocolHandler handler: protocolHandlers) { addProtocolHandler(handler); @@ -123,7 +124,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } public List getProtocolHandlers() { - return new ArrayList(protocolHandlers.values()); + return new ArrayList(this.protocolHandlerLookup.values()); } @@ -131,27 +132,26 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * Register a sub-protocol handler. */ public void addProtocolHandler(SubProtocolHandler handler) { - List protocols = handler.getSupportedProtocols(); if (CollectionUtils.isEmpty(protocols)) { - logger.warn("No sub-protocols, ignoring handler " + handler); + logger.error("No sub-protocols for " + handler + "."); return; } - for (String protocol: protocols) { - SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler); + SubProtocolHandler replaced = this.protocolHandlerLookup.put(protocol, handler); if ((replaced != null) && (replaced != handler) ) { - throw new IllegalStateException("Failed to map handler " + handler - + " to protocol '" + protocol + "', it is already mapped to handler " + replaced); + throw new IllegalStateException("Can't map " + handler + + " to protocol '" + protocol + "'. Already mapped to " + replaced + "."); } } + this.protocolHandlers.add(handler); } /** * Return the sub-protocols keyed by protocol name. */ public Map getProtocolHandlerMap() { - return this.protocolHandlers; + return this.protocolHandlerLookup; } /** @@ -161,7 +161,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, */ public void setDefaultProtocolHandler(SubProtocolHandler defaultProtocolHandler) { this.defaultProtocolHandler = defaultProtocolHandler; - if (this.protocolHandlers.isEmpty()) { + if (this.protocolHandlerLookup.isEmpty()) { setProtocolHandlers(Arrays.asList(defaultProtocolHandler)); } } @@ -177,7 +177,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, * Return all supported protocols. */ public List getSubProtocols() { - return new ArrayList(this.protocolHandlers.keySet()); + return new ArrayList(this.protocolHandlerLookup.keySet()); } @@ -216,6 +216,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public final void start() { + Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers"); synchronized (this.lifecycleMonitor) { this.clientOutboundChannel.subscribe(this); this.running = true; @@ -225,11 +226,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, @Override public final void stop() { synchronized (this.lifecycleMonitor) { - this.running = false; this.clientOutboundChannel.unsubscribe(this); - - // Notify sessions to stop flushing messages for (WebSocketSessionHolder holder : this.sessions.values()) { try { holder.getSession().close(CloseStatus.GOING_AWAY); @@ -254,45 +252,44 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit()); this.sessions.put(session.getId(), new WebSocketSessionHolder(session)); if (logger.isDebugEnabled()) { - logger.debug("Started session " + session.getId() + ", number of sessions=" + this.sessions.size()); + logger.debug("Started session " + session.getId() + " (" + this.sessions.size() + " sessions)"); } findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel); } protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) { - String protocol = null; try { protocol = session.getAcceptedProtocol(); } catch (Exception ex) { - logger.warn("Ignoring protocol in WebSocket session after failure to obtain it: " + ex.toString()); + // Shouldn't happen + logger.error("Failed to obtain session.getAcceptedProtocol(). Will use the " + + "default protocol handler (if configured).", ex); } - SubProtocolHandler handler; if (!StringUtils.isEmpty(protocol)) { - handler = this.protocolHandlers.get(protocol); - Assert.state(handler != null, - "No handler for sub-protocol '" + protocol + "', handlers=" + this.protocolHandlers); + handler = this.protocolHandlerLookup.get(protocol); + Assert.state(handler != null, "No handler for '" + protocol + "' among " + this.protocolHandlerLookup); } else { if (this.defaultProtocolHandler != null) { handler = this.defaultProtocolHandler; } + else if (this.protocolHandlers.size() == 1) { + handler = this.protocolHandlers.get(0); + } else { - Set handlers = new HashSet(this.protocolHandlers.values()); - if (handlers.size() == 1) { - handler = handlers.iterator().next(); - } - else { - throw new IllegalStateException( - "No sub-protocol was requested and a default sub-protocol handler was not configured"); - } + throw new IllegalStateException("Multiple protocol handlers configured and " + + "no protocol was negotiated. Consider configuring a default SubProtocolHandler."); } } return handler; } + /** + * Handle an inbound message from a WebSocket client. + */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { SubProtocolHandler protocolHandler = findProtocolHandler(session); @@ -308,16 +305,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, checkSessions(); } + /** + * Handle an outbound Spring Message to a WebSocket client. + */ @Override public void handleMessage(Message message) throws MessagingException { String sessionId = resolveSessionId(message); if (sessionId == null) { - logger.error("sessionId not found in message " + message); + logger.error("Couldn't find sessionId in " + message); return; } WebSocketSessionHolder holder = this.sessions.get(sessionId); if (holder == null) { - logger.error("Session not found for session with id '" + sessionId + "', ignoring message " + message); + logger.error("No session for " + message); return; } WebSocketSession session = holder.getSession(); @@ -327,22 +327,20 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, catch (SessionLimitExceededException ex) { try { logger.error("Terminating '" + session + "'", ex); - - // Session may be unresponsive so clear first - clearSession(session, ex.getStatus()); + clearSession(session, ex.getStatus()); // clear first, session may be unresponsive session.close(ex.getStatus()); } catch (Exception secondException) { - logger.error("Exception terminating '" + sessionId + "'", secondException); + logger.error("Failure while closing session " + sessionId + ".", secondException); } } catch (Exception e) { - logger.error("Failed to send message to client " + message + " in " + session, e); + logger.error("Failed to send message to client in " + session + ": " + message, e); } } private String resolveSessionId(Message message) { - for (SubProtocolHandler handler : this.protocolHandlers.values()) { + for (SubProtocolHandler handler : this.protocolHandlerLookup.values()) { String sessionId = handler.resolveSessionId(message); if (sessionId != null) { return sessionId; @@ -366,8 +364,8 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, if (!isRunning() && currentTime - this.lastSessionCheckTime < TIME_TO_FIRST_MESSAGE) { return; } - try { - if (this.sessionCheckLock.tryLock()) { + if (this.sessionCheckLock.tryLock()) { + try { for (WebSocketSessionHolder holder : this.sessions.values()) { if (holder.hasHandledMessages()) { continue; @@ -378,19 +376,19 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } WebSocketSession session = holder.getSession(); if (logger.isErrorEnabled()) { - logger.error("No messages received after " + timeSinceCreated + " ms. Closing " + holder); + logger.error("No messages received after " + timeSinceCreated + " ms. Closing " + holder + "."); } try { session.close(CloseStatus.PROTOCOL_ERROR); } catch (Throwable t) { - logger.error("Failed to close " + session, t); + logger.error("Failure while closing " + session, t); } } } - } - finally { - this.sessionCheckLock.unlock(); + finally { + this.sessionCheckLock.unlock(); + } } } @@ -404,6 +402,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, } private void clearSession(WebSocketSession session, CloseStatus closeStatus) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)"); + } this.sessions.remove(session.getId()); findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientInboundChannel); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java index 16217fa03c0..825b8a0c8ab 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/AbstractClientSockJsSession.java @@ -241,8 +241,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession { private void handleMessageFrame(SockJsFrame frame) { if (!isOpen()) { - if (logger.isWarnEnabled()) { - logger.warn("Ignoring received message due to state=" + this.state + " in " + this); + if (logger.isErrorEnabled()) { + logger.error("Ignoring received message due to state=" + this.state + " in " + this); } return; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java index 447de17efe1..0d5fbe63ac1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/support/AbstractSockJsService.java @@ -266,23 +266,23 @@ public abstract class AbstractSockJsService implements SockJsService { String sockJsPath, WebSocketHandler wsHandler) throws SockJsException { if (sockJsPath == null) { - if (logger.isWarnEnabled()) { - logger.warn("No SockJS path provided, URI=\"" + request.getURI()); + if (logger.isErrorEnabled()) { + logger.error("Expected SockJS path. Failing request: " + request.getURI()); } response.setStatusCode(HttpStatus.NOT_FOUND); return; } - if (logger.isDebugEnabled()) { - logger.debug(request.getMethod() + " with SockJS path [" + sockJsPath + "]"); + if (logger.isTraceEnabled()) { + logger.trace(request.getMethod() + " with SockJS path [" + sockJsPath + "]"); } try { request.getHeaders(); } catch (InvalidMediaTypeException ex) { - if (logger.isWarnEnabled()) { - logger.warn("Invalid media type ignored: " + ex.getMediaType()); + if (logger.isDebugEnabled()) { + logger.debug("Invalid media type ignored: " + ex.getMediaType()); } } @@ -305,8 +305,8 @@ public abstract class AbstractSockJsService implements SockJsService { else { String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/"); if (pathSegments.length != 3) { - if (logger.isWarnEnabled()) { - logger.warn("Expected \"/{server}/{session}/{transport}\" but got \"" + sockJsPath + "\""); + if (logger.isErrorEnabled()) { + logger.error("Expected \"/{server}/{session}/{transport}\" but got \"" + sockJsPath + "\""); } response.setStatusCode(HttpStatus.NOT_FOUND); return; @@ -330,18 +330,18 @@ public abstract class AbstractSockJsService implements SockJsService { protected boolean validateRequest(String serverId, String sessionId, String transport) { if (!StringUtils.hasText(serverId) || !StringUtils.hasText(sessionId) || !StringUtils.hasText(transport)) { - logger.warn("Empty server, session, or transport value"); + logger.error("Empty server, session, or transport value"); return false; } // Server and session id's must not contain "." if (serverId.contains(".") || sessionId.contains(".")) { - logger.warn("Server or session contain a \".\""); + logger.error("Server or session contain a \".\""); return false; } if (!isWebSocketEnabled() && transport.equals("websocket")) { - logger.warn("Websocket transport is disabled"); + logger.debug("Ignoring WebSocket request (transport disabled via SockJsService property)."); return false; } @@ -370,7 +370,7 @@ public abstract class AbstractSockJsService implements SockJsService { try { // Perhaps a CORS Filter has already added this? if (!CollectionUtils.isEmpty(responseHeaders.get("Access-Control-Allow-Origin"))) { - logger.debug("Skip adding CORS headers, response already contains \"Access-Control-Allow-Origin\""); + logger.trace("Skip adding CORS headers, response already contains \"Access-Control-Allow-Origin\""); return; } } @@ -407,7 +407,7 @@ public abstract class AbstractSockJsService implements SockJsService { } protected void sendMethodNotAllowed(ServerHttpResponse response, HttpMethod... httpMethods) { - logger.debug("Sending Method Not Allowed (405)"); + logger.error("Sending Method Not Allowed (405)"); response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED); response.getHeaders().setAllow(new HashSet(Arrays.asList(httpMethods))); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java index dc8d020b745..271ae0d91a4 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/TransportHandlingSockJsService.java @@ -156,7 +156,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportHandler transportHandler = this.handlers.get(TransportType.WEBSOCKET); if (!(transportHandler instanceof HandshakeHandler)) { - logger.warn("No handler for raw WebSocket messages"); + logger.error("No handler configured for raw WebSocket messages"); response.setStatusCode(HttpStatus.NOT_FOUND); return; } @@ -192,8 +192,8 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportType transportType = TransportType.fromValue(transport); if (transportType == null) { - if (logger.isDebugEnabled()) { - logger.debug("Unknown transport type: " + transportType); + if (logger.isErrorEnabled()) { + logger.error("Unknown transport type: " + transportType); } response.setStatusCode(HttpStatus.NOT_FOUND); return; @@ -201,7 +201,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem TransportHandler transportHandler = this.handlers.get(transportType); if (transportHandler == null) { - logger.debug("Transport handler not found"); + logger.error("Transport handler not found"); response.setStatusCode(HttpStatus.NOT_FOUND); return; } @@ -238,7 +238,9 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem } else { response.setStatusCode(HttpStatus.NOT_FOUND); - logger.warn("Session not found, sessionId=" + sessionId); + if (logger.isDebugEnabled()) { + logger.debug("Session not found, sessionId=" + sessionId); + } return; } } @@ -281,7 +283,7 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem } if (logger.isDebugEnabled()) { - logger.debug("Creating new session with session id \"" + sessionId + "\""); + logger.debug("Creating new SockJS session, sessionId=" + sessionId); } session = sessionFactory.createSession(sessionId, handler, attributes); this.sessions.put(sessionId, session); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java index 0acd7337672..d15c35a4fda 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/handler/AbstractHttpSendingTransportHandler.java @@ -62,11 +62,13 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp AbstractHttpSockJsSession sockJsSession) throws SockJsException { if (sockJsSession.isNew()) { - logger.debug("Opening " + getTransportType() + " connection"); + if (logger.isDebugEnabled()) { + logger.debug("Opening " + getTransportType() + " connection."); + } sockJsSession.handleInitialRequest(request, response, getFrameFormat(request)); } else if (sockJsSession.isClosed()) { - logger.debug("Connection already closed (but not removed yet)"); + logger.debug("Connection already closed (but not removed yet)."); SockJsFrame frame = SockJsFrame.closeFrameGoAway(); try { response.getBody().write(frame.getContentBytes()); @@ -77,11 +79,15 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp return; } else if (!sockJsSession.isActive()) { - logger.debug("starting " + getTransportType() + " async request"); + if (logger.isTraceEnabled()) { + logger.trace("Starting " + getTransportType() + " async request."); + } sockJsSession.handleSuccessiveRequest(request, response, getFrameFormat(request)); } else { - logger.debug("another " + getTransportType() + " connection still open: " + sockJsSession); + if (logger.isDebugEnabled()) { + logger.debug("Another " + getTransportType() + " connection still open: " + sockJsSession); + } String formattedFrame = getFrameFormat(request).format(SockJsFrame.closeFrameAnotherConnectionOpen()); try { response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET)); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index c2f8de47180..c2b9c9a0e3c 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -322,7 +322,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { if (control != null && !control.isCompleted()) { if (control.isStarted()) { try { - logger.debug("Completing asynchronous request"); control.complete(); } catch (Throwable ex) { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java index 0a0ec5770a3..01e3a30f40f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java @@ -65,7 +65,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { * *

We make a best effort to identify such network failures, on a per-server * basis, and log them under a separate log category. A simple one-line message - * is logged at DEBUG level, while a full stack trace is shown at TRACE level. + * is logged at INFO level, while a full stack trace is shown at TRACE level. * * @see #disconnectedClientLogger */ @@ -225,16 +225,10 @@ public abstract class AbstractSockJsSession implements SockJsSession { } /** - * Invoked in reaction to the underlying connection being closed by the remote side - * (or the WebSocket container) in order to perform cleanup and notify the - * {@link WebSocketHandler}. This is in contrast to {@link #close()} that pro-actively - * closes the connection. + * Invoked when the underlying connection is closed. */ public final void delegateConnectionClosed(CloseStatus status) throws Exception { if (!isClosed()) { - if (logger.isDebugEnabled()) { - logger.debug(this + " was closed, " + status); - } try { updateLastActiveTime(); cancelHeartbeat(); @@ -260,8 +254,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * {@inheritDoc} - * - *

Performs cleanup and notifies the {@link WebSocketHandler}. + *

Perform cleanup and notify the {@link WebSocketHandler}. */ @Override public final void close() throws IOException { @@ -270,22 +263,21 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * {@inheritDoc} - *

Performs cleanup and notifies the {@link WebSocketHandler}. + *

Perform cleanup and notify the {@link WebSocketHandler}. */ @Override public final void close(CloseStatus status) throws IOException { if (isOpen()) { - if (logger.isDebugEnabled()) { - logger.debug("Closing " + this + ", " + status); + if (logger.isInfoEnabled()) { + logger.info("Closing SockJS session " + getId() + " with " + status); } try { if (isActive() && !CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { try { - // bypass writeFrame writeFrameInternal(SockJsFrame.closeFrame(status.getCode(), status.getReason())); } catch (Throwable ex) { - logger.warn("Failed to send SockJS close frame: " + ex.getMessage()); + logger.debug("Failure while send SockJS close frame", ex); } } updateLastActiveTime(); @@ -298,7 +290,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { this.handler.afterConnectionClosed(this, status); } catch (Throwable ex) { - logger.error("Unhandled error for " + this, ex); + logger.error("Error from WebSocketHandler.afterConnectionClosed in " + this, ex); } } } @@ -313,19 +305,19 @@ public abstract class AbstractSockJsSession implements SockJsSession { /** * Close due to error arising from SockJS transport handling. */ - public void tryCloseWithSockJsTransportError(Throwable ex, CloseStatus closeStatus) { + public void tryCloseWithSockJsTransportError(Throwable error, CloseStatus closeStatus) { logger.error("Closing due to transport error for " + this); try { - delegateError(ex); + delegateError(error); } - catch (Throwable delegateEx) { + catch (Throwable delegateException) { // ignore } try { close(closeStatus); } - catch (Throwable closeEx) { - // ignore + catch (Throwable closeException) { + logger.error("Failure while closing " + this, closeException); } } @@ -343,11 +335,17 @@ public abstract class AbstractSockJsSession implements SockJsSession { catch (Throwable ex) { logWriteFrameFailure(ex); try { + // Force disconnect (so we won't try to send close frame) disconnect(CloseStatus.SERVER_ERROR); + } + catch (Throwable disconnectFailure) { + logger.error("Failure while closing " + this, disconnectFailure); + } + try { close(CloseStatus.SERVER_ERROR); } - catch (Throwable ex2) { - // ignore + catch (Throwable t) { + // Nothing of consequence, already forced disconnect } throw new SockJsTransportFailureException("Failed to write " + frame, this.getId(), ex); } @@ -364,8 +362,8 @@ public abstract class AbstractSockJsSession implements SockJsSession { if (disconnectedClientLogger.isTraceEnabled()) { disconnectedClientLogger.trace("Looks like the client has gone away", failure); } - else if (disconnectedClientLogger.isDebugEnabled()) { - disconnectedClientLogger.debug("Looks like the client has gone away: " + + else if (disconnectedClientLogger.isInfoEnabled()) { + disconnectedClientLogger.info("Looks like the client has gone away: " + nestedException.getMessage() + " (For full stack trace, set the '" + DISCONNECTED_CLIENT_LOG_CATEGORY + "' log category to TRACE level)"); } @@ -388,7 +386,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { if (this.heartbeatDisabled) { return; } - Assert.state(this.config.getTaskScheduler() != null, "No TaskScheduler configured for heartbeat"); + Assert.state(this.config.getTaskScheduler() != null, "Expecteded SockJS TaskScheduler."); cancelHeartbeat(); if (!isActive()) { return; @@ -405,20 +403,24 @@ public abstract class AbstractSockJsSession implements SockJsSession { } }, time); if (logger.isTraceEnabled()) { - logger.trace("Scheduled heartbeat after " + this.config.getHeartbeatTime() / 1000 + " seconds"); + logger.trace("Scheduled heartbeat in session " + getId()); } } protected void cancelHeartbeat() { + try { + ScheduledFuture task = this.heartbeatTask; + this.heartbeatTask = null; - ScheduledFuture task = this.heartbeatTask; - this.heartbeatTask = null; - - if ((task != null) && !task.isDone()) { - if (logger.isTraceEnabled()) { - logger.trace("Cancelling heartbeat"); + if ((task != null) && !task.isDone()) { + if (logger.isTraceEnabled()) { + logger.trace("Cancelling heartbeat in session " + getId()); + } + task.cancel(false); } - task.cancel(false); + } + catch (Throwable ex) { + logger.error("Failure while cancelling heartbeat in session " + getId(), ex); } } @@ -426,7 +428,7 @@ public abstract class AbstractSockJsSession implements SockJsSession { @Override public String toString() { long currentTime = System.currentTimeMillis(); - return "SockJsSession[id=" + this.id + ", state=" + this.state + ", sinceCreated=" + + return getClass().getSimpleName() + "[id=" + this.id + ", state=" + this.state + ", sinceCreated=" + (currentTime - this.timeCreated) + ", sinceLastActive=" + (currentTime - this.timeLastActive) + "]"; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java index d250d9e34c4..8b937dadcfc 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; @@ -54,7 +52,9 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen private final Queue initSessionCache = new LinkedBlockingDeque(); - private final Lock initSessionLock = new ReentrantLock(); + private final Object initSessionLock = new Object(); + + private volatile boolean disconnected; public WebSocketServerSockJsSession(String id, SockJsServiceConfig config, @@ -174,7 +174,7 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @Override public boolean isActive() { - return ((this.webSocketSession != null) && this.webSocketSession.isOpen()); + return (this.webSocketSession != null && this.webSocketSession.isOpen() && !this.disconnected); } public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception { @@ -225,8 +225,11 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @Override protected void disconnect(CloseStatus status) throws IOException { - if (isActive()) { - this.webSocketSession.close(status); + synchronized (this) { + if (isActive()) { + this.disconnected = true; + this.webSocketSession.close(status); + } } } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java index 3ef5d87f0b9..8075197908e 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/SubProtocolWebSocketHandlerTests.java @@ -116,7 +116,7 @@ public class SubProtocolWebSocketHandlerTests { @Test public void emptySubProtocol() throws Exception { this.session.setAcceptedProtocol(""); - this.webSocketHandler.setDefaultProtocolHandler(defaultHandler); + this.webSocketHandler.setDefaultProtocolHandler(this.defaultHandler); this.webSocketHandler.afterConnectionEstablished(session); verify(this.defaultHandler).afterSessionStarted(