Browse Source
This change adds support for Reactor 1.1 in spring-messaging in addition to Reactor 1.0.1 -- whichever is present on the classpath is used. Note also the module name change: reactor-tcp:1.0.1 -> reactor-net:1.1.0 Issue: SPR-11636pull/531/head
11 changed files with 455 additions and 24 deletions
@ -0,0 +1,102 @@
@@ -0,0 +1,102 @@
|
||||
/* |
||||
* Copyright 2002-2014 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.simp.stomp; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
|
||||
import org.springframework.util.Assert; |
||||
import reactor.function.Consumer; |
||||
import reactor.function.Function; |
||||
import reactor.io.Buffer; |
||||
import reactor.io.encoding.Codec; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
|
||||
/** |
||||
* A Reactor TCP {@link reactor.io.encoding.Codec} for sending and receiving STOMP messages. |
||||
* |
||||
* @author Andy Wilkinson |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> { |
||||
|
||||
private final StompDecoder stompDecoder; |
||||
|
||||
private final StompEncoder stompEncoder; |
||||
|
||||
private final Function<Message<byte[]>, Buffer> encodingFunction; |
||||
|
||||
|
||||
public Reactor11StompCodec() { |
||||
this(new StompEncoder(), new StompDecoder()); |
||||
} |
||||
|
||||
public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) { |
||||
Assert.notNull(encoder, "'encoder' is required"); |
||||
Assert.notNull(decoder, "'decoder' is required"); |
||||
this.stompEncoder = encoder; |
||||
this.stompDecoder = decoder; |
||||
this.encodingFunction = new EncodingFunction(this.stompEncoder); |
||||
} |
||||
|
||||
@Override |
||||
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) { |
||||
return new DecodingFunction(this.stompDecoder, messageConsumer); |
||||
} |
||||
|
||||
@Override |
||||
public Function<Message<byte[]>, Buffer> encoder() { |
||||
return this.encodingFunction; |
||||
} |
||||
|
||||
|
||||
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> { |
||||
|
||||
private final StompEncoder encoder; |
||||
|
||||
private EncodingFunction(StompEncoder encoder) { |
||||
this.encoder = encoder; |
||||
} |
||||
|
||||
@Override |
||||
public Buffer apply(Message<byte[]> message) { |
||||
byte[] bytes = this.encoder.encode(message); |
||||
return new Buffer(ByteBuffer.wrap(bytes)); |
||||
} |
||||
} |
||||
|
||||
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> { |
||||
|
||||
private final StompDecoder decoder; |
||||
|
||||
private final Consumer<Message<byte[]>> messageConsumer; |
||||
|
||||
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) { |
||||
this.decoder = decoder; |
||||
this.messageConsumer = next; |
||||
} |
||||
|
||||
@Override |
||||
public Message<byte[]> apply(Buffer buffer) { |
||||
for (Message<byte[]> message : this.decoder.decode(buffer.byteBuffer())) { |
||||
this.messageConsumer.accept(message); |
||||
} |
||||
return null; |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,218 @@
@@ -0,0 +1,218 @@
|
||||
/* |
||||
* Copyright 2002-2014 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.tcp.reactor; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.Arrays; |
||||
import java.util.Properties; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.tcp.ReconnectStrategy; |
||||
import org.springframework.messaging.tcp.TcpConnectionHandler; |
||||
import org.springframework.messaging.tcp.TcpOperations; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.concurrent.ListenableFuture; |
||||
|
||||
import reactor.core.Environment; |
||||
import reactor.core.composable.Composable; |
||||
import reactor.core.composable.Promise; |
||||
import reactor.core.composable.Stream; |
||||
import reactor.core.composable.spec.Promises; |
||||
import reactor.core.configuration.ConfigurationReader; |
||||
import reactor.core.configuration.DispatcherConfiguration; |
||||
import reactor.core.configuration.ReactorConfiguration; |
||||
import reactor.function.Consumer; |
||||
import reactor.function.Function; |
||||
import reactor.io.Buffer; |
||||
import reactor.io.encoding.Codec; |
||||
import reactor.net.NetChannel; |
||||
import reactor.net.Reconnect; |
||||
import reactor.net.netty.tcp.NettyTcpClient; |
||||
import reactor.net.tcp.TcpClient; |
||||
import reactor.net.tcp.spec.TcpClientSpec; |
||||
import reactor.tuple.Tuple; |
||||
import reactor.tuple.Tuple2; |
||||
|
||||
|
||||
/** |
||||
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations} |
||||
* based on the TCP client support of the Reactor project. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.0 |
||||
*/ |
||||
public class Reactor11TcpClient<P> implements TcpOperations<P> { |
||||
|
||||
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; |
||||
|
||||
|
||||
private final TcpClient<Message<P>, Message<P>> tcpClient; |
||||
|
||||
private final Environment environment; |
||||
|
||||
|
||||
/** |
||||
* A constructor that creates a {@link reactor.net.netty.tcp.NettyTcpClient} with |
||||
* a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which |
||||
* network I/O is handled in Netty threads. |
||||
* |
||||
* <p>Also see the constructor accepting a pre-configured Reactor |
||||
* {@link reactor.net.tcp.TcpClient}. |
||||
* |
||||
* @param host the host to connect to |
||||
* @param port the port to connect to |
||||
* @param codec the codec to use for encoding and decoding the TCP stream |
||||
*/ |
||||
public Reactor11TcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) { |
||||
|
||||
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
|
||||
this.environment = new Environment(new SynchronousDispatcherConfigReader()); |
||||
|
||||
this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(REACTOR_TCP_CLIENT_TYPE) |
||||
.env(this.environment) |
||||
.codec(codec) |
||||
.connect(host, port) |
||||
.get(); |
||||
} |
||||
|
||||
/** |
||||
* A constructor with a pre-configured {@link reactor.net.tcp.TcpClient}. |
||||
* |
||||
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating |
||||
* dispatcher, you are responsible for shutting down the {@link reactor.core.Environment} |
||||
* instance with which the client is configured. |
||||
* |
||||
* @param tcpClient the TcpClient to use |
||||
*/ |
||||
public Reactor11TcpClient(TcpClient<Message<P>, Message<P>> tcpClient) { |
||||
Assert.notNull(tcpClient, "'tcpClient' must not be null"); |
||||
this.tcpClient = tcpClient; |
||||
this.environment = null; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) { |
||||
|
||||
Promise<NetChannel<Message<P>, Message<P>>> promise = this.tcpClient.open(); |
||||
composeConnectionHandling(promise, connectionHandler); |
||||
|
||||
return new AbstractPromiseToListenableFutureAdapter<NetChannel<Message<P>, Message<P>>, Void>(promise) { |
||||
@Override |
||||
protected Void adapt(NetChannel<Message<P>, Message<P>> result) { |
||||
return null; |
||||
} |
||||
}; |
||||
} |
||||
|
||||
@Override |
||||
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler, |
||||
final ReconnectStrategy reconnectStrategy) { |
||||
|
||||
Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null"); |
||||
|
||||
Reconnect reconnect = new Reconnect() { |
||||
@Override |
||||
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) { |
||||
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); |
||||
} |
||||
}; |
||||
|
||||
Stream<NetChannel<Message<P>, Message<P>>> stream = this.tcpClient.open(reconnect); |
||||
composeConnectionHandling(stream, connectionHandler); |
||||
|
||||
Promise<Void> promise = Promises.next(stream).map( |
||||
new Function<NetChannel<Message<P>, Message<P>>, Void>() { |
||||
@Override |
||||
public Void apply(NetChannel<Message<P>, Message<P>> ch) { |
||||
return null; |
||||
} |
||||
}); |
||||
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise); |
||||
} |
||||
|
||||
private void composeConnectionHandling(Composable<NetChannel<Message<P>, Message<P>>> composable, |
||||
final TcpConnectionHandler<P> connectionHandler) { |
||||
|
||||
composable |
||||
.when(Throwable.class, new Consumer<Throwable>() { |
||||
@Override |
||||
public void accept(Throwable ex) { |
||||
connectionHandler.afterConnectFailure(ex); |
||||
} |
||||
}) |
||||
.consume(new Consumer<NetChannel<Message<P>, Message<P>>>() { |
||||
@Override |
||||
public void accept(NetChannel<Message<P>, Message<P>> connection) { |
||||
connection |
||||
.when(Throwable.class, new Consumer<Throwable>() { |
||||
@Override |
||||
public void accept(Throwable t) { |
||||
connectionHandler.handleFailure(t); |
||||
} |
||||
}) |
||||
.consume(new Consumer<Message<P>>() { |
||||
@Override |
||||
public void accept(Message<P> message) { |
||||
connectionHandler.handleMessage(message); |
||||
} |
||||
}) |
||||
.on() |
||||
.close(new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
connectionHandler.afterConnectionClosed(); |
||||
} |
||||
}); |
||||
connectionHandler.afterConnected(new Reactor11TcpConnection<P>(connection)); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public ListenableFuture<Boolean> shutdown() { |
||||
try { |
||||
Promise<Boolean> promise = this.tcpClient.close(); |
||||
return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(promise) { |
||||
@Override |
||||
protected Boolean adapt(Boolean result) { |
||||
return result; |
||||
} |
||||
}; |
||||
} |
||||
finally { |
||||
this.environment.shutdown(); |
||||
|
||||
} |
||||
} |
||||
|
||||
/** |
||||
* A ConfigurationReader that enforces the use of a SynchronousDispatcher. |
||||
* |
||||
* <p>The {@link reactor.core.configuration.PropertiesConfigurationReader} used by |
||||
* default automatically creates other dispatchers with thread pools that are |
||||
* not needed. |
||||
*/ |
||||
private static class SynchronousDispatcherConfigReader implements ConfigurationReader { |
||||
|
||||
@Override |
||||
public ReactorConfiguration read() { |
||||
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(), "sync", new Properties()); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,67 @@
@@ -0,0 +1,67 @@
|
||||
/* |
||||
* Copyright 2002-2014 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.messaging.tcp.reactor; |
||||
|
||||
import org.springframework.messaging.Message; |
||||
import org.springframework.messaging.tcp.TcpConnection; |
||||
import org.springframework.messaging.tcp.reactor.PassThroughPromiseToListenableFutureAdapter; |
||||
import org.springframework.util.concurrent.ListenableFuture; |
||||
|
||||
import reactor.core.composable.Promise; |
||||
import reactor.net.NetChannel; |
||||
|
||||
|
||||
/** |
||||
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection} |
||||
* based on the TCP client support of the Reactor project. |
||||
* |
||||
* @param <P> the payload type of Spring Message's read from |
||||
* and written to the TCP stream |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class Reactor11TcpConnection<P> implements TcpConnection<P> { |
||||
|
||||
private final NetChannel<Message<P>, Message<P>> channel; |
||||
|
||||
|
||||
public Reactor11TcpConnection(NetChannel<Message<P>, Message<P>> connection) { |
||||
this.channel = connection; |
||||
} |
||||
|
||||
@Override |
||||
public ListenableFuture<Void> send(Message<P> message) { |
||||
Promise<Void> promise = this.channel.send(message); |
||||
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise); |
||||
} |
||||
|
||||
@Override |
||||
public void onReadInactivity(Runnable runnable, long inactivityDuration) { |
||||
this.channel.on().readIdle(inactivityDuration, runnable); |
||||
} |
||||
|
||||
@Override |
||||
public void onWriteInactivity(Runnable runnable, long inactivityDuration) { |
||||
this.channel.on().writeIdle(inactivityDuration, runnable); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
this.channel.close(); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue