@ -21,8 +21,11 @@ import java.util.ArrayList;
@@ -21,8 +21,11 @@ import java.util.ArrayList;
import java.util.Arrays ;
import java.util.List ;
import java.util.Properties ;
import java.util.concurrent.atomic.AtomicBoolean ;
import io.netty.channel.nio.NioEventLoopGroup ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import org.reactivestreams.Publisher ;
import reactor.Environment ;
import reactor.core.config.ConfigurationReader ;
@ -79,6 +82,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -79,6 +82,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
private final List < TcpClient < Message < P > , Message < P > > > tcpClients =
new ArrayList < TcpClient < Message < P > , Message < P > > > ( ) ;
private final NioEventLoopGroup eventLoopGroup ;
private boolean stopping ;
/ * *
* A constructor that creates a { @link TcpClientSpec TcpClientSpec } factory
@ -95,7 +102,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -95,7 +102,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
* /
public Reactor2TcpClient ( final String host , final int port , final Codec < Buffer , Message < P > , Message < P > > codec ) {
final NioEventLoopGroup eventLoopGroup = initEventLoopGroup ( ) ;
this . eventLoopGroup = initEventLoopGroup ( ) ;
this . tcpClientSpecFactory = new TcpClientFactory < Message < P > , Message < P > > ( ) {
@ -110,7 +117,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -110,7 +117,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
} ;
}
private NioEventLoopGroup initEventLoopGroup ( ) {
private static NioEventLoopGroup initEventLoopGroup ( ) {
int ioThreadCount ;
try {
ioThreadCount = Integer . parseInt ( System . getProperty ( "reactor.tcp.ioThreadCount" ) ) ;
@ -139,13 +146,27 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -139,13 +146,27 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
public Reactor2TcpClient ( TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ) {
Assert . notNull ( tcpClientSpecFactory , "'tcpClientClientFactory' must not be null" ) ;
this . tcpClientSpecFactory = tcpClientSpecFactory ;
this . eventLoopGroup = null ;
}
@Override
public ListenableFuture < Void > connect ( final TcpConnectionHandler < P > connectionHandler ) {
Assert . notNull ( connectionHandler , "'connectionHandler' must not be null" ) ;
Promise < Void > promise = createTcpClient ( ) . start ( new MessageChannelStreamHandler < P > ( connectionHandler ) ) ;
TcpClient < Message < P > , Message < P > > tcpClient ;
synchronized ( this . tcpClients ) {
if ( this . stopping ) {
IllegalStateException ex = new IllegalStateException ( "Shutting down." ) ;
connectionHandler . afterConnectFailure ( ex ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( Promises . < Void > error ( ex ) ) ;
}
tcpClient = NetStreams . tcpClient ( REACTOR_TCP_CLIENT_TYPE , this . tcpClientSpecFactory ) ;
this . tcpClients . add ( tcpClient ) ;
}
Promise < Void > promise = tcpClient . start ( new MessageChannelStreamHandler < P > ( connectionHandler ) ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > (
promise . onError ( new Consumer < Throwable > ( ) {
@Override
@ -161,43 +182,62 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -161,43 +182,62 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
Assert . notNull ( connectionHandler , "'connectionHandler' must not be null" ) ;
Assert . notNull ( strategy , "'reconnectStrategy' must not be null" ) ;
Stream < Tuple2 < InetSocketAddress , Integer > > stream = createTcpClient ( ) . start (
TcpClient < Message < P > , Message < P > > tcpClient ;
synchronized ( this . tcpClients ) {
if ( this . stopping ) {
IllegalStateException ex = new IllegalStateException ( "Shutting down." ) ;
connectionHandler . afterConnectFailure ( ex ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( Promises . < Void > error ( ex ) ) ;
}
tcpClient = NetStreams . tcpClient ( REACTOR_TCP_CLIENT_TYPE , this . tcpClientSpecFactory ) ;
this . tcpClients . add ( tcpClient ) ;
}
Stream < Tuple2 < InetSocketAddress , Integer > > stream = tcpClient . start (
new MessageChannelStreamHandler < P > ( connectionHandler ) ,
new ReactorReconnectAdapter ( strategy ) ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( stream . next ( ) . after ( ) ) ;
}
private TcpClient < Message < P > , Message < P > > createTcpClient ( ) {
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
synchronized ( this . tcpClients ) {
this . tcpClients . add ( tcpClient ) ;
}
return tcpClient ;
}
@Override
public ListenableFuture < Void > shutdown ( ) {
final List < TcpClient < Message < P > , Message < P > > > readOnlyClients ;
synchronized ( this . tcpClients ) {
readOnlyClients = new ArrayList < TcpClient < Message < P > , Message < P > > > ( this . tcpClients ) ;
this . stopping = true ;
}
Promise < Void > promise = Streams . from ( readOnly Clients)
Promise < Void > promise = Streams . from ( this . tcpClients )
. flatMap ( new Function < TcpClient < Message < P > , Message < P > > , Promise < Void > > ( ) {
@Override
public Promise < Void > apply ( final TcpClient < Message < P > , Message < P > > client ) {
return client . shutdown ( ) . onComplete ( new Consumer < Promise < Void > > ( ) {
@Override
public void accept ( Promise < Void > voidPromise ) {
synchronized ( tcpClients ) {
tcpClients . remove ( client ) ;
}
tcpClients . remove ( client ) ;
}
} ) ;
}
} )
. next ( ) ;
if ( this . eventLoopGroup ! = null ) {
final Promise < Void > eventLoopPromise = Promises . prepare ( ) ;
promise . onComplete ( new Consumer < Promise < Void > > ( ) {
@Override
public void accept ( Promise < Void > voidPromise ) {
eventLoopGroup . shutdownGracefully ( ) . addListener ( new FutureListener < Object > ( ) {
@Override
public void operationComplete ( Future < Object > future ) throws Exception {
if ( future . isSuccess ( ) ) {
eventLoopPromise . onComplete ( ) ;
}
else {
eventLoopPromise . onError ( future . cause ( ) ) ;
}
}
} ) ;
}
} ) ;
promise = eventLoopPromise ;
}
return new PassThroughPromiseToListenableFutureAdapter < Void > ( promise ) ;
}