@ -282,11 +282,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@@ -282,11 +282,28 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
public void expectMessages ( MessageExchange . . . messageExchanges ) throws InterruptedException {
for ( MessageExchange exchange : messageExchanges ) {
List < MessageExchange > expectedMessages =
new ArrayList < MessageExchange > ( Arrays . < MessageExchange > asList ( messageExchanges ) ) ;
while ( expectedMessages . size ( ) > 0 ) {
Message < ? > message = this . queue . poll ( 10000 , TimeUnit . MILLISECONDS ) ;
assertNotNull ( "Timed out waiting for: " + exchange , message ) ;
assertTrue ( "Expected: " + exchange + " but got: " + message , exchange . matchMessage ( message ) ) ;
assertNotNull ( "Timed out waiting for messages, expected [" + expectedMessages + "]" , message ) ;
MessageExchange match = findMatch ( expectedMessages , message ) ;
assertNotNull ( "Unexpected message=" + message + ", expected [" + expectedMessages + "]" , match ) ;
expectedMessages . remove ( match ) ;
}
}
private MessageExchange findMatch ( List < MessageExchange > expectedMessages , Message < ? > message ) {
for ( MessageExchange exchange : expectedMessages ) {
if ( exchange . matchMessage ( message ) ) {
return exchange ;
}
}
return null ;
}
}