Browse Source

Refactor StringDecoder

Simplify and optimize the processing of the input stream.

The existing implementation was using bufferUntil and creating a List
for every line along with an EndFrameBuffer inserted for the
bufferUntil predicate. So the larger the input buffer and the more
lines it contained, the greater the overhead.

The new implementation avoids bufferUntil for all lines and
instead uses concatMapIterable to aggregate the lines from a buffer
into a single list. So the larger the input buffer and the more
lines it contains, the better the throughput. The only buffering
used then is for partial chunks and those are accumulated in a list.

See gh-25915
pull/26273/head
Rossen Stoyanchev 5 years ago
parent
commit
db9e0b0ccb
  1. 191
      spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
  2. 16
      spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

191
spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

@ -21,21 +21,20 @@ import java.nio.charset.Charset; @@ -21,21 +21,20 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.LimitedDataBufferList;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
@ -45,12 +44,12 @@ import org.springframework.util.MimeType; @@ -45,12 +44,12 @@ import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder
* realigns the incoming data buffers so that each buffer ends with a newline.
* This is to make sure that multibyte characters are decoded properly, and do not cross buffer
* boundaries. The default delimiters ({@code \n}, {@code \r\n})can be customized.
*
* <p>Partially inspired by Netty's {@code DelimiterBasedFrameDecoder}.
* Decode from a data buffer stream to a {@code String} stream, either splitting
* or aggregating incoming data chunks to realign along newlines delimiters
* and produce a stream of strings. This is useful for streaming but is also
* necessary to ensure that that multibyte characters can be decoded correctly,
* avoiding split-character issues. The default delimiters used by default are
* {@code \n} and {@code \r\n} but that can be customized.
*
* @author Sebastien Deleuze
* @author Brian Clozel
@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
LimitChecker limiter = new LimitChecker(getMaxInMemorySize());
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(list -> joinAndStrip(list, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
return super.decode(inputFlux, elementType, mimeType, hints);
LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize());
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks))
.concatWith(Mono.defer(() -> {
if (chunks.isEmpty()) {
return Mono.empty();
}
DataBuffer lastBuffer = chunks.get(0).factory().join(chunks);
chunks.clear();
return Mono.just(lastBuffer);
}))
.doOnTerminate(chunks::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)
.map(buffer -> decode(buffer, elementType, mimeType, hints));
}
private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) {
@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
});
}
private Collection<DataBuffer> processDataBuffer(
DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) {
try {
List<DataBuffer> result = null;
do {
int endIndex = matcher.match(buffer);
if (endIndex == -1) {
chunks.add(buffer);
DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException)
break;
}
int startIndex = buffer.readPosition();
int length = (endIndex - startIndex + 1);
DataBuffer slice = buffer.retainedSlice(startIndex, length);
if (this.stripDelimiter) {
slice.writePosition(slice.writePosition() - matcher.delimiter().length);
}
result = (result != null ? result : new ArrayList<>());
if (chunks.isEmpty()) {
result.add(slice);
}
else {
chunks.add(slice);
result.add(buffer.factory().join(chunks));
chunks.clear();
}
buffer.readPosition(endIndex + 1);
}
while (buffer.readableByteCount() > 0);
return (result != null ? result : Collections.emptyList());
}
finally {
DataBufferUtils.release(buffer);
}
}
@Override
public String decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
}
}
/**
* Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it.
* @param dataBuffer the buffer to find delimiters in
* @param matcher used to find the first delimiters
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
* results in memory leaks due to pre-fetching.
*/
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) {
List<DataBuffer> result = new ArrayList<>();
try {
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = (endIdx - readPosition + 1);
DataBuffer slice = dataBuffer.retainedSlice(readPosition, length);
result.add(slice);
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
break;
}
}
while (dataBuffer.readableByteCount() > 0);
}
finally {
DataBufferUtils.release(dataBuffer);
}
return result;
}
/**
* Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is
* removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with
* a delimiter, it is removed.
* @param dataBuffers the data buffers to join
* @param stripDelimiter whether to strip the delimiter
* @return the joined buffer
*/
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers, boolean stripDelimiter) {
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty");
byte[] matchingDelimiter = null;
int lastIdx = dataBuffers.size() - 1;
DataBuffer lastBuffer = dataBuffers.get(lastIdx);
if (lastBuffer instanceof EndFrameBuffer) {
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter();
dataBuffers.remove(lastIdx);
}
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
if (stripDelimiter && matchingDelimiter != null) {
result.writePosition(result.writePosition() - matchingDelimiter.length);
}
return result;
}
/**
* Create a {@code StringDecoder} for {@code "text/plain"}.
* @param stripDelimiter this flag is ignored
@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
}
private static class EndFrameBuffer extends DataBufferWrapper {
private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]);
private final byte[] delimiter;
public EndFrameBuffer(byte[] delimiter) {
super(BUFFER);
this.delimiter = delimiter;
}
public byte[] delimiter() {
return this.delimiter;
}
}
private static class LimitChecker implements Consumer<DataBuffer> {
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private final LimitedDataBufferList list;
LimitChecker(int maxInMemorySize) {
this.list = new LimitedDataBufferList(maxInMemorySize);
}
@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.list.clear();
}
try {
this.list.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
}
}

16
spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> { @@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
@Test
void maxInMemoryLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n"));
stringBuffer("abc\n"), stringBuffer("defg\n"),
stringBuffer("hi"), stringBuffer("jkl"), stringBuffer("mnop"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class));
}
@Test // gh-24312
void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() {
@Test
void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class));
testDecode(input, String.class, step -> step
.expectNext("TOO MUCH DATA")
.expectNext("another line")
.expectNext("")
.expectNext("and another")
.expectComplete()
.verify());
}
@Test // gh-24339

Loading…
Cancel
Save