diff --git a/build.gradle b/build.gradle index a2f3ce9a01a..c1abd80230a 100644 --- a/build.gradle +++ b/build.gradle @@ -78,6 +78,7 @@ configure(allprojects) { project -> repositories { maven { url "http://repo.spring.io/libs-release" } maven { url "http://repo.spring.io/milestone" } // for AspectJ 1.8.0.RC2 + maven { url "http://repo.spring.io/snapshot" } // temporarily until Reactor 1.1.0.RC1 } dependencies { @@ -396,7 +397,8 @@ project("spring-messaging") { compile(project(":spring-beans")) compile(project(":spring-core")) compile(project(":spring-context")) - optional("org.projectreactor:reactor-core:1.0.1.RELEASE") + optional("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT") + optional("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT") optional("org.projectreactor:reactor-tcp:1.0.1.RELEASE") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" @@ -623,8 +625,9 @@ project("spring-websocket") { testCompile("org.apache.tomcat.embed:tomcat-embed-core:8.0.3") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:8.0.3") testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.3") - testCompile("org.projectreactor:reactor-core:1.0.1.RELEASE") - testCompile("org.projectreactor:reactor-tcp:1.0.1.RELEASE") + + testCompile("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT") + testCompile("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT") testCompile("log4j:log4j:1.2.17") testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}") } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java new file mode 100644 index 00000000000..72189a41e4e --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java @@ -0,0 +1,102 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.stomp; + +import org.springframework.messaging.Message; + +import org.springframework.util.Assert; +import reactor.function.Consumer; +import reactor.function.Function; +import reactor.io.Buffer; +import reactor.io.encoding.Codec; + +import java.nio.ByteBuffer; + +/** + * A Reactor TCP {@link reactor.io.encoding.Codec} for sending and receiving STOMP messages. + * + * @author Andy Wilkinson + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class Reactor11StompCodec implements Codec, Message> { + + private final StompDecoder stompDecoder; + + private final StompEncoder stompEncoder; + + private final Function, Buffer> encodingFunction; + + + public Reactor11StompCodec() { + this(new StompEncoder(), new StompDecoder()); + } + + public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) { + Assert.notNull(encoder, "'encoder' is required"); + Assert.notNull(decoder, "'decoder' is required"); + this.stompEncoder = encoder; + this.stompDecoder = decoder; + this.encodingFunction = new EncodingFunction(this.stompEncoder); + } + + @Override + public Function> decoder(final Consumer> messageConsumer) { + return new DecodingFunction(this.stompDecoder, messageConsumer); + } + + @Override + public Function, Buffer> encoder() { + return this.encodingFunction; + } + + + private static class EncodingFunction implements Function, Buffer> { + + private final StompEncoder encoder; + + private EncodingFunction(StompEncoder encoder) { + this.encoder = encoder; + } + + @Override + public Buffer apply(Message message) { + byte[] bytes = this.encoder.encode(message); + return new Buffer(ByteBuffer.wrap(bytes)); + } + } + + private static class DecodingFunction implements Function> { + + private final StompDecoder decoder; + + private final Consumer> messageConsumer; + + public DecodingFunction(StompDecoder decoder, Consumer> next) { + this.decoder = decoder; + this.messageConsumer = next; + } + + @Override + public Message apply(Buffer buffer) { + for (Message message : this.decoder.decode(buffer.byteBuffer())) { + this.messageConsumer.accept(message); + } + return null; + } + } +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 1980f90404b..b72335045c3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,7 +16,6 @@ package org.springframework.messaging.simp.stomp; -import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; @@ -34,8 +33,8 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.ReactorTcpClient; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureTask; @@ -69,6 +68,12 @@ import org.springframework.util.concurrent.ListenableFutureTask; */ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { + private static final boolean reactor10Present = + ClassUtils.isPresent("reactor.tcp.TcpClient", StompBrokerRelayMessageHandler.class.getClassLoader()); + + private static final boolean reactor11Present = + ClassUtils.isPresent("reactor.net.tcp.TcpClient", StompBrokerRelayMessageHandler.class.getClassLoader()); + private static final byte[] EMPTY_PAYLOAD = new byte[0]; private static final ListenableFutureTask EMPTY_TASK = new ListenableFutureTask(new VoidCallable()); @@ -331,7 +336,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.brokerChannel.subscribe(this); if (this.tcpClient == null) { - this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort); + if (reactor11Present) { + this.tcpClient = new Reactor11TcpClientFactory().create(this.relayHost, this.relayPort); + } + else if (reactor10Present) { + this.tcpClient = new Reactor10TcpClientFactory().create(this.relayHost, this.relayPort); + } + else { + throw new IllegalStateException("Please add the \"org.projectreactor:reactor-net\" dependency"); + } } if (logger.isDebugEnabled()) { @@ -606,6 +619,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + @Override + public void handleFailure(Throwable ex) { + if (this.tcpConnection == null) { + return; + } + handleTcpConnectionFailure("Closing connection after TCP failure", ex); + } + @Override public void afterConnectionClosed() { if (this.tcpConnection == null) { @@ -753,10 +774,19 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } - private static class StompTcpClientFactory { + private static class Reactor11TcpClientFactory { + + public TcpOperations create(String host, int port) { + return new org.springframework.messaging.tcp.reactor.Reactor11TcpClient( + host, port, new Reactor11StompCodec()); + } + } + + private static class Reactor10TcpClientFactory { - public TcpOperations create(String relayHost, int relayPort) { - return new ReactorTcpClient(relayHost, relayPort, new StompCodec()); + public TcpOperations create(String host, int port) { + return new org.springframework.messaging.tcp.reactor.ReactorTcpClient( + host, port, new StompCodec()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java index bb5841be063..600ff1bb6f2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpConnectionHandler.java @@ -47,6 +47,12 @@ public interface TcpConnectionHandler

{ */ void handleMessage(Message

message); + /** + * Handle a failure on the connection. + * @param ex the exception + */ + void handleFailure(Throwable ex); + /** * Invoked after the connection is closed. */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java index 6070ad13d65..8db2e10140b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java @@ -50,6 +50,6 @@ public interface TcpOperations

{ * @return a ListenableFuture that can be used to determine when and if the * connection is successfully closed */ - ListenableFuture shutdown(); + ListenableFuture shutdown(); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java index 02d0d19ccaf..06a7f37a028 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java @@ -38,7 +38,7 @@ import reactor.function.Consumer; * @author Rossen Stoyanchev * @since 4.0 */ -abstract class AbstractPromiseToListenableFutureAdapter implements ListenableFuture { +public abstract class AbstractPromiseToListenableFutureAdapter implements ListenableFuture { private final Promise promise; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java index 05a586ef7ea..d8e3e90d41a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java @@ -25,7 +25,7 @@ import reactor.core.composable.Promise; * @author Rossen Stoyanchev * @since 4.0 */ -class PassThroughPromiseToListenableFutureAdapter extends AbstractPromiseToListenableFutureAdapter { +public class PassThroughPromiseToListenableFutureAdapter extends AbstractPromiseToListenableFutureAdapter { public PassThroughPromiseToListenableFutureAdapter(Promise promise) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java new file mode 100644 index 00000000000..c27d55105ba --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java @@ -0,0 +1,218 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.tcp.reactor; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Properties; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.ReconnectStrategy; +import org.springframework.messaging.tcp.TcpConnectionHandler; +import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; + +import reactor.core.Environment; +import reactor.core.composable.Composable; +import reactor.core.composable.Promise; +import reactor.core.composable.Stream; +import reactor.core.composable.spec.Promises; +import reactor.core.configuration.ConfigurationReader; +import reactor.core.configuration.DispatcherConfiguration; +import reactor.core.configuration.ReactorConfiguration; +import reactor.function.Consumer; +import reactor.function.Function; +import reactor.io.Buffer; +import reactor.io.encoding.Codec; +import reactor.net.NetChannel; +import reactor.net.Reconnect; +import reactor.net.netty.tcp.NettyTcpClient; +import reactor.net.tcp.TcpClient; +import reactor.net.tcp.spec.TcpClientSpec; +import reactor.tuple.Tuple; +import reactor.tuple.Tuple2; + + +/** + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the TCP client support of the Reactor project. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class Reactor11TcpClient

implements TcpOperations

{ + + public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; + + + private final TcpClient, Message

> tcpClient; + + private final Environment environment; + + + /** + * A constructor that creates a {@link reactor.net.netty.tcp.NettyTcpClient} with + * a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which + * network I/O is handled in Netty threads. + * + *

Also see the constructor accepting a pre-configured Reactor + * {@link reactor.net.tcp.TcpClient}. + * + * @param host the host to connect to + * @param port the port to connect to + * @param codec the codec to use for encoding and decoding the TCP stream + */ + public Reactor11TcpClient(String host, int port, Codec, Message

> codec) { + + // Revisit in 1.1: is Environment still required w/ sync dispatcher? + this.environment = new Environment(new SynchronousDispatcherConfigReader()); + + this.tcpClient = new TcpClientSpec, Message

>(REACTOR_TCP_CLIENT_TYPE) + .env(this.environment) + .codec(codec) + .connect(host, port) + .get(); + } + + /** + * A constructor with a pre-configured {@link reactor.net.tcp.TcpClient}. + * + *

NOTE: if the client is configured with a thread-creating + * dispatcher, you are responsible for shutting down the {@link reactor.core.Environment} + * instance with which the client is configured. + * + * @param tcpClient the TcpClient to use + */ + public Reactor11TcpClient(TcpClient, Message

> tcpClient) { + Assert.notNull(tcpClient, "'tcpClient' must not be null"); + this.tcpClient = tcpClient; + this.environment = null; + } + + + @Override + public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { + + Promise, Message

>> promise = this.tcpClient.open(); + composeConnectionHandling(promise, connectionHandler); + + return new AbstractPromiseToListenableFutureAdapter, Message

>, Void>(promise) { + @Override + protected Void adapt(NetChannel, Message

> result) { + return null; + } + }; + } + + @Override + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler, + final ReconnectStrategy reconnectStrategy) { + + Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null"); + + Reconnect reconnect = new Reconnect() { + @Override + public Tuple2 reconnect(InetSocketAddress address, int attempt) { + return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); + } + }; + + Stream, Message

>> stream = this.tcpClient.open(reconnect); + composeConnectionHandling(stream, connectionHandler); + + Promise promise = Promises.next(stream).map( + new Function, Message

>, Void>() { + @Override + public Void apply(NetChannel, Message

> ch) { + return null; + } + }); + return new PassThroughPromiseToListenableFutureAdapter(promise); + } + + private void composeConnectionHandling(Composable, Message

>> composable, + final TcpConnectionHandler

connectionHandler) { + + composable + .when(Throwable.class, new Consumer() { + @Override + public void accept(Throwable ex) { + connectionHandler.afterConnectFailure(ex); + } + }) + .consume(new Consumer, Message

>>() { + @Override + public void accept(NetChannel, Message

> connection) { + connection + .when(Throwable.class, new Consumer() { + @Override + public void accept(Throwable t) { + connectionHandler.handleFailure(t); + } + }) + .consume(new Consumer>() { + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }) + .on() + .close(new Runnable() { + @Override + public void run() { + connectionHandler.afterConnectionClosed(); + } + }); + connectionHandler.afterConnected(new Reactor11TcpConnection

(connection)); + } + }); + } + + @Override + public ListenableFuture shutdown() { + try { + Promise promise = this.tcpClient.close(); + return new AbstractPromiseToListenableFutureAdapter(promise) { + @Override + protected Boolean adapt(Boolean result) { + return result; + } + }; + } + finally { + this.environment.shutdown(); + + } + } + + /** + * A ConfigurationReader that enforces the use of a SynchronousDispatcher. + * + *

The {@link reactor.core.configuration.PropertiesConfigurationReader} used by + * default automatically creates other dispatchers with thread pools that are + * not needed. + */ + private static class SynchronousDispatcherConfigReader implements ConfigurationReader { + + @Override + public ReactorConfiguration read() { + return new ReactorConfiguration(Arrays.asList(), "sync", new Properties()); + } + } + +} \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java new file mode 100644 index 00000000000..f92d84c55c8 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java @@ -0,0 +1,67 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.tcp.reactor; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.TcpConnection; +import org.springframework.messaging.tcp.reactor.PassThroughPromiseToListenableFutureAdapter; +import org.springframework.util.concurrent.ListenableFuture; + +import reactor.core.composable.Promise; +import reactor.net.NetChannel; + + +/** + * An implementation of {@link org.springframework.messaging.tcp.TcpConnection} + * based on the TCP client support of the Reactor project. + * + * @param

the payload type of Spring Message's read from + * and written to the TCP stream + * + * @author Rossen Stoyanchev + */ +public class Reactor11TcpConnection

implements TcpConnection

{ + + private final NetChannel, Message

> channel; + + + public Reactor11TcpConnection(NetChannel, Message

> connection) { + this.channel = connection; + } + + @Override + public ListenableFuture send(Message

message) { + Promise promise = this.channel.send(message); + return new PassThroughPromiseToListenableFutureAdapter(promise); + } + + @Override + public void onReadInactivity(Runnable runnable, long inactivityDuration) { + this.channel.on().readIdle(inactivityDuration, runnable); + } + + @Override + public void onWriteInactivity(Runnable runnable, long inactivityDuration) { + this.channel.on().writeIdle(inactivityDuration, runnable); + } + + @Override + public void close() { + this.channel.close(); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java index c58bbffe3d2..2b3c86706c5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java @@ -19,7 +19,6 @@ package org.springframework.messaging.tcp.reactor; import java.lang.reflect.Modifier; import java.net.InetSocketAddress; import java.util.Arrays; -import java.util.Collections; import java.util.Properties; import org.apache.commons.logging.Log; @@ -217,13 +216,13 @@ public class ReactorTcpClient

implements TcpOperations

{ } @Override - public ListenableFuture shutdown() { + public ListenableFuture shutdown() { try { Promise promise = this.tcpClient.close(); - return new AbstractPromiseToListenableFutureAdapter(promise) { + return new AbstractPromiseToListenableFutureAdapter(promise) { @Override - protected Void adapt(Void result) { - return result; + protected Boolean adapt(Void result) { + return true; } }; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index 7fa15c1610d..2a84974dce8 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -19,18 +19,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; @@ -150,6 +145,17 @@ public class StompBrokerRelayMessageHandlerTests { return futureTask; } + private static ListenableFutureTask getBooleanFuture() { + ListenableFutureTask futureTask = new ListenableFutureTask<>(new Callable() { + @Override + public Boolean call() throws Exception { + return true; + } + }); + futureTask.run(); + return futureTask; + } + private static class StubTcpOperations implements TcpOperations { @@ -169,8 +175,8 @@ public class StompBrokerRelayMessageHandlerTests { } @Override - public ListenableFuture shutdown() { - return getFuture(); + public ListenableFuture shutdown() { + return getBooleanFuture(); } }