diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index 65e7a9c509b..a4e8048db96 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -89,4 +89,8 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { this.tcpClient.shutdown(); } + @Override + public String toString() { + return "ReactorNettyTcpStompClient[" + this.tcpClient + "]"; + } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 7a3b28c231f..bf62680ab97 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -82,12 +82,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public static final String SYSTEM_SESSION_ID = "_system_"; - // STOMP recommends error of margin for receiving heartbeats + /** STOMP recommended error of margin for receiving heartbeats */ private static final long HEARTBEAT_MULTIPLIER = 3; /** - * A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings - * we need. If we don't receive CONNECTED within a minute, the connection is closed proactively. + * Heartbeat starts once CONNECTED frame with heartbeat settings is received. + * If CONNECTED doesn't arrive within a minute, we'll close the connection. */ private static final int MAX_TIME_TO_CONNECTED_FRAME = 60 * 1000; @@ -403,7 +403,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } if (logger.isInfoEnabled()) { - logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort); + logger.info("Starting \"system\" session, " + toString()); } StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT); @@ -552,7 +552,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public String toString() { - return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]"; + return "StompBrokerRelay[" + getTcpClientInfo() + "]"; + } + + private String getTcpClientInfo() { + return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort; } @@ -987,7 +991,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static class VoidCallable implements Callable { @Override - public Void call() throws Exception { + public Void call() { return null; } } @@ -1014,7 +1018,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } public String toString() { - return (connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort + + return (connectionHandlers.size() + " sessions, " + getTcpClientInfo() + (isBrokerAvailable() ? " (available)" : " (not available)") + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")"); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 99dccd316d9..83cce81f0d8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,8 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.concurrent.ImmediateEventExecutor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; @@ -65,6 +67,8 @@ import org.springframework.util.concurrent.SettableListenableFuture; */ public class ReactorNettyTcpClient

implements TcpOperations

{ + private static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); + private static final int PUBLISH_ON_BUFFER_SIZE = 16; @@ -201,7 +205,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ .doOnNext(updateConnectMono(connectMono)) .doOnError(updateConnectMono(connectMono)) .doOnError(handler::afterConnectFailure) // report all connect failures to the handler - .flatMap(NettyContext::onClose) // post-connect issues + .flatMap(NettyContext::onClose) // post-connect issues .retryWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy)) .subscribe(); @@ -281,6 +285,11 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ }); } + @Override + public String toString() { + return "ReactorNettyTcpClient[" + this.tcpClient + "]"; + } + private class ReactorNettyHandler implements BiFunction> { @@ -293,6 +302,9 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override @SuppressWarnings("unchecked") public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + inbound.remoteAddress()); + } DirectProcessor completion = DirectProcessor.create(); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); @@ -321,7 +333,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { Collection> messages = codec.decode(in); out.addAll(messages); } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 47b70d7f5a2..fbdcea74fb9 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -278,9 +278,9 @@ public class MessageBrokerBeanDefinitionParserTests { assertEquals(5000, messageBroker.getSystemHeartbeatSendInterval()); assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue")); - List> subscriberTypes = - Arrays.>asList(SimpAnnotationMethodMessageHandler.class, - UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class); + List> subscriberTypes = Arrays.asList( + SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, + StompBrokerRelayMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 2); testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); @@ -288,8 +288,7 @@ public class MessageBrokerBeanDefinitionParserTests { testChannel("clientOutboundChannel", subscriberTypes, 1); testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); - subscriberTypes = Arrays.>asList( - StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class); + subscriberTypes = Arrays.asList(StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class); testChannel("brokerChannel", subscriberTypes, 1); try { this.appContext.getBean("brokerChannelExecutor", ThreadPoolTaskExecutor.class);