diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 15ca95e9741..65e0460d5eb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -163,7 +163,8 @@ public class Reactor2TcpClient
implements TcpOperations
{
public ListenableFuture connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
- TcpClient > tcpClient;
+ final TcpClient > tcpClient;
+ Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
@@ -172,9 +173,18 @@ public class Reactor2TcpClient implements TcpOperations {
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
+ cleanupTask = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (tcpClients) {
+ tcpClients.remove(tcpClient);
+ }
+ }
+ };
}
- Promise (connectionHandler));
+ Promise (connectionHandler, cleanupTask));
return new PassThroughPromiseToListenableFutureAdapter implements TcpOperations {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
- TcpClient > tcpClient;
+ final TcpClient > tcpClient;
+ Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
@@ -200,10 +211,18 @@ public class Reactor2TcpClient implements TcpOperations {
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
+ cleanupTask = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (tcpClients) {
+ tcpClients.remove(tcpClient);
+ }
+ }
+ };
}
Stream (connectionHandler),
+ new MessageChannelStreamHandler (connectionHandler, cleanupTask),
new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter implements TcpOperations {
});
promise = eventLoopPromise;
}
+
return new PassThroughPromiseToListenableFutureAdapter implements TcpOperations {
private final TcpConnectionHandler connectionHandler;
- public MessageChannelStreamHandler(TcpConnectionHandler connectionHandler) {
+ private final Runnable cleanupTask;
+
+ public MessageChannelStreamHandler(TcpConnectionHandler connectionHandler, Runnable cleanupTask) {
this.connectionHandler = connectionHandler;
+ this.cleanupTask = cleanupTask;
}
@Override
@@ -290,6 +313,7 @@ public class Reactor2TcpClient implements TcpOperations {
.finallyDo(new Consumer