@ -16,6 +16,7 @@
@@ -16,6 +16,7 @@
package org.springframework.messaging.tcp.reactor ;
import java.lang.reflect.Method ;
import java.util.Collection ;
import java.util.List ;
import java.util.function.BiFunction ;
@ -41,7 +42,10 @@ import reactor.ipc.netty.NettyContext;
@@ -41,7 +42,10 @@ import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound ;
import reactor.ipc.netty.NettyOutbound ;
import reactor.ipc.netty.options.ClientOptions ;
import reactor.ipc.netty.resources.LoopResources ;
import reactor.ipc.netty.resources.PoolResources ;
import reactor.ipc.netty.tcp.TcpClient ;
import reactor.ipc.netty.tcp.TcpResources ;
import reactor.util.concurrent.QueueSupplier ;
import org.springframework.messaging.Message ;
@ -50,6 +54,7 @@ import org.springframework.messaging.tcp.TcpConnection;
@@ -50,6 +54,7 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler ;
import org.springframework.messaging.tcp.TcpOperations ;
import org.springframework.util.Assert ;
import org.springframework.util.ReflectionUtils ;
import org.springframework.util.concurrent.ListenableFuture ;
import org.springframework.util.concurrent.SettableListenableFuture ;
@ -68,6 +73,10 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -68,6 +73,10 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final ChannelGroup channelGroup ;
private final LoopResources loopResources ;
private final PoolResources poolResources ;
private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient" ) ;
private volatile boolean stopping = false ;
@ -84,11 +93,21 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -84,11 +93,21 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* Alternate constructor with a { @link ClientOptions } consumer providing
* additional control beyond a host and a port .
* /
public ReactorNettyTcpClient ( Consumer < ClientOptions > c onsumer, ReactorNettyCodec < P > codec ) {
Assert . notNull ( c onsumer, "Consumer<ClientOptions> is required" ) ;
public ReactorNettyTcpClient ( Consumer < ClientOptions > optionsC onsumer, ReactorNettyCodec < P > codec ) {
Assert . notNull ( optionsC onsumer, "Consumer<ClientOptions> is required" ) ;
Assert . notNull ( codec , "ReactorNettyCodec is required" ) ;
this . channelGroup = new DefaultChannelGroup ( ImmediateEventExecutor . INSTANCE ) ;
this . tcpClient = TcpClient . create ( consumer . andThen ( opts - > opts . channelGroup ( this . channelGroup ) ) ) ;
this . loopResources = LoopResources . create ( "reactor-netty-tcp-client" ) ;
this . poolResources = PoolResources . fixed ( "reactor-netty-tcp-pool" ) ;
Consumer < ClientOptions > builtInConsumer = opts - > opts
. channelGroup ( this . channelGroup )
. loopResources ( this . loopResources )
. poolResources ( this . poolResources )
. preferNative ( false ) ;
this . tcpClient = TcpClient . create ( optionsConsumer . andThen ( builtInConsumer ) ) ;
this . codec = codec ;
}
@ -152,7 +171,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -152,7 +171,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
}
private < T > Function < Flux < T > , Publisher < ? > > reconnectFunction ( ReconnectStrategy reconnectStrategy ) {
return flux - > flux . scan ( 1 , ( count , e ) - > count + + )
return flux - > flux
. scan ( 1 , ( count , element ) - > count + + )
. flatMap ( attempt - > Mono . delayMillis ( reconnectStrategy . getTimeToNextAttempt ( attempt ) ) ) ;
}
@ -163,12 +183,45 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -163,12 +183,45 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
future . set ( null ) ;
return future ;
}
this . stopping = true ;
ChannelGroupFuture future = this . channelGroup . close ( ) ;
Mono < Void > completion = FutureMono . from ( future ) . doAfterTerminate ( ( x , e ) - > scheduler . dispose ( ) ) ;
ChannelGroupFuture close = this . channelGroup . close ( ) ;
Mono < Void > completion = FutureMono . from ( close )
. doAfterTerminate ( ( x , e ) - > {
// TODO: https://github.com/reactor/reactor-netty/issues/24
shutdownGlobalResources ( ) ;
this . loopResources . dispose ( ) ;
this . poolResources . dispose ( ) ;
// TODO: https://github.com/reactor/reactor-netty/issues/25
try {
Thread . sleep ( 2000 ) ;
}
catch ( InterruptedException ex ) {
ex . printStackTrace ( ) ;
}
// Scheduler after loop resources...
this . scheduler . dispose ( ) ;
} ) ;
return new MonoToListenableFutureAdapter < > ( completion ) ;
}
private static void shutdownGlobalResources ( ) {
try {
Method method = TcpResources . class . getDeclaredMethod ( "_dispose" ) ;
ReflectionUtils . makeAccessible ( method ) ;
ReflectionUtils . invokeMethod ( method , TcpResources . get ( ) ) ;
}
catch ( NoSuchMethodException ex ) {
ex . printStackTrace ( ) ;
}
}
private class ReactorNettyHandler implements BiFunction < NettyInbound , NettyOutbound , Publisher < Void > > {