diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java index 8b36d4bce40..c59d1379cee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java @@ -15,19 +15,27 @@ */ package org.springframework.messaging.simp.stomp; -import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; +import java.nio.ByteBuffer; +import java.util.List; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCodec; /** - * {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and - * {@link StompEncoder}. + * Simple delegation to StompDecoder and StompEncoder. * * @author Rossen Stoyanchev * @since 5.0 */ -class StompReactorNettyCodec extends ReactorNettyCodec { +class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec { + + private final StompDecoder decoder; + + private final StompEncoder encoder; + public StompReactorNettyCodec() { - this(new StompDecoder(), new StompEncoder()); + this(new StompDecoder()); } public StompReactorNettyCodec(StompDecoder decoder) { @@ -35,8 +43,18 @@ class StompReactorNettyCodec extends ReactorNettyCodec { } public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) { - super(byteBuf -> decoder.decode(byteBuf.nioBuffer()), - (byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message))); + this.decoder = decoder; + this.encoder = encoder; + } + + + @Override + protected List> decodeInternal(ByteBuffer nioBuffer) { + return this.decoder.decode(nioBuffer); + } + + protected ByteBuffer encodeInternal(Message message) { + return ByteBuffer.wrap(this.encoder.encode(message)); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java new file mode 100644 index 00000000000..aeef66482a7 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.tcp.reactor; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.springframework.messaging.Message; + +/** + * Convenient base class for {@link ReactorNettyCodec} implementations that need + * to work with NIO {@link ByteBuffer}s. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class AbstractNioBufferReactorNettyCodec

implements ReactorNettyCodec

{ + + @Override + public Collection> decode(ByteBuf inputBuffer) { + ByteBuffer nioBuffer = inputBuffer.nioBuffer(); + int start = nioBuffer.position(); + List> messages = decodeInternal(nioBuffer); + inputBuffer.skipBytes(nioBuffer.position() - start); + return messages; + } + + protected abstract List> decodeInternal(ByteBuffer nioBuffer); + + @Override + public void encode(Message

message, ByteBuf outputBuffer) { + outputBuffer.writeBytes(encodeInternal(message)); + } + + protected abstract ByteBuffer encodeInternal(Message

message); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java index 3254882a94b..529dd7ea002 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java @@ -22,7 +22,6 @@ import java.util.function.Function; import io.netty.buffer.ByteBuf; import org.springframework.messaging.Message; -import org.springframework.util.Assert; /** * Simple holder for a decoding {@link Function} and an encoding @@ -31,28 +30,20 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 5.0 */ -public class ReactorNettyCodec

{ - - private final Function>> decoder; - - private final BiConsumer> encoder; - - - public ReactorNettyCodec(Function>> decoder, - BiConsumer> encoder) { - - Assert.notNull(decoder, "'decoder' is required"); - Assert.notNull(encoder, "'encoder' is required"); - this.decoder = decoder; - this.encoder = encoder; - } - - public Function>> getDecoder() { - return this.decoder; - } - - public BiConsumer> getEncoder() { - return this.encoder; - } +public interface ReactorNettyCodec

{ + + /** + * Decode the input {@link ByteBuf} into one or more {@link Message}s. + * @param inputBuffer the input buffer to decode from + * @return 0 or more decoded messages + */ + Collection> decode(ByteBuf inputBuffer); + + /** + * Encode the given {@link Message} to the output {@link ByteBuf}. + * @param message the message the encode + * @param outputBuffer the buffer to write to + */ + void encode(Message

message, ByteBuf outputBuffer); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 3617ddb2b22..b65890048a3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,13 +16,18 @@ package org.springframework.messaging.tcp.reactor; +import java.util.Collection; +import java.util.List; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.concurrent.ImmediateEventExecutor; import org.reactivestreams.Publisher; import reactor.core.publisher.DirectProcessor; @@ -39,6 +44,7 @@ import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.tcp.TcpClient; import reactor.util.concurrent.QueueSupplier; +import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; @@ -170,6 +176,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.connectionHandler = handler; } + @SuppressWarnings("unchecked") @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { @@ -177,10 +184,11 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.receive() - .map(codec.getDecoder()) + inbound.context().addDecoder(new StompMessageDecoder<>(codec)); + + inbound.receiveObject() + .cast(Message.class) .publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE) - .flatMapIterable(Function.identity()) .subscribe( connectionHandler::handleMessage, connectionHandler::handleFailure, @@ -190,4 +198,19 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } } + private static class StompMessageDecoder

extends ByteToMessageDecoder { + + private final ReactorNettyCodec

codec; + + public StompMessageDecoder(ReactorNettyCodec

codec) { + this.codec = codec; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + Collection> messages = codec.decode(in); + out.addAll(messages); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 9cb0caa2a38..13245c1e2dd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -58,7 +58,7 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override public ListenableFuture send(Message

message) { ByteBuf byteBuf = this.outbound.alloc().buffer(); - this.codec.getEncoder().accept(byteBuf, message); + this.codec.encode(message, byteBuf); Mono sendCompletion = this.outbound.send(Mono.just(byteBuf)).then(); return new MonoToListenableFutureAdapter<>(sendCompletion); }