|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2020 the original author or authors. |
|
|
|
* Copyright 2002-2021 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -75,7 +75,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@BeforeEach |
|
|
|
@BeforeEach |
|
|
|
public void setUp(TestInfo testInfo) throws Exception { |
|
|
|
public void setup(TestInfo testInfo) throws Exception { |
|
|
|
logger.debug("Setting up before '" + testInfo.getTestMethod().get().getName() + "'"); |
|
|
|
logger.debug("Setting up before '" + testInfo.getTestMethod().get().getName() + "'"); |
|
|
|
|
|
|
|
|
|
|
|
this.port = SocketUtils.findAvailableTcpPort(61613); |
|
|
|
this.port = SocketUtils.findAvailableTcpPort(61613); |
|
|
|
@ -83,11 +83,11 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
this.responseHandler = new TestMessageHandler(); |
|
|
|
this.responseHandler = new TestMessageHandler(); |
|
|
|
this.responseChannel.subscribe(this.responseHandler); |
|
|
|
this.responseChannel.subscribe(this.responseHandler); |
|
|
|
this.eventPublisher = new TestEventPublisher(); |
|
|
|
this.eventPublisher = new TestEventPublisher(); |
|
|
|
startActiveMqBroker(); |
|
|
|
startActiveMQBroker(); |
|
|
|
createAndStartRelay(); |
|
|
|
createAndStartRelay(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void startActiveMqBroker() throws Exception { |
|
|
|
private void startActiveMQBroker() throws Exception { |
|
|
|
this.activeMQBroker = new BrokerService(); |
|
|
|
this.activeMQBroker = new BrokerService(); |
|
|
|
this.activeMQBroker.addConnector("stomp://localhost:" + this.port); |
|
|
|
this.activeMQBroker.addConnector("stomp://localhost:" + this.port); |
|
|
|
this.activeMQBroker.setStartAsync(false); |
|
|
|
this.activeMQBroker.setStartAsync(false); |
|
|
|
@ -217,7 +217,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
|
|
|
|
|
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(false); |
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(false); |
|
|
|
|
|
|
|
|
|
|
|
startActiveMqBroker(); |
|
|
|
startActiveMQBroker(); |
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(true); |
|
|
|
this.eventPublisher.expectBrokerAvailabilityEvent(true); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -274,8 +274,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { |
|
|
|
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { |
|
|
|
List<MessageExchange> expectedMessages = |
|
|
|
List<MessageExchange> expectedMessages = new ArrayList<>(Arrays.asList(messageExchanges)); |
|
|
|
new ArrayList<>(Arrays.<MessageExchange>asList(messageExchanges)); |
|
|
|
|
|
|
|
while (expectedMessages.size() > 0) { |
|
|
|
while (expectedMessages.size() > 0) { |
|
|
|
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS); |
|
|
|
assertThat(message).as("Timed out waiting for messages, expected [" + expectedMessages + "]").isNotNull(); |
|
|
|
assertThat(message).as("Timed out waiting for messages, expected [" + expectedMessages + "]").isNotNull(); |
|
|
|
@ -451,7 +450,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public final boolean match(Message<?> message) { |
|
|
|
public final boolean match(Message<?> message) { |
|
|
|
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); |
|
|
|
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); |
|
|
|
if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) { |
|
|
|
if (!this.command.equals(headers.getCommand()) || !this.sessionId.equals(headers.getSessionId())) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
return matchInternal(headers, message.getPayload()); |
|
|
|
return matchInternal(headers, message.getPayload()); |
|
|
|
|