From db9e0b0ccb756378093d58635922c8d17aeb0815 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 22 Oct 2020 21:28:06 +0100 Subject: [PATCH] Refactor StringDecoder Simplify and optimize the processing of the input stream. The existing implementation was using bufferUntil and creating a List for every line along with an EndFrameBuffer inserted for the bufferUntil predicate. So the larger the input buffer and the more lines it contained, the greater the overhead. The new implementation avoids bufferUntil for all lines and instead uses concatMapIterable to aggregate the lines from a buffer into a single list. So the larger the input buffer and the more lines it contains, the better the throughput. The only buffering used then is for partial chunks and those are accumulated in a list. See gh-25915 --- .../core/codec/StringDecoder.java | 191 ++++++------------ .../core/codec/StringDecoderTests.java | 16 +- 2 files changed, 74 insertions(+), 133 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index c2bf9620e11..48c0da67699 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -21,21 +21,20 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DataBufferWrapper; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.LimitedDataBufferList; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; @@ -45,12 +44,12 @@ import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; /** - * Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder - * realigns the incoming data buffers so that each buffer ends with a newline. - * This is to make sure that multibyte characters are decoded properly, and do not cross buffer - * boundaries. The default delimiters ({@code \n}, {@code \r\n})can be customized. - * - *

Partially inspired by Netty's {@code DelimiterBasedFrameDecoder}. + * Decode from a data buffer stream to a {@code String} stream, either splitting + * or aggregating incoming data chunks to realign along newlines delimiters + * and produce a stream of strings. This is useful for streaming but is also + * necessary to ensure that that multibyte characters can be decoded correctly, + * avoiding split-character issues. The default delimiters used by default are + * {@code \n} and {@code \r\n} but that can be customized. * * @author Sebastien Deleuze * @author Brian Clozel @@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder { byte[][] delimiterBytes = getDelimiterBytes(mimeType); - Flux inputFlux = Flux.defer(() -> { - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - LimitChecker limiter = new LimitChecker(getMaxInMemorySize()); - - return Flux.from(input) - .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)) - .doOnNext(limiter) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(list -> joinAndStrip(list, this.stripDelimiter)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); - }); - - return super.decode(inputFlux, elementType, mimeType, hints); + LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); + + return Flux.from(input) + .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) + .concatWith(Mono.defer(() -> { + if (chunks.isEmpty()) { + return Mono.empty(); + } + DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); + chunks.clear(); + return Mono.just(lastBuffer); + })) + .doOnTerminate(chunks::releaseAndClear) + .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release) + .map(buffer -> decode(buffer, elementType, mimeType, hints)); } private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { @@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder { }); } + private Collection processDataBuffer( + DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { + + try { + List result = null; + do { + int endIndex = matcher.match(buffer); + if (endIndex == -1) { + chunks.add(buffer); + DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException) + break; + } + int startIndex = buffer.readPosition(); + int length = (endIndex - startIndex + 1); + DataBuffer slice = buffer.retainedSlice(startIndex, length); + if (this.stripDelimiter) { + slice.writePosition(slice.writePosition() - matcher.delimiter().length); + } + result = (result != null ? result : new ArrayList<>()); + if (chunks.isEmpty()) { + result.add(slice); + } + else { + chunks.add(slice); + result.add(buffer.factory().join(chunks)); + chunks.clear(); + } + buffer.readPosition(endIndex + 1); + } + while (buffer.readableByteCount() > 0); + return (result != null ? result : Collections.emptyList()); + } + finally { + DataBufferUtils.release(buffer); + } + } + @Override public String decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { @@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder { } } - /** - * Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it. - * @param dataBuffer the buffer to find delimiters in - * @param matcher used to find the first delimiters - * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was - * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) - * results in memory leaks due to pre-fetching. - */ - private static List endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) { - List result = new ArrayList<>(); - try { - do { - int endIdx = matcher.match(dataBuffer); - if (endIdx != -1) { - int readPosition = dataBuffer.readPosition(); - int length = (endIdx - readPosition + 1); - DataBuffer slice = dataBuffer.retainedSlice(readPosition, length); - result.add(slice); - result.add(new EndFrameBuffer(matcher.delimiter())); - dataBuffer.readPosition(endIdx + 1); - } - else { - result.add(DataBufferUtils.retain(dataBuffer)); - break; - } - } - while (dataBuffer.readableByteCount() > 0); - } - finally { - DataBufferUtils.release(dataBuffer); - } - return result; - } - - /** - * Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is - * removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with - * a delimiter, it is removed. - * @param dataBuffers the data buffers to join - * @param stripDelimiter whether to strip the delimiter - * @return the joined buffer - */ - private static DataBuffer joinAndStrip(List dataBuffers, boolean stripDelimiter) { - Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); - - byte[] matchingDelimiter = null; - - int lastIdx = dataBuffers.size() - 1; - DataBuffer lastBuffer = dataBuffers.get(lastIdx); - if (lastBuffer instanceof EndFrameBuffer) { - matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); - dataBuffers.remove(lastIdx); - } - - DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); - if (stripDelimiter && matchingDelimiter != null) { - result.writePosition(result.writePosition() - matchingDelimiter.length); - } - return result; - } - - /** * Create a {@code StringDecoder} for {@code "text/plain"}. * @param stripDelimiter this flag is ignored @@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder { new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL); } - - private static class EndFrameBuffer extends DataBufferWrapper { - - private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]); - - private final byte[] delimiter; - - public EndFrameBuffer(byte[] delimiter) { - super(BUFFER); - this.delimiter = delimiter; - } - - public byte[] delimiter() { - return this.delimiter; - } - } - - - private static class LimitChecker implements Consumer { - - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final LimitedDataBufferList list; - - LimitChecker(int maxInMemorySize) { - this.list = new LimitedDataBufferList(maxInMemorySize); - } - - @Override - public void accept(DataBuffer buffer) { - if (buffer instanceof EndFrameBuffer) { - this.list.clear(); - } - try { - this.list.add(buffer); - } - catch (DataBufferLimitException ex) { - DataBufferUtils.release(buffer); - throw ex; - } - } - } - } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index b3bcdfe8f7a..a41e1fdc629 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests { @Test void maxInMemoryLimit() { Flux input = Flux.just( - stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n")); + stringBuffer("abc\n"), stringBuffer("defg\n"), + stringBuffer("hi"), stringBuffer("jkl"), stringBuffer("mnop")); this.decoder.setMaxInMemorySize(5); testDecode(input, String.class, step -> step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class)); } - @Test // gh-24312 - void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() { + @Test + void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() { Flux input = Flux.just( stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n")); this.decoder.setMaxInMemorySize(5); - testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class)); + + testDecode(input, String.class, step -> step + .expectNext("TOO MUCH DATA") + .expectNext("another line") + .expectNext("") + .expectNext("and another") + .expectComplete() + .verify()); } @Test // gh-24339