|
|
|
@ -32,7 +32,6 @@ import reactor.core.publisher.Flux; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.core.ResolvableType; |
|
|
|
import org.springframework.core.ResolvableType; |
|
|
|
import org.springframework.core.io.buffer.DataBuffer; |
|
|
|
import org.springframework.core.io.buffer.DataBuffer; |
|
|
|
import org.springframework.core.io.buffer.DataBufferLimitException; |
|
|
|
|
|
|
|
import org.springframework.core.io.buffer.DataBufferUtils; |
|
|
|
import org.springframework.core.io.buffer.DataBufferUtils; |
|
|
|
import org.springframework.core.io.buffer.DataBufferWrapper; |
|
|
|
import org.springframework.core.io.buffer.DataBufferWrapper; |
|
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
|
|
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory; |
|
|
|
@ -96,42 +95,25 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { |
|
|
|
|
|
|
|
|
|
|
|
Flux<DataBuffer> inputFlux = Flux.defer(() -> { |
|
|
|
Flux<DataBuffer> inputFlux = Flux.defer(() -> { |
|
|
|
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); |
|
|
|
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); |
|
|
|
if (getMaxInMemorySize() != -1) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer
|
|
|
|
|
|
|
|
// containing multiple lines, the limit is checked and raised immediately without accumulating
|
|
|
|
|
|
|
|
// subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard.
|
|
|
|
|
|
|
|
// When reactor-core#1925 is resolved, we could replace bufferUntil with:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// .windowUntil(buffer -> buffer instanceof EndFrameBuffer)
|
|
|
|
Flux<DataBuffer> buffers = Flux.from(input) |
|
|
|
// .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add))
|
|
|
|
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)); |
|
|
|
|
|
|
|
|
|
|
|
LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize()); |
|
|
|
Flux<List<DataBuffer>> delimitedBuffers; |
|
|
|
|
|
|
|
if (getMaxInMemorySize() != -1) { |
|
|
|
return Flux.from(input) |
|
|
|
delimitedBuffers = buffers |
|
|
|
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher, limiter)) |
|
|
|
.windowUntil(buffer -> buffer instanceof EndFrameBuffer) |
|
|
|
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer) |
|
|
|
.concatMap(window -> window.collect( |
|
|
|
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) |
|
|
|
() -> new LimitedDataBufferList(getMaxInMemorySize()), |
|
|
|
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); |
|
|
|
LimitedDataBufferList::add)); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
|
|
|
|
delimitedBuffers = buffers.bufferUntil(buffer -> buffer instanceof EndFrameBuffer); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not
|
|
|
|
return delimitedBuffers |
|
|
|
// be released if cancel is signalled before they are turned into String lines
|
|
|
|
.map(list -> joinAndStrip(list, this.stripDelimiter)) |
|
|
|
// (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited).
|
|
|
|
|
|
|
|
// When reactor-core#1925 is resolved, the workaround can be removed and the entire
|
|
|
|
|
|
|
|
// else clause possibly dropped.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Flux.from(input) |
|
|
|
|
|
|
|
.concatMapIterable(buffer -> cache.addAll(endFrameAfterDelimiter(buffer, matcher, null))) |
|
|
|
|
|
|
|
.doOnNext(cache) |
|
|
|
|
|
|
|
.doOnCancel(cache) |
|
|
|
|
|
|
|
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer) |
|
|
|
|
|
|
|
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) |
|
|
|
|
|
|
|
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); |
|
|
|
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); |
|
|
|
} |
|
|
|
|
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
return super.decode(inputFlux, elementType, mimeType, hints); |
|
|
|
return super.decode(inputFlux, elementType, mimeType, hints); |
|
|
|
@ -176,14 +158,11 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { |
|
|
|
* |
|
|
|
* |
|
|
|
* @param dataBuffer the buffer to find delimiters in |
|
|
|
* @param dataBuffer the buffer to find delimiters in |
|
|
|
* @param matcher used to find the first delimiters |
|
|
|
* @param matcher used to find the first delimiters |
|
|
|
* @param limiter to enforce maxInMemorySize with |
|
|
|
|
|
|
|
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was |
|
|
|
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was |
|
|
|
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) |
|
|
|
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) |
|
|
|
* results in memory leaks due to pre-fetching. |
|
|
|
* results in memory leaks due to pre-fetching. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private static List<DataBuffer> endFrameAfterDelimiter( |
|
|
|
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) { |
|
|
|
DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, @Nullable LimitedDataBufferList limiter) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
List<DataBuffer> result = new ArrayList<>(); |
|
|
|
List<DataBuffer> result = new ArrayList<>(); |
|
|
|
try { |
|
|
|
try { |
|
|
|
do { |
|
|
|
do { |
|
|
|
@ -195,27 +174,14 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { |
|
|
|
result.add(slice); |
|
|
|
result.add(slice); |
|
|
|
result.add(new EndFrameBuffer(matcher.delimiter())); |
|
|
|
result.add(new EndFrameBuffer(matcher.delimiter())); |
|
|
|
dataBuffer.readPosition(endIdx + 1); |
|
|
|
dataBuffer.readPosition(endIdx + 1); |
|
|
|
if (limiter != null) { |
|
|
|
|
|
|
|
limiter.add(slice); // enforce the limit
|
|
|
|
|
|
|
|
limiter.clear(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
|
result.add(DataBufferUtils.retain(dataBuffer)); |
|
|
|
result.add(DataBufferUtils.retain(dataBuffer)); |
|
|
|
if (limiter != null) { |
|
|
|
|
|
|
|
limiter.add(dataBuffer); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
while (dataBuffer.readableByteCount() > 0); |
|
|
|
while (dataBuffer.readableByteCount() > 0); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (DataBufferLimitException ex) { |
|
|
|
|
|
|
|
if (limiter != null) { |
|
|
|
|
|
|
|
limiter.releaseAndClear(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
throw ex; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
finally { |
|
|
|
finally { |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -230,9 +196,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { |
|
|
|
* @param stripDelimiter whether to strip the delimiter |
|
|
|
* @param stripDelimiter whether to strip the delimiter |
|
|
|
* @return the joined buffer |
|
|
|
* @return the joined buffer |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers, |
|
|
|
private static DataBuffer joinAndStrip(List<DataBuffer> dataBuffers, boolean stripDelimiter) { |
|
|
|
boolean stripDelimiter) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); |
|
|
|
Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); |
|
|
|
|
|
|
|
|
|
|
|
byte[] matchingDelimiter = null; |
|
|
|
byte[] matchingDelimiter = null; |
|
|
|
@ -241,7 +205,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { |
|
|
|
DataBuffer lastBuffer = dataBuffers.get(lastIdx); |
|
|
|
DataBuffer lastBuffer = dataBuffers.get(lastIdx); |
|
|
|
if (lastBuffer instanceof EndFrameBuffer) { |
|
|
|
if (lastBuffer instanceof EndFrameBuffer) { |
|
|
|
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); |
|
|
|
matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); |
|
|
|
dataBuffers.remove(lastIdx); |
|
|
|
dataBuffers = dataBuffers.subList(0, lastIdx); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); |
|
|
|
DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); |
|
|
|
|