Browse Source

Remove CONNECT-related message buffer from STOMP relay

Before this change, the StompProtocolHandler always responded to
clients with a CONNECTED frame, while the STOMP broker relay
independantly forwarded the client CONNECT to the broker and waited
for the CONNECTED frame back. That meant the relay had to buffer
client messages until it received the CONNECTED response from
the message broker.

This change ensures that clients wait for a CONNECTED frame from
the message broker. The broker relay forwards the CONNECT frame to
the broker. The broker responds with a CONNECTED frame, which the
relay then forwards to the client. As a result, a (well-written)
client will not send any messages to the relay until the connection
to the broker is fully established.

The StompProtcolHandler can now be configured whether to send CONNECTED
frame back. By default that is off. So when using the simple broker,
the StompProtocolHandler can still respond with CONNECTED frames.

The relay's handling of a connection being dropped has also been
improved. When a connection for a client relay session is dropped
an ERROR frame will be sent back to the client. If a connection is
closed as part of a DISCONNECT frame being sent, no ERROR frame
is sent back to the client. When the connection for the system relay
session is dropped, an event is published indicating that the broker
is unavailable. Reactor's TcpClient will then attempt to re-restablish
the connection.
pull/364/merge
Andy Wilkinson 13 years ago committed by Rossen Stoyanchev
parent
commit
8d2a376b0f
  1. 4
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java
  2. 6
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java
  3. 71
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  4. 36
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java
  5. 2
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java
  6. 220
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
  7. 41
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java

4
spring-messaging/src/main/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistry.java

@ -54,7 +54,8 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { @@ -54,7 +54,8 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler,
MutableUserQueueSuffixResolver userQueueSuffixResolver, TaskScheduler defaultSockJsTaskScheduler) {
MutableUserQueueSuffixResolver userQueueSuffixResolver, TaskScheduler defaultSockJsTaskScheduler,
boolean handleConnect) {
Assert.notNull(webSocketHandler);
Assert.notNull(userQueueSuffixResolver);
@ -63,6 +64,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry { @@ -63,6 +64,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler);
this.stompHandler = new StompProtocolHandler();
this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver);
this.stompHandler.setHandleConnect(handleConnect);
this.sockJsScheduler = defaultSockJsTaskScheduler;
}

6
spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java

@ -57,8 +57,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { @@ -57,8 +57,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean
public HandlerMapping brokerWebSocketHandlerMapping() {
ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(
subProtocolWebSocketHandler(), userQueueSuffixResolver(), brokerDefaultSockJsTaskScheduler());
boolean brokerRelayConfigured = getMessageBrokerConfigurer().getStompBrokerRelay() != null;
ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(subProtocolWebSocketHandler(),
userQueueSuffixResolver(), brokerDefaultSockJsTaskScheduler(), !brokerRelayConfigured);
registerStompEndpoints(registry);
AbstractHandlerMapping hm = registry.getHandlerMapping();
hm.setOrder(1);

71
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -17,13 +17,9 @@ @@ -17,13 +17,9 @@
package org.springframework.messaging.simp.stomp;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.messaging.Message;
@ -249,12 +245,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -249,12 +245,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private final String sessionId;
private final BlockingQueue<Message<?>> messageQueue = new LinkedBlockingQueue<Message<?>>(50);
private volatile StompConnection stompConnection = new StompConnection();
private final Object monitor = new Object();
private RelaySession(String sessionId) {
Assert.notNull(sessionId, "sessionId is required");
@ -291,6 +283,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -291,6 +283,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void handleTcpConnection(TcpConnection<Message<byte[]>, Message<byte[]>> tcpConn, final Message<?> connectMessage) {
this.stompConnection.setTcpConnection(tcpConn);
tcpConn.on().close(new Runnable() {
@Override
public void run() {
connectionClosed();
}
});
tcpConn.in().consume(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
@ -307,12 +305,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -307,12 +305,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECTED == headers.getCommand()) {
synchronized(this.monitor) {
this.stompConnection.setReady();
publishBrokerAvailableEvent();
flushMessages();
}
return;
this.stompConnection.setReady();
publishBrokerAvailableEvent();
}
headers.setSessionId(this.sessionId);
@ -344,24 +338,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -344,24 +338,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public void forward(Message<?> message) {
if (!this.stompConnection.isReady()) {
synchronized(this.monitor) {
if (!this.stompConnection.isReady()) {
this.messageQueue.add(message);
if (logger.isTraceEnabled()) {
logger.trace("Not connected, message queued. Queue size=" + this.messageQueue.size());
}
return;
}
}
logger.warn("Message sent to relay before it was CONNECTED. Discarding message: " + message);
return;
}
if (this.messageQueue.isEmpty()) {
forwardInternal(message);
}
else {
this.messageQueue.add(message);
flushMessages();
}
forwardInternal(message);
}
private boolean forwardInternal(final Message<?> message) {
@ -381,6 +362,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -381,6 +362,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.trace("Forwarding to STOMP broker, message: " + message);
}
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
if (command == StompCommand.DISCONNECT) {
this.stompConnection.setDisconnected();
}
final Deferred<Boolean, Promise<Boolean>> deferred = new DeferredPromiseSpec<Boolean>().get();
tcpConnection.send((Message<byte[]>)message, new Consumer<Boolean>() {
@Override
@ -396,7 +383,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -396,7 +383,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null);
}
else if (!success) {
if (StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) {
if (command != StompCommand.DISCONNECT) {
handleTcpClientFailure("Failed to forward message to the broker", null);
}
}
@ -408,13 +395,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -408,13 +395,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return (success != null) ? success : false;
}
private void flushMessages() {
List<Message<?>> messages = new ArrayList<Message<?>>();
this.messageQueue.drainTo(messages);
for (Message<?> message : messages) {
if (!forwardInternal(message)) {
return;
}
protected void connectionClosed() {
relaySessions.remove(this.sessionId);
if (this.stompConnection.isReady()) {
sendError("Lost connection to the broker");
}
}
}
@ -461,6 +445,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -461,6 +445,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private class SystemRelaySession extends RelaySession {
private static final long HEARTBEAT_SEND_INTERVAL = 10000;
private static final long HEARTBEAT_RECEIVE_INTERVAL = 10000;
public static final String ID = "stompRelaySystemSessionId";
@ -473,7 +461,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -473,7 +461,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headers.setAcceptVersion("1.1,1.2");
headers.setLogin(systemLogin);
headers.setPasscode(systemPasscode);
headers.setHeartbeat(0,0);
headers.setHeartbeat(HEARTBEAT_SEND_INTERVAL, HEARTBEAT_RECEIVE_INTERVAL);
Message<?> connectMessage = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
super.connect(connectMessage);
}
@ -488,6 +476,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -488,6 +476,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
});
}
@Override
protected void connectionClosed() {
publishBrokerUnavailableEvent();
}
@Override
protected void sendMessageToClient(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);

36
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java

@ -70,6 +70,7 @@ public class StompProtocolHandler implements SubProtocolHandler { @@ -70,6 +70,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
private MutableUserQueueSuffixResolver queueSuffixResolver = new SimpleUserQueueSuffixResolver();
private volatile boolean handleConnect = false;
/**
* Configure a resolver to use to maintain queue suffixes for user
@ -86,6 +87,29 @@ public class StompProtocolHandler implements SubProtocolHandler { @@ -86,6 +87,29 @@ public class StompProtocolHandler implements SubProtocolHandler {
return this.queueSuffixResolver;
}
/**
* Configures the handling of CONNECT frames. When {@code true}, CONNECT
* frames will be handled by this handler, and a CONNECTED response will be
* sent. When {@code false}, CONNECT frames will be forwarded for
* handling by another component.
*
* @param handleConnect {@code true} if connect frames should be handled
* by this handler, {@code false} otherwise.
*/
public void setHandleConnect(boolean handleConnect) {
this.handleConnect = handleConnect;
}
/**
* Returns whether or not this handler will handle CONNECT frames.
*
* @return Returns {@code true} if this handler will handle CONNECT frames,
* otherwise {@code false}.
*/
public boolean willHandleConnect() {
return this.handleConnect;
}
@Override
public List<String> getSupportedProtocols() {
return Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
@ -121,17 +145,17 @@ public class StompProtocolHandler implements SubProtocolHandler { @@ -121,17 +145,17 @@ public class StompProtocolHandler implements SubProtocolHandler {
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
if (SimpMessageType.CONNECT.equals(headers.getMessageType())) {
if (this.handleConnect && SimpMessageType.CONNECT.equals(headers.getMessageType())) {
handleConnect(session, message);
}
outputChannel.send(message);
else {
outputChannel.send(message);
}
}
catch (Throwable t) {
logger.error("Terminating STOMP session due to failure to send message: ", t);
sendErrorMessage(session, t);
}
}
/**
@ -144,8 +168,8 @@ public class StompProtocolHandler implements SubProtocolHandler { @@ -144,8 +168,8 @@ public class StompProtocolHandler implements SubProtocolHandler {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
headers.setCommandIfNotSet(StompCommand.MESSAGE);
if (StompCommand.CONNECTED.equals(headers.getCommand())) {
// Ignore for now since we already sent it
if (this.handleConnect && StompCommand.CONNECTED.equals(headers.getCommand())) {
// Ignore since we already sent it
return;
}

2
spring-messaging/src/test/java/org/springframework/messaging/simp/config/ServletStompEndpointRegistryTests.java

@ -53,7 +53,7 @@ public class ServletStompEndpointRegistryTests { @@ -53,7 +53,7 @@ public class ServletStompEndpointRegistryTests {
this.webSocketHandler = new SubProtocolWebSocketHandler(channel);
this.queueSuffixResolver = new SimpleUserQueueSuffixResolver();
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
this.registry = new ServletStompEndpointRegistry(webSocketHandler, queueSuffixResolver, taskScheduler);
this.registry = new ServletStompEndpointRegistry(webSocketHandler, queueSuffixResolver, taskScheduler, false);
}

220
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
@ -63,16 +62,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -63,16 +62,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private ExpectationMatchingEventPublisher eventPublisher;
private int port;
@Before
public void setUp() throws Exception {
int port = SocketUtils.findAvailableTcpPort(61613);
this.port = SocketUtils.findAvailableTcpPort(61613);
this.activeMQBroker = new BrokerService();
this.activeMQBroker.addConnector("stomp://localhost:" + port);
this.activeMQBroker.setStartAsync(false);
this.activeMQBroker.setDeleteAllMessagesOnStartup(true);
this.activeMQBroker.start();
createAndStartBroker();
this.responseChannel = new ExecutorSubscribableChannel();
this.responseHandler = new ExpectationMatchingMessageHandler();
@ -86,6 +83,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -86,6 +83,14 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.relay.start();
}
private void createAndStartBroker() throws Exception {
this.activeMQBroker = new BrokerService();
this.activeMQBroker.addConnector("stomp://localhost:" + port);
this.activeMQBroker.setStartAsync(false);
this.activeMQBroker.setDeleteAllMessagesOnStartup(true);
this.activeMQBroker.start();
}
@After
public void tearDown() throws Exception {
try {
@ -102,22 +107,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -102,22 +107,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.relay.handleMessage(conn1.message);
this.responseHandler.expect(conn1);
String sess2 = "sess2";
MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
this.relay.handleMessage(conn2.message);
this.responseHandler.expect(conn2);
this.responseHandler.awaitAndAssert();
String subs1 = "subs1";
String destination = "/topic/test";
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
this.responseHandler.expect(subscribe);
this.relay.handleMessage(subscribe.message);
this.responseHandler.expect(subscribe);
this.responseHandler.awaitAndAssert();
MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
this.responseHandler.reset();
this.responseHandler.expect(send);
this.relay.handleMessage(send.message);
@ -129,7 +136,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -129,7 +136,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
stopBrokerAndAwait();
MessageExchange connect = MessageExchangeBuilder.connect("sess1").andExpectError().build();
MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build();
this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message);
@ -137,37 +144,31 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -137,37 +144,31 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
@Test
public void brokerUnvailableErrorFrameOnSend() throws Exception {
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
String sess1 = "sess1";
MessageExchange connect = MessageExchangeBuilder.connect(sess1).build();
this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message);
// TODO: expect CONNECTED
Thread.sleep(2000);
this.responseHandler.awaitAndAssert();
stopBrokerAndAwait();
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
MessageExchange subscribe = MessageExchangeBuilder.subscribe(sess1, "s1", "/topic/a").andExpectError().build();
this.responseHandler.expect(subscribe);
stopBrokerAndAwait();
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert();
}
@Test
public void brokerAvailabilityEvents() throws Exception {
// TODO: expect CONNECTED
Thread.sleep(2000);
this.eventPublisher.expect(true, false);
this.eventPublisher.expect(true);
this.eventPublisher.awaitAndAssert();
this.eventPublisher.expect(false);
stopBrokerAndAwait();
// TODO: remove when stop is detecteded
this.relay.handleMessage(MessageExchangeBuilder.connect("sess1").build().message);
this.eventPublisher.awaitAndAssert();
}
@ -176,37 +177,55 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -176,37 +177,55 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.responseHandler.expect(conn1);
this.relay.handleMessage(conn1.message);
this.responseHandler.awaitAndAssert();
String subs1 = "subs1";
String destination = "/topic/test";
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
MessageExchange subscribe =
MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
this.responseHandler.expect(subscribe);
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert();
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
stopBrokerAndAwait();
// 1st message will see ERROR frame (broker shutdown is not but should be detected)
// 2nd message will be queued (a side effect of CONNECT/CONNECTED-buffering, likely to be removed)
// Finish this once the above changes are made.
this.responseHandler.awaitAndAssert();
/* MessageExchange send = MessageExchangeBuilder.send(destination, "foo").build();
this.responseHandler.reset();
this.relay.handleMessage(send.message);
Thread.sleep(2000);
this.eventPublisher.expect(true, false);
this.eventPublisher.awaitAndAssert();
this.activeMQBroker.start();
Thread.sleep(5000);
this.eventPublisher.expect(true);
createAndStartBroker();
this.eventPublisher.awaitAndAssert();
send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
this.responseHandler.reset();
this.responseHandler.expect(send);
this.relay.handleMessage(send.message);
// TODO The event publisher assertions show that the broker's back up and the system relay session
// has reconnected. We need to decide what we want the reconnect behaviour to be for client relay
// sessions and add further message sending and assertions as appropriate. At the moment any client
// sessions will be closed and an ERROR from will be sent.
}
@Test
public void disconnectClosesRelaySessionCleanly() throws Exception {
String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.responseHandler.expect(conn1);
this.relay.handleMessage(conn1.message);
this.responseHandler.awaitAndAssert();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId(sess1);
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build());
Thread.sleep(2000);
// Check that we have not received an ERROR as a result of the connection closing
this.responseHandler.awaitAndAssert();
*/
}
@ -234,58 +253,66 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -234,58 +253,66 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
*/
private static class ExpectationMatchingMessageHandler implements MessageHandler {
private final List<MessageExchange> expected;
private final Object monitor = new Object();
private final List<MessageExchange> actual = new CopyOnWriteArrayList<>();
private final List<MessageExchange> expected;
private final List<Message<?>> unexpected = new CopyOnWriteArrayList<>();
private final List<MessageExchange> actual = new ArrayList<>();
private CountDownLatch latch = new CountDownLatch(1);
private final List<Message<?>> unexpected = new ArrayList<>();
public ExpectationMatchingMessageHandler(MessageExchange... expected) {
this.expected = new CopyOnWriteArrayList<>(expected);
synchronized (this.monitor) {
this.expected = new CopyOnWriteArrayList<>(expected);
}
}
public void expect(MessageExchange... expected) {
this.expected.addAll(Arrays.asList(expected));
synchronized (this.monitor) {
this.expected.addAll(Arrays.asList(expected));
}
}
public void awaitAndAssert() throws InterruptedException {
boolean result = this.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(getAsString(), result && this.unexpected.isEmpty());
}
public void reset() {
this.latch = new CountDownLatch(1);
this.expected.clear();
this.actual.clear();
this.unexpected.clear();
long endTime = System.currentTimeMillis() + 10000;
synchronized (this.monitor) {
while (!this.expected.isEmpty() && System.currentTimeMillis() < endTime) {
this.monitor.wait(500);
}
boolean result = this.expected.isEmpty();
assertTrue(getAsString(), result && this.unexpected.isEmpty());
}
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
for (MessageExchange exch : this.expected) {
if (exch.matchMessage(message)) {
if (exch.isDone()) {
this.expected.remove(exch);
this.actual.add(exch);
if (this.expected.isEmpty()) {
this.latch.countDown();
synchronized(this.monitor) {
for (MessageExchange exch : this.expected) {
if (exch.matchMessage(message)) {
if (exch.isDone()) {
this.expected.remove(exch);
this.actual.add(exch);
if (this.expected.isEmpty()) {
this.monitor.notifyAll();
}
}
return;
}
return;
}
this.unexpected.add(message);
}
this.unexpected.add(message);
}
public String getAsString() {
StringBuilder sb = new StringBuilder("\n");
sb.append("INCOMPLETE:\n").append(this.expected).append("\n");
sb.append("COMPLETE:\n").append(this.actual).append("\n");
sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n");
synchronized (this.monitor) {
sb.append("INCOMPLETE:\n").append(this.expected).append("\n");
sb.append("COMPLETE:\n").append(this.actual).append("\n");
sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n");
}
return sb.toString();
}
}
@ -352,22 +379,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -352,22 +379,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.headers = StompHeaderAccessor.wrap(message);
}
public static MessageExchangeBuilder error(String sessionId) {
return new MessageExchangeBuilder(null).andExpectError(sessionId);
}
public static MessageExchangeBuilder connect(String sessionId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setAcceptVersion("1.1,1.2");
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
return new MessageExchangeBuilder(message);
MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
builder.expected.add(new StompConnectedFrameMessageMatcher(sessionId));
return builder;
}
public static MessageExchangeBuilder subscribe(String sessionId, String subscriptionId, String destination) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
public static MessageExchangeBuilder connectWithError(String sessionId) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setSubscriptionId(subscriptionId);
headers.setDestination(destination);
headers.setAcceptVersion("1.1,1.2");
Message<?> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
return new MessageExchangeBuilder(message);
MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
return builder.andExpectError();
}
public static MessageExchangeBuilder subscribeWithReceipt(String sessionId, String subscriptionId,
@ -515,35 +548,48 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -515,35 +548,48 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher {
public StompConnectedFrameMessageMatcher(String sessionId) {
super(StompCommand.CONNECTED, sessionId);
}
}
private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher {
private final List<Boolean> expected = new CopyOnWriteArrayList<>();
private final List<Boolean> expected = new ArrayList<>();
private final List<Boolean> actual = new CopyOnWriteArrayList<>();
private final List<Boolean> actual = new ArrayList<>();
private CountDownLatch latch = new CountDownLatch(1);
private final Object monitor = new Object();
public void expect(Boolean... expected) {
this.expected.addAll(Arrays.asList(expected));
synchronized (this.monitor) {
this.expected.addAll(Arrays.asList(expected));
}
}
public void awaitAndAssert() throws InterruptedException {
if (this.expected.size() == this.actual.size()) {
synchronized(this.monitor) {
long endTime = System.currentTimeMillis() + 5000;
while (this.expected.size() != this.actual.size() && System.currentTimeMillis() < endTime) {
this.monitor.wait(500);
}
assertEquals(this.expected, this.actual);
}
else {
assertTrue("Expected=" + this.expected + ", actual=" + this.actual,
this.latch.await(5, TimeUnit.SECONDS));
}
}
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof BrokerAvailabilityEvent) {
this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());
if (this.actual.size() == this.expected.size()) {
this.latch.countDown();
synchronized(this.monitor) {
this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());
if (this.actual.size() == this.expected.size()) {
this.monitor.notifyAll();
}
}
}
}

41
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompProtocolHandlerTests.java

@ -61,25 +61,15 @@ public class StompProtocolHandlerTests { @@ -61,25 +61,15 @@ public class StompProtocolHandlerTests {
}
@Test
public void handleConnect() {
public void connectedResponseIsSentWhenHandlingConnect() {
this.stompHandler.setHandleConnect(true);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).headers(
"login:guest", "passcode:guest", "accept-version:1.1,1.0", "heart-beat:10000,10000").build();
this.stompHandler.handleMessageFromClient(this.session, textMessage, this.channel);
verify(this.channel).send(this.messageCaptor.capture());
Message<?> actual = this.messageCaptor.getValue();
assertNotNull(actual);
StompHeaderAccessor headers = StompHeaderAccessor.wrap(actual);
assertEquals(StompCommand.CONNECT, headers.getCommand());
assertEquals("s1", headers.getSessionId());
assertEquals("joe", headers.getUser().getName());
assertEquals("guest", headers.getLogin());
assertEquals("PROTECTED", headers.getPasscode());
assertArrayEquals(new long[] {10000, 10000}, headers.getHeartbeat());
assertEquals(new HashSet<>(Arrays.asList("1.1","1.0")), headers.getAcceptVersion());
verifyNoMoreInteractions(this.channel);
// Check CONNECTED reply
@ -95,4 +85,29 @@ public class StompProtocolHandlerTests { @@ -95,4 +85,29 @@ public class StompProtocolHandlerTests {
assertEquals("s1", replyHeaders.getNativeHeader("queue-suffix").get(0));
}
@Test
public void connectIsForwardedWhenNotHandlingConnect() {
this.stompHandler.setHandleConnect(false);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).headers(
"login:guest", "passcode:guest", "accept-version:1.1,1.0", "heart-beat:10000,10000").build();
this.stompHandler.handleMessageFromClient(this.session, textMessage, this.channel);
verify(this.channel).send(this.messageCaptor.capture());
Message<?> actual = this.messageCaptor.getValue();
assertNotNull(actual);
StompHeaderAccessor headers = StompHeaderAccessor.wrap(actual);
assertEquals(StompCommand.CONNECT, headers.getCommand());
assertEquals("s1", headers.getSessionId());
assertEquals("joe", headers.getUser().getName());
assertEquals("guest", headers.getLogin());
assertEquals("PROTECTED", headers.getPasscode());
assertArrayEquals(new long[] {10000, 10000}, headers.getHeartbeat());
assertEquals(new HashSet<>(Arrays.asList("1.1","1.0")), headers.getAcceptVersion());
assertEquals(0, this.session.getSentMessages().size());
}
}

Loading…
Cancel
Save