Browse Source

Merge branch '5.2.x' into master

pull/23650/merge
Rossen Stoyanchev 5 years ago
parent
commit
eec6ec8f44
  1. 1
      build.gradle
  2. 191
      spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
  3. 16
      spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

1
build.gradle

@ -292,6 +292,7 @@ configure(allprojects) { project -> @@ -292,6 +292,7 @@ configure(allprojects) { project ->
mavenCentral()
maven { url "https://repo.spring.io/libs-spring-framework-build" }
maven { url "https://repo.spring.io/snapshot" } // Reactor
maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket
}
}
configurations.all {

191
spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

@ -21,21 +21,20 @@ import java.nio.charset.Charset; @@ -21,21 +21,20 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.LimitedDataBufferList;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
@ -45,12 +44,12 @@ import org.springframework.util.MimeType; @@ -45,12 +44,12 @@ import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder
* realigns the incoming data buffers so that each buffer ends with a newline.
* This is to make sure that multibyte characters are decoded properly, and do not cross buffer
* boundaries. The default delimiters ({@code \n}, {@code \r\n})can be customized.
*
* <p>Partially inspired by Netty's {@code DelimiterBasedFrameDecoder}.
* Decode from a data buffer stream to a {@code String} stream, either splitting
* or aggregating incoming data chunks to realign along newlines delimiters
* and produce a stream of strings. This is useful for streaming but is also
* necessary to ensure that that multibyte characters can be decoded correctly,
* avoiding split-character issues. The default delimiters used by default are
* {@code \n} and {@code \r\n} but that can be customized.
*
* @author Sebastien Deleuze
* @author Brian Clozel
@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -115,21 +114,22 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
byte[][] delimiterBytes = getDelimiterBytes(mimeType);
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
LimitChecker limiter = new LimitChecker(getMaxInMemorySize());
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(list -> joinAndStrip(list, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
});
return super.decode(inputFlux, elementType, mimeType, hints);
LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize());
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks))
.concatWith(Mono.defer(() -> {
if (chunks.isEmpty()) {
return Mono.empty();
}
DataBuffer lastBuffer = chunks.get(0).factory().join(chunks);
chunks.clear();
return Mono.just(lastBuffer);
}))
.doOnTerminate(chunks::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)
.map(buffer -> decode(buffer, elementType, mimeType, hints));
}
private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) {
@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -142,6 +142,43 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
});
}
private Collection<DataBuffer> processDataBuffer(
DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) {
try {
List<DataBuffer> result = null;
do {
int endIndex = matcher.match(buffer);
if (endIndex == -1) {
chunks.add(buffer);
DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException)
break;
}
int startIndex = buffer.readPosition();
int length = (endIndex - startIndex + 1);
DataBuffer slice = buffer.retainedSlice(startIndex, length);
if (this.stripDelimiter) {
slice.writePosition(slice.writePosition() - matcher.delimiter().length);
}
result = (result != null ? result : new ArrayList<>());
if (chunks.isEmpty()) {
result.add(slice);
}
else {
chunks.add(slice);
result.add(buffer.factory().join(chunks));
chunks.clear();
}
buffer.readPosition(endIndex + 1);
}
while (buffer.readableByteCount() > 0);
return (result != null ? result : Collections.emptyList());
}
finally {
DataBufferUtils.release(buffer);
}
}
@Override
public String decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -166,68 +203,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
}
}
/**
* Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it.
* @param dataBuffer the buffer to find delimiters in
* @param matcher used to find the first delimiters
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
* results in memory leaks due to pre-fetching.
*/
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) {
List<DataBuffer> result = new ArrayList<>();
try {
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = (endIdx - readPosition + 1);
DataBuffer slice = dataBuffer.retainedSlice(readPosition, length);
result.add(slice);
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
break;
}
}
while (dataBuffer.readableByteCount() > 0);
}
finally {
DataBufferUtils.release(dataBuffer);
}
return result;
}
/**
* Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is
* removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with
* a delimiter, it is removed.
* @param dataBuffers the data buffers to join
* @param stripDelimiter whether to strip the delimiter
* @return the joined buffer
*/
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers, boolean stripDelimiter) {
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty");
byte[] matchingDelimiter = null;
int lastIdx = dataBuffers.size() - 1;
DataBuffer lastBuffer = dataBuffers.get(lastIdx);
if (lastBuffer instanceof EndFrameBuffer) {
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter();
dataBuffers.remove(lastIdx);
}
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers);
if (stripDelimiter && matchingDelimiter != null) {
result.writePosition(result.writePosition() - matchingDelimiter.length);
}
return result;
}
/**
* Create a {@code StringDecoder} for {@code "text/plain"}.
* @param stripDelimiter this flag is ignored
@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -285,46 +260,4 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
}
private static class EndFrameBuffer extends DataBufferWrapper {
private static final DataBuffer BUFFER = DefaultDataBufferFactory.sharedInstance.wrap(new byte[0]);
private final byte[] delimiter;
public EndFrameBuffer(byte[] delimiter) {
super(BUFFER);
this.delimiter = delimiter;
}
public byte[] delimiter() {
return this.delimiter;
}
}
private static class LimitChecker implements Consumer<DataBuffer> {
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private final LimitedDataBufferList list;
LimitChecker(int maxInMemorySize) {
this.list = new LimitedDataBufferList(maxInMemorySize);
}
@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.list.clear();
}
try {
this.list.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
}
}

16
spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> { @@ -139,20 +139,28 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
@Test
void maxInMemoryLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n"));
stringBuffer("abc\n"), stringBuffer("defg\n"),
stringBuffer("hi"), stringBuffer("jkl"), stringBuffer("mnop"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class));
}
@Test // gh-24312
void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() {
@Test
void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n"));
this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class));
testDecode(input, String.class, step -> step
.expectNext("TOO MUCH DATA")
.expectNext("another line")
.expectNext("")
.expectNext("and another")
.expectComplete()
.verify());
}
@Test // gh-24339

Loading…
Cancel
Save