From 5ebc1a8b60558cf72383847d213b007a0859416a Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Mon, 27 Apr 2015 02:27:04 +0100 Subject: [PATCH] Update to latest reactor 2.0.1 snapshot --- .../simp/stomp/Reactor2TcpStompClient.java | 7 +- .../messaging/tcp/TcpOperations.java | 2 +- .../tcp/reactor/Reactor2TcpClient.java | 163 +++++++++--------- .../tcp/reactor/Reactor2TcpConnection.java | 22 +-- .../StompBrokerRelayMessageHandlerTests.java | 4 +- 5 files changed, 98 insertions(+), 100 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java index 23220225e10..62a4a478cd6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java @@ -18,13 +18,13 @@ package org.springframework.messaging.simp.stomp; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; -import org.springframework.messaging.tcp.reactor.Reactor2TcpClient.TcpClientSpecFactory; import org.springframework.util.concurrent.ListenableFuture; import reactor.Environment; import reactor.core.config.ConfigurationReader; import reactor.core.config.DispatcherConfiguration; import reactor.core.config.DispatcherType; import reactor.core.config.ReactorConfiguration; +import reactor.io.net.NetStreams; import reactor.io.net.Spec.TcpClientSpec; import java.util.Arrays; @@ -33,8 +33,7 @@ import java.util.Properties; /** * A STOMP over TCP client that uses - * {@link Reactor2TcpClient - * Reactor11TcpClient}. + * {@link Reactor2TcpClient}. * * @author Rossen Stoyanchev * @since 4.2 @@ -121,7 +120,7 @@ public class Reactor2TcpStompClient extends StompClientSupport { } private static class StompTcpClientSpecFactory - implements TcpClientSpecFactory, Message> { + implements NetStreams.TcpClientFactory, Message> { private final Environment environment; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java index c90a87e9e16..4795b2a7eb5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java @@ -49,6 +49,6 @@ public interface TcpOperations

{ * @return a ListenableFuture that can be used to determine when and if the * connection is successfully closed */ - ListenableFuture shutdown(); + ListenableFuture shutdown(); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 5780392c5dc..e40ad6a1f41 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Properties; import io.netty.channel.nio.NioEventLoopGroup; +import org.reactivestreams.Publisher; import reactor.Environment; import reactor.core.config.ConfigurationReader; import reactor.core.config.DispatcherConfiguration; @@ -35,14 +36,13 @@ import reactor.fn.tuple.Tuple; import reactor.fn.tuple.Tuple2; import reactor.io.buffer.Buffer; import reactor.io.codec.Codec; -import reactor.io.net.ChannelStream; -import reactor.io.net.NetStreams; -import reactor.io.net.Reconnect; +import reactor.io.net.*; import reactor.io.net.Spec.TcpClientSpec; import reactor.io.net.impl.netty.NettyClientSocketOptions; import reactor.io.net.impl.netty.tcp.NettyTcpClient; import reactor.io.net.tcp.TcpClient; import reactor.rx.Promise; +import reactor.rx.Promises; import reactor.rx.Stream; import reactor.rx.Streams; import reactor.rx.action.Signal; @@ -58,7 +58,7 @@ import org.springframework.util.concurrent.ListenableFuture; /** * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} * based on the TCP client support of the Reactor project. - * + *

*

This implementation wraps N (Reactor) clients for N {@link #connect} calls, * i.e. a separate (Reactor) client instance for each connection. * @@ -71,7 +71,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ @SuppressWarnings("rawtypes") public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private final TcpClientSpecFactory, Message

> tcpClientSpecFactory; + private final NetStreams.TcpClientFactory, Message

> tcpClientSpecFactory; private final List, Message

>> tcpClients = new ArrayList, Message

>>(); @@ -85,6 +85,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ * threads will be shared amongst the active clients. *

Also see the constructor accepting a ready Reactor * {@link TcpClientSpec} {@link Function} factory. + * * @param host the host to connect to * @param port the port to connect to * @param codec the codec to use for encoding and decoding the TCP stream @@ -93,7 +94,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ final NioEventLoopGroup eventLoopGroup = initEventLoopGroup(); - this.tcpClientSpecFactory = new TcpClientSpecFactory, Message

>() { + this.tcpClientSpecFactory = new NetStreams.TcpClientFactory, Message

>() { @Override public TcpClientSpec, Message

> apply(TcpClientSpec, Message

> spec) { @@ -110,8 +111,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ int ioThreadCount; try { ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); - } - catch (Exception i) { + } catch (Exception i) { ioThreadCount = -1; } if (ioThreadCount <= 0l) { @@ -132,7 +132,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ * * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation. */ - public Reactor2TcpClient(TcpClientSpecFactory, Message

> tcpClientSpecFactory) { + public Reactor2TcpClient(NetStreams.TcpClientFactory, Message

> tcpClientSpecFactory) { Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); this.tcpClientSpecFactory = tcpClientSpecFactory; } @@ -141,73 +141,80 @@ public class Reactor2TcpClient

implements TcpOperations

{ @Override public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { Class type = REACTOR_TCP_CLIENT_TYPE; + TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); - composeConnectionHandling(tcpClient, connectionHandler); - Promise promise = tcpClient.open(); - return new BooleanToVoidAdapter(promise); + + Promise promise = tcpClient.start(composeConnectionHandling(tcpClient, connectionHandler)); + + return new PassThroughPromiseToListenableFutureAdapter( + promise.onError(new Consumer() { + @Override + public void accept(Throwable throwable) { + connectionHandler.afterConnectFailure(throwable); + } + }) + ); } @Override public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { Assert.notNull(strategy, "ReconnectStrategy must not be null"); Class type = REACTOR_TCP_CLIENT_TYPE; + TcpClient, Message

> tcpClient = NetStreams.tcpClient(type, this.tcpClientSpecFactory); - composeConnectionHandling(tcpClient, handler); - Stream stream = tcpClient.open(new ReactorRectonnectAdapter(strategy)); - return new BooleanToVoidAdapter(stream.next()); + + Stream> stream = tcpClient.start( + composeConnectionHandling(tcpClient, handler), + new ReactorRectonnectAdapter(strategy) + ); + + return new PassThroughPromiseToListenableFutureAdapter(stream.next().after()); } - private void composeConnectionHandling(final TcpClient, Message

> tcpClient, - final TcpConnectionHandler

connectionHandler) { + private MessageHandler

composeConnectionHandling( + final TcpClient, Message

> tcpClient, + final TcpConnectionHandler

connectionHandler + ) { - synchronized (this.tcpClients){ + synchronized (this.tcpClients) { this.tcpClients.add(tcpClient); } - tcpClient - .finallyDo(new Consumer, Message

>>>() { + return new MessageHandler

() { + @Override + public Publisher apply(ChannelStream, Message

> connection) { - @Override - public void accept(Signal, Message

>> signal) { - synchronized (tcpClients) { - tcpClients.remove(tcpClient); - } - if (signal.isOnError()) { - connectionHandler.afterConnectFailure(signal.getThrowable()); - } - } - }) - .consume(new Consumer, Message

>>() { + Promise closePromise = Promises.prepare(); - @Override - public void accept(ChannelStream, Message

> connection) { - connection - .finallyDo(new Consumer>>() { - - @Override - public void accept(Signal> signal) { - if (signal.isOnError()) { - connectionHandler.handleFailure(signal.getThrowable()); - } - else if (signal.isOnComplete()) { - connectionHandler.afterConnectionClosed(); - } - } - }) - .consume(new Consumer>() { - - @Override - public void accept(Message

message) { - connectionHandler.handleMessage(message); - } - }); - connectionHandler.afterConnected(new Reactor2TcpConnection

(connection)); - } - }); + connectionHandler.afterConnected(new Reactor2TcpConnection

(connection, closePromise)); + + connection + .finallyDo(new Consumer>>() { + + @Override + public void accept(Signal> signal) { + if (signal.isOnError()) { + connectionHandler.handleFailure(signal.getThrowable()); + } else if (signal.isOnComplete()) { + connectionHandler.afterConnectionClosed(); + } + } + }) + .consume(new Consumer>() { + + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }); + + return closePromise; + } + }; } @Override - public ListenableFuture shutdown() { + public ListenableFuture shutdown() { final List, Message

>> clients; @@ -215,26 +222,23 @@ public class Reactor2TcpClient

implements TcpOperations

{ clients = new ArrayList, Message

>>(this.tcpClients); } - Promise promise = Streams.from(clients) - .flatMap(new Function, Message

>, Promise>() { - @Override - public Promise apply(TcpClient, Message

> client) { - return client.close(); - } - }) - .reduce(new BiFunction() { + Promise promise = Streams.from(clients) + .flatMap(new Function, Message

>, Promise>() { @Override - public Boolean apply(Boolean prev, Boolean next) { - return prev && next; + public Promise apply(final TcpClient, Message

> client) { + return client.shutdown().onComplete(new Consumer>() { + @Override + public void accept(Promise voidPromise) { + synchronized (tcpClients) { + tcpClients.remove(client); + } + } + }); } }) .next(); - return new PassThroughPromiseToListenableFutureAdapter(promise); - } - - - public interface TcpClientSpecFactory extends Function, TcpClientSpec> { + return new PassThroughPromiseToListenableFutureAdapter(promise); } private static class SynchronousDispatcherConfigReader implements ConfigurationReader { @@ -257,18 +261,11 @@ public class Reactor2TcpClient

implements TcpOperations

{ public Tuple2 reconnect(InetSocketAddress address, int attempt) { return Tuple.of(address, strategy.getTimeToNextAttempt(attempt)); } - } - - private static class BooleanToVoidAdapter extends AbstractPromiseToListenableFutureAdapter { - public BooleanToVoidAdapter(Promise promise) { - super(promise); - } + } - @Override - protected Void adapt(Boolean result) { - return null; - } + private interface MessageHandler

+ extends ReactorChannelHandler, Message

, ChannelStream, Message

>>{ } -} \ No newline at end of file +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java index a08533b7023..ed198f9599f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java @@ -16,9 +16,14 @@ package org.springframework.messaging.tcp.reactor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.util.concurrent.ListenableFutureAdapter; import reactor.fn.Functions; import reactor.io.net.ChannelStream; +import reactor.rx.Promise; import reactor.rx.Promises; +import reactor.rx.Streams; import reactor.rx.broadcast.Broadcaster; import org.springframework.messaging.Message; @@ -37,21 +42,19 @@ import org.springframework.util.concurrent.ListenableFuture; public class Reactor2TcpConnection

implements TcpConnection

{ private final ChannelStream, Message

> channelStream; + private final Promise closePromise; - private final Broadcaster> sink; - - - public Reactor2TcpConnection(ChannelStream, Message

> channelStream) { + public Reactor2TcpConnection(ChannelStream, Message

> channelStream, Promise closePromise) { this.channelStream = channelStream; - this.sink = Broadcaster.create(); - this.channelStream.sink(this.sink); + this.closePromise = closePromise; } @Override public ListenableFuture send(Message

message) { - this.sink.onNext(message); - return new PassThroughPromiseToListenableFutureAdapter(Promises.success(null)); + Promise afterWrite = Promises.prepare(); + this.channelStream.writeWith(Streams.just(message)).subscribe(afterWrite); + return new PassThroughPromiseToListenableFutureAdapter(afterWrite); } @Override @@ -66,7 +69,6 @@ public class Reactor2TcpConnection

implements TcpConnection

{ @Override public void close() { - this.sink.onComplete(); + this.closePromise.onComplete(); } - } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index ed7180608d0..4736007b840 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -300,8 +300,8 @@ public class StompBrokerRelayMessageHandlerTests { } @Override - public ListenableFuture shutdown() { - return getBooleanFuture(); + public ListenableFuture shutdown() { + return getVoidFuture(); } public void handleMessage(Message message) {