diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java similarity index 93% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java index 7941ff8e1d4..d42749dce41 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java @@ -33,7 +33,7 @@ import java.nio.ByteBuffer; * @author Rossen Stoyanchev * @since 4.0 */ -public class StompCodec implements Codec, Message> { +public class Reactor11StompCodec implements Codec, Message> { private final StompDecoder stompDecoder; @@ -42,11 +42,11 @@ public class StompCodec implements Codec, Message, Buffer> encodingFunction; - public StompCodec() { + public Reactor11StompCodec() { this(new StompEncoder(), new StompDecoder()); } - public StompCodec(StompEncoder encoder, StompDecoder decoder) { + public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) { Assert.notNull(encoder, "'encoder' is required"); Assert.notNull(decoder, "'decoder' is required"); this.stompEncoder = encoder; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b9745a17037..2aeae711676 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -35,7 +35,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.ReactorTcpClient; +import org.springframework.messaging.tcp.reactor.Reactor11TcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -311,7 +311,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. - * By default {@link org.springframework.messaging.tcp.reactor.ReactorTcpClient} is used. + * By default {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -354,7 +354,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (this.tcpClient == null) { StompDecoder decoder = new StompDecoder(); decoder.setHeaderInitializer(getHeaderInitializer()); - StompCodec codec = new StompCodec(new StompEncoder(), decoder); + Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder); this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); } @@ -838,8 +838,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static class StompTcpClientFactory { - public TcpOperations create(String relayHost, int relayPort, StompCodec codec) { - return new ReactorTcpClient(relayHost, relayPort, codec); + public TcpOperations create(String relayHost, int relayPort, Reactor11StompCodec codec) { + return new Reactor11TcpClient(relayHost, relayPort, codec); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java similarity index 95% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java index 20dfde82da6..c27d55105ba 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java @@ -55,7 +55,7 @@ import reactor.tuple.Tuple2; * @author Rossen Stoyanchev * @since 4.0 */ -public class ReactorTcpClient

implements TcpOperations

{ +public class Reactor11TcpClient

implements TcpOperations

{ public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; @@ -77,7 +77,7 @@ public class ReactorTcpClient

implements TcpOperations

{ * @param port the port to connect to * @param codec the codec to use for encoding and decoding the TCP stream */ - public ReactorTcpClient(String host, int port, Codec, Message

> codec) { + public Reactor11TcpClient(String host, int port, Codec, Message

> codec) { // Revisit in 1.1: is Environment still required w/ sync dispatcher? this.environment = new Environment(new SynchronousDispatcherConfigReader()); @@ -98,7 +98,7 @@ public class ReactorTcpClient

implements TcpOperations

{ * * @param tcpClient the TcpClient to use */ - public ReactorTcpClient(TcpClient, Message

> tcpClient) { + public Reactor11TcpClient(TcpClient, Message

> tcpClient) { Assert.notNull(tcpClient, "'tcpClient' must not be null"); this.tcpClient = tcpClient; this.environment = null; @@ -178,7 +178,7 @@ public class ReactorTcpClient

implements TcpOperations

{ connectionHandler.afterConnectionClosed(); } }); - connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); + connectionHandler.afterConnected(new Reactor11TcpConnection

(connection)); } }); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java similarity index 92% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java index 78033c865f8..15646b260ad 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java @@ -33,12 +33,12 @@ import reactor.net.NetChannel; * * @author Rossen Stoyanchev */ -public class ReactorTcpConnection

implements TcpConnection

{ +public class Reactor11TcpConnection

implements TcpConnection

{ private final NetChannel, Message

> channel; - public ReactorTcpConnection(NetChannel, Message

> connection) { + public Reactor11TcpConnection(NetChannel, Message

> connection) { this.channel = connection; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java index ac7b68f3f26..5e74b7e4c40 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java @@ -33,7 +33,7 @@ import reactor.io.Buffer; import static org.junit.Assert.*; /** - * Test fixture for {@link StompCodec}. + * Test fixture for {@link Reactor11StompCodec}. * * @author Andy Wilkinson */ @@ -41,7 +41,7 @@ public class StompCodecTests { private final ArgumentCapturingConsumer> consumer = new ArgumentCapturingConsumer>(); - private final Function> decoder = new StompCodec().decoder(consumer); + private final Function> decoder = new Reactor11StompCodec().decoder(consumer); @Test public void decodeFrameWithCrLfEols() { @@ -176,7 +176,7 @@ public class StompCodecTests { Buffer buffer = Buffer.wrap(frame1 + frame2); final List> messages = new ArrayList>(); - new StompCodec().decoder(new Consumer>() { + new Reactor11StompCodec().decoder(new Consumer>() { @Override public void accept(Message message) { messages.add(message); @@ -234,7 +234,7 @@ public class StompCodecTests { Buffer buffer = Buffer.wrap(frame); final List> messages = new ArrayList>(); - new StompCodec().decoder(new Consumer>() { + new Reactor11StompCodec().decoder(new Consumer>() { @Override public void accept(Message message) { messages.add(message); @@ -251,7 +251,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - assertEquals("DISCONNECT\n\n\0", new StompCodec().encoder().apply(frame).asString()); + assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString()); } @Test @@ -262,7 +262,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - String frameString = new StompCodec().encoder().apply(frame).asString(); + String frameString = new Reactor11StompCodec().encoder().apply(frame).asString(); assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") || frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0")); @@ -276,7 +276,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", - new StompCodec().encoder().apply(frame).asString()); + new Reactor11StompCodec().encoder().apply(frame).asString()); } @Test @@ -287,7 +287,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", - new StompCodec().encoder().apply(frame).asString()); + new Reactor11StompCodec().encoder().apply(frame).asString()); } private void assertIncompleteDecode(String partialFrame) {