Browse Source

Support STOMP DISCONNECT with receipt

Issue: SPR-11599
pull/573/head
Rossen Stoyanchev 12 years ago
parent
commit
7da3fb4ce6
  1. 17
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 32
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

17
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -759,7 +759,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -759,7 +759,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void onSuccess(Void result) {
if (accessor.getCommand() == StompCommand.DISCONNECT) {
clearConnection();
afterDisconnectSent(accessor);
}
}
@Override
@ -775,6 +775,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -775,6 +775,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return future;
}
/**
* After a DISCONNECT there should be no more client frames so we can
* close the connection pro-actively. However, if the DISCONNECT has a
* receipt header we leave the connection open and expect the server will
* respond with a RECEIPT and then close the connection.
*
* @see <a href="http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT">
* STOMP Specification 1.2 DISCONNECT</a>
*/
private void afterDisconnectSent(StompHeaderAccessor accessor) {
if (accessor.getReceipt() == null) {
clearConnection();
}
}
/**
* Clean up state associated with the connection and close it.
* Any exception arising from closing the connection are propagated.

32
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -223,11 +223,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -223,11 +223,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
startActiveMqBroker();
this.eventPublisher.expectBrokerAvailabilityEvent(true);
// TODO The event publisher assertions show that the broker's back up and the system relay session
// has reconnected. We need to decide what we want the reconnect behaviour to be for client relay
// sessions and add further message sending and assertions as appropriate. At the moment any client
// sessions will be closed and an ERROR from will be sent.
}
@Test
@ -249,6 +244,21 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -249,6 +244,21 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
assertTrue("Unexpected messages: " + this.responseHandler.queue, this.responseHandler.queue.isEmpty());
}
@Test
public void disconnectWithReceipt() throws Exception {
logger.debug("Starting test disconnectWithReceipt()");
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
this.relay.handleMessage(connect.message);
this.responseHandler.expectMessages(connect);
MessageExchange disconnect = MessageExchangeBuilder.disconnectWithReceipt("sess1", "r123").build();
this.relay.handleMessage(disconnect.message);
this.responseHandler.expectMessages(disconnect);
}
private static class TestEventPublisher implements ApplicationEventPublisher {
@ -403,6 +413,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -403,6 +413,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
return new MessageExchangeBuilder(message);
}
public static MessageExchangeBuilder disconnectWithReceipt(String sessionId, String receiptId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(sessionId);
headers.setReceipt(receiptId);
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
return builder;
}
public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) {
Assert.isTrue(SimpMessageType.MESSAGE.equals(headers.getMessageType()));
String destination = this.headers.getDestination();

Loading…
Cancel
Save