Browse Source

Upgrade to reactor 1.1.0 snapshots

Issue: SPR-11636
pull/508/merge
Rossen Stoyanchev 12 years ago
parent
commit
59703981c4
  1. 9
      build.gradle
  2. 2
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java
  3. 2
      spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java
  4. 152
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java
  5. 27
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java
  6. 28
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

9
build.gradle

@ -75,6 +75,7 @@ configure(allprojects) { project -> @@ -75,6 +75,7 @@ configure(allprojects) { project ->
}
repositories {
maven { url "http://repo.spring.io/snapshot" } // temporarily until Reactor 1.1.0.M4
maven { url "http://repo.spring.io/libs-release" }
maven { url "http://repo.spring.io/milestone" } // for AspectJ 1.8.0.M1
maven { url "https://oss.sonatype.org/content/repositories/releases" } // javax.cache
@ -398,8 +399,8 @@ project("spring-messaging") { @@ -398,8 +399,8 @@ project("spring-messaging") {
compile(project(":spring-beans"))
compile(project(":spring-core"))
compile(project(":spring-context"))
optional("org.projectreactor:reactor-core:1.0.1.RELEASE")
optional("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
optional("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT")
optional("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
@ -621,8 +622,8 @@ project("spring-websocket") { @@ -621,8 +622,8 @@ project("spring-websocket") {
testCompile("org.apache.tomcat.embed:tomcat-embed-core:8.0.3")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:8.0.3")
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:8.0.3")
testCompile("org.projectreactor:reactor-core:1.0.1.RELEASE")
testCompile("org.projectreactor:reactor-tcp:1.0.1.RELEASE")
testCompile("org.projectreactor:reactor-core:1.1.0.BUILD-SNAPSHOT")
testCompile("org.projectreactor:reactor-net:1.1.0.BUILD-SNAPSHOT")
testCompile("log4j:log4j:1.2.17")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
}

2
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java

@ -21,7 +21,7 @@ import org.springframework.messaging.Message; @@ -21,7 +21,7 @@ import org.springframework.messaging.Message;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import reactor.tcp.encoding.Codec;
import reactor.io.encoding.Codec;
import java.util.List;

2
spring-messaging/src/main/java/org/springframework/messaging/tcp/TcpOperations.java

@ -50,6 +50,6 @@ public interface TcpOperations<P> { @@ -50,6 +50,6 @@ public interface TcpOperations<P> {
* @return a ListenableFuture that can be used to determine when and if the
* connection is successfully closed
*/
ListenableFuture<Void> shutdown();
ListenableFuture<Boolean> shutdown();
}

152
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java

@ -16,10 +16,8 @@ @@ -16,10 +16,8 @@
package org.springframework.messaging.tcp.reactor;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import org.apache.commons.logging.Log;
@ -33,7 +31,6 @@ import org.springframework.util.concurrent.ListenableFuture; @@ -33,7 +31,6 @@ import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
@ -41,14 +38,14 @@ import reactor.core.configuration.ConfigurationReader; @@ -41,14 +38,14 @@ import reactor.core.configuration.ConfigurationReader;
import reactor.core.configuration.DispatcherConfiguration;
import reactor.core.configuration.ReactorConfiguration;
import reactor.function.Consumer;
import reactor.function.support.SingleUseConsumer;
import reactor.function.Function;
import reactor.io.Buffer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
@ -73,12 +70,12 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -73,12 +70,12 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
/**
* A constructor that creates a {@link reactor.tcp.netty.NettyTcpClient} with
* A constructor that creates a {@link reactor.net.netty.tcp.NettyTcpClient} with
* a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which
* network I/O is handled in Netty threads.
*
* <p>Also see the constructor accepting a pre-configured Reactor
* {@link reactor.tcp.TcpClient}.
* {@link reactor.net.tcp.TcpClient}.
*
* @param host the host to connect to
* @param port the port to connect to
@ -94,12 +91,10 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -94,12 +91,10 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
.codec(codec)
.connect(host, port)
.get();
checkReactorVersion();
}
/**
* A constructor with a pre-configured {@link reactor.tcp.TcpClient}.
* A constructor with a pre-configured {@link reactor.net.tcp.TcpClient}.
*
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for shutting down the {@link reactor.core.Environment}
@ -111,31 +106,18 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -111,31 +106,18 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
Assert.notNull(tcpClient, "'tcpClient' must not be null");
this.tcpClient = tcpClient;
this.environment = null;
checkReactorVersion();
}
private static void checkReactorVersion() {
Class<?> type = null;
try {
type = ReactorTcpClient.class.getClassLoader().loadClass("reactor.event.dispatch.BaseDispatcher");
Assert.isTrue(Modifier.isPublic(type.getModifiers()),
"Detected older version of reactor-tcp. Switch to 1.0.1.RELEASE or higher.");
}
catch (ClassNotFoundException e) {
// Ignore, must be 1.1+
}
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
Promise<TcpConnection<Message<P>, Message<P>>> promise = this.tcpClient.open();
Promise<NetChannel<Message<P>, Message<P>>> promise = this.tcpClient.open();
composeConnectionHandling(promise, connectionHandler);
return new AbstractPromiseToListenableFutureAdapter<TcpConnection<Message<P>, Message<P>>, Void>(promise) {
return new AbstractPromiseToListenableFutureAdapter<NetChannel<Message<P>, Message<P>>, Void>(promise) {
@Override
protected Void adapt(TcpConnection<Message<P>, Message<P>> result) {
protected Void adapt(NetChannel<Message<P>, Message<P>> result) {
return null;
}
};
@ -147,82 +129,71 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -147,82 +129,71 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
Stream<TcpConnection<Message<P>, Message<P>>> stream =
this.tcpClient.open(new Reconnect() {
Reconnect reconnect = new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
}
};
Stream<NetChannel<Message<P>, Message<P>>> stream = this.tcpClient.open(reconnect);
composeConnectionHandling(stream, connectionHandler);
Promise<Void> promise = Promises.next(stream).map(
new Function<NetChannel<Message<P>, Message<P>>, Void>() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
public Void apply(NetChannel<Message<P>, Message<P>> ch) {
return null;
}
});
composeConnectionHandling(stream, connectionHandler);
return new PassThroughPromiseToListenableFutureAdapter<Void>(toPromise(stream));
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}
private void composeConnectionHandling(Composable<TcpConnection<Message<P>, Message<P>>> composable,
private void composeConnectionHandling(Composable<NetChannel<Message<P>, Message<P>>> composable,
final TcpConnectionHandler<P> connectionHandler) {
composable.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable ex) {
connectionHandler.afterConnectFailure(ex);
}
});
composable.consume(new Consumer<TcpConnection<Message<P>, Message<P>>>() {
@Override
public void accept(TcpConnection<Message<P>, Message<P>> connection) {
connection.on().close(new Runnable() {
composable
.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void run() {
connectionHandler.afterConnectionClosed();
public void accept(Throwable ex) {
connectionHandler.afterConnectFailure(ex);
}
});
connection.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
});
connection.when(Throwable.class, new Consumer<Throwable>() {
})
.consume(new Consumer<NetChannel<Message<P>, Message<P>>>() {
@Override
public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t);
public void accept(NetChannel<Message<P>, Message<P>> connection) {
connection
.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t);
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
})
.on()
.close(new Runnable() {
@Override
public void run() {
connectionHandler.afterConnectionClosed();
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
}
});
}
private Promise<Void> toPromise(Stream<TcpConnection<Message<P>, Message<P>>> stream) {
final Deferred<Void,Promise<Void>> deferred = Promises.<Void>defer().get();
stream.consume(SingleUseConsumer.once(new Consumer<TcpConnection<Message<P>, Message<P>>>() {
@Override
public void accept(TcpConnection<Message<P>, Message<P>> conn) {
deferred.accept((Void) null);
}
}));
stream.when(Throwable.class, SingleUseConsumer.once(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
deferred.accept(throwable);
}
}));
return deferred.compose();
}
@Override
public ListenableFuture<Void> shutdown() {
public ListenableFuture<Boolean> shutdown() {
try {
Promise<Void> promise = this.tcpClient.close();
return new AbstractPromiseToListenableFutureAdapter<Void, Void>(promise) {
Promise<Boolean> promise = this.tcpClient.close();
return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(promise) {
@Override
protected Void adapt(Void result) {
protected Boolean adapt(Boolean result) {
return result;
}
};
@ -233,7 +204,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> { @@ -233,7 +204,6 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
}
}
/**
* A ConfigurationReader that enforces the use of a SynchronousDispatcher.
*

27
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,35 +21,46 @@ import org.springframework.messaging.tcp.TcpConnection; @@ -21,35 +21,46 @@ import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.composable.Promise;
import reactor.net.NetChannel;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection}
* based on the TCP client support of the Reactor project.
*
* @param <P> the payload type of Spring Message's read from
* and written to the TCP stream
*
* @author Rossen Stoyanchev
*/
public class ReactorTcpConnection<P> implements TcpConnection<P> {
private final reactor.tcp.TcpConnection<Message<P>, Message<P>> reactorTcpConnection;
private final NetChannel<Message<P>, Message<P>> channel;
public ReactorTcpConnection(reactor.tcp.TcpConnection<Message<P>, Message<P>> connection) {
this.reactorTcpConnection = connection;
public ReactorTcpConnection(NetChannel<Message<P>, Message<P>> connection) {
this.channel = connection;
}
@Override
public ListenableFuture<Void> send(Message<P> message) {
Promise<Void> promise = this.reactorTcpConnection.send(message);
Promise<Void> promise = this.channel.send(message);
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}
@Override
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.reactorTcpConnection.on().readIdle(inactivityDuration, runnable);
this.channel.on().readIdle(inactivityDuration, runnable);
}
@Override
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.reactorTcpConnection.on().writeIdle(inactivityDuration, runnable);
this.channel.on().writeIdle(inactivityDuration, runnable);
}
@Override
public void close() {
this.reactorTcpConnection.close();
this.channel.close();
}
}

28
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

@ -19,18 +19,13 @@ import java.util.ArrayList; @@ -19,18 +19,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
@ -139,7 +134,7 @@ public class StompBrokerRelayMessageHandlerTests { @@ -139,7 +134,7 @@ public class StompBrokerRelayMessageHandlerTests {
}
private static ListenableFutureTask<Void> getFuture() {
private static ListenableFutureTask<Void> getVoidFuture() {
ListenableFutureTask<Void> futureTask = new ListenableFutureTask<>(new Callable<Void>() {
@Override
public Void call() throws Exception {
@ -150,6 +145,17 @@ public class StompBrokerRelayMessageHandlerTests { @@ -150,6 +145,17 @@ public class StompBrokerRelayMessageHandlerTests {
return futureTask;
}
private static ListenableFutureTask<Boolean> getBooleanFuture() {
ListenableFutureTask<Boolean> futureTask = new ListenableFutureTask<>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return null;
}
});
futureTask.run();
return futureTask;
}
private static class StubTcpOperations implements TcpOperations<byte[]> {
@ -159,18 +165,18 @@ public class StompBrokerRelayMessageHandlerTests { @@ -159,18 +165,18 @@ public class StompBrokerRelayMessageHandlerTests {
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> connectionHandler) {
connectionHandler.afterConnected(this.connection);
return getFuture();
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> connectionHandler, ReconnectStrategy reconnectStrategy) {
connectionHandler.afterConnected(this.connection);
return getFuture();
return getVoidFuture();
}
@Override
public ListenableFuture<Void> shutdown() {
return getFuture();
public ListenableFuture<Boolean> shutdown() {
return getBooleanFuture();
}
}
@ -183,7 +189,7 @@ public class StompBrokerRelayMessageHandlerTests { @@ -183,7 +189,7 @@ public class StompBrokerRelayMessageHandlerTests {
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
this.messages.add(message);
return getFuture();
return getVoidFuture();
}
@Override

Loading…
Cancel
Save