Browse Source

Merge pull request #1250 from origin/update-stomp-reactor-netty

pull/1253/head
Rossen Stoyanchev 9 years ago
parent
commit
d41d3aa1d6
  1. 15
      build.gradle
  2. 107
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java
  3. 150
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java
  4. 91
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java
  5. 18
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  6. 42
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java
  7. 82
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java
  8. 15
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java
  9. 362
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java
  10. 58
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java
  11. 231
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
  12. 49
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
  13. 10
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java
  14. 108
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java
  15. 91
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java

15
build.gradle

@ -78,7 +78,6 @@ configure(allprojects) { project -> @@ -78,7 +78,6 @@ configure(allprojects) { project ->
ext.protobufVersion = "3.1.0"
ext.quartzVersion = "2.2.3"
ext.reactivestreamsVersion = "1.0.0"
ext.reactorVersion = "2.0.8.RELEASE"
ext.reactorCoreVersion = '3.0.3.RELEASE'
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
ext.romeVersion = "1.7.0"
@ -578,12 +577,8 @@ project("spring-messaging") { @@ -578,12 +577,8 @@ project("spring-messaging") {
compile(project(":spring-core"))
compile(project(":spring-context"))
optional(project(":spring-oxm"))
optional("io.projectreactor:reactor-core:${reactorVersion}") {
force = true // enforce 2.0.x
}
optional("io.projectreactor:reactor-net:${reactorVersion}") {
exclude group: "io.netty", module: "netty-all"
}
optional("io.projectreactor:reactor-core:${reactorCoreVersion}")
optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
optional("io.netty:netty-all:${nettyVersion}")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
@ -1003,10 +998,8 @@ project("spring-websocket") { @@ -1003,10 +998,8 @@ project("spring-websocket") {
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
testCompile("io.projectreactor:reactor-core:${reactorVersion}") {
force = true // enforce 2.0.x
}
testCompile("io.projectreactor:reactor-net:${reactorVersion}")
testCompile("io.projectreactor:reactor-core:${reactorCoreVersion}")
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
testCompile("io.netty:netty-all:${nettyVersion}")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
testRuntime("org.jboss.xnio:xnio-nio:${xnioVersion}")

107
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java

@ -1,107 +0,0 @@ @@ -1,107 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
*
* @author Andy Wilkinson
* @author Rossen Stoyanchev
* @since 4.0
*/
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
private final Function<Message<byte[]>, Buffer> encodingFunction;
private final StompDecoder stompDecoder;
public Reactor2StompCodec() {
this(new StompEncoder(), new StompDecoder());
}
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
Assert.notNull(encoder, "StompEncoder is required");
Assert.notNull(decoder, "StompDecoder is required");
this.encodingFunction = new EncodingFunction(encoder);
this.stompDecoder = decoder;
}
@Override
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
return new DecodingFunction(this.stompDecoder, messageConsumer);
}
@Override
public Function<Message<byte[]>, Buffer> encoder() {
return this.encodingFunction;
}
@Override
public Buffer apply(Message<byte[]> message) {
return this.encodingFunction.apply(message);
}
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
private final StompEncoder encoder;
public EncodingFunction(StompEncoder encoder) {
this.encoder = encoder;
}
@Override
public Buffer apply(Message<byte[]> message) {
byte[] bytes = this.encoder.encode(message);
return new Buffer(ByteBuffer.wrap(bytes));
}
}
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> {
private final StompDecoder decoder;
private final Consumer<Message<byte[]>> messageConsumer;
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
this.decoder = decoder;
this.messageConsumer = next;
}
@Override
public Message<byte[]> apply(Buffer buffer) {
for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) {
this.messageConsumer.accept(message);
}
return null;
}
}
}

150
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java

@ -1,150 +0,0 @@ @@ -1,150 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.DispatcherType;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec.TcpClientSpec;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A STOMP over TCP client that uses
* {@link Reactor2TcpClient}.
*
* @author Rossen Stoyanchev
* @since 4.2
*/
public class Reactor2TcpStompClient extends StompClientSupport {
private final TcpOperations<byte[]> tcpClient;
/**
* Create an instance with host "127.0.0.1" and port 61613.
*/
public Reactor2TcpStompClient() {
this("127.0.0.1", 61613);
}
/**
* Create an instance with the given host and port.
* @param host the host
* @param port the port
*/
public Reactor2TcpStompClient(final String host, final int port) {
ConfigurationReader reader = new StompClientDispatcherConfigReader();
Environment environment = new Environment(reader).assignErrorJournal();
StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port);
this.tcpClient = new Reactor2TcpClient<>(factory);
}
/**
* Create an instance with a pre-configured TCP client.
* @param tcpClient the client to use
*/
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
}
/**
* Connect and notify the given {@link StompSessionHandler} when connected
* on the STOMP level.
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompSessionHandler handler) {
return connect(null, handler);
}
/**
* An overloaded version of {@link #connect(StompSessionHandler)} that
* accepts headers to use for the STOMP CONNECT frame.
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
this.tcpClient.connect(session);
return session.getSessionFuture();
}
/**
* Shut down the client and release resources.
*/
public void shutdown() {
this.tcpClient.shutdown();
}
/**
* A ConfigurationReader with a thread pool-based dispatcher.
*/
private static class StompClientDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
String dispatcherName = "StompClient";
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
List<DispatcherConfiguration> configList = Collections.<DispatcherConfiguration>singletonList(config);
return new ReactorConfiguration(configList, dispatcherName, new Properties());
}
}
private static class StompTcpClientSpecFactory
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final String host;
private final int port;
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
this.environment = environment;
this.host = host;
this.port = port;
}
@Override
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
return tcpClientSpec
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(this.environment)
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
.connect(this.host, this.port);
}
}
}

91
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClient.java

@ -0,0 +1,91 @@ @@ -0,0 +1,91 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/**
* A STOMP over TCP client that uses {@link ReactorNettyTcpClient}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactorNettyTcpStompClient extends StompClientSupport {
private final TcpOperations<byte[]> tcpClient;
/**
* Create an instance with host "127.0.0.1" and port 61613.
*/
public ReactorNettyTcpStompClient() {
this("127.0.0.1", 61613);
}
/**
* Create an instance with the given host and port.
* @param host the host
* @param port the port
*/
public ReactorNettyTcpStompClient(String host, int port) {
this.tcpClient = new ReactorNettyTcpClient<byte[]>(host, port, new StompReactorNettyCodec());
}
/**
* Create an instance with a pre-configured TCP client.
* @param tcpClient the client to use
*/
public ReactorNettyTcpStompClient(TcpOperations<byte[]> tcpClient) {
Assert.notNull(tcpClient, "'tcpClient' is required");
this.tcpClient = tcpClient;
}
/**
* Connect and notify the given {@link StompSessionHandler} when connected
* on the STOMP level.
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompSessionHandler handler) {
return connect(null, handler);
}
/**
* An overloaded version of {@link #connect(StompSessionHandler)} that
* accepts headers to use for the STOMP CONNECT frame.
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
this.tcpClient.connect(session);
return session.getSessionFuture();
}
/**
* Shut down the client and release resources.
*/
public void shutdown() {
this.tcpClient.shutdown();
}
}

18
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -40,7 +40,8 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; @@ -40,7 +40,8 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -335,7 +336,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -335,7 +336,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* By default {@link Reactor2TcpClient} is used.
* By default {@link ReactorNettyTcpClient} is used.
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
@ -387,8 +388,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -387,8 +388,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (this.tcpClient == null) {
StompDecoder decoder = new StompDecoder();
decoder.setHeaderInitializer(getHeaderInitializer());
Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
ReactorNettyCodec<byte[]> codec = new StompReactorNettyCodec(decoder);
this.tcpClient = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, codec);
}
if (logger.isInfoEnabled()) {
@ -970,15 +971,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -970,15 +971,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private static class StompTcpClientFactory {
public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor2StompCodec codec) {
return new Reactor2TcpClient<>(relayHost, relayPort, codec);
}
}
private static class VoidCallable implements Callable<Void> {
@Override

42
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java

@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.springframework.messaging.tcp.reactor.ReactorNettyCodec;
/**
* {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and
* {@link StompEncoder}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
class StompReactorNettyCodec extends ReactorNettyCodec<byte[]> {
public StompReactorNettyCodec() {
this(new StompDecoder(), new StompEncoder());
}
public StompReactorNettyCodec(StompDecoder decoder) {
this(decoder, new StompEncoder());
}
public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) {
super(byteBuf -> decoder.decode(byteBuf.nioBuffer()),
(byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message)));
}
}

82
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java → spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractMonoToListenableFutureAdapter.java

@ -16,12 +16,13 @@ @@ -16,12 +16,13 @@
package org.springframework.messaging.tcp.reactor;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import reactor.fn.Consumer;
import reactor.rx.Promise;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
@ -31,77 +32,73 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; @@ -31,77 +32,73 @@ import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
import org.springframework.util.concurrent.SuccessCallback;
/**
* Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting
* the result Object type {@code <S>} to the expected target type {@code <T>}.
* Adapts {@link Mono} to {@link ListenableFuture} optionally converting the
* result Object type {@code <S>} to the expected target type {@code <T>}.
*
* @author Rossen Stoyanchev
* @since 4.0
* @param <S> the type of object expected from the {@link Promise}
* @since 5.0
* @param <S> the type of object expected from the {@link Mono}
* @param <T> the type of object expected from the {@link ListenableFuture}
*/
abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
abstract class AbstractMonoToListenableFutureAdapter<S, T> implements ListenableFuture<T> {
private final Promise<S> promise;
private final MonoProcessor<S> monoProcessor;
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>();
protected AbstractPromiseToListenableFutureAdapter(Promise<S> promise) {
Assert.notNull(promise, "Promise must not be null");
this.promise = promise;
this.promise.onSuccess(new Consumer<S>() {
@Override
public void accept(S result) {
T adapted;
try {
adapted = adapt(result);
}
catch (Throwable ex) {
registry.failure(ex);
return;
}
registry.success(adapted);
}
});
this.promise.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable ex) {
registry.failure(ex);
}
});
protected AbstractMonoToListenableFutureAdapter(Mono<S> mono) {
Assert.notNull(mono, "'mono' must not be null");
this.monoProcessor = mono
.doOnSuccess(result -> {
T adapted;
try {
adapted = adapt(result);
}
catch (Throwable ex) {
registry.failure(ex);
return;
}
registry.success(adapted);
})
.doOnError(this.registry::failure)
.subscribe();
}
@Override
public T get() throws InterruptedException {
S result = this.promise.await();
S result = this.monoProcessor.block();
return adapt(result);
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
S result = this.promise.await(timeout, unit);
if (!this.promise.isComplete()) {
throw new TimeoutException();
}
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Assert.notNull(unit);
Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit));
S result = this.monoProcessor.block(duration);
return adapt(result);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
if (isCancelled()) {
return false;
}
this.monoProcessor.cancel();
return true;
}
@Override
public boolean isCancelled() {
return false;
return this.monoProcessor.isCancelled();
}
@Override
public boolean isDone() {
return this.promise.isComplete();
return this.monoProcessor.isTerminated();
}
@Override
@ -115,7 +112,6 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena @@ -115,7 +112,6 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena
this.registry.addFailureCallback(failureCallback);
}
protected abstract T adapt(S result);
}

15
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java → spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/MonoToListenableFutureAdapter.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,20 +16,21 @@ @@ -16,20 +16,21 @@
package org.springframework.messaging.tcp.reactor;
import reactor.rx.Promise;
import reactor.core.publisher.Mono;
/**
* A Promise-to-ListenableFutureAdapter where the source and the target from
* A Mono-to-ListenableFuture adapter where the source and the target from
* the Promise and the ListenableFuture respectively are of the same type.
*
* @author Rossen Stoyanchev
* @since 4.0
* @author Stephane Maldini
* @since 5.0
*/
class PassThroughPromiseToListenableFutureAdapter<T> extends AbstractPromiseToListenableFutureAdapter<T, T> {
class MonoToListenableFutureAdapter<T> extends AbstractMonoToListenableFutureAdapter<T, T> {
public PassThroughPromiseToListenableFutureAdapter(Promise<T> promise) {
super(promise);
public MonoToListenableFutureAdapter(Mono<T> mono) {
super(mono);
}

362
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

@ -1,362 +0,0 @@ @@ -1,362 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.reactivestreams.Publisher;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.ReactorConfiguration;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.NetStreams.TcpClientFactory;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.Spec.TcpClientSpec;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.Signal;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
*
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
* i.e. a separate (Reactor) client instance for each connection.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 4.2
*/
public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings("rawtypes")
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private static final Method eventLoopGroupMethod = initEventLoopGroupMethod();
private final EventLoopGroup eventLoopGroup;
private final Environment environment;
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
new ArrayList<>();
private boolean stopping;
/**
* A constructor that creates a {@link TcpClientSpec TcpClientSpec} factory
* with a default {@link reactor.core.dispatch.SynchronousDispatcher}, i.e.
* relying on Netty threads. The number of Netty threads can be tweaked with
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
* threads will be shared amongst the active clients.
* <p>Also see the constructor accepting a ready Reactor
* {@link TcpClientSpec} {@link Function} factory.
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
this.eventLoopGroup = nioEventLoopGroup;
this.environment = new Environment(new SynchronousDispatcherConfigReader());
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
@Override
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
return spec
.env(environment)
.codec(codec)
.connect(host, port)
.options(createClientSocketOptions());
}
private ClientSocketOptions createClientSocketOptions() {
return (ClientSocketOptions) ReflectionUtils.invokeMethod(eventLoopGroupMethod,
new NettyClientSocketOptions(), nioEventLoopGroup);
}
};
}
/**
* A constructor with a pre-configured {@link TcpClientSpec} {@link Function}
* factory. This might be used to add SSL or specific network parameters to
* the generated client configuration.
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for cleaning them, e.g. using
* {@link reactor.core.Dispatcher#shutdown}.
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation
*/
public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpecFactory = tcpClientSpecFactory;
this.eventLoopGroup = null;
this.environment = null;
}
private static NioEventLoopGroup initEventLoopGroup() {
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
}
catch (Throwable ex) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0) {
ioThreadCount = Runtime.getRuntime().availableProcessors();
}
return new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
final TcpClient<Message<P>, Message<P>> tcpClient;
final Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
return new PassThroughPromiseToListenableFutureAdapter<>(Promises.<Void>error(ex));
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
cleanupTask = new Runnable() {
@Override
public void run() {
synchronized (tcpClients) {
tcpClients.remove(tcpClient);
}
}
};
}
Promise<Void> promise = tcpClient.start(
new MessageChannelStreamHandler<>(connectionHandler, cleanupTask));
return new PassThroughPromiseToListenableFutureAdapter<>(
promise.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable ex) {
cleanupTask.run();
connectionHandler.afterConnectFailure(ex);
}
})
);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
final TcpClient<Message<P>, Message<P>> tcpClient;
Runnable cleanupTask;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
return new PassThroughPromiseToListenableFutureAdapter<>(Promises.<Void>error(ex));
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
cleanupTask = new Runnable() {
@Override
public void run() {
synchronized (tcpClients) {
tcpClients.remove(tcpClient);
}
}
};
}
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
new MessageChannelStreamHandler<>(connectionHandler, cleanupTask),
new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter<>(stream.next().after());
}
@Override
public ListenableFuture<Void> shutdown() {
synchronized (this.tcpClients) {
this.stopping = true;
}
Promise<Void> promise = Streams.from(this.tcpClients)
.flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Void>>() {
@Override
public Promise<Void> apply(final TcpClient<Message<P>, Message<P>> client) {
return client.shutdown().onComplete(new Consumer<Promise<Void>>() {
@Override
public void accept(Promise<Void> voidPromise) {
tcpClients.remove(client);
}
});
}
})
.next();
if (this.eventLoopGroup != null) {
final Promise<Void> eventLoopPromise = Promises.prepare();
promise.onComplete(new Consumer<Promise<Void>>() {
@Override
public void accept(Promise<Void> voidPromise) {
eventLoopGroup.shutdownGracefully().addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (future.isSuccess()) {
eventLoopPromise.onComplete();
}
else {
eventLoopPromise.onError(future.cause());
}
}
});
}
});
promise = eventLoopPromise;
}
if (this.environment != null) {
promise.onComplete(new Consumer<Promise<Void>>() {
@Override
public void accept(Promise<Void> voidPromise) {
environment.shutdown();
}
});
}
return new PassThroughPromiseToListenableFutureAdapter<>(promise);
}
private static Method initEventLoopGroupMethod() {
for (Method method : NettyClientSocketOptions.class.getMethods()) {
if (method.getName().equals("eventLoopGroup") && method.getParameterCount() == 1) {
return method;
}
}
throw new IllegalStateException("No compatible Reactor version found");
}
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
}
}
private static class MessageChannelStreamHandler<P>
implements ReactorChannelHandler<Message<P>, Message<P>, ChannelStream<Message<P>, Message<P>>> {
private final TcpConnectionHandler<P> connectionHandler;
private final Runnable cleanupTask;
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
this.connectionHandler = connectionHandler;
this.cleanupTask = cleanupTask;
}
@Override
public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
Promise<Void> closePromise = Promises.prepare();
this.connectionHandler.afterConnected(new Reactor2TcpConnection<>(channelStream, closePromise));
channelStream
.finallyDo(new Consumer<Signal<Message<P>>>() {
@Override
public void accept(Signal<Message<P>> signal) {
cleanupTask.run();
if (signal.isOnError()) {
connectionHandler.handleFailure(signal.getThrowable());
}
else if (signal.isOnComplete()) {
connectionHandler.afterConnectionClosed();
}
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
});
return closePromise;
}
}
private static class ReactorReconnectAdapter implements Reconnect {
private final ReconnectStrategy strategy;
public ReactorReconnectAdapter(ReconnectStrategy strategy) {
this.strategy = strategy;
}
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, this.strategy.getTimeToNextAttempt(attempt));
}
}
}

58
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java

@ -0,0 +1,58 @@ @@ -0,0 +1,58 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* Simple holder for a decoding {@link Function} and an encoding
* {@link BiConsumer} to use with Reactor Netty.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class ReactorNettyCodec<P> {
private final Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder;
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
public ReactorNettyCodec(Function<? super ByteBuf, ? extends Collection<Message<P>>> decoder,
BiConsumer<? super ByteBuf, ? super Message<P>> encoder) {
Assert.notNull(decoder, "'decoder' is required");
Assert.notNull(encoder, "'encoder' is required");
this.decoder = decoder;
this.encoder = encoder;
}
public Function<? super ByteBuf, ? extends Collection<Message<P>>> getDecoder() {
return this.decoder;
}
public BiConsumer<? super ByteBuf, ? super Message<P>> getEncoder() {
return this.encoder;
}
}

231
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

@ -0,0 +1,231 @@ @@ -0,0 +1,231 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.util.Collection;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.ChannelFutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.util.concurrent.QueueSupplier;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
* <p>
* <p>This implementation wraps N (Reactor) clients for N {@link #connect} calls,
* i.e. a separate (Reactor) client instance for each connection.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 5.0
*/
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final TcpClient tcpClient;
private final ReactorNettyCodec<P> codec;
private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient");
private final ChannelGroup group;
private volatile boolean stopping;
/**
* A constructor that creates a {@link TcpClient TcpClient} factory relying on
* Reactor Netty TCP threads. The number of Netty threads can be tweaked with
* the {@code reactor.tcp.ioThreadCount} System property. The network I/O
* threads will be shared amongst the active clients.
* <p>Also see the constructor accepting a {@link Consumer} of
* {@link ClientOptions} for additional options.
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding messages
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
this(opts -> opts.connect(host, port), codec);
}
/**
* A constructor with a configurator {@link Consumer} that will receive
* default {@link ClientOptions} from {@link TcpClient}. This might be used
* to add SSL or specific network parameters to the generated client
* configuration.
*
* @param tcpOptions callback for configuring shared {@link ClientOptions}
* @param codec for encoding and decoding messages
*/
public ReactorNettyTcpClient(Consumer<? super ClientOptions> tcpOptions,
ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "'codec' is required");
this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
this.codec = codec;
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "'handler' is required");
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
}
Mono<Void> connectMono = this.tcpClient
.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler))
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "'handler' is required");
Assert.notNull(strategy, "'reconnectStrategy' is required");
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.<Void>error(ex));
}
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler))
.doOnNext(item -> {
if (!connectMono.isTerminated()) {
connectMono.onComplete();
}
})
.doOnError(ex -> {
if (!connectMono.isTerminated()) {
connectMono.onError(ex);
}
})
.then(NettyContext::onClose)
.retryWhen(new Reconnector<>(strategy))
.repeatWhen(new Reconnector<>(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> shutdown() {
if (this.stopping) {
return new MonoToListenableFutureAdapter<>(Mono.empty());
}
this.stopping = true;
Mono<Void> completion = ChannelFutureMono.from(this.group.close())
.doAfterTerminate((x, e) -> this.scheduler.shutdown());
return new MonoToListenableFutureAdapter<>(completion);
}
private static final class MessageHandler<P>
implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
private final TcpConnectionHandler<P> connectionHandler;
private final ReactorNettyCodec<P> codec;
private final Scheduler scheduler;
MessageHandler(TcpConnectionHandler<P> handler, ReactorNettyCodec<P> codec,
Scheduler scheduler) {
this.connectionHandler = handler;
this.codec = codec;
this.scheduler = scheduler;
}
@Override
public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
Flux<Collection<Message<P>>> inbound = in.receive().map(this.codec.getDecoder());
DirectProcessor<Void> closeProcessor = DirectProcessor.create();
TcpConnection<P> tcpConnection =
new ReactorNettyTcpConnection<>(in, out, this.codec.getEncoder(), closeProcessor);
this.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection));
inbound = inbound.publishOn(this.scheduler, QueueSupplier.SMALL_BUFFER_SIZE);
inbound.flatMapIterable(Function.identity())
.subscribe(
connectionHandler::handleMessage,
connectionHandler::handleFailure,
connectionHandler::afterConnectionClosed);
return closeProcessor;
}
}
private static final class Reconnector<T> implements Function<Flux<T>, Publisher<?>> {
private final ReconnectStrategy strategy;
Reconnector(ReconnectStrategy strategy) {
this.strategy = strategy;
}
@Override
public Publisher<?> apply(Flux<T> flux) {
return flux.scan(1, (p, e) -> p++)
.flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt)));
}
}
}

49
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java → spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

@ -16,10 +16,13 @@ @@ -16,10 +16,13 @@
package org.springframework.messaging.tcp.reactor;
import reactor.io.net.ChannelStream;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;
import java.util.function.BiConsumer;
import io.netty.buffer.ByteBuf;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
@ -29,45 +32,55 @@ import org.springframework.util.concurrent.ListenableFuture; @@ -29,45 +32,55 @@ import org.springframework.util.concurrent.ListenableFuture;
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection
* TcpConnection} based on the TCP client support of the Reactor project.
*
* @author Rossen Stoyanchev
* @since 4.2
* @param <P> the payload type of messages read or written to the TCP stream.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
private final NettyInbound inbound;
private final NettyOutbound outbound;
private final DirectProcessor<Void> closeProcessor;
private final ChannelStream<Message<P>, Message<P>> channelStream;
private final BiConsumer<? super ByteBuf, ? super Message<P>> encoder;
private final Promise<Void> closePromise;
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
BiConsumer<? super ByteBuf, ? super Message<P>> encoder,
DirectProcessor<Void> closeProcessor) {
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> channelStream, Promise<Void> closePromise) {
this.channelStream = channelStream;
this.closePromise = closePromise;
this.inbound = inbound;
this.outbound = outbound;
this.encoder = encoder;
this.closeProcessor = closeProcessor;
}
@Override
public ListenableFuture<Void> send(Message<P> message) {
Promise<Void> afterWrite = Promises.prepare();
this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite);
return new PassThroughPromiseToListenableFutureAdapter<>(afterWrite);
ByteBuf byteBuf = this.inbound.channel().alloc().buffer();
this.encoder.accept(byteBuf, message);
return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf)));
}
@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.channelStream.on().readIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
this.inbound.onReadIdle(inactivityDuration, runnable);
}
@Override
@SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.channelStream.on().writeIdle(inactivityDuration, reactor.fn.Functions.<Void>consumer(runnable));
this.outbound.onWriteIdle(inactivityDuration, runnable);
}
@Override
public void close() {
this.closePromise.onComplete();
this.closeProcessor.onComplete();
}
}

10
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java → spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/ReactorNettyTcpStompClientTests.java

@ -43,20 +43,20 @@ import static org.hamcrest.Matchers.*; @@ -43,20 +43,20 @@ import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
/**
* Integration tests for {@link Reactor2TcpStompClient}.
* Integration tests for {@link ReactorNettyTcpStompClient}.
*
* @author Rossen Stoyanchev
*/
public class Reactor2TcpStompClientTests {
public class ReactorNettyTcpStompClientTests {
private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class);
private static final Log logger = LogFactory.getLog(ReactorNettyTcpStompClientTests.class);
@Rule
public final TestName testName = new TestName();
private BrokerService activeMQBroker;
private Reactor2TcpStompClient client;
private ReactorNettyTcpStompClient client;
@Before
@ -78,7 +78,7 @@ public class Reactor2TcpStompClientTests { @@ -78,7 +78,7 @@ public class Reactor2TcpStompClientTests {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
this.client = new Reactor2TcpStompClient("127.0.0.1", port);
this.client = new ReactorNettyTcpStompClient("127.0.0.1", port);
this.client.setMessageConverter(new StringMessageConverter());
this.client.setTaskScheduler(taskScheduler);
}

108
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java → spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompDecoderTests.java

@ -17,31 +17,28 @@ @@ -17,31 +17,28 @@
package org.springframework.messaging.simp.stomp;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.nio.ByteBuffer;
import java.util.List;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.InvalidMimeTypeException;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Test fixture for {@link Reactor2StompCodec}.
* Test fixture for {@link StompDecoder}.
*
* @author Andy Wilkinson
* @author Stephane Maldini
*/
public class StompCodecTests {
public class StompDecoderTests {
private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<>();
private final StompDecoder decoder = new StompDecoder();
private final Function<Buffer, Message<byte[]>> decoder = new Reactor2StompCodec().decoder(consumer);
@Test
public void decodeFrameWithCrLfEols() {
@ -172,11 +169,9 @@ public class StompCodecTests { @@ -172,11 +169,9 @@ public class StompCodecTests {
public void decodeMultipleFramesFromSameBuffer() {
String frame1 = "SEND\ndestination:test\n\nThe body of the message\0";
String frame2 = "DISCONNECT\n\n\0";
ByteBuffer buffer = ByteBuffer.wrap((frame1 + frame2).getBytes());
Buffer buffer = Buffer.wrap(frame1 + frame2);
final List<Message<byte[]>> messages = new ArrayList<>();
new Reactor2StompCodec().decoder(messages::add).apply(buffer);
final List<Message<byte[]>> messages = decoder.decode(buffer);
assertEquals(2, messages.size());
assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand());
@ -245,102 +240,33 @@ public class StompCodecTests { @@ -245,102 +240,33 @@ public class StompCodecTests {
public void decodeHeartbeat() {
String frame = "\n";
Buffer buffer = Buffer.wrap(frame);
ByteBuffer buffer = ByteBuffer.wrap(frame.getBytes());
final List<Message<byte[]>> messages = new ArrayList<>();
new Reactor2StompCodec().decoder(messages::add).apply(buffer);
final List<Message<byte[]>> messages = decoder.decode(buffer);
assertEquals(1, messages.size());
assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
}
@Test
public void encodeFrameWithNoHeadersAndNoBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
public void encodeFrameWithHeaders() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
String frameString = new Reactor2StompCodec().encoder().apply(frame).asString();
assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscaped() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
public void encodeFrameWithHeadersBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
public void encodeFrameWithContentLengthPresent() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
new Reactor2StompCodec().encoder().apply(frame).asString());
}
private void assertIncompleteDecode(String partialFrame) {
Buffer buffer = Buffer.wrap(partialFrame);
ByteBuffer buffer = ByteBuffer.wrap(partialFrame.getBytes());
assertNull(decode(buffer));
assertEquals(0, buffer.position());
}
private Message<byte[]> decode(String stompFrame) {
Buffer buffer = Buffer.wrap(stompFrame);
ByteBuffer buffer = ByteBuffer.wrap(stompFrame.getBytes());
return decode(buffer);
}
private Message<byte[]> decode(Buffer buffer) {
this.decoder.apply(buffer);
if (consumer.arguments.isEmpty()) {
private Message<byte[]> decode(ByteBuffer buffer) {
List<Message<byte[]>> messages = this.decoder.decode(buffer);
if (messages.isEmpty()) {
return null;
}
else {
return consumer.arguments.get(0);
return messages.get(0);
}
}
private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
private final List<T> arguments = new ArrayList<>();
@Override
public void accept(T t) {
arguments.add(t);
}
}
}

91
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompEncoderTests.java

@ -0,0 +1,91 @@ @@ -0,0 +1,91 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test fixture for {@link StompEncoder}.
*
* @author Andy Wilkinson
* @author Stephane Maldini
*/
public class StompEncoderTests {
private final StompEncoder encoder = new StompEncoder();
@Test
public void encodeFrameWithNoHeadersAndNoBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\n\n\0", new String(encoder.encode(frame)));
}
@Test
public void encodeFrameWithHeaders() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.2");
headers.setHost("github.org");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
String frameString = new String(encoder.encode(frame));
assertTrue(
"CONNECT\naccept-version:1.2\nhost:github.org\n\n\0".equals(frameString) ||
"CONNECT\nhost:github.org\naccept-version:1.2\n\n\0".equals(frameString));
}
@Test
public void encodeFrameWithHeadersThatShouldBeEscaped() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
new String(encoder.encode(frame)));
}
@Test
public void encodeFrameWithHeadersBody() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.addNativeHeader("a", "alpha");
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
new String(encoder.encode(frame)));
}
@Test
public void encodeFrameWithContentLengthPresent() {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setContentLength(12);
Message<byte[]> frame = MessageBuilder.createMessage(
"Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
new String(encoder.encode(frame)));
}
}
Loading…
Cancel
Save