From 870f61fd8ea75ff46f0892c9ccff347fa6d8b6f3 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Wed, 30 Nov 2016 12:46:01 +0000 Subject: [PATCH 1/3] update STOMP support to reactor-netty --- build.gradle | 15 +- .../simp/stomp/Reactor2StompCodec.java | 107 ------ ...t.java => ReactorNettyTcpStompClient.java} | 100 +++-- .../stomp/StompBrokerRelayMessageHandler.java | 17 +- ...bstractMonoToListenableFutureAdapter.java} | 58 ++- ...ava => MonoToListenableFutureAdapter.java} | 12 +- .../tcp/reactor/Reactor2TcpClient.java | 362 ------------------ .../tcp/reactor/ReactorNettyTcpClient.java | 264 +++++++++++++ ...on.java => ReactorNettyTcpConnection.java} | 48 ++- ...a => ReactorNettyTcpStompClientTests.java} | 10 +- ...CodecTests.java => StompDecoderTests.java} | 105 +---- .../simp/stomp/StompEncoderTests.java | 93 +++++ 12 files changed, 496 insertions(+), 695 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java rename spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/{Reactor2TcpStompClient.java => ReactorNettyTcpStompClient.java} (51%) rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{AbstractPromiseToListenableFutureAdapter.java => AbstractMonoToListenableFutureAdapter.java} (70%) rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{PassThroughPromiseToListenableFutureAdapter.java => MonoToListenableFutureAdapter.java} (72%) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java rename spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/{Reactor2TcpConnection.java => ReactorNettyTcpConnection.java} (57%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/{Reactor2TcpStompClientTests.java => ReactorNettyTcpStompClientTests.java} (95%) rename spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/{StompCodecTests.java => StompDecoderTests.java} (70%) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java diff --git a/build.gradle b/build.gradle index 37d9b017124..d56e831e138 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,6 @@ configure(allprojects) { project -> ext.protobufVersion = "3.1.0" ext.quartzVersion = "2.2.3" ext.reactivestreamsVersion = "1.0.0" - ext.reactorVersion = "2.0.8.RELEASE" ext.reactorCoreVersion = '3.0.3.RELEASE' ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT' ext.romeVersion = "1.7.0" @@ -578,12 +577,8 @@ project("spring-messaging") { compile(project(":spring-core")) compile(project(":spring-context")) optional(project(":spring-oxm")) - optional("io.projectreactor:reactor-core:${reactorVersion}") { - force = true // enforce 2.0.x - } - optional("io.projectreactor:reactor-net:${reactorVersion}") { - exclude group: "io.netty", module: "netty-all" - } + optional("io.projectreactor:reactor-core:${reactorCoreVersion}") + optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") optional("io.netty:netty-all:${nettyVersion}") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" @@ -1003,10 +998,8 @@ project("spring-websocket") { optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") - testCompile("io.projectreactor:reactor-core:${reactorVersion}") { - force = true // enforce 2.0.x - } - testCompile("io.projectreactor:reactor-net:${reactorVersion}") + testCompile("io.projectreactor:reactor-core:${reactorCoreVersion}") + testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}") testCompile("io.netty:netty-all:${nettyVersion}") testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}") testRuntime("org.jboss.xnio:xnio-nio:${xnioVersion}") diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java deleted file mode 100644 index 91d8c4851b7..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.simp.stomp; - -import java.nio.ByteBuffer; - -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.io.buffer.Buffer; -import reactor.io.codec.Codec; - -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -/** - * A Reactor TCP {@link Codec} for sending and receiving STOMP messages. - * - * @author Andy Wilkinson - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class Reactor2StompCodec extends Codec, Message> { - - private final Function, Buffer> encodingFunction; - - private final StompDecoder stompDecoder; - - - public Reactor2StompCodec() { - this(new StompEncoder(), new StompDecoder()); - } - - public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { - Assert.notNull(encoder, "StompEncoder is required"); - Assert.notNull(decoder, "StompDecoder is required"); - this.encodingFunction = new EncodingFunction(encoder); - this.stompDecoder = decoder; - } - - - @Override - public Function> decoder(final Consumer> messageConsumer) { - return new DecodingFunction(this.stompDecoder, messageConsumer); - } - - @Override - public Function, Buffer> encoder() { - return this.encodingFunction; - } - - @Override - public Buffer apply(Message message) { - return this.encodingFunction.apply(message); - } - - - private static class EncodingFunction implements Function, Buffer> { - - private final StompEncoder encoder; - - public EncodingFunction(StompEncoder encoder) { - this.encoder = encoder; - } - - @Override - public Buffer apply(Message message) { - byte[] bytes = this.encoder.encode(message); - return new Buffer(ByteBuffer.wrap(bytes)); - } - } - - - private static class DecodingFunction implements Function> { - - private final StompDecoder decoder; - - private final Consumer> messageConsumer; - - public DecodingFunction(StompDecoder decoder, Consumer> next) { - this.decoder = decoder; - this.messageConsumer = next; - } - - @Override - public Message apply(Buffer buffer) { - for (Message message : this.decoder.decode(buffer.byteBuffer())) { - this.messageConsumer.accept(message); - } - return null; - } - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java similarity index 51% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index e7eda79eb30..64d072f9eda 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -16,63 +16,54 @@ package org.springframework.messaging.simp.stomp; -import java.util.Collections; import java.util.List; -import java.util.Properties; +import java.util.function.BiConsumer; +import java.util.function.Function; -import reactor.Environment; -import reactor.core.config.ConfigurationReader; -import reactor.core.config.DispatcherConfiguration; -import reactor.core.config.DispatcherType; -import reactor.core.config.ReactorConfiguration; -import reactor.io.net.NetStreams; -import reactor.io.net.Spec.TcpClientSpec; +import io.netty.buffer.ByteBuf; +import reactor.core.scheduler.Schedulers; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.concurrent.ListenableFuture; /** * A STOMP over TCP client that uses - * {@link Reactor2TcpClient}. + * {@link ReactorNettyTcpClient}. * * @author Rossen Stoyanchev * @since 4.2 */ -public class Reactor2TcpStompClient extends StompClientSupport { +public class ReactorNettyTcpStompClient extends StompClientSupport { private final TcpOperations tcpClient; - /** * Create an instance with host "127.0.0.1" and port 61613. */ - public Reactor2TcpStompClient() { + public ReactorNettyTcpStompClient() { this("127.0.0.1", 61613); } + /** * Create an instance with the given host and port. * @param host the host * @param port the port */ - public Reactor2TcpStompClient(final String host, final int port) { - ConfigurationReader reader = new StompClientDispatcherConfigReader(); - Environment environment = new Environment(reader).assignErrorJournal(); - StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port); - this.tcpClient = new Reactor2TcpClient<>(factory); + public ReactorNettyTcpStompClient(final String host, final int port) { + this.tcpClient = create(host, port, new StompDecoder()); } /** * Create an instance with a pre-configured TCP client. * @param tcpClient the client to use */ - public Reactor2TcpStompClient(TcpOperations tcpClient) { + public ReactorNettyTcpStompClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; } - /** * Connect and notify the given {@link StompSessionHandler} when connected * on the STOMP level. @@ -83,6 +74,7 @@ public class Reactor2TcpStompClient extends StompClientSupport { return connect(null, handler); } + /** * An overloaded version of {@link #connect(StompSessionHandler)} that * accepts headers to use for the STOMP CONNECT frame. @@ -103,48 +95,54 @@ public class Reactor2TcpStompClient extends StompClientSupport { this.tcpClient.shutdown(); } - /** - * A ConfigurationReader with a thread pool-based dispatcher. + * Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for + * encoding, decoding and hand-off. + * + * @param relayHost target host + * @param relayPort target port + * @param decoder {@link StompDecoder} to use + * @return a new {@link TcpOperations} */ - private static class StompClientDispatcherConfigReader implements ConfigurationReader { - - @Override - public ReactorConfiguration read() { - String dispatcherName = "StompClient"; - DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP; - DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0); - List configList = Collections.singletonList(config); - return new ReactorConfiguration(configList, dispatcherName, new Properties()); - } + protected static TcpOperations create(String relayHost, + int relayPort, + StompDecoder decoder) { + return new ReactorNettyTcpClient<>(relayHost, + relayPort, + new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction( + decoder), + new EncodingConsumer(new StompEncoder()), + 128, + Schedulers.newParallel("StompClient"))); } + private static final class EncodingConsumer + implements BiConsumer> { - private static class StompTcpClientSpecFactory - implements NetStreams.TcpClientFactory, Message> { + private final StompEncoder encoder; - private final Environment environment; + public EncodingConsumer(StompEncoder encoder) { + this.encoder = encoder; + } - private final String host; + @Override + public void accept(ByteBuf byteBuf, Message message) { + byteBuf.writeBytes(encoder.encode(message)); + } + } + + private static final class DecodingFunction + implements Function>> { - private final int port; + private final StompDecoder decoder; - public StompTcpClientSpecFactory(Environment environment, String host, int port) { - this.environment = environment; - this.host = host; - this.port = port; + public DecodingFunction(StompDecoder decoder) { + this.decoder = decoder; } @Override - public TcpClientSpec, Message> apply( - TcpClientSpec, Message> tcpClientSpec) { - - return tcpClientSpec - .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) - .env(this.environment) - .dispatcher(this.environment.getCachedDispatchers("StompClient").get()) - .connect(this.host, this.port); + public List> apply(ByteBuf buffer) { + return this.decoder.decode(buffer.nioBuffer()); } } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index a4d16467835..b8b9c98a989 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -40,7 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -335,7 +335,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. - * By default {@link Reactor2TcpClient} is used. + * By default {@link ReactorNettyTcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -387,8 +387,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (this.tcpClient == null) { StompDecoder decoder = new StompDecoder(); decoder.setHeaderInitializer(getHeaderInitializer()); - Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder); - this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); + + this.tcpClient = ReactorNettyTcpStompClient.create(this.relayHost, this.relayPort, decoder); } if (logger.isInfoEnabled()) { @@ -970,15 +970,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - - private static class StompTcpClientFactory { - - public TcpOperations create(String relayHost, int relayPort, Reactor2StompCodec codec) { - return new Reactor2TcpClient<>(relayHost, relayPort, codec); - } - } - - private static class VoidCallable implements Callable { @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java similarity index 70% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java index 7e4a77ad343..2b5ab32d479 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java @@ -16,12 +16,14 @@ package org.springframework.messaging.tcp.reactor; +import java.time.Duration; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import reactor.fn.Consumer; -import reactor.rx.Promise; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.util.Assert; import org.springframework.util.concurrent.FailureCallback; @@ -31,28 +33,24 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; import org.springframework.util.concurrent.SuccessCallback; /** - * Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting + * Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting * the result Object type {@code } to the expected target type {@code }. * * @author Rossen Stoyanchev * @since 4.0 - * @param the type of object expected from the {@link Promise} + * @param the type of object expected from the {@link Mono} * @param the type of object expected from the {@link ListenableFuture} */ -abstract class AbstractPromiseToListenableFutureAdapter implements ListenableFuture { +abstract class AbstractMonoToListenableFutureAdapter + implements ListenableFuture { - private final Promise promise; + private final MonoProcessor promise; private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); - - protected AbstractPromiseToListenableFutureAdapter(Promise promise) { - Assert.notNull(promise, "Promise must not be null"); - this.promise = promise; - - this.promise.onSuccess(new Consumer() { - @Override - public void accept(S result) { + protected AbstractMonoToListenableFutureAdapter(Mono promise) { + Assert.notNull(promise, "Mono must not be null"); + this.promise = promise.doOnSuccess(result -> { T adapted; try { adapted = adapt(result); @@ -62,46 +60,44 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena return; } registry.success(adapted); - } - }); - - this.promise.onError(new Consumer() { - @Override - public void accept(Throwable ex) { - registry.failure(ex); - } - }); + }) + .doOnError(registry::failure) + .subscribe(); } @Override public T get() throws InterruptedException { - S result = this.promise.await(); + S result = this.promise.block(); return adapt(result); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - S result = this.promise.await(timeout, unit); - if (!this.promise.isComplete()) { - throw new TimeoutException(); - } + Objects.requireNonNull(unit, "unit"); + S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert( + timeout, + unit))); return adapt(result); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - return false; + if (isCancelled()) { + return false; + } + this.promise.cancel(); + return true; } @Override public boolean isCancelled() { - return false; + return this.promise.isCancelled(); } @Override public boolean isDone() { - return this.promise.isComplete(); + return this.promise.isTerminated(); } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java similarity index 72% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java index 3028307f31c..8ecab4a974d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java @@ -16,20 +16,22 @@ package org.springframework.messaging.tcp.reactor; -import reactor.rx.Promise; +import reactor.core.publisher.Mono; /** - * A Promise-to-ListenableFutureAdapter where the source and the target from + * A Mono-to-ListenableFutureAdapter where the source and the target from * the Promise and the ListenableFuture respectively are of the same type. * * @author Rossen Stoyanchev + * @author Stephane Maldini * @since 4.0 */ -class PassThroughPromiseToListenableFutureAdapter extends AbstractPromiseToListenableFutureAdapter { +class MonoToListenableFutureAdapter extends + AbstractMonoToListenableFutureAdapter { - public PassThroughPromiseToListenableFutureAdapter(Promise promise) { - super(promise); + public MonoToListenableFutureAdapter(Mono mono) { + super(mono); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java deleted file mode 100644 index 4ff624314e5..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.tcp.reactor; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import org.reactivestreams.Publisher; -import reactor.Environment; -import reactor.core.config.ConfigurationReader; -import reactor.core.config.DispatcherConfiguration; -import reactor.core.config.ReactorConfiguration; -import reactor.core.support.NamedDaemonThreadFactory; -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.fn.tuple.Tuple; -import reactor.fn.tuple.Tuple2; -import reactor.io.buffer.Buffer; -import reactor.io.codec.Codec; -import reactor.io.net.ChannelStream; -import reactor.io.net.NetStreams; -import reactor.io.net.NetStreams.TcpClientFactory; -import reactor.io.net.ReactorChannelHandler; -import reactor.io.net.Reconnect; -import reactor.io.net.Spec.TcpClientSpec; -import reactor.io.net.config.ClientSocketOptions; -import reactor.io.net.impl.netty.NettyClientSocketOptions; -import reactor.io.net.impl.netty.tcp.NettyTcpClient; -import reactor.io.net.tcp.TcpClient; -import reactor.rx.Promise; -import reactor.rx.Promises; -import reactor.rx.Stream; -import reactor.rx.Streams; -import reactor.rx.action.Signal; - -import org.springframework.messaging.Message; -import org.springframework.messaging.tcp.ReconnectStrategy; -import org.springframework.messaging.tcp.TcpConnectionHandler; -import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; -import org.springframework.util.concurrent.ListenableFuture; - -/** - * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. - * - *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, - * i.e. a separate (Reactor) client instance for each connection. - * - * @author Rossen Stoyanchev - * @author Stephane Maldini - * @since 4.2 - */ -public class Reactor2TcpClient

implements TcpOperations

{ - - @SuppressWarnings("rawtypes") - public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - - private static final Method eventLoopGroupMethod = initEventLoopGroupMethod(); - - - private final EventLoopGroup eventLoopGroup; - - private final Environment environment; - - private final TcpClientFactory, Message

> tcpClientSpecFactory; - - private final List, Message

>> tcpClients = - new ArrayList<>(); - - private boolean stopping; - - - /** - * A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory - * with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e. - * relying on Netty threads. The number of Netty threads can be tweaked with - * the {@code reactor.tcp.ioThreadCount} System property. The network I/O - * threads will be shared amongst the active clients. - *

Also see the constructor accepting a ready Reactor - * {@link TcpClientSpec} {@link Function} factory. - * @param host the host to connect to - * @param port the port to connect to - * @param codec the codec to use for encoding and decoding the TCP stream - */ - public Reactor2TcpClient(final String host, final int port, final Codec, Message

> codec) { - // Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup - final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup(); - this.eventLoopGroup = nioEventLoopGroup; - this.environment = new Environment(new SynchronousDispatcherConfigReader()); - - this.tcpClientSpecFactory = new TcpClientFactory, Message

>() { - @Override - public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { - return spec - .env(environment) - .codec(codec) - .connect(host, port) - .options(createClientSocketOptions()); - } - - private ClientSocketOptions createClientSocketOptions() { - return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod, - new NettyClientSocketOptions(), nioEventLoopGroup); - } - }; - } - - /** - * A constructor with a pre-configured {@link TcpClientSpec} {@link Function} - * factory. This might be used to add SSL or specific network parameters to - * the generated client configuration. - *

NOTE: if the client is configured with a thread-creating - * dispatcher, you are responsible for cleaning them, e.g. using - * {@link reactor.core.Dispatcher#shutdown}. - * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation - */ - public Reactor2TcpClient(TcpClientFactory, Message

> tcpClientSpecFactory) { - Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); - this.tcpClientSpecFactory = tcpClientSpecFactory; - this.eventLoopGroup = null; - this.environment = null; - } - - - private static NioEventLoopGroup initEventLoopGroup() { - int ioThreadCount; - try { - ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); - } - catch (Throwable ex) { - ioThreadCount = -1; - } - if (ioThreadCount <= 0) { - ioThreadCount = Runtime.getRuntime().availableProcessors(); - } - return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io")); - } - - - @Override - public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - - final TcpClient, Message

> tcpClient; - final Runnable cleanupTask; - synchronized (this.tcpClients) { - if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); - return new PassThroughPromiseToListenableFutureAdapter<>(Promises.error(ex)); - } - tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); - this.tcpClients.add(tcpClient); - cleanupTask = new Runnable() { - @Override - public void run() { - synchronized (tcpClients) { - tcpClients.remove(tcpClient); - } - } - }; - } - - Promise promise = tcpClient.start( - new MessageChannelStreamHandler<>(connectionHandler, cleanupTask)); - - return new PassThroughPromiseToListenableFutureAdapter<>( - promise.onError(new Consumer() { - @Override - public void accept(Throwable ex) { - cleanupTask.run(); - connectionHandler.afterConnectFailure(ex); - } - }) - ); - } - - @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy strategy) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - Assert.notNull(strategy, "ReconnectStrategy must not be null"); - - final TcpClient, Message

> tcpClient; - Runnable cleanupTask; - synchronized (this.tcpClients) { - if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); - return new PassThroughPromiseToListenableFutureAdapter<>(Promises.error(ex)); - } - tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); - this.tcpClients.add(tcpClient); - cleanupTask = new Runnable() { - @Override - public void run() { - synchronized (tcpClients) { - tcpClients.remove(tcpClient); - } - } - }; - } - - Stream> stream = tcpClient.start( - new MessageChannelStreamHandler<>(connectionHandler, cleanupTask), - new ReactorReconnectAdapter(strategy)); - - return new PassThroughPromiseToListenableFutureAdapter<>(stream.next().after()); - } - - @Override - public ListenableFuture shutdown() { - synchronized (this.tcpClients) { - this.stopping = true; - } - - Promise promise = Streams.from(this.tcpClients) - .flatMap(new Function, Message

>, Promise>() { - @Override - public Promise apply(final TcpClient, Message

> client) { - return client.shutdown().onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - tcpClients.remove(client); - } - }); - } - }) - .next(); - - if (this.eventLoopGroup != null) { - final Promise eventLoopPromise = Promises.prepare(); - promise.onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - eventLoopGroup.shutdownGracefully().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - eventLoopPromise.onComplete(); - } - else { - eventLoopPromise.onError(future.cause()); - } - } - }); - } - }); - promise = eventLoopPromise; - } - - if (this.environment != null) { - promise.onComplete(new Consumer>() { - @Override - public void accept(Promise voidPromise) { - environment.shutdown(); - } - }); - } - - return new PassThroughPromiseToListenableFutureAdapter<>(promise); - } - - - private static Method initEventLoopGroupMethod() { - for (Method method : NettyClientSocketOptions.class.getMethods()) { - if (method.getName().equals("eventLoopGroup") && method.getParameterCount() == 1) { - return method; - } - } - throw new IllegalStateException("No compatible Reactor version found"); - } - - - private static class SynchronousDispatcherConfigReader implements ConfigurationReader { - - @Override - public ReactorConfiguration read() { - return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()); - } - } - - - private static class MessageChannelStreamHandler

- implements ReactorChannelHandler, Message

, ChannelStream, Message

>> { - - private final TcpConnectionHandler

connectionHandler; - - private final Runnable cleanupTask; - - public MessageChannelStreamHandler(TcpConnectionHandler

connectionHandler, Runnable cleanupTask) { - this.connectionHandler = connectionHandler; - this.cleanupTask = cleanupTask; - } - - @Override - public Publisher apply(ChannelStream, Message

> channelStream) { - Promise closePromise = Promises.prepare(); - this.connectionHandler.afterConnected(new Reactor2TcpConnection<>(channelStream, closePromise)); - channelStream - .finallyDo(new Consumer>>() { - @Override - public void accept(Signal> signal) { - cleanupTask.run(); - if (signal.isOnError()) { - connectionHandler.handleFailure(signal.getThrowable()); - } - else if (signal.isOnComplete()) { - connectionHandler.afterConnectionClosed(); - } - } - }) - .consume(new Consumer>() { - @Override - public void accept(Message

message) { - connectionHandler.handleMessage(message); - } - }); - - return closePromise; - } - } - - - private static class ReactorReconnectAdapter implements Reconnect { - - private final ReconnectStrategy strategy; - - public ReactorReconnectAdapter(ReconnectStrategy strategy) { - this.strategy = strategy; - } - - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt)); - } - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java new file mode 100644 index 00000000000..0b93368e8ce --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -0,0 +1,264 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.tcp.reactor; + +import java.util.Collection; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.reactivestreams.Publisher; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Scheduler; +import reactor.ipc.netty.ChannelFutureMono; +import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.options.ClientOptions; +import reactor.ipc.netty.tcp.TcpClient; +import reactor.util.concurrent.QueueSupplier; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.ReconnectStrategy; +import org.springframework.messaging.tcp.TcpConnection; +import org.springframework.messaging.tcp.TcpConnectionHandler; +import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the TCP client support of the Reactor project. + *

+ *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, + * i.e. a separate (Reactor) client instance for each connection. + * + * @author Rossen Stoyanchev + * @author Stephane Maldini + * @since 4.2 + */ +public class ReactorNettyTcpClient

implements TcpOperations

{ + + private final TcpClient tcpClient; + + private final MessageHandlerConfiguration

configuration; + private final ChannelGroup group; + private volatile boolean stopping; + + /** + * A constructor that creates a {@link TcpClient TcpClient} factory relying on + * Reactor Netty TCP threads. The number of Netty threads can be tweaked with + * the {@code reactor.tcp.ioThreadCount} System property. The network I/O + * threads will be shared amongst the active clients. + *

Also see the constructor accepting a {@link Consumer} of + * {@link ClientOptions} for advanced tuning. + * + * @param host the host to connect to + * @param port the port to connect to + * @param configuration the client configuration + */ + public ReactorNettyTcpClient(String host, + int port, + MessageHandlerConfiguration

configuration) { + this.configuration = Objects.requireNonNull(configuration, "configuration"); + this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.tcpClient = TcpClient.create(options -> options.connect(host, port) + .channelGroup(group)); + } + + /** + * A constructor with a configurator {@link Consumer} that will receive default {@link + * ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific + * network parameters to the generated client configuration. + * + * @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by + * connected handlers. + * @param configuration the client configuration + */ + public ReactorNettyTcpClient(Consumer tcpOptions, + MessageHandlerConfiguration

configuration) { + this.configuration = Objects.requireNonNull(configuration, "configuration"); + this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.tcpClient = + TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + } + + @Override + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { + Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); + if (stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + } + + MessageHandler

handler = + new MessageHandler<>(connectionHandler, configuration); + + Mono promise = tcpClient.newHandler(handler) + .doOnError(connectionHandler::afterConnectFailure) + .then(); + + return new MonoToListenableFutureAdapter<>(promise); + } + + @Override + public ListenableFuture connect(TcpConnectionHandler

connectionHandler, + ReconnectStrategy strategy) { + Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); + Assert.notNull(strategy, "ReconnectStrategy must not be null"); + + if (stopping) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + connectionHandler.afterConnectFailure(ex); + return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + } + + MessageHandler

handler = + new MessageHandler<>(connectionHandler, configuration); + + MonoProcessor promise = MonoProcessor.create(); + + tcpClient.newHandler(handler) + .doOnNext(e -> { + if (!promise.isTerminated()) { + promise.onComplete(); + } + }) + .doOnError(e -> { + if (!promise.isTerminated()) { + promise.onError(e); + } + }) + .then(NettyContext::onClose) + .retryWhen(new Reconnector<>(strategy)) + .repeatWhen(new Reconnector<>(strategy)) + .subscribe(); + + return new MonoToListenableFutureAdapter<>(promise); + } + + @Override + public ListenableFuture shutdown() { + if (stopping) { + return new MonoToListenableFutureAdapter<>(Mono.empty()); + } + + stopping = true; + + Mono closing = ChannelFutureMono.from(group.close()); + + if (configuration.scheduler != null) { + closing = + closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); + } + + return new MonoToListenableFutureAdapter<>(closing); + } + + /** + * A configuration holder + */ + public static final class MessageHandlerConfiguration

{ + + private final Function>> decoder; + private final BiConsumer> encoder; + private final int backlog; + private final Scheduler + scheduler; + + public MessageHandlerConfiguration(Function>> decoder, + BiConsumer> encoder, + int backlog, + Scheduler scheduler) { + this.decoder = decoder; + this.encoder = encoder; + this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE; + this.scheduler = scheduler; + } + } + + private static final class MessageHandler

implements BiFunction> { + + private final TcpConnectionHandler

connectionHandler; + + private final MessageHandlerConfiguration

configuration; + + MessageHandler(TcpConnectionHandler

connectionHandler, + MessageHandlerConfiguration

configuration) { + this.connectionHandler = connectionHandler; + this.configuration = configuration; + } + + @Override + public Publisher apply(NettyInbound in, NettyOutbound out) { + Flux>> inbound = in.receive() + .map(configuration.decoder); + + DirectProcessor promise = DirectProcessor.create(); + TcpConnection

tcpConnection = new ReactorNettyTcpConnection<>(in, + out, + configuration.encoder, + promise); + + if (configuration.scheduler != null) { + configuration.scheduler.schedule(() -> connectionHandler.afterConnected( + tcpConnection)); + inbound = + inbound.publishOn(configuration.scheduler, configuration.backlog); + } + else { + connectionHandler.afterConnected(tcpConnection); + } + + inbound.flatMapIterable(Function.identity()) + .subscribe(connectionHandler::handleMessage, + connectionHandler::handleFailure, + connectionHandler::afterConnectionClosed); + + return promise; + } + + } + + static final class Reconnector implements Function, Publisher> { + + private final ReconnectStrategy strategy; + + Reconnector(ReconnectStrategy strategy) { + this.strategy = strategy; + } + + @Override + public Publisher apply(Flux flux) { + return flux.scan(1, (p, e) -> p++) + .doOnCancel(() -> new Exception().printStackTrace()) + .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt( + attempt))); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java similarity index 57% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 3ef2de416d3..7fe758e842c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -16,10 +16,13 @@ package org.springframework.messaging.tcp.reactor; -import reactor.io.net.ChannelStream; -import reactor.rx.Promise; -import reactor.rx.Promises; -import reactor.rx.Streams; +import java.util.function.BiConsumer; + +import io.netty.buffer.ByteBuf; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -29,45 +32,50 @@ import org.springframework.util.concurrent.ListenableFuture; * An implementation of {@link org.springframework.messaging.tcp.TcpConnection * TcpConnection} based on the TCP client support of the Reactor project. * + * @param

the payload type of messages read or written to the TCP stream. + * * @author Rossen Stoyanchev * @since 4.2 - * @param

the payload type of messages read or written to the TCP stream. */ -public class Reactor2TcpConnection

implements TcpConnection

{ +public class ReactorNettyTcpConnection

implements TcpConnection

{ - private final ChannelStream, Message

> channelStream; + private final NettyInbound in; + private final NettyOutbound out; + private final DirectProcessor close; + private final BiConsumer> encoder; - private final Promise closePromise; - - - public Reactor2TcpConnection(ChannelStream, Message

> channelStream, Promise closePromise) { - this.channelStream = channelStream; - this.closePromise = closePromise; + public ReactorNettyTcpConnection(NettyInbound in, + NettyOutbound out, + BiConsumer> encoder, + DirectProcessor close) { + this.out = out; + this.in = in; + this.encoder = encoder; + this.close = close; } - @Override public ListenableFuture send(Message

message) { - Promise afterWrite = Promises.prepare(); - this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite); - return new PassThroughPromiseToListenableFutureAdapter<>(afterWrite); + ByteBuf byteBuf = in.channel().alloc().buffer(); + encoder.accept(byteBuf, message); + return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf))); } @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - this.channelStream.on().readIdle(inactivityDuration, reactor.fn.Functions.consumer(runnable)); + in.onReadIdle(inactivityDuration, runnable); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.channelStream.on().writeIdle(inactivityDuration, reactor.fn.Functions.consumer(runnable)); + out.onWriteIdle(inactivityDuration, runnable); } @Override public void close() { - this.closePromise.onComplete(); + close.onComplete(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java similarity index 95% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java index 94bfce4be33..bdfe762af13 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java @@ -43,20 +43,20 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; /** - * Integration tests for {@link Reactor2TcpStompClient}. + * Integration tests for {@link ReactorNettyTcpStompClient}. * * @author Rossen Stoyanchev */ -public class Reactor2TcpStompClientTests { +public class ReactorNettyTcpStompClientTests { - private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class); + private static final Log logger = LogFactory.getLog(ReactorNettyTcpStompClientTests.class); @Rule public final TestName testName = new TestName(); private BrokerService activeMQBroker; - private Reactor2TcpStompClient client; + private ReactorNettyTcpStompClient client; @Before @@ -78,7 +78,7 @@ public class Reactor2TcpStompClientTests { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.afterPropertiesSet(); - this.client = new Reactor2TcpStompClient("127.0.0.1", port); + this.client = new ReactorNettyTcpStompClient("127.0.0.1", port); this.client.setMessageConverter(new StringMessageConverter()); this.client.setTaskScheduler(taskScheduler); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java similarity index 70% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java index 78d34d40847..98816ef77fc 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java @@ -17,7 +17,7 @@ package org.springframework.messaging.simp.stomp; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.List; import org.junit.Test; @@ -26,22 +26,18 @@ import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.InvalidMimeTypeException; -import reactor.fn.Consumer; -import reactor.fn.Function; -import reactor.io.buffer.Buffer; import static org.junit.Assert.*; /** - * Test fixture for {@link Reactor2StompCodec}. + * Test fixture for {@link StompDecoder}. * * @author Andy Wilkinson + * @author Stephane Maldini */ -public class StompCodecTests { +public class StompDecoderTests { - private final ArgumentCapturingConsumer> consumer = new ArgumentCapturingConsumer<>(); - - private final Function> decoder = new Reactor2StompCodec().decoder(consumer); + private final StompDecoder decoder = new StompDecoder(); @Test public void decodeFrameWithCrLfEols() { @@ -172,11 +168,9 @@ public class StompCodecTests { public void decodeMultipleFramesFromSameBuffer() { String frame1 = "SEND\ndestination:test\n\nThe body of the message\0"; String frame2 = "DISCONNECT\n\n\0"; + ByteBuffer buffer = ByteBuffer.wrap((frame1 + frame2).getBytes()); - Buffer buffer = Buffer.wrap(frame1 + frame2); - - final List> messages = new ArrayList<>(); - new Reactor2StompCodec().decoder(messages::add).apply(buffer); + final List> messages = decoder.decode(buffer); assertEquals(2, messages.size()); assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand()); @@ -245,102 +239,33 @@ public class StompCodecTests { public void decodeHeartbeat() { String frame = "\n"; - Buffer buffer = Buffer.wrap(frame); + ByteBuffer buffer = ByteBuffer.wrap(frame.getBytes()); - final List> messages = new ArrayList<>(); - new Reactor2StompCodec().decoder(messages::add).apply(buffer); + final List> messages = decoder.decode(buffer); assertEquals(1, messages.size()); assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType()); } - @Test - public void encodeFrameWithNoHeadersAndNoBody() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithHeaders() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setAcceptVersion("1.2"); - headers.setHost("github.org"); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - String frameString = new Reactor2StompCodec().encoder().apply(frame).asString(); - - assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") || - frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0")); - } - - @Test - public void encodeFrameWithHeadersThatShouldBeEscaped() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - - assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithHeadersBody() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - headers.addNativeHeader("a", "alpha"); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); - - assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - - @Test - public void encodeFrameWithContentLengthPresent() { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - headers.setContentLength(12); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); - - assertEquals("SEND\ncontent-length:12\n\nMessage body\0", - new Reactor2StompCodec().encoder().apply(frame).asString()); - } - private void assertIncompleteDecode(String partialFrame) { - Buffer buffer = Buffer.wrap(partialFrame); + ByteBuffer buffer = ByteBuffer.wrap(partialFrame.getBytes()); assertNull(decode(buffer)); assertEquals(0, buffer.position()); } private Message decode(String stompFrame) { - Buffer buffer = Buffer.wrap(stompFrame); + ByteBuffer buffer = ByteBuffer.wrap(stompFrame.getBytes()); return decode(buffer); } - private Message decode(Buffer buffer) { - this.decoder.apply(buffer); - if (consumer.arguments.isEmpty()) { + private Message decode(ByteBuffer buffer) { + List> messages = this.decoder.decode(buffer); + if (messages.isEmpty()) { return null; } else { - return consumer.arguments.get(0); + return messages.get(0); } } - - - private static final class ArgumentCapturingConsumer implements Consumer { - - private final List arguments = new ArrayList<>(); - - @Override - public void accept(T t) { - arguments.add(t); - } - - } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java new file mode 100644 index 00000000000..ef6565cdd3e --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test fixture for {@link StompEncoder}. + * + * @author Andy Wilkinson + * @author Stephane Maldini + */ +public class StompEncoderTests { + + private final StompEncoder encoder = new StompEncoder(); + + @Test + public void encodeFrameWithNoHeadersAndNoBody() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithHeaders() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setAcceptVersion("1.2"); + headers.setHost("github.org"); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + String frameString = new String(encoder.encode(frame)); + + assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals( + frameString)); + } + + @Test + public void encodeFrameWithHeadersThatShouldBeEscaped() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); + + Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); + + assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", + new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithHeadersBody() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.addNativeHeader("a", "alpha"); + + Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + + assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", + new String(encoder.encode(frame))); + } + + @Test + public void encodeFrameWithContentLengthPresent() { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setContentLength(12); + + Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + + assertEquals("SEND\ncontent-length:12\n\nMessage body\0", + new String(encoder.encode(frame))); + } + +} From 85c93f5d672b53db029f246a06a6cb9cc3ae0f0f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 30 Nov 2016 17:46:29 -0500 Subject: [PATCH 2/3] Polish Reactor Netty TCP client support --- .../stomp/ReactorNettyTcpStompClient.java | 28 ++- ...AbstractMonoToListenableFutureAdapter.java | 64 +++---- .../MonoToListenableFutureAdapter.java | 9 +- .../tcp/reactor/ReactorNettyTcpClient.java | 179 +++++++++--------- .../reactor/ReactorNettyTcpConnection.java | 37 ++-- .../simp/stomp/StompDecoderTests.java | 5 +- .../simp/stomp/StompEncoderTests.java | 18 +- 7 files changed, 165 insertions(+), 175 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index 64d072f9eda..ae8f1c3f0b7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -29,11 +29,10 @@ import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.concurrent.ListenableFuture; /** - * A STOMP over TCP client that uses - * {@link ReactorNettyTcpClient}. + * A STOMP over TCP client that uses {@link ReactorNettyTcpClient}. * * @author Rossen Stoyanchev - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpStompClient extends StompClientSupport { @@ -99,25 +98,21 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { * Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for * encoding, decoding and hand-off. * - * @param relayHost target host - * @param relayPort target port + * @param host target host + * @param port target port * @param decoder {@link StompDecoder} to use * @return a new {@link TcpOperations} */ - protected static TcpOperations create(String relayHost, - int relayPort, - StompDecoder decoder) { - return new ReactorNettyTcpClient<>(relayHost, - relayPort, - new ReactorNettyTcpClient.MessageHandlerConfiguration<>(new DecodingFunction( - decoder), + protected static TcpOperations create(String host, int port, StompDecoder decoder) { + return new ReactorNettyTcpClient<>(host, port, + new ReactorNettyTcpClient.MessageHandlerConfiguration<>( + new DecodingFunction(decoder), new EncodingConsumer(new StompEncoder()), 128, Schedulers.newParallel("StompClient"))); } - private static final class EncodingConsumer - implements BiConsumer> { + private static final class EncodingConsumer implements BiConsumer> { private final StompEncoder encoder; @@ -127,12 +122,11 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { @Override public void accept(ByteBuf byteBuf, Message message) { - byteBuf.writeBytes(encoder.encode(message)); + byteBuf.writeBytes(this.encoder.encode(message)); } } - private static final class DecodingFunction - implements Function>> { + private static final class DecodingFunction implements Function>> { private final StompDecoder decoder; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java index 2b5ab32d479..8b6cf5bcf74 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java @@ -17,7 +17,6 @@ package org.springframework.messaging.tcp.reactor; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,51 +32,53 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; import org.springframework.util.concurrent.SuccessCallback; /** - * Adapts a reactor {@link Mono} to {@link ListenableFuture} optionally converting - * the result Object type {@code } to the expected target type {@code }. + * Adapts {@link Mono} to {@link ListenableFuture} optionally converting the + * result Object type {@code } to the expected target type {@code }. * * @author Rossen Stoyanchev - * @since 4.0 + * @since 5.0 * @param the type of object expected from the {@link Mono} * @param the type of object expected from the {@link ListenableFuture} */ -abstract class AbstractMonoToListenableFutureAdapter - implements ListenableFuture { +abstract class AbstractMonoToListenableFutureAdapter implements ListenableFuture { - private final MonoProcessor promise; + private final MonoProcessor monoProcessor; private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); - protected AbstractMonoToListenableFutureAdapter(Mono promise) { - Assert.notNull(promise, "Mono must not be null"); - this.promise = promise.doOnSuccess(result -> { - T adapted; - try { - adapted = adapt(result); - } - catch (Throwable ex) { - registry.failure(ex); - return; - } - registry.success(adapted); - }) - .doOnError(registry::failure) - .subscribe(); + + protected AbstractMonoToListenableFutureAdapter(Mono mono) { + Assert.notNull(mono, "'mono' must not be null"); + this.monoProcessor = mono + .doOnSuccess(result -> { + T adapted; + try { + adapted = adapt(result); + } + catch (Throwable ex) { + registry.failure(ex); + return; + } + registry.success(adapted); + }) + .doOnError(this.registry::failure) + .subscribe(); } @Override public T get() throws InterruptedException { - S result = this.promise.block(); + S result = this.monoProcessor.block(); return adapt(result); } @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Objects.requireNonNull(unit, "unit"); - S result = this.promise.block(Duration.ofMillis(TimeUnit.MILLISECONDS.convert( - timeout, - unit))); + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + Assert.notNull(unit); + Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); + S result = this.monoProcessor.block(duration); return adapt(result); } @@ -86,18 +87,18 @@ abstract class AbstractMonoToListenableFutureAdapter if (isCancelled()) { return false; } - this.promise.cancel(); + this.monoProcessor.cancel(); return true; } @Override public boolean isCancelled() { - return this.promise.isCancelled(); + return this.monoProcessor.isCancelled(); } @Override public boolean isDone() { - return this.promise.isTerminated(); + return this.monoProcessor.isTerminated(); } @Override @@ -111,7 +112,6 @@ abstract class AbstractMonoToListenableFutureAdapter this.registry.addFailureCallback(failureCallback); } - protected abstract T adapt(S result); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java index 8ecab4a974d..ad82d685a33 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,15 +19,14 @@ package org.springframework.messaging.tcp.reactor; import reactor.core.publisher.Mono; /** - * A Mono-to-ListenableFutureAdapter where the source and the target from + * A Mono-to-ListenableFuture adapter where the source and the target from * the Promise and the ListenableFuture respectively are of the same type. * * @author Rossen Stoyanchev * @author Stephane Maldini - * @since 4.0 + * @since 5.0 */ -class MonoToListenableFutureAdapter extends - AbstractMonoToListenableFutureAdapter { +class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { public MonoToListenableFutureAdapter(Mono mono) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 0b93368e8ce..75df13921b8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -17,7 +17,6 @@ package org.springframework.messaging.tcp.reactor; import java.util.Collection; -import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -58,16 +57,19 @@ import org.springframework.util.concurrent.ListenableFuture; * * @author Rossen Stoyanchev * @author Stephane Maldini - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpClient

implements TcpOperations

{ - private final TcpClient tcpClient; + private final TcpClient tcpClient; private final MessageHandlerConfiguration

configuration; - private final ChannelGroup group; + + private final ChannelGroup group; + private volatile boolean stopping; + /** * A constructor that creates a {@link TcpClient TcpClient} factory relying on * Reactor Netty TCP threads. The number of Netty threads can be tweaked with @@ -80,120 +82,116 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @param port the port to connect to * @param configuration the client configuration */ - public ReactorNettyTcpClient(String host, - int port, - MessageHandlerConfiguration

configuration) { - this.configuration = Objects.requireNonNull(configuration, "configuration"); - this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = TcpClient.create(options -> options.connect(host, port) - .channelGroup(group)); + public ReactorNettyTcpClient(String host, int port, MessageHandlerConfiguration

configuration) { + this(opts -> opts.connect(host, port), configuration); } /** - * A constructor with a configurator {@link Consumer} that will receive default {@link - * ClientOptions} from {@link TcpClient}. This might be used to add SSL or specific - * network parameters to the generated client configuration. + * A constructor with a configurator {@link Consumer} that will receive + * default {@link ClientOptions} from {@link TcpClient}. This might be used + * to add SSL or specific network parameters to the generated client + * configuration. * - * @param tcpOptions the {@link Consumer} of {@link ClientOptions} shared to use by - * connected handlers. + * @param tcpOptions callback for configuring shared {@link ClientOptions} * @param configuration the client configuration */ public ReactorNettyTcpClient(Consumer tcpOptions, MessageHandlerConfiguration

configuration) { - this.configuration = Objects.requireNonNull(configuration, "configuration"); + + Assert.notNull(configuration, "'configuration' is required"); this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = - TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + this.configuration = configuration; } + @Override - public ListenableFuture connect(final TcpConnectionHandler

connectionHandler) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - if (stopping) { + public ListenableFuture connect(final TcpConnectionHandler

handler) { + Assert.notNull(handler, "'handler' is required"); + + if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); + handler.afterConnectFailure(ex); return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - MessageHandler

handler = - new MessageHandler<>(connectionHandler, configuration); - - Mono promise = tcpClient.newHandler(handler) - .doOnError(connectionHandler::afterConnectFailure) - .then(); + Mono connectMono = this.tcpClient + .newHandler(new MessageHandler<>(handler, this.configuration)) + .doOnError(handler::afterConnectFailure) + .then(); - return new MonoToListenableFutureAdapter<>(promise); + return new MonoToListenableFutureAdapter<>(connectMono); } @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler, - ReconnectStrategy strategy) { - Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); - Assert.notNull(strategy, "ReconnectStrategy must not be null"); + public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { + Assert.notNull(handler, "'handler' is required"); + Assert.notNull(strategy, "'reconnectStrategy' is required"); - if (stopping) { + if (this.stopping) { IllegalStateException ex = new IllegalStateException("Shutting down."); - connectionHandler.afterConnectFailure(ex); + handler.afterConnectFailure(ex); return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - MessageHandler

handler = - new MessageHandler<>(connectionHandler, configuration); - - MonoProcessor promise = MonoProcessor.create(); - - tcpClient.newHandler(handler) - .doOnNext(e -> { - if (!promise.isTerminated()) { - promise.onComplete(); - } - }) - .doOnError(e -> { - if (!promise.isTerminated()) { - promise.onError(e); - } - }) - .then(NettyContext::onClose) - .retryWhen(new Reconnector<>(strategy)) - .repeatWhen(new Reconnector<>(strategy)) - .subscribe(); - - return new MonoToListenableFutureAdapter<>(promise); + MonoProcessor connectMono = MonoProcessor.create(); + + this.tcpClient.newHandler(new MessageHandler<>(handler, this.configuration)) + .doOnNext(item -> { + if (!connectMono.isTerminated()) { + connectMono.onComplete(); + } + }) + .doOnError(ex -> { + if (!connectMono.isTerminated()) { + connectMono.onError(ex); + } + }) + .then(NettyContext::onClose) + .retryWhen(new Reconnector<>(strategy)) + .repeatWhen(new Reconnector<>(strategy)) + .subscribe(); + + return new MonoToListenableFutureAdapter<>(connectMono); } @Override public ListenableFuture shutdown() { - if (stopping) { + if (this.stopping) { return new MonoToListenableFutureAdapter<>(Mono.empty()); } - stopping = true; + this.stopping = true; - Mono closing = ChannelFutureMono.from(group.close()); + Mono completion = ChannelFutureMono.from(this.group.close()); - if (configuration.scheduler != null) { - closing = - closing.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); + if (this.configuration.scheduler != null) { + completion = completion.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); } - return new MonoToListenableFutureAdapter<>(closing); + return new MonoToListenableFutureAdapter<>(completion); } + /** * A configuration holder */ public static final class MessageHandlerConfiguration

{ private final Function>> decoder; - private final BiConsumer> encoder; - private final int backlog; - private final Scheduler - scheduler; - public MessageHandlerConfiguration(Function>> decoder, + private final BiConsumer> encoder; + + private final int backlog; + + private final Scheduler scheduler; + + + public MessageHandlerConfiguration( + Function>> decoder, BiConsumer> encoder, - int backlog, - Scheduler scheduler) { + int backlog, Scheduler scheduler) { + this.decoder = decoder; this.encoder = encoder; this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE; @@ -201,34 +199,30 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } } - private static final class MessageHandler

implements BiFunction> { + private static final class MessageHandler

+ implements BiFunction> { private final TcpConnectionHandler

connectionHandler; private final MessageHandlerConfiguration

configuration; - MessageHandler(TcpConnectionHandler

connectionHandler, - MessageHandlerConfiguration

configuration) { - this.connectionHandler = connectionHandler; - this.configuration = configuration; + + MessageHandler(TcpConnectionHandler

handler, MessageHandlerConfiguration

config) { + this.connectionHandler = handler; + this.configuration = config; } @Override public Publisher apply(NettyInbound in, NettyOutbound out) { - Flux>> inbound = in.receive() - .map(configuration.decoder); + Flux>> inbound = in.receive().map(configuration.decoder); - DirectProcessor promise = DirectProcessor.create(); - TcpConnection

tcpConnection = new ReactorNettyTcpConnection<>(in, - out, - configuration.encoder, - promise); + DirectProcessor closeProcessor = DirectProcessor.create(); + TcpConnection

tcpConnection = + new ReactorNettyTcpConnection<>(in, out, configuration.encoder, closeProcessor); if (configuration.scheduler != null) { - configuration.scheduler.schedule(() -> connectionHandler.afterConnected( - tcpConnection)); - inbound = - inbound.publishOn(configuration.scheduler, configuration.backlog); + configuration.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection)); + inbound = inbound.publishOn(configuration.scheduler, configuration.backlog); } else { connectionHandler.afterConnected(tcpConnection); @@ -239,15 +233,16 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connectionHandler::handleFailure, connectionHandler::afterConnectionClosed); - return promise; + return closeProcessor; } } - static final class Reconnector implements Function, Publisher> { + private static final class Reconnector implements Function, Publisher> { private final ReconnectStrategy strategy; + Reconnector(ReconnectStrategy strategy) { this.strategy = strategy; } @@ -255,9 +250,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public Publisher apply(Flux flux) { return flux.scan(1, (p, e) -> p++) - .doOnCancel(() -> new Exception().printStackTrace()) - .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt( - attempt))); + .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt))); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 7fe758e842c..6f4ec74ecba 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -35,47 +35,52 @@ import org.springframework.util.concurrent.ListenableFuture; * @param

the payload type of messages read or written to the TCP stream. * * @author Rossen Stoyanchev - * @since 4.2 + * @since 5.0 */ public class ReactorNettyTcpConnection

implements TcpConnection

{ - private final NettyInbound in; - private final NettyOutbound out; - private final DirectProcessor close; + private final NettyInbound inbound; + + private final NettyOutbound outbound; + + private final DirectProcessor closeProcessor; + private final BiConsumer> encoder; - public ReactorNettyTcpConnection(NettyInbound in, - NettyOutbound out, + + public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, BiConsumer> encoder, - DirectProcessor close) { - this.out = out; - this.in = in; + DirectProcessor closeProcessor) { + + this.inbound = inbound; + this.outbound = outbound; this.encoder = encoder; - this.close = close; + this.closeProcessor = closeProcessor; } + @Override public ListenableFuture send(Message

message) { - ByteBuf byteBuf = in.channel().alloc().buffer(); - encoder.accept(byteBuf, message); - return new MonoToListenableFutureAdapter<>(out.send(Mono.just(byteBuf))); + ByteBuf byteBuf = this.inbound.channel().alloc().buffer(); + this.encoder.accept(byteBuf, message); + return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf))); } @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - in.onReadIdle(inactivityDuration, runnable); + this.inbound.onReadIdle(inactivityDuration, runnable); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - out.onWriteIdle(inactivityDuration, runnable); + this.outbound.onWriteIdle(inactivityDuration, runnable); } @Override public void close() { - close.onComplete(); + this.closeProcessor.onComplete(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java index 98816ef77fc..f48c6f6d031 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java @@ -24,10 +24,10 @@ import org.junit.Test; import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.InvalidMimeTypeException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * Test fixture for {@link StompDecoder}. @@ -39,6 +39,7 @@ public class StompDecoderTests { private final StompDecoder decoder = new StompDecoder(); + @Test public void decodeFrameWithCrLfEols() { Message frame = decode("DISCONNECT\r\n\r\n\0"); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java index ef6565cdd3e..9a218c911df 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java @@ -34,10 +34,10 @@ public class StompEncoderTests { private final StompEncoder encoder = new StompEncoder(); + @Test public void encodeFrameWithNoHeadersAndNoBody() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame))); @@ -48,20 +48,18 @@ public class StompEncoderTests { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setAcceptVersion("1.2"); headers.setHost("github.org"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - String frameString = new String(encoder.encode(frame)); - assertTrue("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals( - frameString)); + assertTrue( + "CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) || + "CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(frameString)); } @Test public void encodeFrameWithHeadersThatShouldBeEscaped() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\"); - Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", @@ -72,8 +70,8 @@ public class StompEncoderTests { public void encodeFrameWithHeadersBody() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.addNativeHeader("a", "alpha"); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + Message frame = MessageBuilder.createMessage( + "Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", new String(encoder.encode(frame))); @@ -83,8 +81,8 @@ public class StompEncoderTests { public void encodeFrameWithContentLengthPresent() { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); headers.setContentLength(12); - - Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); + Message frame = MessageBuilder.createMessage( + "Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\ncontent-length:12\n\nMessage body\0", new String(encoder.encode(frame))); From b874692452d63c61b25ccad591662a7e1e269e56 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 30 Nov 2016 21:11:19 -0500 Subject: [PATCH 3/3] Simplify ReactorNettyTcpClient input Create a ReactorNettyCodec to hold the decoding and encoding function and consumer along with a package-private sub-class that delegates to StompDecoder and StompEncoder. Issue: SPR-14531 --- .../stomp/ReactorNettyTcpStompClient.java | 61 +----------- .../stomp/StompBrokerRelayMessageHandler.java | 5 +- .../simp/stomp/StompReactorNettyCodec.java | 42 +++++++++ .../tcp/reactor/ReactorNettyCodec.java | 58 ++++++++++++ .../tcp/reactor/ReactorNettyTcpClient.java | 92 +++++++------------ 5 files changed, 141 insertions(+), 117 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java index ae8f1c3f0b7..958556196af 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java @@ -16,16 +16,9 @@ package org.springframework.messaging.simp.stomp; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Function; - -import io.netty.buffer.ByteBuf; -import reactor.core.scheduler.Schedulers; - -import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; +import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; /** @@ -38,6 +31,7 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { private final TcpOperations tcpClient; + /** * Create an instance with host "127.0.0.1" and port 61613. */ @@ -45,14 +39,13 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { this("127.0.0.1", 61613); } - /** * Create an instance with the given host and port. * @param host the host * @param port the port */ - public ReactorNettyTcpStompClient(final String host, final int port) { - this.tcpClient = create(host, port, new StompDecoder()); + public ReactorNettyTcpStompClient(String host, int port) { + this.tcpClient = new ReactorNettyTcpClient(host, port, new StompReactorNettyCodec()); } /** @@ -60,6 +53,7 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { * @param tcpClient the client to use */ public ReactorNettyTcpStompClient(TcpOperations tcpClient) { + Assert.notNull(tcpClient, "'tcpClient' is required"); this.tcpClient = tcpClient; } @@ -94,49 +88,4 @@ public class ReactorNettyTcpStompClient extends StompClientSupport { this.tcpClient.shutdown(); } - /** - * Create a new {@link ReactorNettyTcpClient} with Stomp specific configuration for - * encoding, decoding and hand-off. - * - * @param host target host - * @param port target port - * @param decoder {@link StompDecoder} to use - * @return a new {@link TcpOperations} - */ - protected static TcpOperations create(String host, int port, StompDecoder decoder) { - return new ReactorNettyTcpClient<>(host, port, - new ReactorNettyTcpClient.MessageHandlerConfiguration<>( - new DecodingFunction(decoder), - new EncodingConsumer(new StompEncoder()), - 128, - Schedulers.newParallel("StompClient"))); - } - - private static final class EncodingConsumer implements BiConsumer> { - - private final StompEncoder encoder; - - public EncodingConsumer(StompEncoder encoder) { - this.encoder = encoder; - } - - @Override - public void accept(ByteBuf byteBuf, Message message) { - byteBuf.writeBytes(this.encoder.encode(message)); - } - } - - private static final class DecodingFunction implements Function>> { - - private final StompDecoder decoder; - - public DecodingFunction(StompDecoder decoder) { - this.decoder = decoder; - } - - @Override - public List> apply(ByteBuf buffer) { - return this.decoder.decode(buffer.nioBuffer()); - } - } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b8b9c98a989..9f0dd380763 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -40,6 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; @@ -387,8 +388,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (this.tcpClient == null) { StompDecoder decoder = new StompDecoder(); decoder.setHeaderInitializer(getHeaderInitializer()); - - this.tcpClient = ReactorNettyTcpStompClient.create(this.relayHost, this.relayPort, decoder); + ReactorNettyCodec codec = new StompReactorNettyCodec(decoder); + this.tcpClient = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec); } if (logger.isInfoEnabled()) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java new file mode 100644 index 00000000000..8b36d4bce40 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java @@ -0,0 +1,42 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.simp.stomp; + +import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; + +/** + * {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and + * {@link StompEncoder}. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +class StompReactorNettyCodec extends ReactorNettyCodec { + + public StompReactorNettyCodec() { + this(new StompDecoder(), new StompEncoder()); + } + + public StompReactorNettyCodec(StompDecoder decoder) { + this(decoder, new StompEncoder()); + } + + public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) { + super(byteBuf -> decoder.decode(byteBuf.nioBuffer()), + (byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message))); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java new file mode 100644 index 00000000000..3254882a94b --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.tcp.reactor; + +import java.util.Collection; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import io.netty.buffer.ByteBuf; + +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * Simple holder for a decoding {@link Function} and an encoding + * {@link BiConsumer} to use with Reactor Netty. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class ReactorNettyCodec

{ + + private final Function>> decoder; + + private final BiConsumer> encoder; + + + public ReactorNettyCodec(Function>> decoder, + BiConsumer> encoder) { + + Assert.notNull(decoder, "'decoder' is required"); + Assert.notNull(encoder, "'encoder' is required"); + this.decoder = decoder; + this.encoder = encoder; + } + + public Function>> getDecoder() { + return this.decoder; + } + + public BiConsumer> getEncoder() { + return this.encoder; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 75df13921b8..a53c02e0456 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -17,12 +17,10 @@ package org.springframework.messaging.tcp.reactor; import java.util.Collection; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import io.netty.buffer.ByteBuf; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -32,6 +30,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.ipc.netty.ChannelFutureMono; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; @@ -63,7 +62,9 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final TcpClient tcpClient; - private final MessageHandlerConfiguration

configuration; + private final ReactorNettyCodec

codec; + + private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); private final ChannelGroup group; @@ -76,14 +77,14 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * the {@code reactor.tcp.ioThreadCount} System property. The network I/O * threads will be shared amongst the active clients. *

Also see the constructor accepting a {@link Consumer} of - * {@link ClientOptions} for advanced tuning. + * {@link ClientOptions} for additional options. * * @param host the host to connect to * @param port the port to connect to - * @param configuration the client configuration + * @param codec for encoding and decoding messages */ - public ReactorNettyTcpClient(String host, int port, MessageHandlerConfiguration

configuration) { - this(opts -> opts.connect(host, port), configuration); + public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { + this(opts -> opts.connect(host, port), codec); } /** @@ -93,15 +94,15 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * configuration. * * @param tcpOptions callback for configuring shared {@link ClientOptions} - * @param configuration the client configuration + * @param codec for encoding and decoding messages */ public ReactorNettyTcpClient(Consumer tcpOptions, - MessageHandlerConfiguration

configuration) { + ReactorNettyCodec

codec) { - Assert.notNull(configuration, "'configuration' is required"); + Assert.notNull(codec, "'codec' is required"); this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); - this.configuration = configuration; + this.codec = codec; } @@ -116,7 +117,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } Mono connectMono = this.tcpClient - .newHandler(new MessageHandler<>(handler, this.configuration)) + .newHandler(new MessageHandler<>(handler, this.codec, this.scheduler)) .doOnError(handler::afterConnectFailure) .then(); @@ -136,7 +137,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ MonoProcessor connectMono = MonoProcessor.create(); - this.tcpClient.newHandler(new MessageHandler<>(handler, this.configuration)) + this.tcpClient.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler)) .doOnNext(item -> { if (!connectMono.isTerminated()) { connectMono.onComplete(); @@ -163,75 +164,48 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.stopping = true; - Mono completion = ChannelFutureMono.from(this.group.close()); - - if (this.configuration.scheduler != null) { - completion = completion.doAfterTerminate((x, e) -> configuration.scheduler.shutdown()); - } + Mono completion = ChannelFutureMono.from(this.group.close()) + .doAfterTerminate((x, e) -> this.scheduler.shutdown()); return new MonoToListenableFutureAdapter<>(completion); } - /** - * A configuration holder - */ - public static final class MessageHandlerConfiguration

{ - - private final Function>> decoder; - - private final BiConsumer> encoder; - - private final int backlog; - - private final Scheduler scheduler; - - - public MessageHandlerConfiguration( - Function>> decoder, - BiConsumer> encoder, - int backlog, Scheduler scheduler) { - - this.decoder = decoder; - this.encoder = encoder; - this.backlog = backlog > 0 ? backlog : QueueSupplier.SMALL_BUFFER_SIZE; - this.scheduler = scheduler; - } - } - private static final class MessageHandler

implements BiFunction> { private final TcpConnectionHandler

connectionHandler; - private final MessageHandlerConfiguration

configuration; + private final ReactorNettyCodec

codec; + private final Scheduler scheduler; + + + MessageHandler(TcpConnectionHandler

handler, ReactorNettyCodec

codec, + Scheduler scheduler) { - MessageHandler(TcpConnectionHandler

handler, MessageHandlerConfiguration

config) { this.connectionHandler = handler; - this.configuration = config; + this.codec = codec; + this.scheduler = scheduler; } @Override public Publisher apply(NettyInbound in, NettyOutbound out) { - Flux>> inbound = in.receive().map(configuration.decoder); + Flux>> inbound = in.receive().map(this.codec.getDecoder()); DirectProcessor closeProcessor = DirectProcessor.create(); + TcpConnection

tcpConnection = - new ReactorNettyTcpConnection<>(in, out, configuration.encoder, closeProcessor); + new ReactorNettyTcpConnection<>(in, out, this.codec.getEncoder(), closeProcessor); - if (configuration.scheduler != null) { - configuration.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection)); - inbound = inbound.publishOn(configuration.scheduler, configuration.backlog); - } - else { - connectionHandler.afterConnected(tcpConnection); - } + this.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection)); + inbound = inbound.publishOn(this.scheduler, QueueSupplier.SMALL_BUFFER_SIZE); inbound.flatMapIterable(Function.identity()) - .subscribe(connectionHandler::handleMessage, - connectionHandler::handleFailure, - connectionHandler::afterConnectionClosed); + .subscribe( + connectionHandler::handleMessage, + connectionHandler::handleFailure, + connectionHandler::afterConnectionClosed); return closeProcessor; }