diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b72335045c3..7cf256f6e82 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -650,21 +650,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + /** + * Forward the given message to the STOMP broker. + * + *

The method checks whether we have an active TCP connection and have + * received the STOMP CONNECTED frame. For client messages this should be + * false only if we lose the TCP connection around the same time when a + * client message is being forwarded, so we simply log the ignored message + * at trace level. For messages from within the application being sent on + * the "system" connection an exception is raised so that components sending + * the message have a chance to handle it -- by default the broker message + * channel is synchronous. + * + *

Note that if messages arrive concurrently around the same time a TCP + * connection is lost, there is a brief period of time before the connection + * is reset when one or more messages may sneak through and an attempt made + * to forward them. Rather than synchronizing to guard against that, this + * method simply lets them try and fail. For client sessions that may + * result in an additional STOMP ERROR frame(s) being sent downstream but + * code handling that downstream should be idempotent in such cases. + * + * @param message the message to send, never {@code null} + * @return a future to wait for the result + */ + @SuppressWarnings("unchecked") public ListenableFuture forward(final Message message) { + TcpConnection conn = this.tcpConnection; + if (!this.isStompConnected) { if (this.isRemoteClientSession) { - if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) { - return EMPTY_TASK; + if (logger.isTraceEnabled()) { + logger.trace("Ignoring client message received " + message + + (conn != null ? "before CONNECTED frame" : "after TCP connection closed")); } - // Should never happen - throw new IllegalStateException("Unexpected client message " + message + - (this.tcpConnection != null ? - "before STOMP CONNECTED frame" : "after TCP connection closed")); + return EMPTY_TASK; } else { throw new IllegalStateException("Cannot forward messages on system connection " + - (this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") + + (conn != null ? "before STOMP CONNECTED frame" : "while inactive") + ". Try listening for BrokerAvailabilityEvent ApplicationContext events."); } @@ -680,8 +704,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - @SuppressWarnings("unchecked") - ListenableFuture future = this.tcpConnection.send((Message) message); + ListenableFuture future = conn.send((Message) message); future.addCallback(new ListenableFutureCallback() { @Override @@ -693,7 +716,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } @Override public void onFailure(Throwable t) { - handleTcpConnectionFailure("Failed to send message " + message, t); + if (tcpConnection == null) { + // already reset + } + else { + handleTcpConnectionFailure("Failed to send message " + message, t); + } } }); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java index 2b3c86706c5..7aebd528f68 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java @@ -21,8 +21,6 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Properties; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnectionHandler; @@ -64,8 +62,6 @@ public class ReactorTcpClient

implements TcpOperations

{ public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private final static Log logger = LogFactory.getLog(ReactorTcpClient.class); - private final TcpClient, Message

> tcpClient; private final Environment environment; @@ -186,7 +182,7 @@ public class ReactorTcpClient

implements TcpOperations

{ connection.when(Throwable.class, new Consumer() { @Override public void accept(Throwable t) { - logger.error("Exception on connection " + connectionHandler, t); + connectionHandler.handleFailure(t); } }); connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index d6c56f8c5d1..68ad7128853 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -368,6 +368,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE this.userSessionRegistry.unregisterSessionId(userName, session.getId()); } + if (logger.isDebugEnabled()) { + logger.debug("WebSocket session ended, sending DISCONNECT message to broker"); + } + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId(session.getId()); Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();