diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 95a92bc5f97..ab8ba2ed390 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,6 +16,7 @@ package org.springframework.messaging.tcp.reactor; +import java.lang.reflect.Method; import java.util.Collection; import java.util.List; import java.util.function.BiFunction; @@ -41,7 +42,10 @@ import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.options.ClientOptions; +import reactor.ipc.netty.resources.LoopResources; +import reactor.ipc.netty.resources.PoolResources; import reactor.ipc.netty.tcp.TcpClient; +import reactor.ipc.netty.tcp.TcpResources; import reactor.util.concurrent.QueueSupplier; import org.springframework.messaging.Message; @@ -50,6 +54,7 @@ import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -68,6 +73,10 @@ public class ReactorNettyTcpClient
implements TcpOperations
{ private final ChannelGroup channelGroup; + private final LoopResources loopResources; + + private final PoolResources poolResources; + private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); private volatile boolean stopping = false; @@ -84,11 +93,21 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
* Alternate constructor with a {@link ClientOptions} consumer providing
* additional control beyond a host and a port.
*/
- public ReactorNettyTcpClient(Consumer codec) {
- Assert.notNull(consumer, "Consumer codec) {
+ Assert.notNull(optionsConsumer, "Consumer implements TcpOperations {
}
private implements TcpOperations {
future.set(null);
return future;
}
+
this.stopping = true;
- ChannelGroupFuture future = this.channelGroup.close();
- Mono