@ -29,14 +29,17 @@ import reactor.core.config.ConfigurationReader;
@@ -29,14 +29,17 @@ import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration ;
import reactor.core.config.ReactorConfiguration ;
import reactor.core.support.NamedDaemonThreadFactory ;
import reactor.fn.BiFunction ;
import reactor.fn.Consumer ;
import reactor.fn.Function ;
import reactor.fn.tuple.Tuple ;
import reactor.fn.tuple.Tuple2 ;
import reactor.io.buffer.Buffer ;
import reactor.io.codec.Codec ;
import reactor.io.net.* ;
import reactor.io.net.ChannelStream ;
import reactor.io.net.NetStreams ;
import reactor.io.net.NetStreams.TcpClientFactory ;
import reactor.io.net.ReactorChannelHandler ;
import reactor.io.net.Reconnect ;
import reactor.io.net.Spec.TcpClientSpec ;
import reactor.io.net.impl.netty.NettyClientSocketOptions ;
import reactor.io.net.impl.netty.tcp.NettyTcpClient ;
@ -71,7 +74,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -71,7 +74,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings ( "rawtypes" )
public static final Class < NettyTcpClient > REACTOR_TCP_CLIENT_TYPE = NettyTcpClient . class ;
private final NetStreams . TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ;
private final TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ;
private final List < TcpClient < Message < P > , Message < P > > > tcpClients =
new ArrayList < TcpClient < Message < P > , Message < P > > > ( ) ;
@ -94,7 +97,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -94,7 +97,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
final NioEventLoopGroup eventLoopGroup = initEventLoopGroup ( ) ;
this . tcpClientSpecFactory = new NetStreams . TcpClientFactory < Message < P > , Message < P > > ( ) {
this . tcpClientSpecFactory = new TcpClientFactory < Message < P > , Message < P > > ( ) {
@Override
public TcpClientSpec < Message < P > , Message < P > > apply ( TcpClientSpec < Message < P > , Message < P > > spec ) {
@ -111,7 +114,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -111,7 +114,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
int ioThreadCount ;
try {
ioThreadCount = Integer . parseInt ( System . getProperty ( "reactor.tcp.ioThreadCount" ) ) ;
} catch ( Exception i ) {
}
catch ( Exception i ) {
ioThreadCount = - 1 ;
}
if ( ioThreadCount < = 0l ) {
@ -132,20 +136,16 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -132,20 +136,16 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
*
* @param tcpClientSpecFactory the TcpClientSpec { @link Function } to use for each client creation .
* /
public Reactor2TcpClient ( NetStreams . TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ) {
public Reactor2TcpClient ( TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ) {
Assert . notNull ( tcpClientSpecFactory , "'tcpClientClientFactory' must not be null" ) ;
this . tcpClientSpecFactory = tcpClientSpecFactory ;
}
@Override
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > connectionHandler ) {
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
Promise < Void > promise = tcpClient . start ( composeConnectionHandling ( tcpClient , connectionHandler ) ) ;
public ListenableFuture < Void > connect ( final TcpConnectionHandler < P > connectionHandler ) {
Assert . notNull ( connectionHandler , "'connectionHandler' must not be null" ) ;
Promise < Void > promise = createTcpClient ( ) . start ( new MessageChannelStreamHandler < P > ( connectionHandler ) ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > (
promise . onError ( new Consumer < Throwable > ( ) {
@Override
@ -157,72 +157,33 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -157,72 +157,33 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
@Override
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > h andler, ReconnectStrategy strategy ) {
Assert . notNull ( strategy , "ReconnectStrategy must not be null") ;
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > connectionH andler, ReconnectStrategy strategy ) {
Assert . notNull ( connectionHandler , "'connectionHandler' must not be null") ;
Assert . notNull ( strategy , "'reconnectStrategy' must not be null" ) ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
Stream < Tuple2 < InetSocketAddress , Integer > > stream = tcpClient . start (
composeConnectionHandling ( tcpClient , handler ) ,
new ReactorRectonnectAdapter ( strategy )
) ;
Stream < Tuple2 < InetSocketAddress , Integer > > stream = createTcpClient ( ) . start (
new MessageChannelStreamHandler < P > ( connectionHandler ) ,
new ReactorReconnectAdapter ( strategy ) ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( stream . next ( ) . after ( ) ) ;
}
private MessageHandler < P > composeConnectionHandling (
final TcpClient < Message < P > , Message < P > > tcpClient ,
final TcpConnectionHandler < P > connectionHandler
) {
private TcpClient < Message < P > , Message < P > > createTcpClient ( ) {
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
synchronized ( this . tcpClients ) {
this . tcpClients . add ( tcpClient ) ;
}
return new MessageHandler < P > ( ) {
@Override
public Publisher < Void > apply ( ChannelStream < Message < P > , Message < P > > connection ) {
Promise < Void > closePromise = Promises . prepare ( ) ;
connectionHandler . afterConnected ( new Reactor2TcpConnection < P > ( connection , closePromise ) ) ;
connection
. finallyDo ( new Consumer < Signal < Message < P > > > ( ) {
@Override
public void accept ( Signal < Message < P > > signal ) {
if ( signal . isOnError ( ) ) {
connectionHandler . handleFailure ( signal . getThrowable ( ) ) ;
} else if ( signal . isOnComplete ( ) ) {
connectionHandler . afterConnectionClosed ( ) ;
}
}
} )
. consume ( new Consumer < Message < P > > ( ) {
@Override
public void accept ( Message < P > message ) {
connectionHandler . handleMessage ( message ) ;
}
} ) ;
return closePromise ;
}
} ;
return tcpClient ;
}
@Override
public ListenableFuture < Void > shutdown ( ) {
final List < TcpClient < Message < P > , Message < P > > > clients ;
final List < TcpClient < Message < P > , Message < P > > > readOnlyClients ;
synchronized ( this . tcpClients ) {
c lients = new ArrayList < TcpClient < Message < P > , Message < P > > > ( this . tcpClients ) ;
readOnlyClients = new ArrayList < TcpClient < Message < P > , Message < P > > > ( this . tcpClients ) ;
}
Promise < Void > promise = Streams . from ( clients )
Promise < Void > promise = Streams . from ( readOnlyClients )
. flatMap ( new Function < TcpClient < Message < P > , Message < P > > , Promise < Void > > ( ) {
@Override
public Promise < Void > apply ( final TcpClient < Message < P > , Message < P > > client ) {
@ -237,10 +198,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -237,10 +198,10 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
} )
. next ( ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( promise ) ;
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
@ -249,11 +210,52 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -249,11 +210,52 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
}
}
private static class ReactorRectonnectAdapter implements Reconnect {
private static class MessageChannelStreamHandler < P >
implements ReactorChannelHandler < Message < P > , Message < P > , ChannelStream < Message < P > , Message < P > > > {
private final TcpConnectionHandler < P > connectionHandler ;
public MessageChannelStreamHandler ( TcpConnectionHandler < P > connectionHandler ) {
this . connectionHandler = connectionHandler ;
}
@Override
public Publisher < Void > apply ( ChannelStream < Message < P > , Message < P > > channelStream ) {
Promise < Void > closePromise = Promises . prepare ( ) ;
this . connectionHandler . afterConnected ( new Reactor2TcpConnection < P > ( channelStream , closePromise ) ) ;
channelStream
. finallyDo ( new Consumer < Signal < Message < P > > > ( ) {
@Override
public void accept ( Signal < Message < P > > signal ) {
if ( signal . isOnError ( ) ) {
connectionHandler . handleFailure ( signal . getThrowable ( ) ) ;
}
else if ( signal . isOnComplete ( ) ) {
connectionHandler . afterConnectionClosed ( ) ;
}
}
} )
. consume ( new Consumer < Message < P > > ( ) {
@Override
public void accept ( Message < P > message ) {
connectionHandler . handleMessage ( message ) ;
}
} ) ;
return closePromise ;
}
}
private static class ReactorReconnectAdapter implements Reconnect {
private final ReconnectStrategy strategy ;
public ReactorRectonnectAdapter ( ReconnectStrategy strategy ) {
public ReactorReconnectAdapter ( ReconnectStrategy strategy ) {
this . strategy = strategy ;
}
@ -261,11 +263,6 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -261,11 +263,6 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
public Tuple2 < InetSocketAddress , Long > reconnect ( InetSocketAddress address , int attempt ) {
return Tuple . of ( address , strategy . getTimeToNextAttempt ( attempt ) ) ;
}
}
private interface MessageHandler < P >
extends ReactorChannelHandler < Message < P > , Message < P > , ChannelStream < Message < P > , Message < P > > > {
}
}