diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java index e2df82f77a4..119021a883c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java @@ -45,4 +45,10 @@ public class BrokerAvailabilityEvent extends ApplicationEvent { public boolean isBrokerAvailable() { return this.brokerAvailable; } + + @Override + public String toString() { + return "BrokerAvailabilityEvent=" + this.brokerAvailable; + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 5b531505ae0..736f5d06866 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -68,25 +68,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private int port; + @Before public void setUp() throws Exception { this.port = SocketUtils.findAvailableTcpPort(61613); - createAndStartBroker(); - this.responseChannel = new ExecutorSubscribableChannel(); this.responseHandler = new ExpectationMatchingMessageHandler(); this.responseChannel.subscribe(this.responseHandler); - this.eventPublisher = new ExpectationMatchingEventPublisher(); + startActiveMqBroker(); createAndStartRelay(); } - private void createAndStartBroker() throws Exception { + private void startActiveMqBroker() throws Exception { this.activeMQBroker = new BrokerService(); - this.activeMQBroker.addConnector("stomp://localhost:" + port); + this.activeMQBroker.addConnector("stomp://localhost:" + this.port); this.activeMQBroker.setStartAsync(false); this.activeMQBroker.setDeleteAllMessagesOnStartup(true); this.activeMQBroker.start(); @@ -94,7 +93,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private void createAndStartRelay() throws InterruptedException { this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/")); - this.relay.setRelayPort(port); + this.relay.setRelayPort(this.port); this.relay.setApplicationEventPublisher(this.eventPublisher); this.relay.setSystemHeartbeatReceiveInterval(0); this.relay.setSystemHeartbeatSendInterval(0); @@ -110,10 +109,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.relay.stop(); } finally { - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); } } + private void stopActiveMqBrokerAndAwait() throws Exception { + logger.debug("Stopping ActiveMQ broker and will await shutdown"); + if (!this.activeMQBroker.isStarted()) { + logger.debug("Broker not running"); + return; + } + final CountDownLatch latch = new CountDownLatch(1); + this.activeMQBroker.addShutdownHook(new Runnable() { + public void run() { + latch.countDown(); + } + }); + this.activeMQBroker.stop(); + assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS)); + logger.debug("Broker stopped"); + } + + // When TCP client is behind interface and configurable: // test "host" header (virtualHost property) // test "/user/.." destination is excluded @@ -122,23 +139,22 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { public void publishSubscribe() throws Exception { String sess1 = "sess1"; - MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.relay.handleMessage(conn1.message); - this.responseHandler.expect(conn1); - String sess2 = "sess2"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); - this.relay.handleMessage(conn2.message); - this.responseHandler.expect(conn2); + this.responseHandler.expect(conn1, conn2); + this.relay.handleMessage(conn1.message); + this.relay.handleMessage(conn2.message); this.responseHandler.awaitAndAssert(); String subs1 = "subs1"; String destination = "/topic/test"; MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); - this.relay.handleMessage(subscribe.message); this.responseHandler.expect(subscribe); + + this.relay.handleMessage(subscribe.message); this.responseHandler.awaitAndAssert(); MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); @@ -151,7 +167,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void brokerUnvailableErrorFrameOnConnect() throws Exception { - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); this.responseHandler.expect(connect); @@ -162,7 +178,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test(expected=MessageDeliveryException.class) public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build()); } @@ -175,12 +191,11 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.expect(connect); this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); this.responseHandler.awaitAndAssert(); } @@ -188,7 +203,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void brokerAvailabilityEventWhenStopped() throws Exception { this.eventPublisher.expectAvailabilityStatusChanges(false); - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); this.eventPublisher.awaitAndAssert(); } @@ -198,6 +213,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { String sess1 = "sess1"; MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); this.responseHandler.expect(conn1); + this.relay.handleMessage(conn1.message); this.responseHandler.awaitAndAssert(); @@ -212,7 +228,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); this.responseHandler.awaitAndAssert(); @@ -220,7 +236,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.eventPublisher.awaitAndAssert(); this.eventPublisher.expectAvailabilityStatusChanges(true); - createAndStartBroker(); + startActiveMqBroker(); this.eventPublisher.awaitAndAssert(); // TODO The event publisher assertions show that the broker's back up and the system relay session @@ -231,14 +247,15 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void disconnectClosesRelaySessionCleanly() throws Exception { + MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); this.responseHandler.expect(connect); + this.relay.handleMessage(connect.message); this.responseHandler.awaitAndAssert(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId("sess1"); - this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); Thread.sleep(2000); @@ -248,24 +265,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } - private void stopBrokerAndAwait() throws Exception { - logger.debug("Stopping ActiveMQ broker and will await shutdown"); - if (!this.activeMQBroker.isStarted()) { - logger.debug("Broker not running"); - return; - } - final CountDownLatch latch = new CountDownLatch(1); - this.activeMQBroker.addShutdownHook(new Runnable() { - public void run() { - latch.countDown(); - } - }); - this.activeMQBroker.stop(); - assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS)); - logger.debug("Broker stopped"); - } - - /** * Handles messages by matching them to expectations including a latch to wait for * the completion of expected messages. @@ -408,6 +407,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setSessionId(sessionId); headers.setAcceptVersion("1.1,1.2"); + headers.setHeartbeat(0, 0); Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); MessageExchangeBuilder builder = new MessageExchangeBuilder(message); @@ -595,8 +595,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { public void awaitAndAssert() throws InterruptedException { synchronized(this.monitor) { - long endTime = System.currentTimeMillis() + 6000; - while (this.expected.size() != this.actual.size() && System.currentTimeMillis() < endTime) { + long endTime = System.currentTimeMillis() + 10000; + while ((this.expected.size() != this.actual.size()) && (System.currentTimeMillis() < endTime)) { this.monitor.wait(500); } assertEquals(this.expected, this.actual); @@ -605,6 +605,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Override public void publishEvent(ApplicationEvent event) { + logger.debug("Processing ApplicationEvent " + event); if (event instanceof BrokerAvailabilityEvent) { synchronized(this.monitor) { this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());