diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 1980f90404b..0ecbd410d7f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,11 +16,11 @@ package org.springframework.messaging.simp.stomp; -import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -606,6 +606,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + @Override + public void handleFailure(Throwable ex) { + if (this.tcpConnection == null) { + return; + } + handleTcpConnectionFailure("Closing connection after TCP failure", ex); + } + @Override public void afterConnectionClosed() { if (this.tcpConnection == null) { @@ -629,21 +637,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + /** + * Forward the given message to the STOMP broker. + * + *

The method checks whether we have an active TCP connection and have + * received the STOMP CONNECTED frame. For client messages this should be + * false only if we lose the TCP connection around the same time when a + * client message is being forwarded, so we simply log the ignored message + * at trace level. For messages from within the application being sent on + * the "system" connection an exception is raised so that components sending + * the message have a chance to handle it -- by default the broker message + * channel is synchronous. + * + *

Note that if messages arrive concurrently around the same time a TCP + * connection is lost, there is a brief period of time before the connection + * is reset when one or more messages may sneak through and an attempt made + * to forward them. Rather than synchronizing to guard against that, this + * method simply lets them try and fail. For client sessions that may + * result in an additional STOMP ERROR frame(s) being sent downstream but + * code handling that downstream should be idempotent in such cases. + * + * @param message the message to send, never {@code null} + * @return a future to wait for the result + */ + @SuppressWarnings("unchecked") public ListenableFuture forward(final Message message) { + TcpConnection conn = this.tcpConnection; + if (!this.isStompConnected) { if (this.isRemoteClientSession) { - if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) { - return EMPTY_TASK; + if (logger.isTraceEnabled()) { + logger.trace("Ignoring client message received " + message + + (conn != null ? "before CONNECTED frame" : "after TCP connection closed")); } - // Should never happen - throw new IllegalStateException("Unexpected client message " + message + - (this.tcpConnection != null ? - "before STOMP CONNECTED frame" : "after TCP connection closed")); + return EMPTY_TASK; } else { throw new IllegalStateException("Cannot forward messages on system connection " + - (this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") + + (conn != null ? "before STOMP CONNECTED frame" : "while inactive") + ". Try listening for BrokerAvailabilityEvent ApplicationContext events."); } @@ -659,8 +691,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - @SuppressWarnings("unchecked") - ListenableFuture future = this.tcpConnection.send((Message) message); + ListenableFuture future = conn.send((Message) message); future.addCallback(new ListenableFutureCallback() { @Override @@ -672,7 +703,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override public void onFailure(Throwable t) { - handleTcpConnectionFailure("Failed to send message " + message, t); + if (tcpConnection == null) { + // already reset + } + else { + handleTcpConnectionFailure("Failed to send message " + message, t); + } } }); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java index bb5841be063..3508efab58e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java @@ -36,7 +36,7 @@ public interface TcpConnectionHandler

{ void afterConnected(TcpConnection

connection); /** - * Invoked after a connection failure. + * Invoked on failure to connect. * @param ex the exception */ void afterConnectFailure(Throwable ex); @@ -47,6 +47,12 @@ public interface TcpConnectionHandler

{ */ void handleMessage(Message

message); + /** + * Handle a failure on the connection. + * @param ex the exception + */ + void handleFailure(Throwable ex); + /** * Invoked after the connection is closed. */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java index 8722449d549..20dfde82da6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java @@ -20,8 +20,6 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnectionHandler; @@ -62,8 +60,6 @@ public class ReactorTcpClient

implements TcpOperations

{ public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private final static Log logger = LogFactory.getLog(ReactorTcpClient.class); - private final TcpClient, Message

> tcpClient; private final Environment environment; @@ -166,7 +162,7 @@ public class ReactorTcpClient

implements TcpOperations

{ .when(Throwable.class, new Consumer() { @Override public void accept(Throwable t) { - logger.error("Exception on connection " + connectionHandler, t); + connectionHandler.handleFailure(t); } }) .consume(new Consumer>() { diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 95c23da0a58..b33a91d5f39 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -367,6 +367,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE this.userSessionRegistry.unregisterSessionId(userName, session.getId()); } + if (logger.isDebugEnabled()) { + logger.debug("WebSocket session ended, sending DISCONNECT message to broker"); + } + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId(session.getId()); Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();