From 5538863dc94364ef4fdbd10537ab0e2aba3f173d Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 3 May 2015 10:24:48 +0200 Subject: [PATCH] Use shared eventLoopGroup in Reactor2TcpClient --- ...tractPromiseToListenableFutureAdapter.java | 2 +- .../tcp/reactor/Reactor2TcpClient.java | 78 ++++++++++++++----- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java index 935128ae763..e635e310ab0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java @@ -80,7 +80,7 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { S result = this.promise.await(timeout, unit); - if (result == null) { + if (!this.promise.isComplete()) { throw new TimeoutException(); } return adapt(result); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index f454243ba2b..c9601e44795 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -21,8 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import org.reactivestreams.Publisher; import reactor.Environment; import reactor.core.config.ConfigurationReader; @@ -79,6 +82,10 @@ public class Reactor2TcpClient

implements TcpOperations

{ private final List, Message

>> tcpClients = new ArrayList, Message

>>(); + private final NioEventLoopGroup eventLoopGroup; + + private boolean stopping; + /** * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory @@ -95,7 +102,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ */ public Reactor2TcpClient(final String host, final int port, final Codec, Message

> codec) { - final NioEventLoopGroup eventLoopGroup = initEventLoopGroup(); + this.eventLoopGroup = initEventLoopGroup(); this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { @@ -110,7 +117,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ }; } - private NioEventLoopGroup initEventLoopGroup() { + private static NioEventLoopGroup initEventLoopGroup() { int ioThreadCount; try { ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); @@ -139,13 +146,27 @@ public class Reactor2TcpClient

implements TcpOperations

{ public Reactor2TcpClient(TcpClientFactory, Message

> tcpClientSpecFactory) { Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); this.tcpClientSpecFactory = tcpClientSpecFactory; + this.eventLoopGroup = null; } @Override public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { Assert.notNull(connectionHandler, "'connectionHandler' must not be null"); - Promise promise = createTcpClient().start(new MessageChannelStreamHandler

(connectionHandler)); + + TcpClient, Message

> tcpClient; + synchronized (this.tcpClients) { + if (this.stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new PassThroughPromiseToListenableFutureAdapter(Promises.error(ex)); + } + tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); + this.tcpClients.add(tcpClient); + } + + Promise promise = tcpClient.start(new MessageChannelStreamHandler

(connectionHandler)); + return new PassThroughPromiseToListenableFutureAdapter( promise.onError(new Consumer() { @Override @@ -161,43 +182,62 @@ public class Reactor2TcpClient

implements TcpOperations

{ Assert.notNull(connectionHandler, "'connectionHandler' must not be null"); Assert.notNull(strategy, "'reconnectStrategy' must not be null"); - Stream> stream = createTcpClient().start( + TcpClient, Message

> tcpClient; + synchronized (this.tcpClients) { + if (this.stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new PassThroughPromiseToListenableFutureAdapter(Promises.error(ex)); + } + tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); + this.tcpClients.add(tcpClient); + } + + Stream> stream = tcpClient.start( new MessageChannelStreamHandler

(connectionHandler), new ReactorReconnectAdapter(strategy)); return new PassThroughPromiseToListenableFutureAdapter(stream.next().after()); } - private TcpClient, Message

> createTcpClient() { - Class type = REACTOR_TCP_CLIENT_TYPE; - TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); - synchronized (this.tcpClients) { - this.tcpClients.add(tcpClient); - } - return tcpClient; - } - @Override public ListenableFuture shutdown() { - final List, Message

>> readOnlyClients; synchronized (this.tcpClients) { - readOnlyClients = new ArrayList, Message

>>(this.tcpClients); + this.stopping = true; } - Promise promise = Streams.from(readOnlyClients) + Promise promise = Streams.from(this.tcpClients) .flatMap(new Function, Message

>, Promise>() { @Override public Promise apply(final TcpClient, Message

> client) { return client.shutdown().onComplete(new Consumer>() { @Override public void accept(Promise voidPromise) { - synchronized (tcpClients) { - tcpClients.remove(client); - } + tcpClients.remove(client); } }); } }) .next(); + if (this.eventLoopGroup != null) { + final Promise eventLoopPromise = Promises.prepare(); + promise.onComplete(new Consumer>() { + @Override + public void accept(Promise voidPromise) { + eventLoopGroup.shutdownGracefully().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + eventLoopPromise.onComplete(); + } + else { + eventLoopPromise.onError(future.cause()); + } + } + }); + } + }); + promise = eventLoopPromise; + } return new PassThroughPromiseToListenableFutureAdapter(promise); }