Browse Source

Improve handling of send failures

Prior to this commit, a failure to send a heartbeat was ignored and a
failure to forward a message to the broker would result in an error
frame being sent but nothing more.

Following this commit, a failure to send a heartbeat to the broker
is treated as a TCP client failure. Furthermore, if the system relay
session fails to forward a message to the broker an exception is
thrown. Typically, the system relay session will be forwarding
messages on behalf of local application code, rather than a remote
WebSocket client. Throwing an exception allows the application code
to be notified of the problem directly, rather than via a broker
availability event.
pull/370/head
Andy Wilkinson 12 years ago committed by Rossen Stoyanchev
parent
commit
b2f31a3c74
  1. 96
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 7
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

96
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
@ -205,34 +206,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -205,34 +206,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
try {
if (SimpMessageType.CONNECT.equals(messageType)) {
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
StompRelaySession session = new StompRelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
StompRelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
}
return;
}
session.forward(message);
}
else {
StompRelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
return;
if (SimpMessageType.CONNECT.equals(messageType)) {
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
StompRelaySession session = new StompRelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
StompRelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
}
session.forward(message);
return;
}
session.forward(message);
}
catch (Throwable t) {
logger.error("Failed to handle message " + message, t);
else {
StompRelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
return;
}
session.forward(message);
}
}
@ -323,7 +319,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -323,7 +319,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
publishBrokerAvailableEvent();
}
private void handleTcpClientFailure(String message, Throwable ex) {
protected void handleTcpClientFailure(String message, Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error(message + ", sessionId=" + this.sessionId, ex);
}
@ -348,13 +344,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -348,13 +344,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
messageChannel.send(message);
}
public void forward(Message<?> message) {
private void forward(Message<?> message) {
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection = this.stompConnection.getReadyConnection();
if (tcpConnection == null) {
logger.warn("Connection to STOMP broker is not active, discarding message: " + message);
return;
logger.warn("Connection to STOMP broker is not active");
handleForwardFailure(message);
}
else if (!forwardInternal(tcpConnection, message)) {
handleForwardFailure(message);
}
}
protected void handleForwardFailure(Message<?> message) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to forward message to the broker. message=" + message);
}
forwardInternal(tcpConnection, message);
}
private boolean forwardInternal(
@ -491,32 +495,40 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -491,32 +495,40 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void connected(StompHeaderAccessor headers, final StompConnection stompConnection) {
long brokerReceiveInterval = headers.getHeartbeat()[1];
if (HEARTBEAT_SEND_INTERVAL > 0 && brokerReceiveInterval > 0) {
long brokerReceiveInterval = headers.getHeartbeat()[1];
if ((HEARTBEAT_SEND_INTERVAL > 0) && (brokerReceiveInterval > 0)) {
long interval = Math.max(HEARTBEAT_SEND_INTERVAL, brokerReceiveInterval);
stompConnection.connection.on().writeIdle(interval, new Runnable() {
@Override
public void run() {
TcpConnection<Message<byte[]>, Message<byte[]>> connection = stompConnection.connection;
if (connection != null) {
connection.send(MessageBuilder.withPayload(heartbeatPayload).build());
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn = stompConnection.connection;
if (tcpConn != null) {
tcpConn.send(MessageBuilder.withPayload(heartbeatPayload).build(),
new Consumer<Boolean>() {
@Override
public void accept(Boolean t) {
handleTcpClientFailure("Failed to send heartbeat to the broker", null);
}
});
}
}
});
}
long brokerSendInterval = headers.getHeartbeat()[0];
if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0) {
final long interval =
Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval) * HEARTBEAT_RECEIVE_MULTIPLIER;
final long interval = Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval)
* HEARTBEAT_RECEIVE_MULTIPLIER;
stompConnection.connection.on().readIdle(interval, new Runnable() {
@Override
public void run() {
String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms";
logger.warn(message);
if (logger.isWarnEnabled()) {
logger.warn(message);
}
disconnected(message);
}
});
@ -537,6 +549,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -537,6 +549,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
// Ignore
}
}
@Override
protected void handleForwardFailure(Message<?> message) {
super.handleForwardFailure(message);
throw new MessageDeliveryException(message);
}
}
}

7
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -33,6 +33,7 @@ import org.junit.Test; @@ -33,6 +33,7 @@ import org.junit.Test;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
@ -143,6 +144,12 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -143,6 +144,12 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.awaitAndAssert();
}
@Test(expected=MessageDeliveryException.class)
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
}
@Test
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {

Loading…
Cancel
Save