From d48eeb2c84e1be78dc1d09f02ab7d0747506df86 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 28 Apr 2016 15:26:35 -0400 Subject: [PATCH] Reactor2TcpClient cleans up TcpClient instances Issue: SPR-14231 --- .../tcp/reactor/Reactor2TcpClient.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 15ca95e9741..65e0460d5eb 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -163,7 +163,8 @@ public class Reactor2TcpClient

implements TcpOperations

{ public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - TcpClient, Message

> tcpClient; + final TcpClient, Message

> tcpClient; + Runnable cleanupTask; synchronized (this.tcpClients) { if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); @@ -172,9 +173,18 @@ public class Reactor2TcpClient

implements TcpOperations

{ } tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); this.tcpClients.add(tcpClient); + cleanupTask = new Runnable() { + @Override + public void run() { + synchronized (tcpClients) { + tcpClients.remove(tcpClient); + } + } + }; } - Promise promise = tcpClient.start(new MessageChannelStreamHandler

(connectionHandler)); + Promise promise = tcpClient.start( + new MessageChannelStreamHandler

(connectionHandler, cleanupTask)); return new PassThroughPromiseToListenableFutureAdapter( promise.onError(new Consumer() { @@ -191,7 +201,8 @@ public class Reactor2TcpClient

implements TcpOperations

{ Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); Assert.notNull(strategy, "ReconnectStrategy must not be null"); - TcpClient, Message

> tcpClient; + final TcpClient, Message

> tcpClient; + Runnable cleanupTask; synchronized (this.tcpClients) { if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); @@ -200,10 +211,18 @@ public class Reactor2TcpClient

implements TcpOperations

{ } tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); this.tcpClients.add(tcpClient); + cleanupTask = new Runnable() { + @Override + public void run() { + synchronized (tcpClients) { + tcpClients.remove(tcpClient); + } + } + }; } Stream> stream = tcpClient.start( - new MessageChannelStreamHandler

(connectionHandler), + new MessageChannelStreamHandler

(connectionHandler, cleanupTask), new ReactorReconnectAdapter(strategy)); return new PassThroughPromiseToListenableFutureAdapter(stream.next().after()); @@ -249,6 +268,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ }); promise = eventLoopPromise; } + return new PassThroughPromiseToListenableFutureAdapter(promise); } @@ -278,8 +298,11 @@ public class Reactor2TcpClient

implements TcpOperations

{ private final TcpConnectionHandler

connectionHandler; - public MessageChannelStreamHandler(TcpConnectionHandler

connectionHandler) { + private final Runnable cleanupTask; + + public MessageChannelStreamHandler(TcpConnectionHandler

connectionHandler, Runnable cleanupTask) { this.connectionHandler = connectionHandler; + this.cleanupTask = cleanupTask; } @Override @@ -290,6 +313,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ .finallyDo(new Consumer>>() { @Override public void accept(Signal> signal) { + cleanupTask.run(); if (signal.isOnError()) { connectionHandler.handleFailure(signal.getThrowable()); }