From bd66c70b2b9b8f49ab5e5f24e8b28f73480dbe34 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 10 May 2023 11:42:45 +0100 Subject: [PATCH] Potential fix for flaky STOMP integration tests When ReactorNetty2StompBrokerRelayIntegrationTests fail, typically there are multiple exceptions "Connection refused: /127.0.0.1:61613" that appear after we've conneted, sent CONNECT, and expecting CONNECTED, but that does not come within the 10 second timeout. 61613 is the default port for STOMP. However, in all integration tests we start ActiveMQ with port 0 which results in a random port. Moreover, the stacktrace is for Netty 4 (not 5), and the eventloop thread id's are different than the one where the connection to the correct, random port was established. The suspicion is that these are log messages from MessageBrokerConfigurationTests which focuses on testing configuration but nevertheless as a bean starts and attempts to connect to the default port and fails. Perhaps those attempts to connect on the default port somehow affect the ActiveMQ server, and it stops responding. This change adds a no-op TcpClient in MessageBrokerConfigurationTests to avoid unnecessary attempts to connect that are not needed. See gh-29287 --- .../MessageBrokerConfigurationTests.java | 28 ++++++++++++++++++- ...tractStompBrokerRelayIntegrationTests.java | 5 ++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index a3b430db8b3..1b9e5a90881 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import org.junit.jupiter.api.Test; @@ -69,6 +70,9 @@ import org.springframework.messaging.support.AbstractSubscribableChannel; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.ExecutorSubscribableChannel; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.tcp.ReconnectStrategy; +import org.springframework.messaging.tcp.TcpConnectionHandler; +import org.springframework.messaging.tcp.TcpOperations; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Controller; import org.springframework.util.AntPathMatcher; @@ -622,7 +626,9 @@ public class MessageBrokerConfigurationTests { @Override public void configureMessageBroker(MessageBrokerRegistry registry) { - registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(true) + registry.enableStompBrokerRelay("/topic", "/queue") + .setAutoStartup(true) + .setTcpClient(new NoOpTcpClient()) .setUserDestinationBroadcast("/topic/unresolved-user-destination") .setUserRegistryBroadcast("/topic/simp-user-registry"); } @@ -787,4 +793,24 @@ public class MessageBrokerConfigurationTests { private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { } + + private static class NoOpTcpClient implements TcpOperations { + + @Override + public CompletableFuture connectAsync(TcpConnectionHandler handler) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture connectAsync(TcpConnectionHandler handler, ReconnectStrategy strategy) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture shutdownAsync() { + return CompletableFuture.completedFuture(null); + } + + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/AbstractStompBrokerRelayIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/AbstractStompBrokerRelayIntegrationTests.java index 992a22be772..ba7809a47c0 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/AbstractStompBrokerRelayIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/AbstractStompBrokerRelayIntegrationTests.java @@ -228,13 +228,14 @@ public abstract class AbstractStompBrokerRelayIntegrationTests { this.relay.handleMessage(subscribe.message); this.responseHandler.expectMessages(subscribe); - MessageExchange error = MessageExchangeBuilder.error(sess1).build(); stopActiveMqBrokerAndAwait(); - this.responseHandler.expectMessages(error); + MessageExchange error = MessageExchangeBuilder.error(sess1).build(); + this.responseHandler.expectMessages(error); this.eventPublisher.expectBrokerAvailabilityEvent(false); startActiveMQBroker(); + this.eventPublisher.expectBrokerAvailabilityEvent(true); }