diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index c2bf9620e11..48c0da67699 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -21,21 +21,20 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DataBufferWrapper; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.LimitedDataBufferList; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; @@ -45,12 +44,12 @@ import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; /** - * Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder - * realigns the incoming data buffers so that each buffer ends with a newline. - * This is to make sure that multibyte characters are decoded properly, and do not cross buffer - * boundaries. The default delimiters ({@code \n}, {@code \r\n})can be customized. - * - *

Partially inspired by Netty's {@code DelimiterBasedFrameDecoder}. + * Decode from a data buffer stream to a {@code String} stream, either splitting + * or aggregating incoming data chunks to realign along newlines delimiters + * and produce a stream of strings. This is useful for streaming but is also + * necessary to ensure that that multibyte characters can be decoded correctly, + * avoiding split-character issues. The default delimiters used by default are + * {@code \n} and {@code \r\n} but that can be customized. * * @author Sebastien Deleuze * @author Brian Clozel @@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder { byte[][] delimiterBytes = getDelimiterBytes(mimeType); - Flux inputFlux = Flux.defer(() -> { - DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - LimitChecker limiter = new LimitChecker(getMaxInMemorySize()); - - return Flux.from(input) - .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)) - .doOnNext(limiter) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(list -> joinAndStrip(list, this.stripDelimiter)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); - }); - - return super.decode(inputFlux, elementType, mimeType, hints); + LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); + + return Flux.from(input) + .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) + .concatWith(Mono.defer(() -> { + if (chunks.isEmpty()) { + return Mono.empty(); + } + DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); + chunks.clear(); + return Mono.just(lastBuffer); + })) + .doOnTerminate(chunks::releaseAndClear) + .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release) + .map(buffer -> decode(buffer, elementType, mimeType, hints)); } private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { @@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder { }); } + private Collection processDataBuffer( + DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { + + try { + List result = null; + do { + int endIndex = matcher.match(buffer); + if (endIndex == -1) { + chunks.add(buffer); + DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException) + break; + } + int startIndex = buffer.readPosition(); + int length = (endIndex - startIndex + 1); + DataBuffer slice = buffer.retainedSlice(startIndex, length); + if (this.stripDelimiter) { + slice.writePosition(slice.writePosition() - matcher.delimiter().length); + } + result = (result != null ? result : new ArrayList<>()); + if (chunks.isEmpty()) { + result.add(slice); + } + else { + chunks.add(slice); + result.add(buffer.factory().join(chunks)); + chunks.clear(); + } + buffer.readPosition(endIndex + 1); + } + while (buffer.readableByteCount() > 0); + return (result != null ? result : Collections.emptyList()); + } + finally { + DataBufferUtils.release(buffer); + } + } + @Override public String decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { @@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder { } } - /** - * Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it. - * @param dataBuffer the buffer to find delimiters in - * @param matcher used to find the first delimiters - * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was - * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) - * results in memory leaks due to pre-fetching. - */ - private static List endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) { - List result = new ArrayList<>(); - try { - do { - int endIdx = matcher.match(dataBuffer); - if (endIdx != -1) { - int readPosition = dataBuffer.readPosition(); - int length = (endIdx - readPosition + 1); - DataBuffer slice = dataBuffer.retainedSlice(readPosition, length); - result.add(slice); - result.add(new EndFrameBuffer(matcher.delimiter())); - dataBuffer.readPosition(endIdx + 1); - } - else { - result.add(DataBufferUtils.retain(dataBuffer)); - break; - } - } - while (dataBuffer.readableByteCount() > 0); - } - finally { - DataBufferUtils.release(dataBuffer); - } - return result; - } - - /** - * Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is - * removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with - * a delimiter, it is removed. - * @param dataBuffers the data buffers to join - * @param stripDelimiter whether to strip the delimiter - * @return the joined buffer - */ - private static DataBuffer joinAndStrip(List dataBuffers, boolean stripDelimiter) { - Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); - - byte[] matchingDelimiter = null; - - int lastIdx = dataBuffers.size() - 1; - DataBuffer lastBuffer = dataBuffers.get(lastIdx); - if (lastBuffer instanceof EndFrameBuffer) { - matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); - dataBuffers.remove(lastIdx); - } - - DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); - if (stripDelimiter && matchingDelimiter != null) { - result.writePosition(result.writePosition() - matchingDelimiter.length); - } - return result; - } - - /** * Create a {@code StringDecoder} for {@code "text/plain"}. * @param stripDelimiter this flag is ignored @@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder { new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL); } - - private static class EndFrameBuffer extends DataBufferWrapper { - - private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]); - - private final byte[] delimiter; - - public EndFrameBuffer(byte[] delimiter) { - super(BUFFER); - this.delimiter = delimiter; - } - - public byte[] delimiter() { - return this.delimiter; - } - } - - - private static class LimitChecker implements Consumer { - - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final LimitedDataBufferList list; - - LimitChecker(int maxInMemorySize) { - this.list = new LimitedDataBufferList(maxInMemorySize); - } - - @Override - public void accept(DataBuffer buffer) { - if (buffer instanceof EndFrameBuffer) { - this.list.clear(); - } - try { - this.list.add(buffer); - } - catch (DataBufferLimitException ex) { - DataBufferUtils.release(buffer); - throw ex; - } - } - } - } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index b3bcdfe8f7a..a41e1fdc629 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests { @Test void maxInMemoryLimit() { Flux input = Flux.just( - stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n")); + stringBuffer("abc\n"), stringBuffer("defg\n"), + stringBuffer("hi"), stringBuffer("jkl"), stringBuffer("mnop")); this.decoder.setMaxInMemorySize(5); testDecode(input, String.class, step -> step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class)); } - @Test // gh-24312 - void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() { + @Test + void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() { Flux input = Flux.just( stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n")); this.decoder.setMaxInMemorySize(5); - testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class)); + + testDecode(input, String.class, step -> step + .expectNext("TOO MUCH DATA") + .expectNext("another line") + .expectNext("") + .expectNext("and another") + .expectComplete() + .verify()); } @Test // gh-24339