Browse Source

Improve handling of disconnects in STOMP broker relay

This is a backport of:
990f5bb720

Issue: SPR-11655
pull/555/head
Rossen Stoyanchev 12 years ago
parent
commit
cb712afa97
  1. 48
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 6
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java
  3. 4
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

48
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -650,21 +650,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -650,21 +650,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
/**
* Forward the given message to the STOMP broker.
*
* <p>The method checks whether we have an active TCP connection and have
* received the STOMP CONNECTED frame. For client messages this should be
* false only if we lose the TCP connection around the same time when a
* client message is being forwarded, so we simply log the ignored message
* at trace level. For messages from within the application being sent on
* the "system" connection an exception is raised so that components sending
* the message have a chance to handle it -- by default the broker message
* channel is synchronous.
*
* <p>Note that if messages arrive concurrently around the same time a TCP
* connection is lost, there is a brief period of time before the connection
* is reset when one or more messages may sneak through and an attempt made
* to forward them. Rather than synchronizing to guard against that, this
* method simply lets them try and fail. For client sessions that may
* result in an additional STOMP ERROR frame(s) being sent downstream but
* code handling that downstream should be idempotent in such cases.
*
* @param message the message to send, never {@code null}
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected) {
if (this.isRemoteClientSession) {
if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) {
return EMPTY_TASK;
if (logger.isTraceEnabled()) {
logger.trace("Ignoring client message received " + message +
(conn != null ? "before CONNECTED frame" : "after TCP connection closed"));
}
// Should never happen
throw new IllegalStateException("Unexpected client message " + message +
(this.tcpConnection != null ?
"before STOMP CONNECTED frame" : "after TCP connection closed"));
return EMPTY_TASK;
}
else {
throw new IllegalStateException("Cannot forward messages on system connection " +
(this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") +
(conn != null ? "before STOMP CONNECTED frame" : "while inactive") +
". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
}
@ -680,8 +704,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -680,8 +704,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
@SuppressWarnings("unchecked")
ListenableFuture<Void> future = this.tcpConnection.send((Message<byte[]>) message);
ListenableFuture<Void> future = conn.send((Message<byte[]>) message);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
@ -693,7 +716,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -693,7 +716,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send message " + message, t);
if (tcpConnection == null) {
// already reset
}
else {
handleTcpConnectionFailure("Failed to send message " + message, t);
}
}
});

6
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java

@ -21,8 +21,6 @@ import java.net.InetSocketAddress; @@ -21,8 +21,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
@ -64,8 +62,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -64,8 +62,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final static Log logger = LogFactory.getLog(ReactorTcpClient.class);
private final TcpClient<Message<P>, Message<P>> tcpClient;
private final Environment environment;
@ -186,7 +182,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -186,7 +182,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
connection.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t);
connectionHandler.handleFailure(t);
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));

4
spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

@ -368,6 +368,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -368,6 +368,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
this.userSessionRegistry.unregisterSessionId(userName, session.getId());
}
if (logger.isDebugEnabled()) {
logger.debug("WebSocket session ended, sending DISCONNECT message to broker");
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(session.getId());
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();

Loading…
Cancel
Save