diff --git a/.gitignore b/.gitignore index b1e4ec06c78..095221dc637 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ spring-test/test-output/ .gradle argfile* pom.xml +activemq-data/ /build buildSrc/build diff --git a/build.gradle b/build.gradle index 1b550caa690..8029e662c4a 100644 --- a/build.gradle +++ b/build.gradle @@ -321,10 +321,18 @@ project("spring-messaging") { optional("org.projectreactor:reactor-core:1.0.0.BUILD-SNAPSHOT") optional("org.projectreactor:reactor-tcp:1.0.0.BUILD-SNAPSHOT") optional("com.lmax:disruptor:3.1.1") + testCompile(project(":spring-test")) + testCompile("com.thoughtworks.xstream:xstream:1.4.4") testCompile("commons-dbcp:commons-dbcp:1.2.2") testCompile("javax.inject:javax.inject-tck:1") - testCompile(project(":spring-test")) - } + testCompile("org.apache.activemq:activemq-broker:5.8.0") + testCompile("org.apache.activemq:activemq-kahadb-store:5.8.0") { + exclude group: "org.springframework", module: "spring-context" + } + testCompile("org.apache.activemq:activemq-stomp:5.8.0") + testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}") + testCompile("log4j:log4j:1.2.17") +} repositories { maven { url 'http://repo.springsource.org/libs-milestone' } // reactor diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 09e2e388eef..59e81cdf844 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -376,13 +376,19 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife } private void brokerAvailable() { - if (this.brokerAvailable.compareAndSet(false, true)) { + if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) { + if (logger.isTraceEnabled()) { + logger.trace("Publishing BrokerAvailabilityEvent (available)"); + } this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); } } private void brokerUnavailable() { - if (this.brokerAvailable.compareAndSet(true, false)) { + if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) { + if (logger.isTraceEnabled()) { + logger.trace("Publishing BrokerAvailabilityEvent (unavailable)"); + } this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); } } @@ -518,52 +524,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private boolean forwardInternal(final Message message) { TcpConnection localConnection = this.connection; + if (localConnection == null) { + return false; + } - if (localConnection != null) { - if (logger.isTraceEnabled()) { - logger.trace("Forwarding message to STOMP broker, message id=" + message.getHeaders().getId()); - } - byte[] bytes = stompMessageConverter.fromMessage(message); - - final Deferred> deferred = new DeferredPromiseSpec().get(); - - String payload = new String(bytes, Charset.forName("UTF-8")); - localConnection.send(payload, new Consumer() { - - @Override - public void accept(Boolean success) { - if (!success && StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) { - deferred.accept(false); - } else { - deferred.accept(true); - } - } - }); - - Boolean success = null; + if (logger.isTraceEnabled()) { + logger.trace("Forwarding to STOMP broker, message: " + message); + } - try { - success = deferred.compose().await(); + byte[] bytes = stompMessageConverter.fromMessage(message); + String payload = new String(bytes, Charset.forName("UTF-8")); - if (success == null) { - sendError(sessionId, "Timed out waiting for message to be forwarded to the broker"); - } - else if (!success) { - sendError(sessionId, "Failed to forward message to the broker"); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - sendError(sessionId, "Interrupted while forwarding message to the broker"); + final Deferred> deferred = new DeferredPromiseSpec().get(); + localConnection.send(payload, new Consumer() { + @Override + public void accept(Boolean success) { + deferred.accept(success); } + }); + Boolean success = null; + try { + success = deferred.compose().await(); if (success == null) { - success = false; + sendError(sessionId, "Timed out waiting for message to be forwarded to the broker"); } - - return success; - } else { - return false; + else if (!success) { + if (StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) { + sendError(sessionId, "Failed to forward message to the broker"); + } + } + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + sendError(sessionId, "Interrupted while forwarding message to the broker"); } + return (success != null) ? success : false; } private void flushMessages() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java index a5e3c53e98a..b39d6e53a69 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCommand.java @@ -57,6 +57,9 @@ public enum StompCommand { private static Set destinationRequiredLookup = new HashSet(Arrays.asList(SEND, SUBSCRIBE, MESSAGE)); + private static Set subscriptionIdRequiredLookup = + new HashSet(Arrays.asList(SUBSCRIBE, UNSUBSCRIBE, MESSAGE)); + static { messageTypeLookup.put(StompCommand.CONNECT, SimpMessageType.CONNECT); messageTypeLookup.put(StompCommand.STOMP, SimpMessageType.CONNECT); @@ -76,4 +79,8 @@ public enum StompCommand { return destinationRequiredLookup.contains(this); } + public boolean requiresSubscriptionId() { + return subscriptionIdRequiredLookup.contains(this); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java index 18c1023fa9b..8825a73edb0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaderAccessor.java @@ -54,7 +54,9 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { public static final String STOMP_MESSAGE_ID_HEADER = "message-id"; - public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; + public static final String STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT + + public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame public static final String STOMP_SUBSCRIPTION_HEADER = "subscription"; @@ -176,20 +178,22 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { result.put(STOMP_CONTENT_TYPE_HEADER, Arrays.asList(contentType.toString())); } - if (StompCommand.MESSAGE.equals(getCommand())) { + if (getCommand().requiresSubscriptionId()) { String subscriptionId = getSubscriptionId(); if (subscriptionId != null) { - result.put(STOMP_SUBSCRIPTION_HEADER, Arrays.asList(subscriptionId)); + String name = StompCommand.MESSAGE.equals(getCommand()) ? STOMP_SUBSCRIPTION_HEADER : STOMP_ID_HEADER; + result.put(name, Arrays.asList(subscriptionId)); } else { - logger.warn("STOMP MESSAGE frame should have a subscription: " + this.toString()); - } - if ((getMessageId() == null)) { - String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement(); - result.put(STOMP_MESSAGE_ID_HEADER, Arrays.asList(messageId)); + logger.warn(getCommand() + " frame should have a subscription: " + this.toString()); } } + if (StompCommand.MESSAGE.equals(getCommand()) && ((getMessageId() == null))) { + String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement(); + result.put(STOMP_MESSAGE_ID_HEADER, Arrays.asList(messageId)); + } + return result; } @@ -302,6 +306,14 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER); } + public void setReceipt(String receiptId) { + setNativeHeader(STOMP_RECEIPT_HEADER, receiptId); + } + + public String getReceipt() { + return getFirstNativeHeader(STOMP_RECEIPT_HEADER); + } + public String getMessage() { return getFirstNativeHeader(STOMP_MESSAGE_HEADER); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index e5df5b89eab..60d5e045e61 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -16,285 +16,538 @@ package org.springframework.messaging.simp.stomp; -import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationListener; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; -import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.annotation.DirtiesContext.ClassMode; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.util.SocketUtils; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import reactor.util.Assert; + +import static org.junit.Assert.*; /** - * Integration tests for {@link StompBrokerRelayMessageHandler} - * - * @author Andy Wilkinson + * @author Rossen Stoyanchev */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = {StompBrokerRelayMessageHandlerIntegrationTests.TestConfiguration.class}) -@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD) public class StompBrokerRelayMessageHandlerIntegrationTests { - @Autowired - private SubscribableChannel messageChannel; + private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class); + + private static final Charset UTF_8 = Charset.forName("UTF-8"); - @Autowired private StompBrokerRelayMessageHandler relay; - @Autowired - private TestStompBroker stompBroker; + private BrokerService activeMQBroker; + + private ExecutorSubscribableChannel responseChannel; + + private ExpectationMatchingMessageHandler responseHandler; - @Autowired - private ApplicationContext applicationContext; + private ExpectationMatchingEventPublisher eventPublisher; - @Autowired - private BrokerAvailabilityListener brokerAvailabilityListener; + @Before + public void setUp() throws Exception { + int port = SocketUtils.findAvailableTcpPort(61613); + + this.activeMQBroker = new BrokerService(); + this.activeMQBroker.addConnector("stomp://localhost:" + port); + this.activeMQBroker.setStartAsync(false); + this.activeMQBroker.setDeleteAllMessagesOnStartup(true); + this.activeMQBroker.start(); + + this.responseChannel = new ExecutorSubscribableChannel(); + this.responseHandler = new ExpectationMatchingMessageHandler(); + this.responseChannel.subscribe(this.responseHandler); + + this.eventPublisher = new ExpectationMatchingEventPublisher(); + + this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/")); + this.relay.setRelayPort(port); + this.relay.setApplicationEventPublisher(this.eventPublisher); + this.relay.start(); + } + + @After + public void tearDown() throws Exception { + try { + this.relay.stop(); + } + finally { + stopBrokerAndAwait(); + } + } @Test - public void basicPublishAndSubscribe() throws IOException, InterruptedException { + public void publishSubscribe() throws Exception { - String client1SessionId = "abc123"; - String client2SessionId = "def456"; + String sess1 = "sess1"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); + this.relay.handleMessage(conn1.message); - final CountDownLatch messageLatch = new CountDownLatch(1); + String sess2 = "sess2"; + MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); + this.relay.handleMessage(conn2.message); - this.messageChannel.subscribe(new MessageHandler() { + String subs1 = "subs1"; + String destination = "/topic/test"; - @Override - public void handleMessage(Message message) throws MessagingException { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (headers.getCommand() == StompCommand.MESSAGE) { - messageLatch.countDown(); - } - } + MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); + this.responseHandler.expect(subscribe); - }); + this.relay.handleMessage(subscribe.message); + this.responseHandler.awaitAndAssert(); - this.relay.handleMessage(createConnectMessage(client1SessionId)); - this.relay.handleMessage(createConnectMessage(client2SessionId)); - this.relay.handleMessage(createSubscribeMessage(client1SessionId, "/topic/test")); + MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); + this.responseHandler.reset(); + this.responseHandler.expect(send); - this.stompBroker.awaitMessages(4); + this.relay.handleMessage(send.message); + this.responseHandler.awaitAndAssert(); + } + + @Test + public void brokerUnvailableErrorFrameOnConnect() throws Exception { - this.relay.handleMessage(createSendMessage(client2SessionId, "/topic/test", "fromClient2")); + stopBrokerAndAwait(); - assertTrue(messageLatch.await(30, TimeUnit.SECONDS)); + MessageExchange connect = MessageExchangeBuilder.connect("sess1").andExpectError().build(); + this.responseHandler.expect(connect); - List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); + this.relay.handleMessage(connect.message); + this.responseHandler.awaitAndAssert(); } @Test - public void whenConnectFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient() - throws IOException, InterruptedException { + public void brokerUnvailableErrorFrameOnSend() throws Exception { - String sessionId = "abc123"; + String sess1 = "sess1"; + MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); + this.relay.handleMessage(connect.message); - final CountDownLatch errorLatch = new CountDownLatch(1); + // TODO: expect CONNECTED + Thread.sleep(2000); - this.messageChannel.subscribe(new MessageHandler() { + stopBrokerAndAwait(); - @Override - public void handleMessage(Message message) throws MessagingException { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (headers.getCommand() == StompCommand.ERROR) { - errorLatch.countDown(); - } - } + MessageExchange subscribe = MessageExchangeBuilder.subscribe(sess1, "s1", "/topic/a").andExpectError().build(); + this.responseHandler.expect(subscribe); - }); + this.relay.handleMessage(subscribe.message); + this.responseHandler.awaitAndAssert(); + } - this.stompBroker.awaitMessages(1); + @Test + public void brokerAvailabilityEvents() throws Exception { - List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); + // TODO: expect CONNECTED + Thread.sleep(2000); - this.stompBroker.stop(); + this.eventPublisher.expect(true, false); - this.relay.handleMessage(createConnectMessage(sessionId)); + stopBrokerAndAwait(); - errorLatch.await(30, TimeUnit.SECONDS); + // TODO: remove when stop is detecteded + this.relay.handleMessage(MessageExchangeBuilder.connect("sess1").build().message); - availabilityEvents = brokerAvailabilityListener.awaitAvailabilityEvents(2); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); - assertFalse(availabilityEvents.get(1).isBrokerAvailable()); + this.eventPublisher.awaitAndAssert(); } @Test - public void whenSendFailsDueToTheBrokerBeingUnavailableAnErrorFrameIsSentToTheClient() - throws IOException, InterruptedException { + public void relayReconnectsIfBrokerComesBackUp() throws Exception { - String sessionId = "abc123"; + String sess1 = "sess1"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); + this.relay.handleMessage(conn1.message); - final CountDownLatch errorLatch = new CountDownLatch(1); + String subs1 = "subs1"; + String destination = "/topic/test"; + MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); + this.responseHandler.expect(subscribe); - this.messageChannel.subscribe(new MessageHandler() { + this.relay.handleMessage(subscribe.message); + this.responseHandler.awaitAndAssert(); - @Override - public void handleMessage(Message message) throws MessagingException { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (headers.getCommand() == StompCommand.ERROR) { - errorLatch.countDown(); - } - } + stopBrokerAndAwait(); + + // TODO: + // 1st message will see ERROR frame (broker shutdown is not but should be detected) + // 2nd message will be queued (a side effect of CONNECT/CONNECTED-buffering, likely to be removed) + // Finish this once the above changes are made. + +/* MessageExchange send = MessageExchangeBuilder.send(destination, "foo").build(); + this.responseHandler.reset(); + this.relay.handleMessage(send.message); + Thread.sleep(2000); + + this.activeMQBroker.start(); + Thread.sleep(5000); + + send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); + this.responseHandler.reset(); + this.responseHandler.expect(send); + this.relay.handleMessage(send.message); + + this.responseHandler.awaitAndAssert(); +*/ + } + + private void stopBrokerAndAwait() throws Exception { + logger.debug("Stopping ActiveMQ broker and will await shutdown"); + if (!this.activeMQBroker.isStarted()) { + logger.debug("Broker not running"); + return; + } + final CountDownLatch latch = new CountDownLatch(1); + this.activeMQBroker.addShutdownHook(new Runnable() { + public void run() { + latch.countDown(); + } }); + this.activeMQBroker.stop(); + assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS)); + logger.debug("Broker stopped"); + } + + + /** + * Handles messages by matching them to expectations including a latch to wait for + * the completion of expected messages. + */ + private static class ExpectationMatchingMessageHandler implements MessageHandler { + + private final List expected; + + private final List actual = new CopyOnWriteArrayList<>(); - this.relay.handleMessage(createConnectMessage(sessionId)); + private final List> unexpected = new CopyOnWriteArrayList<>(); - this.stompBroker.awaitMessages(2); + private CountDownLatch latch = new CountDownLatch(1); - List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); - this.stompBroker.stop(); + public ExpectationMatchingMessageHandler(MessageExchange... expected) { + this.expected = new CopyOnWriteArrayList<>(expected); + } + + + public void expect(MessageExchange... expected) { + this.expected.addAll(Arrays.asList(expected)); + } - this.relay.handleMessage(createSubscribeMessage(sessionId, "/topic/test/")); + public void awaitAndAssert() throws InterruptedException { + boolean result = this.latch.await(5000, TimeUnit.MILLISECONDS); + assertTrue(getAsString(), result && this.unexpected.isEmpty()); + } + + public void reset() { + this.latch = new CountDownLatch(1); + this.expected.clear(); + this.actual.clear(); + this.unexpected.clear(); + } - errorLatch.await(30, TimeUnit.SECONDS); + @Override + public void handleMessage(Message message) throws MessagingException { + for (MessageExchange exch : this.expected) { + if (exch.matchMessage(message)) { + if (exch.isDone()) { + this.expected.remove(exch); + this.actual.add(exch); + if (this.expected.isEmpty()) { + this.latch.countDown(); + } + } + return; + } + } + this.unexpected.add(message); + } - availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); - assertFalse(availabilityEvents.get(1).isBrokerAvailable()); + public String getAsString() { + StringBuilder sb = new StringBuilder("\n"); + sb.append("INCOMPLETE:\n").append(this.expected).append("\n"); + sb.append("COMPLETE:\n").append(this.actual).append("\n"); + sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n"); + return sb.toString(); + } } - @Test - public void relayReconnectsIfTheBrokerComesBackUp() throws InterruptedException { - List availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(1); - assertTrue(availabilityEvents.get(0).isBrokerAvailable()); + /** + * Holds a message as well as expected and actual messages matched against expectations. + */ + private static class MessageExchange { + + private final Message message; - List> messages = this.stompBroker.awaitMessages(1); - assertEquals(1, messages.size()); - assertStompCommand(messages.get(0), StompCommand.CONNECT); + private final MessageMatcher[] expected; - this.stompBroker.stop(); + private final Message[] actual; - this.relay.handleMessage(createSendMessage(null, "/topic/test", "test")); - availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(2); - assertFalse(availabilityEvents.get(1).isBrokerAvailable()); + public MessageExchange(Message message, MessageMatcher... expected) { + this.message = message; + this.expected = expected; + this.actual = new Message[expected.length]; + } - this.relay.handleMessage(createSendMessage(null, "/topic/test", "test-again")); - this.stompBroker.start(); + public boolean isDone() { + for (int i=0 ; i < actual.length; i++) { + if (actual[i] == null) { + return false; + } + } + return true; + } - messages = this.stompBroker.awaitMessages(3); - assertEquals(3, messages.size()); - assertStompCommand(messages.get(1), StompCommand.CONNECT); - assertStompCommandAndPayload(messages.get(2), StompCommand.SEND, "test-again"); + public boolean matchMessage(Message message) { + for (int i=0 ; i < this.expected.length; i++) { + if (this.expected[i].match(message)) { + this.actual[i] = message; + return true; + } + } + return false; + } - availabilityEvents = this.brokerAvailabilityListener.awaitAvailabilityEvents(3); - assertTrue(availabilityEvents.get(2).isBrokerAvailable()); + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Forwarded message:\n").append(this.message).append("\n"); + sb.append("Should receive back:\n").append(Arrays.toString(this.expected)).append("\n"); + sb.append("Actually received:\n").append(Arrays.toString(this.actual)).append("\n"); + return sb.toString(); + } } - private Message createConnectMessage(String sessionId) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setSessionId(sessionId); - return MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - } + private static class MessageExchangeBuilder { - private Message createSubscribeMessage(String sessionId, String destination) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); - headers.setSessionId(sessionId); - headers.setDestination(destination); - headers.setNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER, sessionId); + private final Message message; - return MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + private final StompHeaderAccessor headers; + + private final List expected = new ArrayList<>(); + + + private MessageExchangeBuilder(Message message) { + this.message = message; + this.headers = StompHeaderAccessor.wrap(message); + } + + + public static MessageExchangeBuilder connect(String sessionId) { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + return new MessageExchangeBuilder(message); + } + + public static MessageExchangeBuilder subscribe(String sessionId, String subscriptionId, String destination) { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); + headers.setSessionId(sessionId); + headers.setSubscriptionId(subscriptionId); + headers.setDestination(destination); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + return new MessageExchangeBuilder(message); + } + + public static MessageExchangeBuilder subscribeWithReceipt(String sessionId, String subscriptionId, + String destination, String receiptId) { + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE); + headers.setSessionId(sessionId); + headers.setSubscriptionId(subscriptionId); + headers.setDestination(destination); + headers.setReceipt(receiptId); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + + MessageExchangeBuilder builder = new MessageExchangeBuilder(message); + builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId)); + return builder; + } + + public static MessageExchangeBuilder send(String destination, String payload) { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setDestination(destination); + Message message = MessageBuilder.withPayloadAndHeaders(payload.getBytes(UTF_8), headers).build(); + return new MessageExchangeBuilder(message); + } + + public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) { + Assert.isTrue(StompCommand.SEND.equals(headers.getCommand()), "MESSAGE can only be expected after SEND"); + String destination = this.headers.getDestination(); + Object payload = this.message.getPayload(); + this.expected.add(new StompMessageFrameMessageMatcher(sessionId, subscriptionId, destination, payload)); + return this; + } + + public MessageExchangeBuilder andExpectError() { + String sessionId = this.headers.getSessionId(); + Assert.notNull(sessionId, "No sessionId to match the ERROR frame to"); + return andExpectError(sessionId); + } + + public MessageExchangeBuilder andExpectError(String sessionId) { + this.expected.add(new StompFrameMessageMatcher(StompCommand.ERROR, sessionId)); + return this; + } + + public MessageExchange build() { + return new MessageExchange(this.message, this.expected.toArray(new MessageMatcher[this.expected.size()])); + } } - private Message createSendMessage(String sessionId, String destination, String payload) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - headers.setSessionId(sessionId); - headers.setDestination(destination); + private static interface MessageMatcher { + + boolean match(Message message); - return MessageBuilder.withPayloadAndHeaders(payload.getBytes(), headers).build(); } - private void assertStompCommand(Message message, StompCommand expectedCommand) { - assertEquals(expectedCommand, StompHeaderAccessor.wrap(message).getCommand()); + private static class StompFrameMessageMatcher implements MessageMatcher { + + private final StompCommand command; + + private final String sessionId; + + + public StompFrameMessageMatcher(StompCommand command, String sessionId) { + this.command = command; + this.sessionId = sessionId; + } + + + @Override + public final boolean match(Message message) { + StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); + if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) { + return false; + } + return matchInternal(headers, message.getPayload()); + } + + protected boolean matchInternal(StompHeaderAccessor headers, Object payload) { + return true; + } + + @Override + public String toString() { + return "command=" + this.command + ", session=\"" + this.sessionId + "\""; + } } - private void assertStompCommandAndPayload(Message message, StompCommand expectedCommand, - String expectedPayload) { - assertStompCommand(message, expectedCommand); - assertEquals(expectedPayload, new String(((byte[])message.getPayload()))); + private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher { + + private final String receiptId; + + public StompReceiptFrameMessageMatcher(String sessionId, String receipt) { + super(StompCommand.RECEIPT, sessionId); + this.receiptId = receipt; + } + + @Override + protected boolean matchInternal(StompHeaderAccessor headers, Object payload) { + return (this.receiptId.equals(headers.getReceiptId())); + } + + @Override + public String toString() { + return super.toString() + ", receiptId=\"" + this.receiptId + "\""; + } } + private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher { - @Configuration - public static class TestConfiguration { + private final String subscriptionId; - @Bean - public MessageChannel messageChannel() { - return new ExecutorSubscribableChannel(); + private final String destination; + + private final Object payload; + + + public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) { + super(StompCommand.MESSAGE, sessionId); + this.subscriptionId = subscriptionId; + this.destination = destination; + this.payload = payload; } - @Bean - public StompBrokerRelayMessageHandler relay() { - StompBrokerRelayMessageHandler relay = - new StompBrokerRelayMessageHandler(messageChannel(), Arrays.asList("/queue/", "/topic/")); - relay.setRelayPort(SocketUtils.findAvailableTcpPort()); - return relay; + @Override + protected boolean matchInternal(StompHeaderAccessor headers, Object payload) { + if (!this.subscriptionId.equals(headers.getSubscriptionId()) || !this.destination.equals(headers.getDestination())) { + return false; + } + if (payload instanceof byte[] && this.payload instanceof byte[]) { + return Arrays.equals((byte[]) payload, (byte[]) this.payload); + } + else { + return this.payload.equals(payload); + } } - @Bean - public TestStompBroker broker() throws IOException { - TestStompBroker broker = new TestStompBroker(relay().getRelayPort()); - return broker; + @Override + public String toString() { + return super.toString() + ", subscriptionId=\"" + this.subscriptionId + + "\", destination=\"" + this.destination + "\", payload=\"" + getPayloadAsText() + "\""; } - @Bean - public BrokerAvailabilityListener availabilityListener() { - return new BrokerAvailabilityListener(); + protected String getPayloadAsText() { + return (this.payload instanceof byte[]) + ? new String((byte[]) this.payload, UTF_8) : payload.toString(); } } - private static class BrokerAvailabilityListener implements ApplicationListener { + private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher { - private final List availabilityEvents = new ArrayList(); + private final List expected = new CopyOnWriteArrayList<>(); - private final Object monitor = new Object(); + private final List actual = new CopyOnWriteArrayList<>(); - @Override - public void onApplicationEvent(BrokerAvailabilityEvent event) { - synchronized (this.monitor) { - this.availabilityEvents.add(event); - this.monitor.notifyAll(); + private CountDownLatch latch = new CountDownLatch(1); + + + public void expect(Boolean... expected) { + this.expected.addAll(Arrays.asList(expected)); + } + + public void awaitAndAssert() throws InterruptedException { + if (this.expected.size() == this.actual.size()) { + assertEquals(this.expected, this.actual); + } + else { + assertTrue("Expected=" + this.expected + ", actual=" + this.actual, + this.latch.await(5, TimeUnit.SECONDS)); } } - private List awaitAvailabilityEvents(int eventCount) throws InterruptedException { - synchronized (this.monitor) { - while (this.availabilityEvents.size() < eventCount) { - this.monitor.wait(); + @Override + public void publishEvent(ApplicationEvent event) { + if (event instanceof BrokerAvailabilityEvent) { + this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable()); + if (this.actual.size() == this.expected.size()) { + this.latch.countDown(); } - return new ArrayList(this.availabilityEvents); } } } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java deleted file mode 100644 index 305e8937c33..00000000000 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/TestStompBroker.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.stomp; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.springframework.context.SmartLifecycle; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.StringUtils; - -import reactor.core.Environment; -import reactor.function.Consumer; -import reactor.tcp.TcpConnection; -import reactor.tcp.TcpServer; -import reactor.tcp.encoding.DelimitedCodec; -import reactor.tcp.encoding.StandardCodecs; -import reactor.tcp.netty.NettyTcpServer; -import reactor.tcp.spec.TcpServerSpec; - -/** - * @author Andy Wilkinson - */ -class TestStompBroker implements SmartLifecycle { - - private final StompMessageConverter messageConverter = new StompMessageConverter(); - - private final List> messages = new ArrayList>(); - - private final Object messageMonitor = new Object(); - - private final Object subscriberMonitor = new Object(); - - private final Map> subscribers = new HashMap>(); - - private final AtomicLong messageIdCounter = new AtomicLong(); - - private final int port; - - private volatile Environment environment; - - private volatile TcpServer tcpServer; - - private volatile boolean running; - - TestStompBroker(int port) { - this.port = port; - } - - public void start() { - this.environment = new Environment(); - - this.tcpServer = new TcpServerSpec(NettyTcpServer.class) - .env(this.environment) - .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) - .listen(port) - .consume(new Consumer>() { - - @Override - public void accept(final TcpConnection connection) { - connection.consume(new Consumer() { - @Override - public void accept(String stompFrame) { - if (!StringUtils.isEmpty(stompFrame)) { - handleMessage(messageConverter.toMessage(stompFrame), connection); - } - } - }); - } - }) - .get(); - - this.tcpServer.start(); - this.running = true; - } - - public void stop() { - try { - this.tcpServer.shutdown().await(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - this.running = false; - } - - private void handleMessage(Message message, TcpConnection connection) { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (headers.getCommand() == StompCommand.CONNECT) { - StompHeaderAccessor responseHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); - MessageBuilder response = MessageBuilder.withPayloadAndHeaders(new byte[0], responseHeaders); - connection.send(new String(messageConverter.fromMessage(response.build()))); - } - else if (headers.getCommand() == StompCommand.SUBSCRIBE) { - String destination = headers.getDestination(); - synchronized (this.subscriberMonitor) { - Set subscribers = this.subscribers.get(destination); - if (subscribers == null) { - subscribers = new HashSet(); - this.subscribers.put(destination, subscribers); - } - String subscriptionId = headers.getFirstNativeHeader(StompHeaderAccessor.STOMP_ID_HEADER); - subscribers.add(new Subscription(subscriptionId, connection)); - } - } - else if (headers.getCommand() == StompCommand.SEND) { - String destination = headers.getDestination(); - synchronized (this.subscriberMonitor) { - Set subscriptions = this.subscribers.get(destination); - if (subscriptions != null) { - for (Subscription subscription: subscriptions) { - StompHeaderAccessor outboundHeaders = StompHeaderAccessor.create(StompCommand.MESSAGE); - outboundHeaders.setSubscriptionId(subscription.subscriptionId); - outboundHeaders.setMessageId(Long.toString(messageIdCounter.incrementAndGet())); - Message outbound = - MessageBuilder.withPayloadAndHeaders(message.getPayload(), outboundHeaders).build(); - subscription.tcpConnection.send(new String(this.messageConverter.fromMessage(outbound))); - } - } - } - } - addMessage(message); - } - - private void addMessage(Message message) { - synchronized (this.messageMonitor) { - this.messages.add(message); - this.messageMonitor.notifyAll(); - } - } - - public List> awaitMessages(int messageCount) throws InterruptedException { - synchronized (this.messageMonitor) { - while (this.messages.size() < messageCount) { - this.messageMonitor.wait(); - } - return this.messages; - } - } - - private static final class Subscription { - - private final String subscriptionId; - - private final TcpConnection tcpConnection; - - public Subscription(String subscriptionId, TcpConnection tcpConnection) { - this.subscriptionId = subscriptionId; - this.tcpConnection = tcpConnection; - } - - } - - @Override - public boolean isRunning() { - return this.running; - } - - @Override - public int getPhase() { - return Integer.MIN_VALUE; - } - - @Override - public boolean isAutoStartup() { - return true; - } - - @Override - public void stop(Runnable callback) { - this.stop(); - callback.run(); - } -} diff --git a/spring-messaging/src/test/resources/log4j.xml b/spring-messaging/src/test/resources/log4j.xml index 42ef9b7a73d..79762741be0 100644 --- a/spring-messaging/src/test/resources/log4j.xml +++ b/spring-messaging/src/test/resources/log4j.xml @@ -15,6 +15,10 @@ + + + +