@ -19,12 +19,17 @@ package org.springframework.messaging.support.tcp;
@@ -19,12 +19,17 @@ package org.springframework.messaging.support.tcp;
import java.net.InetSocketAddress ;
import org.springframework.messaging.Message ;
import org.springframework.util.Assert ;
import org.springframework.util.concurrent.ListenableFuture ;
import reactor.core.Environment ;
import reactor.core.composable.Composable ;
import reactor.core.composable.Deferred ;
import reactor.core.composable.Promise ;
import reactor.core.composable.Stream ;
import reactor.core.composable.spec.Promises ;
import reactor.function.Consumer ;
import reactor.function.support.SingleUseConsumer ;
import reactor.io.Buffer ;
import reactor.tcp.Reconnect ;
import reactor.tcp.TcpClient ;
@ -59,27 +64,39 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -59,27 +64,39 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public void connect ( TcpConnectionHandler < P > connectionHandler ) {
this . connect ( connectionHandler , null ) ;
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > connectionHandler ) {
Promise < TcpConnection < Message < P > , Message < P > > > promise = this . tcpClient . open ( ) ;
composeConnectionHandling ( promise , connectionHandler ) ;
return new AbstractPromiseToListenableFutureAdapter < TcpConnection < Message < P > , Message < P > > , Void > ( promise ) {
@Override
protected Void adapt ( TcpConnection < Message < P > , Message < P > > result ) {
return null ;
}
} ;
}
@Override
public void connect ( final TcpConnectionHandler < P > connectionHandler ,
public ListenableFuture < Void > connect ( final TcpConnectionHandler < P > connectionHandler ,
final ReconnectStrategy reconnectStrategy ) {
Composable < TcpConnection < Message < P > , Message < P > > > composable ;
Assert . notNull ( reconnectStrategy , "'reconnectStrategy' is required" ) ;
if ( reconnectStrategy ! = null ) {
composable = this . tcpClient . open ( new Reconnect ( ) {
@Override
public Tuple2 < InetSocketAddress , Long > reconnect ( InetSocketAddress address , int attempt ) {
return Tuple . of ( address , reconnectStrategy . getTimeToNextAttempt ( attempt ) ) ;
}
} ) ;
}
else {
composable = this . tcpClient . open ( ) ;
}
Stream < TcpConnection < Message < P > , Message < P > > > stream =
this . tcpClient . open ( new Reconnect ( ) {
@Override
public Tuple2 < InetSocketAddress , Long > reconnect ( InetSocketAddress address , int attempt ) {
return Tuple . of ( address , reconnectStrategy . getTimeToNextAttempt ( attempt ) ) ;
}
} ) ;
composeConnectionHandling ( stream , connectionHandler ) ;
return new PassThroughPromiseToListenableFutureAdapter < Void > ( toPromise ( stream ) ) ;
}
private void composeConnectionHandling ( Composable < TcpConnection < Message < P > , Message < P > > > composable ,
final TcpConnectionHandler < P > connectionHandler ) {
composable . when ( Throwable . class , new Consumer < Throwable > ( ) {
@Override
@ -108,6 +125,27 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -108,6 +125,27 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
} ) ;
}
private Promise < Void > toPromise ( Stream < TcpConnection < Message < P > , Message < P > > > stream ) {
final Deferred < Void , Promise < Void > > deferred = Promises . < Void > defer ( ) . get ( ) ;
stream . consume ( SingleUseConsumer . once ( new Consumer < TcpConnection < Message < P > , Message < P > > > ( ) {
@Override
public void accept ( TcpConnection < Message < P > , Message < P > > conn ) {
deferred . accept ( ( Void ) null ) ;
}
} ) ) ;
stream . when ( Throwable . class , SingleUseConsumer . once ( new Consumer < Throwable > ( ) {
@Override
public void accept ( Throwable throwable ) {
deferred . accept ( throwable ) ;
}
} ) ) ;
return deferred . compose ( ) ;
}
@Override
public ListenableFuture < Void > shutdown ( ) {
try {