Browse Source

Improve handling of disconnects in STOMP broker relay

Issue: SPR-11655
pull/509/merge
Rossen Stoyanchev 12 years ago
parent
commit
990f5bb720
  1. 58
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 8
      spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java
  3. 6
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java
  4. 4
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

58
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -16,11 +16,11 @@ @@ -16,11 +16,11 @@
package org.springframework.messaging.simp.stomp;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -606,6 +606,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -606,6 +606,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
@Override
public void handleFailure(Throwable ex) {
if (this.tcpConnection == null) {
return;
}
handleTcpConnectionFailure("Closing connection after TCP failure", ex);
}
@Override
public void afterConnectionClosed() {
if (this.tcpConnection == null) {
@ -629,21 +637,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -629,21 +637,45 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
/**
* Forward the given message to the STOMP broker.
*
* <p>The method checks whether we have an active TCP connection and have
* received the STOMP CONNECTED frame. For client messages this should be
* false only if we lose the TCP connection around the same time when a
* client message is being forwarded, so we simply log the ignored message
* at trace level. For messages from within the application being sent on
* the "system" connection an exception is raised so that components sending
* the message have a chance to handle it -- by default the broker message
* channel is synchronous.
*
* <p>Note that if messages arrive concurrently around the same time a TCP
* connection is lost, there is a brief period of time before the connection
* is reset when one or more messages may sneak through and an attempt made
* to forward them. Rather than synchronizing to guard against that, this
* method simply lets them try and fail. For client sessions that may
* result in an additional STOMP ERROR frame(s) being sent downstream but
* code handling that downstream should be idempotent in such cases.
*
* @param message the message to send, never {@code null}
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected) {
if (this.isRemoteClientSession) {
if (StompCommand.DISCONNECT.equals(StompHeaderAccessor.wrap(message).getCommand())) {
return EMPTY_TASK;
if (logger.isTraceEnabled()) {
logger.trace("Ignoring client message received " + message +
(conn != null ? "before CONNECTED frame" : "after TCP connection closed"));
}
// Should never happen
throw new IllegalStateException("Unexpected client message " + message +
(this.tcpConnection != null ?
"before STOMP CONNECTED frame" : "after TCP connection closed"));
return EMPTY_TASK;
}
else {
throw new IllegalStateException("Cannot forward messages on system connection " +
(this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") +
(conn != null ? "before STOMP CONNECTED frame" : "while inactive") +
". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
}
@ -659,8 +691,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -659,8 +691,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
@SuppressWarnings("unchecked")
ListenableFuture<Void> future = this.tcpConnection.send((Message<byte[]>) message);
ListenableFuture<Void> future = conn.send((Message<byte[]>) message);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
@ -672,7 +703,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -672,7 +703,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
@Override
public void onFailure(Throwable t) {
handleTcpConnectionFailure("Failed to send message " + message, t);
if (tcpConnection == null) {
// already reset
}
else {
handleTcpConnectionFailure("Failed to send message " + message, t);
}
}
});

8
spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java

@ -36,7 +36,7 @@ public interface TcpConnectionHandler<P> { @@ -36,7 +36,7 @@ public interface TcpConnectionHandler<P> {
void afterConnected(TcpConnection<P> connection);
/**
* Invoked after a connection failure.
* Invoked on failure to connect.
* @param ex the exception
*/
void afterConnectFailure(Throwable ex);
@ -47,6 +47,12 @@ public interface TcpConnectionHandler<P> { @@ -47,6 +47,12 @@ public interface TcpConnectionHandler<P> {
*/
void handleMessage(Message<P> message);
/**
* Handle a failure on the connection.
* @param ex the exception
*/
void handleFailure(Throwable ex);
/**
* Invoked after the connection is closed.
*/

6
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java

@ -20,8 +20,6 @@ import java.net.InetSocketAddress; @@ -20,8 +20,6 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
@ -62,8 +60,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -62,8 +60,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final static Log logger = LogFactory.getLog(ReactorTcpClient.class);
private final TcpClient<Message<P>, Message<P>> tcpClient;
private final Environment environment;
@ -166,7 +162,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -166,7 +162,7 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t);
connectionHandler.handleFailure(t);
}
})
.consume(new Consumer<Message<P>>() {

4
spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

@ -367,6 +367,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -367,6 +367,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
this.userSessionRegistry.unregisterSessionId(userName, session.getId());
}
if (logger.isDebugEnabled()) {
logger.debug("WebSocket session ended, sending DISCONNECT message to broker");
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(session.getId());
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();

Loading…
Cancel
Save