From ea274ebc0a355ffb499791a9abf1618b829c5edd Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 28 Dec 2016 15:43:48 -0500 Subject: [PATCH] Fix decoding issue in Reactor TcpClient When decoding STOMP messages unread portions of a given input ByteBuf must be kept until more input is received and the next complete STOMP frame can be parsed. In Reactor Net 2.x this was handled for us through the "remainder" field in NettyChannelHandlerBridge. The Reactor Netty 0.6 upgrade however applied only a simple map operator on the input ByteBuf after which the buffer is relased. This commit replaces the use of a simple map operator for decoding and installs a ByteToMessageDecoder in the Netty channel pipeline which has a built-in ability to preserve and merge unread input into subsequent input buffers. --- .../simp/stomp/StompReactorNettyCodec.java | 32 ++++++++--- .../AbstractNioBufferReactorNettyCodec.java | 53 +++++++++++++++++++ .../tcp/reactor/ReactorNettyCodec.java | 39 ++++++-------- .../tcp/reactor/ReactorNettyTcpClient.java | 29 ++++++++-- .../reactor/ReactorNettyTcpConnection.java | 2 +- 5 files changed, 120 insertions(+), 35 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java index 8b36d4bce40..c59d1379cee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompReactorNettyCodec.java @@ -15,19 +15,27 @@ */ package org.springframework.messaging.simp.stomp; -import org.springframework.messaging.tcp.reactor.ReactorNettyCodec; +import java.nio.ByteBuffer; +import java.util.List; + +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCodec; /** - * {@code ReactorNettyCodec} that delegates to {@link StompDecoder} and - * {@link StompEncoder}. + * Simple delegation to StompDecoder and StompEncoder. * * @author Rossen Stoyanchev * @since 5.0 */ -class StompReactorNettyCodec extends ReactorNettyCodec { +class StompReactorNettyCodec extends AbstractNioBufferReactorNettyCodec { + + private final StompDecoder decoder; + + private final StompEncoder encoder; + public StompReactorNettyCodec() { - this(new StompDecoder(), new StompEncoder()); + this(new StompDecoder()); } public StompReactorNettyCodec(StompDecoder decoder) { @@ -35,8 +43,18 @@ class StompReactorNettyCodec extends ReactorNettyCodec { } public StompReactorNettyCodec(StompDecoder decoder, StompEncoder encoder) { - super(byteBuf -> decoder.decode(byteBuf.nioBuffer()), - (byteBuf, message) -> byteBuf.writeBytes(encoder.encode(message))); + this.decoder = decoder; + this.encoder = encoder; + } + + + @Override + protected List> decodeInternal(ByteBuffer nioBuffer) { + return this.decoder.decode(nioBuffer); + } + + protected ByteBuffer encodeInternal(Message message) { + return ByteBuffer.wrap(this.encoder.encode(message)); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java new file mode 100644 index 00000000000..aeef66482a7 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractNioBufferReactorNettyCodec.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.tcp.reactor; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.springframework.messaging.Message; + +/** + * Convenient base class for {@link ReactorNettyCodec} implementations that need + * to work with NIO {@link ByteBuffer}s. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class AbstractNioBufferReactorNettyCodec

implements ReactorNettyCodec

{ + + @Override + public Collection> decode(ByteBuf inputBuffer) { + ByteBuffer nioBuffer = inputBuffer.nioBuffer(); + int start = nioBuffer.position(); + List> messages = decodeInternal(nioBuffer); + inputBuffer.skipBytes(nioBuffer.position() - start); + return messages; + } + + protected abstract List> decodeInternal(ByteBuffer nioBuffer); + + @Override + public void encode(Message

message, ByteBuf outputBuffer) { + outputBuffer.writeBytes(encodeInternal(message)); + } + + protected abstract ByteBuffer encodeInternal(Message

message); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java index 3254882a94b..529dd7ea002 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyCodec.java @@ -22,7 +22,6 @@ import java.util.function.Function; import io.netty.buffer.ByteBuf; import org.springframework.messaging.Message; -import org.springframework.util.Assert; /** * Simple holder for a decoding {@link Function} and an encoding @@ -31,28 +30,20 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 5.0 */ -public class ReactorNettyCodec

{ - - private final Function>> decoder; - - private final BiConsumer> encoder; - - - public ReactorNettyCodec(Function>> decoder, - BiConsumer> encoder) { - - Assert.notNull(decoder, "'decoder' is required"); - Assert.notNull(encoder, "'encoder' is required"); - this.decoder = decoder; - this.encoder = encoder; - } - - public Function>> getDecoder() { - return this.decoder; - } - - public BiConsumer> getEncoder() { - return this.encoder; - } +public interface ReactorNettyCodec

{ + + /** + * Decode the input {@link ByteBuf} into one or more {@link Message}s. + * @param inputBuffer the input buffer to decode from + * @return 0 or more decoded messages + */ + Collection> decode(ByteBuf inputBuffer); + + /** + * Encode the given {@link Message} to the output {@link ByteBuf}. + * @param message the message the encode + * @param outputBuffer the buffer to write to + */ + void encode(Message

message, ByteBuf outputBuffer); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 3617ddb2b22..b65890048a3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,13 +16,18 @@ package org.springframework.messaging.tcp.reactor; +import java.util.Collection; +import java.util.List; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.concurrent.ImmediateEventExecutor; import org.reactivestreams.Publisher; import reactor.core.publisher.DirectProcessor; @@ -39,6 +44,7 @@ import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.tcp.TcpClient; import reactor.util.concurrent.QueueSupplier; +import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; @@ -170,6 +176,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ this.connectionHandler = handler; } + @SuppressWarnings("unchecked") @Override public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { @@ -177,10 +184,11 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.receive() - .map(codec.getDecoder()) + inbound.context().addDecoder(new StompMessageDecoder<>(codec)); + + inbound.receiveObject() + .cast(Message.class) .publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE) - .flatMapIterable(Function.identity()) .subscribe( connectionHandler::handleMessage, connectionHandler::handleFailure, @@ -190,4 +198,19 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } } + private static class StompMessageDecoder

extends ByteToMessageDecoder { + + private final ReactorNettyCodec

codec; + + public StompMessageDecoder(ReactorNettyCodec

codec) { + this.codec = codec; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + Collection> messages = codec.decode(in); + out.addAll(messages); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 9cb0caa2a38..13245c1e2dd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -58,7 +58,7 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override public ListenableFuture send(Message

message) { ByteBuf byteBuf = this.outbound.alloc().buffer(); - this.codec.getEncoder().accept(byteBuf, message); + this.codec.encode(message, byteBuf); Mono sendCompletion = this.outbound.send(Mono.just(byteBuf)).then(); return new MonoToListenableFutureAdapter<>(sendCompletion); }