From b2f31a3c741ddfaad94ffe5da7de762f8e3cc6cb Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Fri, 27 Sep 2013 16:44:56 +0100 Subject: [PATCH] Improve handling of send failures Prior to this commit, a failure to send a heartbeat was ignored and a failure to forward a message to the broker would result in an error frame being sent but nothing more. Following this commit, a failure to send a heartbeat to the broker is treated as a TCP client failure. Furthermore, if the system relay session fails to forward a message to the broker an exception is thrown. Typically, the system relay session will be forwarding messages on behalf of local application code, rather than a remote WebSocket client. Throwing an exception allows the application code to be notified of the problem directly, rather than via a broker availability event. --- .../stomp/StompBrokerRelayMessageHandler.java | 96 +++++++++++-------- ...erRelayMessageHandlerIntegrationTests.java | 7 ++ 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 6d22cffd840..d0b78cbad4b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; @@ -205,34 +206,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } - try { - if (SimpMessageType.CONNECT.equals(messageType)) { - message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); - StompRelaySession session = new StompRelaySession(sessionId); - this.relaySessions.put(sessionId, session); - session.connect(message); - } - else if (SimpMessageType.DISCONNECT.equals(messageType)) { - StompRelaySession session = this.relaySessions.remove(sessionId); - if (session == null) { - if (logger.isTraceEnabled()) { - logger.trace("Session already removed, sessionId=" + sessionId); - } - return; - } - session.forward(message); - } - else { - StompRelaySession session = this.relaySessions.get(sessionId); - if (session == null) { - logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); - return; + if (SimpMessageType.CONNECT.equals(messageType)) { + message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); + StompRelaySession session = new StompRelaySession(sessionId); + this.relaySessions.put(sessionId, session); + session.connect(message); + } + else if (SimpMessageType.DISCONNECT.equals(messageType)) { + StompRelaySession session = this.relaySessions.remove(sessionId); + if (session == null) { + if (logger.isTraceEnabled()) { + logger.trace("Session already removed, sessionId=" + sessionId); } - session.forward(message); + return; } + session.forward(message); } - catch (Throwable t) { - logger.error("Failed to handle message " + message, t); + else { + StompRelaySession session = this.relaySessions.get(sessionId); + if (session == null) { + logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); + return; + } + session.forward(message); } } @@ -323,7 +319,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler publishBrokerAvailableEvent(); } - private void handleTcpClientFailure(String message, Throwable ex) { + protected void handleTcpClientFailure(String message, Throwable ex) { if (logger.isErrorEnabled()) { logger.error(message + ", sessionId=" + this.sessionId, ex); } @@ -348,13 +344,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler messageChannel.send(message); } - public void forward(Message message) { + private void forward(Message message) { TcpConnection, Message> tcpConnection = this.stompConnection.getReadyConnection(); if (tcpConnection == null) { - logger.warn("Connection to STOMP broker is not active, discarding message: " + message); - return; + logger.warn("Connection to STOMP broker is not active"); + handleForwardFailure(message); + } + else if (!forwardInternal(tcpConnection, message)) { + handleForwardFailure(message); + } + } + + protected void handleForwardFailure(Message message) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to forward message to the broker. message=" + message); } - forwardInternal(tcpConnection, message); } private boolean forwardInternal( @@ -491,32 +495,40 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void connected(StompHeaderAccessor headers, final StompConnection stompConnection) { - long brokerReceiveInterval = headers.getHeartbeat()[1]; - if (HEARTBEAT_SEND_INTERVAL > 0 && brokerReceiveInterval > 0) { + long brokerReceiveInterval = headers.getHeartbeat()[1]; + if ((HEARTBEAT_SEND_INTERVAL > 0) && (brokerReceiveInterval > 0)) { long interval = Math.max(HEARTBEAT_SEND_INTERVAL, brokerReceiveInterval); stompConnection.connection.on().writeIdle(interval, new Runnable() { @Override public void run() { - TcpConnection, Message> connection = stompConnection.connection; - if (connection != null) { - connection.send(MessageBuilder.withPayload(heartbeatPayload).build()); + TcpConnection, Message> tcpConn = stompConnection.connection; + if (tcpConn != null) { + tcpConn.send(MessageBuilder.withPayload(heartbeatPayload).build(), + new Consumer() { + @Override + public void accept(Boolean t) { + handleTcpClientFailure("Failed to send heartbeat to the broker", null); + } + }); } } - }); } long brokerSendInterval = headers.getHeartbeat()[0]; if (HEARTBEAT_RECEIVE_INTERVAL > 0 && brokerSendInterval > 0) { - final long interval = - Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval) * HEARTBEAT_RECEIVE_MULTIPLIER; + final long interval = Math.max(HEARTBEAT_RECEIVE_INTERVAL, brokerSendInterval) + * HEARTBEAT_RECEIVE_MULTIPLIER; stompConnection.connection.on().readIdle(interval, new Runnable() { + @Override public void run() { String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms"; - logger.warn(message); + if (logger.isWarnEnabled()) { + logger.warn(message); + } disconnected(message); } }); @@ -537,6 +549,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler // Ignore } } + + @Override + protected void handleForwardFailure(Message message) { + super.handleForwardFailure(message); + throw new MessageDeliveryException(message); + } } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 3cf1544313c..26931c57320 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.simp.BrokerAvailabilityEvent; @@ -143,6 +144,12 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.awaitAndAssert(); } + @Test(expected=MessageDeliveryException.class) + public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE); + this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build()); + } + @Test public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {