diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java index a117f3ceece..7592ad13ae2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java @@ -100,14 +100,13 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { else if (SimpMessageType.DISCONNECT.equals(messageType)) { String sessionId = headers.getSessionId(); this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); - } else if (SimpMessageType.CONNECT.equals(messageType)) { - String sessionId = headers.getSessionId(); - SimpMessageHeaderAccessor connectAckHeaders = - SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); - connectAckHeaders.setSessionId(sessionId); - connectAckHeaders.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message); - Message connectAck = - MessageBuilder.withPayloadAndHeaders(EMPTY_PAYLOAD, connectAckHeaders).build(); + } + else if (SimpMessageType.CONNECT.equals(messageType)) { + SimpMessageHeaderAccessor replyHeaders = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); + replyHeaders.setSessionId(headers.getSessionId()); + replyHeaders.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message); + + Message connectAck = MessageBuilder.withPayloadAndHeaders(EMPTY_PAYLOAD, replyHeaders).build(); this.messageChannel.send(connectAck); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index d0b78cbad4b..2058a88bbc5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -368,15 +368,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @SuppressWarnings("unchecked") Message byteMessage = (Message) message; - if (logger.isTraceEnabled()) { logger.trace("Forwarding to STOMP broker, message: " + message); } StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); - if (command == StompCommand.DISCONNECT) { - this.stompConnection.setDisconnected(); - } final Deferred> deferred = new DeferredPromiseSpec().get(); tcpConnection.send(byteMessage, new Consumer() { @@ -393,8 +389,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null); } else if (!success) { - if (command != StompCommand.DISCONNECT) { - handleTcpClientFailure("Failed to forward message to the broker", null); + handleTcpClientFailure("Failed to forward message to the broker", null); + } + else { + if (command == StompCommand.DISCONNECT) { + this.stompConnection.setDisconnected(); } } } @@ -508,8 +507,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler tcpConn.send(MessageBuilder.withPayload(heartbeatPayload).build(), new Consumer() { @Override - public void accept(Boolean t) { - handleTcpClientFailure("Failed to send heartbeat to the broker", null); + public void accept(Boolean result) { + if (!result) { + handleTcpClientFailure("Failed to send heartbeat to the broker", null); + } } }); } @@ -542,7 +543,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); if (StompCommand.ERROR.equals(headers.getCommand())) { if (logger.isErrorEnabled()) { - logger.error("System session received ERROR frame from broker: " + message); + logger.error("STOMP ERROR frame on system session: " + message); } } else { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java index 7769e48e326..128eaa2d5cc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java @@ -128,6 +128,20 @@ public class StompProtocolHandler implements SubProtocolHandler { } } + protected void sendErrorMessage(WebSocketSession session, Throwable error) { + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); + headers.setMessage(error.getMessage()); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + String payload = new String(this.stompEncoder.encode(message), Charset.forName("UTF-8")); + try { + session.sendMessage(new TextMessage(payload)); + } + catch (Throwable t) { + // ignore + } + } + /** * Handle STOMP messages going back out to WebSocket clients. */ @@ -143,7 +157,7 @@ public class StompProtocolHandler implements SubProtocolHandler { if (headers.getMessageType() == SimpMessageType.CONNECT_ACK) { StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); connectedHeaders.setVersion(getVersion(headers)); - connectedHeaders.setHeartbeat(0, 0); + connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker headers = connectedHeaders; } @@ -180,40 +194,25 @@ public class StompProtocolHandler implements SubProtocolHandler { } } - @Override - public String resolveSessionId(Message message) { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - return headers.getSessionId(); - } - - @Override - public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) { - } + private String getVersion(StompHeaderAccessor connectAckHeaders) { - @Override - public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { + String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; + Message connectMessage = (Message) connectAckHeaders.getHeader(name); + StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(connectMessage); + Assert.notNull(connectMessage, "CONNECT_ACK does not contain original CONNECT " + connectAckHeaders); - if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) { - this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), session.getId()); + Set acceptVersions = connectHeaders.getAcceptVersion(); + if (acceptVersions.contains("1.2")) { + return "1.2"; } - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.setSessionId(session.getId()); - Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - outputChannel.send(message); - } - - protected void sendErrorMessage(WebSocketSession session, Throwable error) { - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); - headers.setMessage(error.getMessage()); - Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - String payload = new String(this.stompEncoder.encode(message), Charset.forName("UTF-8")); - try { - session.sendMessage(new TextMessage(payload)); + else if (acceptVersions.contains("1.1")) { + return "1.1"; } - catch (Throwable t) { - // ignore + else if (acceptVersions.isEmpty()) { + return null; + } + else { + throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); } } @@ -230,23 +229,27 @@ public class StompProtocolHandler implements SubProtocolHandler { } } - private String getVersion(StompHeaderAccessor connectAckHeaders) { - Message connectMessage = - (Message) connectAckHeaders.getHeader(StompHeaderAccessor.CONNECT_MESSAGE_HEADER); - StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(connectMessage); + @Override + public String resolveSessionId(Message message) { + StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + return headers.getSessionId(); + } - Set acceptVersions = connectHeaders.getAcceptVersion(); - if (acceptVersions.contains("1.2")) { - return "1.2"; - } - else if (acceptVersions.contains("1.1")) { - return "1.1"; - } - else if (acceptVersions.isEmpty()) { - return null; - } - else { - throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); + @Override + public void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) { + } + + @Override + public void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel) { + + if ((this.queueSuffixResolver != null) && (session.getPrincipal() != null)) { + this.queueSuffixResolver.removeQueueSuffix(session.getPrincipal().getName(), session.getId()); } + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + headers.setSessionId(session.getId()); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + outputChannel.send(message); } + } \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java index 3a298e4fb10..319beaeff96 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java @@ -65,7 +65,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { @Override public boolean sendInternal(final Message message, long timeout) { - logger.trace("subscribers " + this.handlers); + if (logger.isTraceEnabled()) { + logger.trace("subscribers " + this.handlers); + } for (final MessageHandler handler : this.handlers) { if (this.executor == null) {