From 01fe2923ee427cfc940d5910f626c7d27dcd1aba Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 29 Apr 2014 13:19:31 -0400 Subject: [PATCH] Simplify STOMP broker relay integration test This change simplifies the implementation of the "test" EventPublisher and MessageHandler used in the STOMP broker relay integration tests. The updated implementations use a time-limted poll on a BlockingQueue. --- ...erRelayMessageHandlerIntegrationTests.java | 208 +++++------------- .../src/test/resources/log4j.properties | 2 +- 2 files changed, 52 insertions(+), 158 deletions(-) diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index e2f2ad35a34..bdcd2b432c6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -20,8 +20,9 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.BrokerService; @@ -65,9 +66,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private ExecutorSubscribableChannel responseChannel; - private ExpectationMatchingMessageHandler responseHandler; + private TestMessageHandler responseHandler; - private ExpectationMatchingEventPublisher eventPublisher; + private TestEventPublisher eventPublisher; private int port; @@ -78,9 +79,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.port = SocketUtils.findAvailableTcpPort(61613); this.responseChannel = new ExecutorSubscribableChannel(); - this.responseHandler = new ExpectationMatchingMessageHandler(); + this.responseHandler = new TestMessageHandler(); this.responseChannel.subscribe(this.responseHandler); - this.eventPublisher = new ExpectationMatchingEventPublisher(); + this.eventPublisher = new TestEventPublisher(); startActiveMqBroker(); createAndStartRelay(); @@ -104,9 +105,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.relay.setSystemHeartbeatReceiveInterval(0); this.relay.setSystemHeartbeatSendInterval(0); - this.eventPublisher.expectAvailabilityStatusChanges(true); this.relay.start(); - this.eventPublisher.awaitAndAssert(); + this.eventPublisher.expectBrokerAvailabilityEvent(true); } @After @@ -141,32 +141,26 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { String sess1 = "sess1"; String sess2 = "sess2"; + String subs1 = "subs1"; + String destination = "/topic/test"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); - this.responseHandler.expect(conn1, conn2); - this.relay.handleMessage(conn1.message); this.relay.handleMessage(conn2.message); - this.responseHandler.awaitAndAssert(); - - String subs1 = "subs1"; - String destination = "/topic/test"; + this.responseHandler.expectMessages(conn1, conn2); MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); - this.responseHandler.expect(subscribe); - this.relay.handleMessage(subscribe.message); - this.responseHandler.awaitAndAssert(); + this.responseHandler.expectMessages(subscribe); MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); - this.responseHandler.expect(send); - this.relay.handleMessage(send.message); - this.responseHandler.awaitAndAssert(); + this.responseHandler.expectMessages(send); } @Test(expected=MessageDeliveryException.class) - public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { + public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception { stopActiveMqBrokerAndAwait(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders())); @@ -177,23 +171,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { String sess1 = "sess1"; MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(connect); - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); - - this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); + this.responseHandler.expectMessages(connect); + MessageExchange error = MessageExchangeBuilder.error(sess1).build(); stopActiveMqBrokerAndAwait(); - - this.responseHandler.awaitAndAssert(); + this.responseHandler.expectMessages(error); } @Test public void brokerAvailabilityEventWhenStopped() throws Exception { - this.eventPublisher.expectAvailabilityStatusChanges(false); stopActiveMqBrokerAndAwait(); - this.eventPublisher.awaitAndAssert(); + this.eventPublisher.expectBrokerAvailabilityEvent(false); } @Test @@ -201,32 +190,23 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { String sess1 = "sess1"; MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(conn1); - this.relay.handleMessage(conn1.message); - this.responseHandler.awaitAndAssert(); + this.responseHandler.expectMessages(conn1); String subs1 = "subs1"; String destination = "/topic/test"; - MessageExchange subscribe = - MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); - this.responseHandler.expect(subscribe); - + MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); this.relay.handleMessage(subscribe.message); - this.responseHandler.awaitAndAssert(); - - this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); + this.responseHandler.expectMessages(subscribe); + MessageExchange error = MessageExchangeBuilder.error(sess1).build(); stopActiveMqBrokerAndAwait(); + this.responseHandler.expectMessages(error); - this.responseHandler.awaitAndAssert(); + this.eventPublisher.expectBrokerAvailabilityEvent(false); - this.eventPublisher.expectAvailabilityStatusChanges(false); - this.eventPublisher.awaitAndAssert(); - - this.eventPublisher.expectAvailabilityStatusChanges(true); startActiveMqBroker(); - this.eventPublisher.awaitAndAssert(); + this.eventPublisher.expectBrokerAvailabilityEvent(true); // TODO The event publisher assertions show that the broker's back up and the system relay session // has reconnected. We need to decide what we want the reconnect behaviour to be for client relay @@ -238,10 +218,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { public void disconnectClosesRelaySessionCleanly() throws Exception { MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); - this.responseHandler.expect(connect); - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); + this.responseHandler.expectMessages(connect); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.setSessionId("sess1"); @@ -250,79 +228,45 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { Thread.sleep(2000); // Check that we have not received an ERROR as a result of the connection closing - this.responseHandler.awaitAndAssert(); + assertTrue("Unexpected messages: " + this.responseHandler.queue, this.responseHandler.queue.isEmpty()); } - /** - * Handles messages by matching them to expectations including a latch to wait for - * the completion of expected messages. - */ - private static class ExpectationMatchingMessageHandler implements MessageHandler { - - private final Object monitor = new Object(); - - private final List expected; - - private final List actual = new ArrayList<>(); - - private final List> unexpected = new ArrayList<>(); + private static class TestEventPublisher implements ApplicationEventPublisher { + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); - public ExpectationMatchingMessageHandler(MessageExchange... expected) { - synchronized (this.monitor) { - this.expected = new CopyOnWriteArrayList<>(expected); + @Override + public void publishEvent(ApplicationEvent event) { + logger.debug("Processing ApplicationEvent " + event); + if (event instanceof BrokerAvailabilityEvent) { + this.eventQueue.add((BrokerAvailabilityEvent) event); } } - public void expect(MessageExchange... expected) { - synchronized (this.monitor) { - this.expected.addAll(Arrays.asList(expected)); - } + public void expectBrokerAvailabilityEvent(boolean isBrokerAvailable) throws InterruptedException { + BrokerAvailabilityEvent event = this.eventQueue.poll(10000, TimeUnit.MILLISECONDS); + assertEquals(isBrokerAvailable, event.isBrokerAvailable()); } + } - public void awaitAndAssert() throws InterruptedException { - long endTime = System.currentTimeMillis() + 10000; - synchronized (this.monitor) { - while (!this.expected.isEmpty() && System.currentTimeMillis() < endTime) { - this.monitor.wait(500); - } - boolean result = this.expected.isEmpty(); - assertTrue(getAsString(), result && this.unexpected.isEmpty()); - } - } + private static class TestMessageHandler implements MessageHandler { + + private final BlockingQueue> queue = new LinkedBlockingQueue<>(); @Override public void handleMessage(Message message) throws MessagingException { - if (StompHeaderAccessor.wrap(message).getMessageType() != SimpMessageType.HEARTBEAT) { - synchronized(this.monitor) { - for (MessageExchange exch : this.expected) { - if (exch.matchMessage(message)) { - if (exch.isDone()) { - this.expected.remove(exch); - this.actual.add(exch); - if (this.expected.isEmpty()) { - this.monitor.notifyAll(); - } - } - return; - } - } - this.unexpected.add(message); - } + if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(message.getHeaders())) { + return; } + this.queue.add(message); } - public String getAsString() { - StringBuilder sb = new StringBuilder("\n"); - - synchronized (this.monitor) { - sb.append("UNMATCHED EXPECTATIONS:\n").append(this.expected).append("\n"); - sb.append("MATCHED EXPECTATIONS:\n").append(this.actual).append("\n"); - sb.append("UNEXPECTED MESSAGES:\n").append(this.unexpected).append("\n"); + public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { + for (MessageExchange exchange : messageExchanges) { + Message message = this.queue.poll(10000, TimeUnit.MILLISECONDS); + assertTrue("Expected: " + exchange + " but got: " + message, exchange.matchMessage(message)); } - - return sb.toString(); } } @@ -343,15 +287,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.actual = new Message[expected.length]; } - public boolean isDone() { - for (int i=0 ; i < actual.length; i++) { - if (actual[i] == null) { - return false; - } - } - return true; - } - public boolean matchMessage(Message message) { for (int i=0 ; i < this.expected.length; i++) { if (this.expected[i].match(message)) { @@ -364,11 +299,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Forwarded message:\n").append(this.message).append("\n"); - sb.append("Should receive back:\n").append(Arrays.toString(this.expected)).append("\n"); - sb.append("Actually received:\n").append(Arrays.toString(this.actual)).append("\n"); - return sb.toString(); + return "Forwarded message:\n" + this.message + "\n" + + "Should receive back:\n" + Arrays.toString(this.expected) + "\n" + + "Actually received:\n" + Arrays.toString(this.actual) + "\n"; } } @@ -565,43 +498,4 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } - private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher { - - private final List expected = new ArrayList<>(); - - private final List actual = new ArrayList<>(); - - private final Object monitor = new Object(); - - - public void expectAvailabilityStatusChanges(Boolean... expected) { - synchronized (this.monitor) { - this.expected.addAll(Arrays.asList(expected)); - } - } - - public void awaitAndAssert() throws InterruptedException { - synchronized(this.monitor) { - long endTime = System.currentTimeMillis() + 60000; - while ((this.expected.size() != this.actual.size()) && (System.currentTimeMillis() < endTime)) { - this.monitor.wait(500); - } - assertEquals(this.expected, this.actual); - } - } - - @Override - public void publishEvent(ApplicationEvent event) { - logger.debug("Processing ApplicationEvent " + event); - if (event instanceof BrokerAvailabilityEvent) { - synchronized(this.monitor) { - this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable()); - if (this.actual.size() == this.expected.size()) { - this.monitor.notifyAll(); - } - } - } - } - } - } diff --git a/spring-messaging/src/test/resources/log4j.properties b/spring-messaging/src/test/resources/log4j.properties index 7581708c28d..9108ba56059 100644 --- a/spring-messaging/src/test/resources/log4j.properties +++ b/spring-messaging/src/test/resources/log4j.properties @@ -8,7 +8,7 @@ log4j.logger.org.springframework.web=DEBUG log4j.logger.org.apache.activemq=TRACE # Enable TRACE level to chase integration test issues on CI servers log4j.logger.org.springframework.messaging.simp.stomp=TRACE -log4j.logger.reactor.tcp=TRACE +log4j.logger.reactor.net=TRACE log4j.logger.io.netty=TRACE