From e62b104e03cdb7ffb00d816e5922446eb359a4a2 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 28 Dec 2016 21:34:34 -0500 Subject: [PATCH] Resource cleanup on shutdown in ReactorNettyTcpClient --- .../tcp/reactor/ReactorNettyTcpClient.java | 65 +++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 95a92bc5f97..ab8ba2ed390 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,6 +16,7 @@ package org.springframework.messaging.tcp.reactor; +import java.lang.reflect.Method; import java.util.Collection; import java.util.List; import java.util.function.BiFunction; @@ -41,7 +42,10 @@ import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.options.ClientOptions; +import reactor.ipc.netty.resources.LoopResources; +import reactor.ipc.netty.resources.PoolResources; import reactor.ipc.netty.tcp.TcpClient; +import reactor.ipc.netty.tcp.TcpResources; import reactor.util.concurrent.QueueSupplier; import org.springframework.messaging.Message; @@ -50,6 +54,7 @@ import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -68,6 +73,10 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final ChannelGroup channelGroup; + private final LoopResources loopResources; + + private final PoolResources poolResources; + private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); private volatile boolean stopping = false; @@ -84,11 +93,21 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * Alternate constructor with a {@link ClientOptions} consumer providing * additional control beyond a host and a port. */ - public ReactorNettyTcpClient(Consumer consumer, ReactorNettyCodec

codec) { - Assert.notNull(consumer, "Consumer is required"); + public ReactorNettyTcpClient(Consumer optionsConsumer, ReactorNettyCodec

codec) { + Assert.notNull(optionsConsumer, "Consumer is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); + this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = TcpClient.create(consumer.andThen(opts -> opts.channelGroup(this.channelGroup))); + this.loopResources = LoopResources.create("reactor-netty-tcp-client"); + this.poolResources = PoolResources.fixed("reactor-netty-tcp-pool"); + + Consumer builtInConsumer = opts -> opts + .channelGroup(this.channelGroup) + .loopResources(this.loopResources) + .poolResources(this.poolResources) + .preferNative(false); + + this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer)); this.codec = codec; } @@ -152,7 +171,8 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } private Function, Publisher> reconnectFunction(ReconnectStrategy reconnectStrategy) { - return flux -> flux.scan(1, (count, e) -> count++) + return flux -> flux + .scan(1, (count, element) -> count++) .flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt))); } @@ -163,12 +183,45 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ future.set(null); return future; } + this.stopping = true; - ChannelGroupFuture future = this.channelGroup.close(); - Mono completion = FutureMono.from(future).doAfterTerminate((x, e) -> scheduler.dispose()); + + ChannelGroupFuture close = this.channelGroup.close(); + Mono completion = FutureMono.from(close) + .doAfterTerminate((x, e) -> { + + // TODO: https://github.com/reactor/reactor-netty/issues/24 + shutdownGlobalResources(); + + this.loopResources.dispose(); + this.poolResources.dispose(); + + // TODO: https://github.com/reactor/reactor-netty/issues/25 + try { + Thread.sleep(2000); + } + catch (InterruptedException ex) { + ex.printStackTrace(); + } + + // Scheduler after loop resources... + this.scheduler.dispose(); + }); + return new MonoToListenableFutureAdapter<>(completion); } + private static void shutdownGlobalResources() { + try { + Method method = TcpResources.class.getDeclaredMethod("_dispose"); + ReflectionUtils.makeAccessible(method); + ReflectionUtils.invokeMethod(method, TcpResources.get()); + } + catch (NoSuchMethodException ex) { + ex.printStackTrace(); + } + } + private class ReactorNettyHandler implements BiFunction> {