From 7da3fb4ce6dc90048b2a3f661fa80f38bede8ed9 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sat, 28 Jun 2014 23:09:14 -0400 Subject: [PATCH] Support STOMP DISCONNECT with receipt Issue: SPR-11599 --- .../stomp/StompBrokerRelayMessageHandler.java | 17 +++++++++- ...erRelayMessageHandlerIntegrationTests.java | 32 ++++++++++++++++--- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 697e3d1f8c4..8451c05268c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -759,7 +759,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void onSuccess(Void result) { if (accessor.getCommand() == StompCommand.DISCONNECT) { - clearConnection(); + afterDisconnectSent(accessor); } } @Override @@ -775,6 +775,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return future; } + /** + * After a DISCONNECT there should be no more client frames so we can + * close the connection pro-actively. However, if the DISCONNECT has a + * receipt header we leave the connection open and expect the server will + * respond with a RECEIPT and then close the connection. + * + * @see + * STOMP Specification 1.2 DISCONNECT + */ + private void afterDisconnectSent(StompHeaderAccessor accessor) { + if (accessor.getReceipt() == null) { + clearConnection(); + } + } + /** * Clean up state associated with the connection and close it. * Any exception arising from closing the connection are propagated. diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index fe5cb06adae..f038f4e1b00 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -223,11 +223,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { startActiveMqBroker(); this.eventPublisher.expectBrokerAvailabilityEvent(true); - - // TODO The event publisher assertions show that the broker's back up and the system relay session - // has reconnected. We need to decide what we want the reconnect behaviour to be for client relay - // sessions and add further message sending and assertions as appropriate. At the moment any client - // sessions will be closed and an ERROR from will be sent. } @Test @@ -249,6 +244,21 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { assertTrue("Unexpected messages: " + this.responseHandler.queue, this.responseHandler.queue.isEmpty()); } + @Test + public void disconnectWithReceipt() throws Exception { + + logger.debug("Starting test disconnectWithReceipt()"); + + MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); + this.relay.handleMessage(connect.message); + this.responseHandler.expectMessages(connect); + + MessageExchange disconnect = MessageExchangeBuilder.disconnectWithReceipt("sess1", "r123").build(); + this.relay.handleMessage(disconnect.message); + + this.responseHandler.expectMessages(disconnect); + } + private static class TestEventPublisher implements ApplicationEventPublisher { @@ -403,6 +413,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { return new MessageExchangeBuilder(message); } + public static MessageExchangeBuilder disconnectWithReceipt(String sessionId, String receiptId) { + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + headers.setSessionId(sessionId); + headers.setReceipt(receiptId); + Message message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + MessageExchangeBuilder builder = new MessageExchangeBuilder(message); + builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId)); + return builder; + } + public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) { Assert.isTrue(SimpMessageType.MESSAGE.equals(headers.getMessageType())); String destination = this.headers.getDestination();