@ -16,7 +16,6 @@
@@ -16,7 +16,6 @@
package org.springframework.messaging.tcp.reactor ;
import java.lang.reflect.Method ;
import java.time.Duration ;
import java.util.Collection ;
import java.util.List ;
@ -28,7 +27,6 @@ import java.util.function.Function;
@@ -28,7 +27,6 @@ import java.util.function.Function;
import io.netty.buffer.ByteBuf ;
import io.netty.channel.ChannelHandlerContext ;
import io.netty.channel.group.ChannelGroup ;
import io.netty.channel.group.ChannelGroupFuture ;
import io.netty.channel.group.DefaultChannelGroup ;
import io.netty.handler.codec.ByteToMessageDecoder ;
import io.netty.util.concurrent.ImmediateEventExecutor ;
@ -47,15 +45,14 @@ import reactor.ipc.netty.options.ClientOptions;
@@ -47,15 +45,14 @@ import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.resources.LoopResources ;
import reactor.ipc.netty.resources.PoolResources ;
import reactor.ipc.netty.tcp.TcpClient ;
import reactor.ipc.netty.tcp.TcpResources ;
import org.springframework.lang.Nullable ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.tcp.ReconnectStrategy ;
import org.springframework.messaging.tcp.TcpConnection ;
import org.springframework.messaging.tcp.TcpConnectionHandler ;
import org.springframework.messaging.tcp.TcpOperations ;
import org.springframework.util.Assert ;
import org.springframework.util.ReflectionUtils ;
import org.springframework.util.concurrent.ListenableFuture ;
import org.springframework.util.concurrent.SettableListenableFuture ;
@ -75,46 +72,101 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -75,46 +72,101 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final ReactorNettyCodec < P > codec ;
@Nullable
private final ChannelGroup channelGroup ;
private final LoopResources loopResources ;
@Nullable
private LoopResources loopResources ;
private final PoolResources poolResources ;
@Nullable
private PoolResources poolResources ;
private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient " ) ;
private final Scheduler scheduler = Schedulers . newParallel ( "tcp-client-scheduler " ) ;
private volatile boolean stopping = false ;
/ * *
* Basic constructor with a host and a port .
* Simple constructor with a host and a port .
* @param host the host to connect to
* @param port the port to connect to
* @param codec the code to use
* @see org . springframework . messaging . simp . stomp . StompReactorNettyCodec
* /
public ReactorNettyTcpClient ( String host , int port , ReactorNettyCodec < P > codec ) {
this ( opts - > opts . host ( host ) . port ( port ) , codec ) ;
this ( builder - > builder . host ( host ) . port ( port ) , codec ) ;
}
/ * *
* Alternate constructor with a { @link ClientOptions . Builder < ? > } consumer
* providing additional control beyond a host and a port .
* Constructor with a { @link ClientOptions . Builder } that can be used to
* customize Reactor Netty client options .
*
* < p > < strong > Note : < / strong > this constructor manages the lifecycle of the
* { @link TcpClient } and its underlying resources . Please do not customize
* any of the following options :
* { @link ClientOptions . Builder # channelGroup ( ChannelGroup ) ChannelGroup } ,
* { @link ClientOptions . Builder # loopResources ( LoopResources ) LoopResources } , and
* { @link ClientOptions . Builder # poolResources ( PoolResources ) PoolResources } .
* You may set the { @link ClientOptions . Builder # disablePool ( ) disablePool }
* option if you simply want to turn off pooling .
*
* < p > For full control over the initialization and lifecycle of the TcpClient ,
* see { @link # ReactorNettyTcpClient ( TcpClient , ReactorNettyCodec ) } .
*
* @param optionsConsumer consumer to customize client options
* @param codec the code to use
* @see org . springframework . messaging . simp . stomp . StompReactorNettyCodec
* /
public ReactorNettyTcpClient ( Consumer < ClientOptions . Builder < ? > > optionsConsumer , ReactorNettyCodec < P > codec ) {
public ReactorNettyTcpClient ( Consumer < ClientOptions . Builder < ? > > optionsConsumer ,
ReactorNettyCodec < P > codec ) {
Assert . notNull ( optionsConsumer , "Consumer<ClientOptions.Builder<?> is required" ) ;
Assert . notNull ( codec , "ReactorNettyCodec is required" ) ;
this . channelGroup = new DefaultChannelGroup ( ImmediateEventExecutor . INSTANCE ) ;
this . loopResources = LoopResources . create ( "reactor-netty-tcp-client" ) ;
this . poolResources = PoolResources . fixed ( "reactor-netty-tcp-pool" ) ;
Consumer < ClientOptions . Builder < ? > > builtInConsumer = opts - > opts
. channelGroup ( this . channelGroup )
. loopResources ( this . loopResources )
. poolResources ( this . poolResources )
. preferNative ( false ) ;
Consumer < ClientOptions . Builder < ? > > builtInConsumer = builder - > {
Assert . isTrue ( ! builder . isLoopAvailable ( ) & & ! builder . isPoolAvailable ( ) ,
"The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " +
"Please, use the constructor that accepts a TcpClient instance " +
"for full control over initialization and lifecycle." ) ;
builder . channelGroup ( this . channelGroup ) ;
builder . preferNative ( false ) ;
this . loopResources = LoopResources . create ( "tcp-client-loop" ) ;
builder . loopResources ( this . loopResources ) ;
if ( ! builder . isPoolDisabled ( ) ) {
this . poolResources = PoolResources . fixed ( "tcp-client-pool" ) ;
builder . poolResources ( this . poolResources ) ;
}
} ;
this . tcpClient = TcpClient . create ( optionsConsumer . andThen ( builtInConsumer ) ) ;
this . codec = codec ;
}
/ * *
* Constructor with an externally created { @link TcpClient } instance whose
* lifecycle is expected to be managed externally .
*
* @param tcpClient the TcpClient instance to use
* @param codec the code to use
* @see org . springframework . messaging . simp . stomp . StompReactorNettyCodec
* /
public ReactorNettyTcpClient ( TcpClient tcpClient , ReactorNettyCodec < P > codec ) {
Assert . notNull ( tcpClient , "TcpClient is required" ) ;
Assert . notNull ( codec , "ReactorNettyCodec is required" ) ;
this . tcpClient = tcpClient ;
this . codec = codec ;
this . channelGroup = null ;
this . loopResources = null ;
this . poolResources = null ;
}
@Override
public ListenableFuture < Void > connect ( final TcpConnectionHandler < P > handler ) {
@ -180,7 +232,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -180,7 +232,8 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
return flux - > flux
. scan ( 1 , ( count , element ) - > count + + )
. flatMap ( attempt - > Optional . ofNullable ( reconnectStrategy . getTimeToNextAttempt ( attempt ) )
. map ( time - > Mono . delay ( Duration . ofMillis ( time ) ) ) . orElse ( Mono . empty ( ) ) ) ;
. map ( time - > Mono . delay ( Duration . ofMillis ( time ) , this . scheduler ) )
. orElse ( Mono . empty ( ) ) ) ;
}
@Override
@ -193,39 +246,39 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -193,39 +246,39 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
this . stopping = true ;
ChannelGroupFuture close = this . channelGroup . close ( ) ;
Mono < Void > completion = FutureMono . from ( close )
. doOnSuccessOrError ( ( x , e ) - > {
// TODO: https://github.com/reactor/reactor-netty/issues/24
shutdownGlobalResources ( ) ;
this . loopResources . dispose ( ) ;
this . poolResources . dispose ( ) ;
// TODO: https://github.com/reactor/reactor-netty/issues/25
try {
Thread . sleep ( 2000 ) ;
}
catch ( InterruptedException ex ) {
ex . printStackTrace ( ) ;
}
// Scheduler after loop resources...
this . scheduler . dispose ( ) ;
} ) ;
Mono < Void > result ;
if ( this . channelGroup ! = null ) {
result = FutureMono . from ( this . channelGroup . close ( ) ) ;
if ( this . loopResources ! = null ) {
result = result . onErrorResume ( ex - > Mono . empty ( ) ) . then ( this . loopResources . disposeLater ( ) ) ;
}
if ( this . poolResources ! = null ) {
result = result . onErrorResume ( ex - > Mono . empty ( ) ) . then ( this . poolResources . disposeLater ( ) ) ;
}
result = result . onErrorResume ( ex - > Mono . empty ( ) ) . then ( stopScheduler ( ) ) ;
}
else {
result = stopScheduler ( ) ;
}
return new MonoToListenableFutureAdapter < > ( completion ) ;
return new MonoToListenableFutureAdapter < > ( result ) ;
}
private void shutdownGlobalResources ( ) {
try {
Method method = TcpResources . class . getDeclaredMethod ( "_dispose" ) ;
ReflectionUtils . makeAccessible ( method ) ;
ReflectionUtils . invokeMethod ( method , TcpResources . get ( ) ) ;
}
catch ( NoSuchMethodException ex ) {
// ignore
}
private Mono < Void > stopScheduler ( ) {
return Mono . fromRunnable ( ( ) - > {
this . scheduler . dispose ( ) ;
for ( int i = 0 ; i < 20 ; i + + ) {
if ( this . scheduler . isDisposed ( ) ) {
break ;
}
try {
Thread . sleep ( 100 ) ;
}
catch ( Throwable ex ) {
break ;
}
}
} ) ;
}