From 67e7c784e8097ee60db783a55bf1ab1ad1de529a Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 12 Jan 2018 10:35:34 +0100 Subject: [PATCH] Use DataBufferUtils.compose and remove writeAggregator Use DataBufferUtils.compose instead of writeAggregator to combine multiple data buffers into one, as the write aggregator would not work when the initial data buffer did not have enough capacity to contain all subsequent buffers. Removed writeAggregator, as it is no longer needed. Issue: SPR-16365 --- .../core/codec/AbstractDataBufferDecoder.java | 3 +-- .../core/io/buffer/DataBufferUtils.java | 16 ---------------- .../core/io/buffer/DataBufferUtilsTests.java | 15 --------------- .../http/codec/FormHttpMessageReader.java | 2 +- .../http/codec/xml/XmlEventDecoder.java | 2 +- .../SynchronossPartHttpMessageReaderTests.java | 2 +- .../reactive/MultipartIntegrationTests.java | 5 +---- .../function/client/DefaultWebClient.java | 3 +-- .../resource/AppCacheManifestTransformer.java | 6 ++++-- .../resource/ContentVersionStrategy.java | 7 +++++-- .../resource/CssLinkResourceTransformer.java | 6 ++++-- .../reactive/function/BodyInsertersTests.java | 3 +-- 12 files changed, 20 insertions(+), 50 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java index b395cb572b9..944bf6fca78 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java @@ -63,8 +63,7 @@ public abstract class AbstractDataBufferDecoder extends AbstractDecoder { public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream) - .reduce(DataBufferUtils.writeAggregator()) + return DataBufferUtils.compose(inputStream) .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 5c942573e37..f4e6dcb4773 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -419,22 +419,6 @@ public abstract class DataBufferUtils { return RELEASE_CONSUMER; } - /** - * Return an aggregator function that can be used to {@linkplain Flux#reduce(BiFunction) reduce} - * a {@code Flux} of data buffers into a single data buffer by writing all subsequent buffers - * into the first buffer. All buffers except the first buffer are - * {@linkplain #release(DataBuffer) released}. - *

For example: - *

-	 * Flux<DataBuffer> flux = ...
-	 * Mono<DataBuffer> mono = flux.reduce(DataBufferUtils.writeAggregator());
-	 * 
- * @see Flux#reduce(BiFunction) - */ - public static BinaryOperator writeAggregator() { - return WRITE_AGGREGATOR; - } - /** * Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on * the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index afa451a0a06..3c7dab56cd5 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -308,21 +308,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } } - @Test - public void writeAggregator() { - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - - DataBuffer result = - flux.reduce(DataBufferUtils.writeAggregator()).block(Duration.ofSeconds(1)); - - assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8)); - - release(result); - } - @Test public void SPR16070() throws Exception { ReadableByteChannel channel = mock(ReadableByteChannel.class); diff --git a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java index a5c67b3f65e..2be7b91c8f2 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java @@ -96,7 +96,7 @@ public class FormHttpMessageReader implements HttpMessageReader { CharBuffer charBuffer = charset.decode(buffer.asByteBuffer()); String body = charBuffer.toString(); diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index 836c0ae7643..4e7b24167d9 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -103,7 +103,7 @@ public class XmlEventDecoder extends AbstractDecoder { .doFinally(signalType -> aaltoMapper.endOfInput()); } else { - Mono singleBuffer = flux.reduce(DataBufferUtils.writeAggregator()); + Mono singleBuffer = DataBufferUtils.compose(flux); return singleBuffer. flatMapMany(dataBuffer -> { try { diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java index c4381fc5a0b..2502bb9b628 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java @@ -89,7 +89,7 @@ public class SynchronossPartHttpMessageReaderTests { assertTrue(part instanceof FilePart); assertEquals("fooPart", part.name()); assertEquals("foo.txt", ((FilePart) part).filename()); - DataBuffer buffer = part.content().reduce(DataBufferUtils.writeAggregator()).block(); + DataBuffer buffer = DataBufferUtils.compose(part.content()).block(); assertEquals(12, buffer.readableByteCount()); byte[] byteContent = new byte[12]; buffer.read(byteContent); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java index 59581046688..13a5df723a4 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java @@ -103,10 +103,7 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes assertEquals("fooPart", part.name()); assertTrue(part instanceof FilePart); assertEquals("foo.txt", ((FilePart) part).filename()); - DataBuffer buffer = part - .content() - .reduce(DataBufferUtils.writeAggregator()) - .block(); + DataBuffer buffer = DataBufferUtils.compose(part.content()).block(); assertEquals(12, buffer.readableByteCount()); byte[] byteContent = new byte[12]; buffer.read(byteContent); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 34f671bada3..5f5a123fbd2 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -457,8 +457,7 @@ class DefaultWebClient implements WebClient { private static Mono createResponseException(ClientResponse response) { - return response.body(BodyExtractors.toDataBuffers()) - .reduce(DataBufferUtils.writeAggregator()) + return DataBufferUtils.compose(response.body(BodyExtractors.toDataBuffers())) .map(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java index fd35d298b01..024cce9fea1 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; @@ -109,8 +110,9 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport { return Mono.just(outputResource); } DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory(); - return DataBufferUtils.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = DataBufferUtils + .read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .flatMap(dataBuffer -> { CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java index ce6d29cded3..4f45ab50901 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java @@ -16,9 +16,11 @@ package org.springframework.web.reactive.resource; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -42,8 +44,9 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy { @Override public Mono getResourceVersion(Resource resource) { - return DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = + DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .map(buffer -> { byte[] result = new byte[buffer.readableByteCount()]; buffer.read(result); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java index 3ceaf3c73de..fa30b7f7750 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java @@ -33,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; @@ -86,8 +87,9 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport { } DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory(); - return DataBufferUtils.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = DataBufferUtils + .read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .flatMap(dataBuffer -> { CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java index 916e94f5c2f..03fbcd28712 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java @@ -332,8 +332,7 @@ public class BodyInsertersTests { Mono result = inserter.insert(request, this.context); StepVerifier.create(result).expectComplete().verify(); - StepVerifier.create(request.getBody() - .reduce(DataBufferUtils.writeAggregator())) + StepVerifier.create(DataBufferUtils.compose(request.getBody())) .consumeNextWith(dataBuffer -> { byte[] resultBytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(resultBytes);