Browse Source

Make changes for timing related test failures

pull/377/head
Rossen Stoyanchev 12 years ago
parent
commit
bfa6645c7d
  1. 6
      spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java
  2. 85
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

6
spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java

@ -45,4 +45,10 @@ public class BrokerAvailabilityEvent extends ApplicationEvent {
public boolean isBrokerAvailable() { public boolean isBrokerAvailable() {
return this.brokerAvailable; return this.brokerAvailable;
} }
@Override
public String toString() {
return "BrokerAvailabilityEvent=" + this.brokerAvailable;
}
} }

85
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -68,25 +68,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private int port; private int port;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
this.port = SocketUtils.findAvailableTcpPort(61613); this.port = SocketUtils.findAvailableTcpPort(61613);
createAndStartBroker();
this.responseChannel = new ExecutorSubscribableChannel(); this.responseChannel = new ExecutorSubscribableChannel();
this.responseHandler = new ExpectationMatchingMessageHandler(); this.responseHandler = new ExpectationMatchingMessageHandler();
this.responseChannel.subscribe(this.responseHandler); this.responseChannel.subscribe(this.responseHandler);
this.eventPublisher = new ExpectationMatchingEventPublisher(); this.eventPublisher = new ExpectationMatchingEventPublisher();
startActiveMqBroker();
createAndStartRelay(); createAndStartRelay();
} }
private void createAndStartBroker() throws Exception { private void startActiveMqBroker() throws Exception {
this.activeMQBroker = new BrokerService(); this.activeMQBroker = new BrokerService();
this.activeMQBroker.addConnector("stomp://localhost:" + port); this.activeMQBroker.addConnector("stomp://localhost:" + this.port);
this.activeMQBroker.setStartAsync(false); this.activeMQBroker.setStartAsync(false);
this.activeMQBroker.setDeleteAllMessagesOnStartup(true); this.activeMQBroker.setDeleteAllMessagesOnStartup(true);
this.activeMQBroker.start(); this.activeMQBroker.start();
@ -94,7 +93,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private void createAndStartRelay() throws InterruptedException { private void createAndStartRelay() throws InterruptedException {
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/")); this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay.setRelayPort(port); this.relay.setRelayPort(this.port);
this.relay.setApplicationEventPublisher(this.eventPublisher); this.relay.setApplicationEventPublisher(this.eventPublisher);
this.relay.setSystemHeartbeatReceiveInterval(0); this.relay.setSystemHeartbeatReceiveInterval(0);
this.relay.setSystemHeartbeatSendInterval(0); this.relay.setSystemHeartbeatSendInterval(0);
@ -110,10 +109,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.relay.stop(); this.relay.stop();
} }
finally { finally {
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
} }
} }
private void stopActiveMqBrokerAndAwait() throws Exception {
logger.debug("Stopping ActiveMQ broker and will await shutdown");
if (!this.activeMQBroker.isStarted()) {
logger.debug("Broker not running");
return;
}
final CountDownLatch latch = new CountDownLatch(1);
this.activeMQBroker.addShutdownHook(new Runnable() {
public void run() {
latch.countDown();
}
});
this.activeMQBroker.stop();
assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
logger.debug("Broker stopped");
}
// When TCP client is behind interface and configurable: // When TCP client is behind interface and configurable:
// test "host" header (virtualHost property) // test "host" header (virtualHost property)
// test "/user/.." destination is excluded // test "/user/.." destination is excluded
@ -122,23 +139,22 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
public void publishSubscribe() throws Exception { public void publishSubscribe() throws Exception {
String sess1 = "sess1"; String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.relay.handleMessage(conn1.message);
this.responseHandler.expect(conn1);
String sess2 = "sess2"; String sess2 = "sess2";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
this.relay.handleMessage(conn2.message); this.responseHandler.expect(conn1, conn2);
this.responseHandler.expect(conn2);
this.relay.handleMessage(conn1.message);
this.relay.handleMessage(conn2.message);
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
String subs1 = "subs1"; String subs1 = "subs1";
String destination = "/topic/test"; String destination = "/topic/test";
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
this.relay.handleMessage(subscribe.message);
this.responseHandler.expect(subscribe); this.responseHandler.expect(subscribe);
this.relay.handleMessage(subscribe.message);
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
@ -151,7 +167,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test @Test
public void brokerUnvailableErrorFrameOnConnect() throws Exception { public void brokerUnvailableErrorFrameOnConnect() throws Exception {
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build();
this.responseHandler.expect(connect); this.responseHandler.expect(connect);
@ -162,7 +178,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test(expected=MessageDeliveryException.class) @Test(expected=MessageDeliveryException.class)
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build()); this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build());
} }
@ -175,12 +191,11 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.expect(connect); this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message); this.relay.handleMessage(connect.message);
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
} }
@ -188,7 +203,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test @Test
public void brokerAvailabilityEventWhenStopped() throws Exception { public void brokerAvailabilityEventWhenStopped() throws Exception {
this.eventPublisher.expectAvailabilityStatusChanges(false); this.eventPublisher.expectAvailabilityStatusChanges(false);
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
this.eventPublisher.awaitAndAssert(); this.eventPublisher.awaitAndAssert();
} }
@ -198,6 +213,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1"; String sess1 = "sess1";
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
this.responseHandler.expect(conn1); this.responseHandler.expect(conn1);
this.relay.handleMessage(conn1.message); this.relay.handleMessage(conn1.message);
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
@ -212,7 +228,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
stopBrokerAndAwait(); stopActiveMqBrokerAndAwait();
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
@ -220,7 +236,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.eventPublisher.awaitAndAssert(); this.eventPublisher.awaitAndAssert();
this.eventPublisher.expectAvailabilityStatusChanges(true); this.eventPublisher.expectAvailabilityStatusChanges(true);
createAndStartBroker(); startActiveMqBroker();
this.eventPublisher.awaitAndAssert(); this.eventPublisher.awaitAndAssert();
// TODO The event publisher assertions show that the broker's back up and the system relay session // TODO The event publisher assertions show that the broker's back up and the system relay session
@ -231,14 +247,15 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test @Test
public void disconnectClosesRelaySessionCleanly() throws Exception { public void disconnectClosesRelaySessionCleanly() throws Exception {
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
this.responseHandler.expect(connect); this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message); this.relay.handleMessage(connect.message);
this.responseHandler.awaitAndAssert(); this.responseHandler.awaitAndAssert();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.setSessionId("sess1"); headers.setSessionId("sess1");
this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
Thread.sleep(2000); Thread.sleep(2000);
@ -248,24 +265,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
} }
private void stopBrokerAndAwait() throws Exception {
logger.debug("Stopping ActiveMQ broker and will await shutdown");
if (!this.activeMQBroker.isStarted()) {
logger.debug("Broker not running");
return;
}
final CountDownLatch latch = new CountDownLatch(1);
this.activeMQBroker.addShutdownHook(new Runnable() {
public void run() {
latch.countDown();
}
});
this.activeMQBroker.stop();
assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
logger.debug("Broker stopped");
}
/** /**
* Handles messages by matching them to expectations including a latch to wait for * Handles messages by matching them to expectations including a latch to wait for
* the completion of expected messages. * the completion of expected messages.
@ -408,6 +407,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId); headers.setSessionId(sessionId);
headers.setAcceptVersion("1.1,1.2"); headers.setAcceptVersion("1.1,1.2");
headers.setHeartbeat(0, 0);
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
MessageExchangeBuilder builder = new MessageExchangeBuilder(message); MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
@ -595,8 +595,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
public void awaitAndAssert() throws InterruptedException { public void awaitAndAssert() throws InterruptedException {
synchronized(this.monitor) { synchronized(this.monitor) {
long endTime = System.currentTimeMillis() + 6000; long endTime = System.currentTimeMillis() + 10000;
while (this.expected.size() != this.actual.size() && System.currentTimeMillis() < endTime) { while ((this.expected.size() != this.actual.size()) && (System.currentTimeMillis() < endTime)) {
this.monitor.wait(500); this.monitor.wait(500);
} }
assertEquals(this.expected, this.actual); assertEquals(this.expected, this.actual);
@ -605,6 +605,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Override @Override
public void publishEvent(ApplicationEvent event) { public void publishEvent(ApplicationEvent event) {
logger.debug("Processing ApplicationEvent " + event);
if (event instanceof BrokerAvailabilityEvent) { if (event instanceof BrokerAvailabilityEvent) {
synchronized(this.monitor) { synchronized(this.monitor) {
this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable()); this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());

Loading…
Cancel
Save