From abe4420006d244140d9d4efd0c31756b13951e76 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 1 Nov 2017 16:22:49 -0400 Subject: [PATCH] Improve ReactorNettyTcpClient shutdown logic This commit takes care of the TODOs in ReactorNettyTcpClient by taking advantage of improvements in Reactor Netty. Issue: SPR-16145 --- .../simp/stomp/StompReactorNettyCodec.java | 2 +- .../tcp/reactor/ReactorNettyTcpClient.java | 153 ++++++++++++------ 2 files changed, 104 insertions(+), 51 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java index c59d1379cee..01228cca778 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java @@ -27,7 +27,7 @@ import org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCo * @author Rossen Stoyanchev * @since 5.0 */ -class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec { +public class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec { private final StompDecoder decoder; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index c9bf7cb78a9..c37837494a8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,7 +16,6 @@ package org.springframework.messaging.tcp.reactor; -import java.lang.reflect.Method; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -28,7 +27,6 @@ import java.util.function.Function; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -47,15 +45,14 @@ import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.resources.LoopResources; import reactor.ipc.netty.resources.PoolResources; import reactor.ipc.netty.tcp.TcpClient; -import reactor.ipc.netty.tcp.TcpResources; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -75,46 +72,101 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final ReactorNettyCodec

codec; + @Nullable private final ChannelGroup channelGroup; - private final LoopResources loopResources; + @Nullable + private LoopResources loopResources; - private final PoolResources poolResources; + @Nullable + private PoolResources poolResources; - private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); + private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler"); private volatile boolean stopping = false; /** - * Basic constructor with a host and a port. + * Simple constructor with a host and a port. + * @param host the host to connect to + * @param port the port to connect to + * @param codec the code to use + * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { - this(opts -> opts.host(host).port(port), codec); + this(builder -> builder.host(host).port(port), codec); } /** - * Alternate constructor with a {@link ClientOptions.Builder} consumer - * providing additional control beyond a host and a port. + * Constructor with a {@link ClientOptions.Builder} that can be used to + * customize Reactor Netty client options. + * + *

Note: this constructor manages the lifecycle of the + * {@link TcpClient} and its underlying resources. Please do not customize + * any of the following options: + * {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup}, + * {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and + * {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}. + * You may set the {@link ClientOptions.Builder#disablePool() disablePool} + * option if you simply want to turn off pooling. + * + *

For full control over the initialization and lifecycle of the TcpClient, + * see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}. + * + * @param optionsConsumer consumer to customize client options + * @param codec the code to use + * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ - public ReactorNettyTcpClient(Consumer> optionsConsumer, ReactorNettyCodec

codec) { + public ReactorNettyTcpClient(Consumer> optionsConsumer, + ReactorNettyCodec

codec) { + Assert.notNull(optionsConsumer, "Consumer is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.loopResources = LoopResources.create("reactor-netty-tcp-client"); - this.poolResources = PoolResources.fixed("reactor-netty-tcp-pool"); - Consumer> builtInConsumer = opts -> opts - .channelGroup(this.channelGroup) - .loopResources(this.loopResources) - .poolResources(this.poolResources) - .preferNative(false); + Consumer> builtInConsumer = builder -> { + + Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(), + "The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " + + "Please, use the constructor that accepts a TcpClient instance " + + "for full control over initialization and lifecycle."); + + builder.channelGroup(this.channelGroup); + builder.preferNative(false); + + this.loopResources = LoopResources.create("tcp-client-loop"); + builder.loopResources(this.loopResources); + + if (!builder.isPoolDisabled()) { + this.poolResources = PoolResources.fixed("tcp-client-pool"); + builder.poolResources(this.poolResources); + } + }; this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer)); this.codec = codec; } + /** + * Constructor with an externally created {@link TcpClient} instance whose + * lifecycle is expected to be managed externally. + * + * @param tcpClient the TcpClient instance to use + * @param codec the code to use + * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec + */ + public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec

codec) { + Assert.notNull(tcpClient, "TcpClient is required"); + Assert.notNull(codec, "ReactorNettyCodec is required"); + this.tcpClient = tcpClient; + this.codec = codec; + + this.channelGroup = null; + this.loopResources = null; + this.poolResources = null; + } + @Override public ListenableFuture connect(final TcpConnectionHandler

handler) { @@ -180,7 +232,8 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return flux -> flux .scan(1, (count, element) -> count++) .flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt)) - .map(time -> Mono.delay(Duration.ofMillis(time))).orElse(Mono.empty())); + .map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler)) + .orElse(Mono.empty())); } @Override @@ -193,39 +246,39 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.stopping = true; - ChannelGroupFuture close = this.channelGroup.close(); - Mono completion = FutureMono.from(close) - .doOnSuccessOrError((x, e) -> { - // TODO: https://github.com/reactor/reactor-netty/issues/24 - shutdownGlobalResources(); - - this.loopResources.dispose(); - this.poolResources.dispose(); - - // TODO: https://github.com/reactor/reactor-netty/issues/25 - try { - Thread.sleep(2000); - } - catch (InterruptedException ex) { - ex.printStackTrace(); - } - - // Scheduler after loop resources... - this.scheduler.dispose(); - }); + Mono result; + if (this.channelGroup != null) { + result = FutureMono.from(this.channelGroup.close()); + if (this.loopResources != null) { + result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater()); + } + if (this.poolResources != null) { + result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater()); + } + result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler()); + } + else { + result = stopScheduler(); + } - return new MonoToListenableFutureAdapter<>(completion); + return new MonoToListenableFutureAdapter<>(result); } - private void shutdownGlobalResources() { - try { - Method method = TcpResources.class.getDeclaredMethod("_dispose"); - ReflectionUtils.makeAccessible(method); - ReflectionUtils.invokeMethod(method, TcpResources.get()); - } - catch (NoSuchMethodException ex) { - // ignore - } + private Mono stopScheduler() { + return Mono.fromRunnable(() -> { + this.scheduler.dispose(); + for (int i = 0; i < 20; i++) { + if (this.scheduler.isDisposed()) { + break; + } + try { + Thread.sleep(100); + } + catch (Throwable ex) { + break; + } + } + }); }