@ -20,8 +20,9 @@ import java.nio.charset.Charset;
@@ -20,8 +20,9 @@ import java.nio.charset.Charset;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.List ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.LinkedBlockingQueue ;
import java.util.concurrent.TimeUnit ;
import org.apache.activemq.broker.BrokerService ;
@ -65,9 +66,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -65,9 +66,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private ExecutorSubscribableChannel responseChannel ;
private ExpectationMatching MessageHandler responseHandler ;
private Test MessageHandler responseHandler ;
private ExpectationMatching EventPublisher eventPublisher ;
private Test EventPublisher eventPublisher ;
private int port ;
@ -78,9 +79,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -78,9 +79,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this . port = SocketUtils . findAvailableTcpPort ( 61613 ) ;
this . responseChannel = new ExecutorSubscribableChannel ( ) ;
this . responseHandler = new ExpectationMatching MessageHandler( ) ;
this . responseHandler = new Test MessageHandler( ) ;
this . responseChannel . subscribe ( this . responseHandler ) ;
this . eventPublisher = new ExpectationMatching EventPublisher( ) ;
this . eventPublisher = new Test EventPublisher( ) ;
startActiveMqBroker ( ) ;
createAndStartRelay ( ) ;
@ -104,9 +105,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -104,9 +105,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this . relay . setSystemHeartbeatReceiveInterval ( 0 ) ;
this . relay . setSystemHeartbeatSendInterval ( 0 ) ;
this . eventPublisher . expectAvailabilityStatusChanges ( true ) ;
this . relay . start ( ) ;
this . eventPublisher . awaitAndAssert ( ) ;
this . eventPublisher . expectBrokerAvailabilityEvent ( true ) ;
}
@After
@ -141,32 +141,26 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -141,32 +141,26 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1" ;
String sess2 = "sess2" ;
String subs1 = "subs1" ;
String destination = "/topic/test" ;
MessageExchange conn1 = MessageExchangeBuilder . connect ( sess1 ) . build ( ) ;
MessageExchange conn2 = MessageExchangeBuilder . connect ( sess2 ) . build ( ) ;
this . responseHandler . expect ( conn1 , conn2 ) ;
this . relay . handleMessage ( conn1 . message ) ;
this . relay . handleMessage ( conn2 . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
String subs1 = "subs1" ;
String destination = "/topic/test" ;
this . responseHandler . expectMessages ( conn1 , conn2 ) ;
MessageExchange subscribe = MessageExchangeBuilder . subscribeWithReceipt ( sess1 , subs1 , destination , "r1" ) . build ( ) ;
this . responseHandler . expect ( subscribe ) ;
this . relay . handleMessage ( subscribe . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expectMessages ( subscribe ) ;
MessageExchange send = MessageExchangeBuilder . send ( destination , "foo" ) . andExpectMessage ( sess1 , subs1 ) . build ( ) ;
this . responseHandler . expect ( send ) ;
this . relay . handleMessage ( send . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expectMessages ( send ) ;
}
@Test ( expected = MessageDeliveryException . class )
public void messageDeliverExceptionIfSystemSessionForwardFails ( ) throws Exception {
public void messageDelivery ExceptionIfSystemSessionForwardFails ( ) throws Exception {
stopActiveMqBrokerAndAwait ( ) ;
StompHeaderAccessor headers = StompHeaderAccessor . create ( StompCommand . SEND ) ;
this . relay . handleMessage ( MessageBuilder . createMessage ( "test" . getBytes ( ) , headers . getMessageHeaders ( ) ) ) ;
@ -177,23 +171,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -177,23 +171,18 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1" ;
MessageExchange connect = MessageExchangeBuilder . connect ( sess1 ) . build ( ) ;
this . responseHandler . expect ( connect ) ;
this . relay . handleMessage ( connect . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expect ( MessageExchangeBuilder . error ( sess1 ) . build ( ) ) ;
this . responseHandler . expectMessages ( connect ) ;
MessageExchange error = MessageExchangeBuilder . error ( sess1 ) . build ( ) ;
stopActiveMqBrokerAndAwait ( ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expectMessages ( error ) ;
}
@Test
public void brokerAvailabilityEventWhenStopped ( ) throws Exception {
this . eventPublisher . expectAvailabilityStatusChanges ( false ) ;
stopActiveMqBrokerAndAwait ( ) ;
this . eventPublisher . awaitAndAssert ( ) ;
this . eventPublisher . expectBrokerAvailabilityEvent ( false ) ;
}
@Test
@ -201,32 +190,23 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -201,32 +190,23 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
String sess1 = "sess1" ;
MessageExchange conn1 = MessageExchangeBuilder . connect ( sess1 ) . build ( ) ;
this . responseHandler . expect ( conn1 ) ;
this . relay . handleMessage ( conn1 . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expectMessages ( conn1 ) ;
String subs1 = "subs1" ;
String destination = "/topic/test" ;
MessageExchange subscribe =
MessageExchangeBuilder . subscribeWithReceipt ( sess1 , subs1 , destination , "r1" ) . build ( ) ;
this . responseHandler . expect ( subscribe ) ;
MessageExchange subscribe = MessageExchangeBuilder . subscribeWithReceipt ( sess1 , subs1 , destination , "r1" ) . build ( ) ;
this . relay . handleMessage ( subscribe . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expect ( MessageExchangeBuilder . error ( sess1 ) . build ( ) ) ;
this . responseHandler . expectMessages ( subscribe ) ;
MessageExchange error = MessageExchangeBuilder . error ( sess1 ) . build ( ) ;
stopActiveMqBrokerAndAwait ( ) ;
this . responseHandler . expectMessages ( error ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . eventPublisher . expectBrokerAvailabilityEvent ( false ) ;
this . eventPublisher . expectAvailabilityStatusChanges ( false ) ;
this . eventPublisher . awaitAndAssert ( ) ;
this . eventPublisher . expectAvailabilityStatusChanges ( true ) ;
startActiveMqBroker ( ) ;
this . eventPublisher . awaitAndAssert ( ) ;
this . eventPublisher . expectBrokerAvailabilityEvent ( true ) ;
// TODO The event publisher assertions show that the broker's back up and the system relay session
// has reconnected. We need to decide what we want the reconnect behaviour to be for client relay
@ -238,10 +218,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -238,10 +218,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
public void disconnectClosesRelaySessionCleanly ( ) throws Exception {
MessageExchange connect = MessageExchangeBuilder . connect ( "sess1" ) . build ( ) ;
this . responseHandler . expect ( connect ) ;
this . relay . handleMessage ( connect . message ) ;
this . responseHandler . awaitAndAssert ( ) ;
this . responseHandler . expectMessages ( connect ) ;
StompHeaderAccessor headers = StompHeaderAccessor . create ( StompCommand . DISCONNECT ) ;
headers . setSessionId ( "sess1" ) ;
@ -250,79 +228,45 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -250,79 +228,45 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
Thread . sleep ( 2000 ) ;
// Check that we have not received an ERROR as a result of the connection closing
this . responseHandler . awaitAndAssert ( ) ;
assertTrue ( "Unexpected messages: " + this . responseHandler . queue , this . responseHandler . queue . isEmpty ( ) ) ;
}
/ * *
* Handles messages by matching them to expectations including a latch to wait for
* the completion of expected messages .
* /
private static class ExpectationMatchingMessageHandler implements MessageHandler {
private final Object monitor = new Object ( ) ;
private final List < MessageExchange > expected ;
private final List < MessageExchange > actual = new ArrayList < > ( ) ;
private final List < Message < ? > > unexpected = new ArrayList < > ( ) ;
private static class TestEventPublisher implements ApplicationEventPublisher {
private final BlockingQueue < BrokerAvailabilityEvent > eventQueue = new LinkedBlockingQueue < > ( ) ;
public ExpectationMatchingMessageHandler ( MessageExchange . . . expected ) {
synchronized ( this . monitor ) {
this . expected = new CopyOnWriteArrayList < > ( expected ) ;
@Override
public void publishEvent ( ApplicationEvent event ) {
logger . debug ( "Processing ApplicationEvent " + event ) ;
if ( event instanceof BrokerAvailabilityEvent ) {
this . eventQueue . add ( ( BrokerAvailabilityEvent ) event ) ;
}
}
public void expect ( MessageExchange . . . expected ) {
synchronized ( this . monitor ) {
this . expected . addAll ( Arrays . asList ( expected ) ) ;
}
public void expectBrokerAvailabilityEvent ( boolean isBrokerAvailable ) throws InterruptedException {
BrokerAvailabilityEvent event = this . eventQueue . poll ( 10000 , TimeUnit . MILLISECONDS ) ;
assertEquals ( isBrokerAvailable , event . isBrokerAvailable ( ) ) ;
}
}
public void awaitAndAssert ( ) throws InterruptedException {
long endTime = System . currentTimeMillis ( ) + 10000 ;
synchronized ( this . monitor ) {
while ( ! this . expected . isEmpty ( ) & & System . currentTimeMillis ( ) < endTime ) {
this . monitor . wait ( 500 ) ;
}
boolean result = this . expected . isEmpty ( ) ;
assertTrue ( getAsString ( ) , result & & this . unexpected . isEmpty ( ) ) ;
}
}
private static class TestMessageHandler implements MessageHandler {
private final BlockingQueue < Message < ? > > queue = new LinkedBlockingQueue < > ( ) ;
@Override
public void handleMessage ( Message < ? > message ) throws MessagingException {
if ( StompHeaderAccessor . wrap ( message ) . getMessageType ( ) ! = SimpMessageType . HEARTBEAT ) {
synchronized ( this . monitor ) {
for ( MessageExchange exch : this . expected ) {
if ( exch . matchMessage ( message ) ) {
if ( exch . isDone ( ) ) {
this . expected . remove ( exch ) ;
this . actual . add ( exch ) ;
if ( this . expected . isEmpty ( ) ) {
this . monitor . notifyAll ( ) ;
}
}
return ;
}
}
this . unexpected . add ( message ) ;
}
if ( SimpMessageType . HEARTBEAT = = SimpMessageHeaderAccessor . getMessageType ( message . getHeaders ( ) ) ) {
return ;
}
this . queue . add ( message ) ;
}
public String getAsString ( ) {
StringBuilder sb = new StringBuilder ( "\n" ) ;
synchronized ( this . monitor ) {
sb . append ( "UNMATCHED EXPECTATIONS:\n" ) . append ( this . expected ) . append ( "\n" ) ;
sb . append ( "MATCHED EXPECTATIONS:\n" ) . append ( this . actual ) . append ( "\n" ) ;
sb . append ( "UNEXPECTED MESSAGES:\n" ) . append ( this . unexpected ) . append ( "\n" ) ;
public void expectMessages ( MessageExchange . . . messageExchanges ) throws InterruptedException {
for ( MessageExchange exchange : messageExchanges ) {
Message < ? > message = this . queue . poll ( 10000 , TimeUnit . MILLISECONDS ) ;
assertTrue ( "Expected: " + exchange + " but got: " + message , exchange . matchMessage ( message ) ) ;
}
return sb . toString ( ) ;
}
}
@ -343,15 +287,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -343,15 +287,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this . actual = new Message < ? > [ expected . length ] ;
}
public boolean isDone ( ) {
for ( int i = 0 ; i < actual . length ; i + + ) {
if ( actual [ i ] = = null ) {
return false ;
}
}
return true ;
}
public boolean matchMessage ( Message < ? > message ) {
for ( int i = 0 ; i < this . expected . length ; i + + ) {
if ( this . expected [ i ] . match ( message ) ) {
@ -364,11 +299,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -364,11 +299,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Override
public String toString ( ) {
StringBuilder sb = new StringBuilder ( ) ;
sb . append ( "Forwarded message:\n" ) . append ( this . message ) . append ( "\n" ) ;
sb . append ( "Should receive back:\n" ) . append ( Arrays . toString ( this . expected ) ) . append ( "\n" ) ;
sb . append ( "Actually received:\n" ) . append ( Arrays . toString ( this . actual ) ) . append ( "\n" ) ;
return sb . toString ( ) ;
return "Forwarded message:\n" + this . message + "\n" +
"Should receive back:\n" + Arrays . toString ( this . expected ) + "\n" +
"Actually received:\n" + Arrays . toString ( this . actual ) + "\n" ;
}
}
@ -565,43 +498,4 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -565,43 +498,4 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher {
private final List < Boolean > expected = new ArrayList < > ( ) ;
private final List < Boolean > actual = new ArrayList < > ( ) ;
private final Object monitor = new Object ( ) ;
public void expectAvailabilityStatusChanges ( Boolean . . . expected ) {
synchronized ( this . monitor ) {
this . expected . addAll ( Arrays . asList ( expected ) ) ;
}
}
public void awaitAndAssert ( ) throws InterruptedException {
synchronized ( this . monitor ) {
long endTime = System . currentTimeMillis ( ) + 60000 ;
while ( ( this . expected . size ( ) ! = this . actual . size ( ) ) & & ( System . currentTimeMillis ( ) < endTime ) ) {
this . monitor . wait ( 500 ) ;
}
assertEquals ( this . expected , this . actual ) ;
}
}
@Override
public void publishEvent ( ApplicationEvent event ) {
logger . debug ( "Processing ApplicationEvent " + event ) ;
if ( event instanceof BrokerAvailabilityEvent ) {
synchronized ( this . monitor ) {
this . actual . add ( ( ( BrokerAvailabilityEvent ) event ) . isBrokerAvailable ( ) ) ;
if ( this . actual . size ( ) = = this . expected . size ( ) ) {
this . monitor . notifyAll ( ) ;
}
}
}
}
}
}