@ -23,6 +23,7 @@ import java.util.List;
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Properties ;
import io.netty.channel.nio.NioEventLoopGroup ;
import org.reactivestreams.Publisher ;
import reactor.Environment ;
import reactor.core.config.ConfigurationReader ;
import reactor.core.config.DispatcherConfiguration ;
@ -35,14 +36,13 @@ import reactor.fn.tuple.Tuple;
@@ -35,14 +36,13 @@ import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2 ;
import reactor.io.buffer.Buffer ;
import reactor.io.codec.Codec ;
import reactor.io.net.ChannelStream ;
import reactor.io.net.NetStreams ;
import reactor.io.net.Reconnect ;
import reactor.io.net.* ;
import reactor.io.net.Spec.TcpClientSpec ;
import reactor.io.net.impl.netty.NettyClientSocketOptions ;
import reactor.io.net.impl.netty.tcp.NettyTcpClient ;
import reactor.io.net.tcp.TcpClient ;
import reactor.rx.Promise ;
import reactor.rx.Promises ;
import reactor.rx.Stream ;
import reactor.rx.Streams ;
import reactor.rx.action.Signal ;
@ -58,7 +58,7 @@ import org.springframework.util.concurrent.ListenableFuture;
@@ -58,7 +58,7 @@ import org.springframework.util.concurrent.ListenableFuture;
/ * *
* An implementation of { @link org . springframework . messaging . tcp . TcpOperations }
* based on the TCP client support of the Reactor project .
*
* < p >
* < p > This implementation wraps N ( Reactor ) clients for N { @link # connect } calls ,
* i . e . a separate ( Reactor ) client instance for each connection .
*
@ -71,7 +71,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -71,7 +71,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings ( "rawtypes" )
public static final Class < NettyTcpClient > REACTOR_TCP_CLIENT_TYPE = NettyTcpClient . class ;
private final TcpClientSpec Factory < Message < P > , Message < P > > tcpClientSpecFactory ;
private final NetStreams . TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ;
private final List < TcpClient < Message < P > , Message < P > > > tcpClients =
new ArrayList < TcpClient < Message < P > , Message < P > > > ( ) ;
@ -85,6 +85,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -85,6 +85,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
* threads will be shared amongst the active clients .
* < p > Also see the constructor accepting a ready Reactor
* { @link TcpClientSpec } { @link Function } factory .
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
@ -93,7 +94,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -93,7 +94,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
final NioEventLoopGroup eventLoopGroup = initEventLoopGroup ( ) ;
this . tcpClientSpecFactory = new TcpClientSpec Factory < Message < P > , Message < P > > ( ) {
this . tcpClientSpecFactory = new NetStreams . TcpClientFactory < Message < P > , Message < P > > ( ) {
@Override
public TcpClientSpec < Message < P > , Message < P > > apply ( TcpClientSpec < Message < P > , Message < P > > spec ) {
@ -110,8 +111,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -110,8 +111,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
int ioThreadCount ;
try {
ioThreadCount = Integer . parseInt ( System . getProperty ( "reactor.tcp.ioThreadCount" ) ) ;
}
catch ( Exception i ) {
} catch ( Exception i ) {
ioThreadCount = - 1 ;
}
if ( ioThreadCount < = 0l ) {
@ -132,7 +132,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -132,7 +132,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
*
* @param tcpClientSpecFactory the TcpClientSpec { @link Function } to use for each client creation .
* /
public Reactor2TcpClient ( TcpClientSpec Factory < Message < P > , Message < P > > tcpClientSpecFactory ) {
public Reactor2TcpClient ( NetStreams . TcpClientFactory < Message < P > , Message < P > > tcpClientSpecFactory ) {
Assert . notNull ( tcpClientSpecFactory , "'tcpClientClientFactory' must not be null" ) ;
this . tcpClientSpecFactory = tcpClientSpecFactory ;
}
@ -141,73 +141,80 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -141,73 +141,80 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > connectionHandler ) {
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
composeConnectionHandling ( tcpClient , connectionHandler ) ;
Promise < Boolean > promise = tcpClient . open ( ) ;
return new BooleanToVoidAdapter ( promise ) ;
Promise < Void > promise = tcpClient . start ( composeConnectionHandling ( tcpClient , connectionHandler ) ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > (
promise . onError ( new Consumer < Throwable > ( ) {
@Override
public void accept ( Throwable throwable ) {
connectionHandler . afterConnectFailure ( throwable ) ;
}
} )
) ;
}
@Override
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > handler , ReconnectStrategy strategy ) {
Assert . notNull ( strategy , "ReconnectStrategy must not be null" ) ;
Class < NettyTcpClient > type = REACTOR_TCP_CLIENT_TYPE ;
TcpClient < Message < P > , Message < P > > tcpClient = NetStreams . tcpClient ( type , this . tcpClientSpecFactory ) ;
composeConnectionHandling ( tcpClient , handler ) ;
Stream < Boolean > stream = tcpClient . open ( new ReactorRectonnectAdapter ( strategy ) ) ;
return new BooleanToVoidAdapter ( stream . next ( ) ) ;
Stream < Tuple2 < InetSocketAddress , Integer > > stream = tcpClient . start (
composeConnectionHandling ( tcpClient , handler ) ,
new ReactorRectonnectAdapter ( strategy )
) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( stream . next ( ) . after ( ) ) ;
}
private void composeConnectionHandling ( final TcpClient < Message < P > , Message < P > > tcpClient ,
final TcpConnectionHandler < P > connectionHandler ) {
private MessageHandler < P > composeConnectionHandling (
final TcpClient < Message < P > , Message < P > > tcpClient ,
final TcpConnectionHandler < P > connectionHandler
) {
synchronized ( this . tcpClients ) {
synchronized ( this . tcpClients ) {
this . tcpClients . add ( tcpClient ) ;
}
tcpClient
. finallyDo ( new Consumer < Signal < ChannelStream < Message < P > , Message < P > > > > ( ) {
return new MessageHandler < P > ( ) {
@Override
public Publisher < Void > apply ( ChannelStream < Message < P > , Message < P > > connection ) {
@Override
public void accept ( Signal < ChannelStream < Message < P > , Message < P > > > signal ) {
synchronized ( tcpClients ) {
tcpClients . remove ( tcpClient ) ;
}
if ( signal . isOnError ( ) ) {
connectionHandler . afterConnectFailure ( signal . getThrowable ( ) ) ;
}
}
} )
. consume ( new Consumer < ChannelStream < Message < P > , Message < P > > > ( ) {
Promise < Void > closePromise = Promises . prepare ( ) ;
@Override
public void accept ( ChannelStream < Message < P > , Message < P > > connection ) {
connection
. finallyDo ( new Consumer < Signal < Message < P > > > ( ) {
@Override
public void accept ( Signal < Message < P > > signal ) {
if ( signal . isOnError ( ) ) {
connectionHandler . handleFailure ( signal . getThrowable ( ) ) ;
}
else if ( signal . isOnComplete ( ) ) {
connectionHandler . afterConnectionClosed ( ) ;
}
}
} )
. consume ( new Consumer < Message < P > > ( ) {
@Override
public void accept ( Message < P > message ) {
connectionHandler . handleMessage ( message ) ;
}
} ) ;
connectionHandler . afterConnected ( new Reactor2TcpConnection < P > ( connection ) ) ;
}
} ) ;
connectionHandler . afterConnected ( new Reactor2TcpConnection < P > ( connection , closePromise ) ) ;
connection
. finallyDo ( new Consumer < Signal < Message < P > > > ( ) {
@Override
public void accept ( Signal < Message < P > > signal ) {
if ( signal . isOnError ( ) ) {
connectionHandler . handleFailure ( signal . getThrowable ( ) ) ;
} else if ( signal . isOnComplete ( ) ) {
connectionHandler . afterConnectionClosed ( ) ;
}
}
} )
. consume ( new Consumer < Message < P > > ( ) {
@Override
public void accept ( Message < P > message ) {
connectionHandler . handleMessage ( message ) ;
}
} ) ;
return closePromise ;
}
} ;
}
@Override
public ListenableFuture < Boolean > shutdown ( ) {
public ListenableFuture < Void > shutdown ( ) {
final List < TcpClient < Message < P > , Message < P > > > clients ;
@ -215,26 +222,23 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -215,26 +222,23 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
clients = new ArrayList < TcpClient < Message < P > , Message < P > > > ( this . tcpClients ) ;
}
Promise < Boolean > promise = Streams . from ( clients )
. flatMap ( new Function < TcpClient < Message < P > , Message < P > > , Promise < Boolean > > ( ) {
@Override
public Promise < Boolean > apply ( TcpClient < Message < P > , Message < P > > client ) {
return client . close ( ) ;
}
} )
. reduce ( new BiFunction < Boolean , Boolean , Boolean > ( ) {
Promise < Void > promise = Streams . from ( clients )
. flatMap ( new Function < TcpClient < Message < P > , Message < P > > , Promise < Void > > ( ) {
@Override
public Boolean apply ( Boolean prev , Boolean next ) {
return prev & & next ;
public Promise < Void > apply ( final TcpClient < Message < P > , Message < P > > client ) {
return client . shutdown ( ) . onComplete ( new Consumer < Promise < Void > > ( ) {
@Override
public void accept ( Promise < Void > voidPromise ) {
synchronized ( tcpClients ) {
tcpClients . remove ( client ) ;
}
}
} ) ;
}
} )
. next ( ) ;
return new PassThroughPromiseToListenableFutureAdapter < Boolean > ( promise ) ;
}
public interface TcpClientSpecFactory < I , O > extends Function < TcpClientSpec < I , O > , TcpClientSpec < I , O > > {
return new PassThroughPromiseToListenableFutureAdapter < Void > ( promise ) ;
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@ -257,18 +261,11 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
@@ -257,18 +261,11 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
public Tuple2 < InetSocketAddress , Long > reconnect ( InetSocketAddress address , int attempt ) {
return Tuple . of ( address , strategy . getTimeToNextAttempt ( attempt ) ) ;
}
}
private static class BooleanToVoidAdapter extends AbstractPromiseToListenableFutureAdapter < Boolean , Void > {
public BooleanToVoidAdapter ( Promise < Boolean > promise ) {
super ( promise ) ;
}
}
@Override
protected Void adapt ( Boolean result ) {
return null ;
}
private interface MessageHandler < P >
extends ReactorChannelHandler < Message < P > , Message < P > , ChannelStream < Message < P > , Message < P > > > {
}
}
}