diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java index b395cb572b9..944bf6fca78 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java @@ -63,8 +63,7 @@ public abstract class AbstractDataBufferDecoder extends AbstractDecoder { public Mono decodeToMono(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux.from(inputStream) - .reduce(DataBufferUtils.writeAggregator()) + return DataBufferUtils.compose(inputStream) .map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints)); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 5c942573e37..f4e6dcb4773 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -419,22 +419,6 @@ public abstract class DataBufferUtils { return RELEASE_CONSUMER; } - /** - * Return an aggregator function that can be used to {@linkplain Flux#reduce(BiFunction) reduce} - * a {@code Flux} of data buffers into a single data buffer by writing all subsequent buffers - * into the first buffer. All buffers except the first buffer are - * {@linkplain #release(DataBuffer) released}. - *

For example: - *

-	 * Flux<DataBuffer> flux = ...
-	 * Mono<DataBuffer> mono = flux.reduce(DataBufferUtils.writeAggregator());
-	 * 
- * @see Flux#reduce(BiFunction) - */ - public static BinaryOperator writeAggregator() { - return WRITE_AGGREGATOR; - } - /** * Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on * the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index afa451a0a06..3c7dab56cd5 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -308,21 +308,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { } } - @Test - public void writeAggregator() { - DataBuffer foo = stringBuffer("foo"); - DataBuffer bar = stringBuffer("bar"); - DataBuffer baz = stringBuffer("baz"); - Flux flux = Flux.just(foo, bar, baz); - - DataBuffer result = - flux.reduce(DataBufferUtils.writeAggregator()).block(Duration.ofSeconds(1)); - - assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8)); - - release(result); - } - @Test public void SPR16070() throws Exception { ReadableByteChannel channel = mock(ReadableByteChannel.class); diff --git a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java index a5c67b3f65e..2be7b91c8f2 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java @@ -96,7 +96,7 @@ public class FormHttpMessageReader implements HttpMessageReader { CharBuffer charBuffer = charset.decode(buffer.asByteBuffer()); String body = charBuffer.toString(); diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index 836c0ae7643..4e7b24167d9 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -103,7 +103,7 @@ public class XmlEventDecoder extends AbstractDecoder { .doFinally(signalType -> aaltoMapper.endOfInput()); } else { - Mono singleBuffer = flux.reduce(DataBufferUtils.writeAggregator()); + Mono singleBuffer = DataBufferUtils.compose(flux); return singleBuffer. flatMapMany(dataBuffer -> { try { diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java index c4381fc5a0b..2502bb9b628 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java @@ -89,7 +89,7 @@ public class SynchronossPartHttpMessageReaderTests { assertTrue(part instanceof FilePart); assertEquals("fooPart", part.name()); assertEquals("foo.txt", ((FilePart) part).filename()); - DataBuffer buffer = part.content().reduce(DataBufferUtils.writeAggregator()).block(); + DataBuffer buffer = DataBufferUtils.compose(part.content()).block(); assertEquals(12, buffer.readableByteCount()); byte[] byteContent = new byte[12]; buffer.read(byteContent); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java index 59581046688..13a5df723a4 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java @@ -103,10 +103,7 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes assertEquals("fooPart", part.name()); assertTrue(part instanceof FilePart); assertEquals("foo.txt", ((FilePart) part).filename()); - DataBuffer buffer = part - .content() - .reduce(DataBufferUtils.writeAggregator()) - .block(); + DataBuffer buffer = DataBufferUtils.compose(part.content()).block(); assertEquals(12, buffer.readableByteCount()); byte[] byteContent = new byte[12]; buffer.read(byteContent); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 34f671bada3..5f5a123fbd2 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -457,8 +457,7 @@ class DefaultWebClient implements WebClient { private static Mono createResponseException(ClientResponse response) { - return response.body(BodyExtractors.toDataBuffers()) - .reduce(DataBufferUtils.writeAggregator()) + return DataBufferUtils.compose(response.body(BodyExtractors.toDataBuffers())) .map(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java index fd35d298b01..024cce9fea1 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; @@ -109,8 +110,9 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport { return Mono.just(outputResource); } DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory(); - return DataBufferUtils.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = DataBufferUtils + .read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .flatMap(dataBuffer -> { CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java index ce6d29cded3..4f45ab50901 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java @@ -16,9 +16,11 @@ package org.springframework.web.reactive.resource; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -42,8 +44,9 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy { @Override public Mono getResourceVersion(Resource resource) { - return DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = + DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .map(buffer -> { byte[] result = new byte[buffer.readableByteCount()]; buffer.read(result); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java index 3ceaf3c73de..fa30b7f7750 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java @@ -33,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; @@ -86,8 +87,9 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport { } DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory(); - return DataBufferUtils.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE) - .reduce(DataBufferUtils.writeAggregator()) + Flux flux = DataBufferUtils + .read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE); + return DataBufferUtils.compose(flux) .flatMap(dataBuffer -> { CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer()); DataBufferUtils.release(dataBuffer); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java index 916e94f5c2f..03fbcd28712 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java @@ -332,8 +332,7 @@ public class BodyInsertersTests { Mono result = inserter.insert(request, this.context); StepVerifier.create(result).expectComplete().verify(); - StepVerifier.create(request.getBody() - .reduce(DataBufferUtils.writeAggregator())) + StepVerifier.create(DataBufferUtils.compose(request.getBody())) .consumeNextWith(dataBuffer -> { byte[] resultBytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(resultBytes);