Browse Source

Use DataBufferUtils.compose and remove writeAggregator

Use DataBufferUtils.compose instead of writeAggregator to combine
multiple data buffers into one, as the write aggregator would not work
when the initial data buffer did not have enough capacity to contain
all subsequent buffers.

Removed writeAggregator, as it is no longer needed.

Issue: SPR-16365
pull/1503/merge
Arjen Poutsma 8 years ago
parent
commit
67e7c784e8
  1. 3
      spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java
  2. 16
      spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java
  3. 15
      spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java
  4. 2
      spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java
  5. 2
      spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java
  6. 2
      spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java
  7. 5
      spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java
  8. 3
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  9. 6
      spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java
  10. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java
  11. 6
      spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java
  12. 3
      spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java

3
spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java

@ -63,8 +63,7 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> { @@ -63,8 +63,7 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.from(inputStream)
.reduce(DataBufferUtils.writeAggregator())
return DataBufferUtils.compose(inputStream)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
}

16
spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

@ -419,22 +419,6 @@ public abstract class DataBufferUtils { @@ -419,22 +419,6 @@ public abstract class DataBufferUtils {
return RELEASE_CONSUMER;
}
/**
* Return an aggregator function that can be used to {@linkplain Flux#reduce(BiFunction) reduce}
* a {@code Flux} of data buffers into a single data buffer by writing all subsequent buffers
* into the first buffer. All buffers except the first buffer are
* {@linkplain #release(DataBuffer) released}.
* <p>For example:
* <pre class="code">
* Flux&lt;DataBuffer&gt; flux = ...
* Mono&lt;DataBuffer&gt; mono = flux.reduce(DataBufferUtils.writeAggregator());
* </pre>
* @see Flux#reduce(BiFunction)
*/
public static BinaryOperator<DataBuffer> writeAggregator() {
return WRITE_AGGREGATOR;
}
/**
* Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
* the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing

15
spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

@ -308,21 +308,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { @@ -308,21 +308,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
}
}
@Test
public void writeAggregator() {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
DataBuffer result =
flux.reduce(DataBufferUtils.writeAggregator()).block(Duration.ofSeconds(1));
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
release(result);
}
@Test
public void SPR16070() throws Exception {
ReadableByteChannel channel = mock(ReadableByteChannel.class);

2
spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java

@ -96,7 +96,7 @@ public class FormHttpMessageReader implements HttpMessageReader<MultiValueMap<St @@ -96,7 +96,7 @@ public class FormHttpMessageReader implements HttpMessageReader<MultiValueMap<St
MediaType contentType = message.getHeaders().getContentType();
Charset charset = getMediaTypeCharset(contentType);
return message.getBody().reduce(DataBufferUtils.writeAggregator())
return DataBufferUtils.compose(message.getBody())
.map(buffer -> {
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
String body = charBuffer.toString();

2
spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java

@ -103,7 +103,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> { @@ -103,7 +103,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
.doFinally(signalType -> aaltoMapper.endOfInput());
}
else {
Mono<DataBuffer> singleBuffer = flux.reduce(DataBufferUtils.writeAggregator());
Mono<DataBuffer> singleBuffer = DataBufferUtils.compose(flux);
return singleBuffer.
flatMapMany(dataBuffer -> {
try {

2
spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java

@ -89,7 +89,7 @@ public class SynchronossPartHttpMessageReaderTests { @@ -89,7 +89,7 @@ public class SynchronossPartHttpMessageReaderTests {
assertTrue(part instanceof FilePart);
assertEquals("fooPart", part.name());
assertEquals("foo.txt", ((FilePart) part).filename());
DataBuffer buffer = part.content().reduce(DataBufferUtils.writeAggregator()).block();
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
assertEquals(12, buffer.readableByteCount());
byte[] byteContent = new byte[12];
buffer.read(byteContent);

5
spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java

@ -103,10 +103,7 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes @@ -103,10 +103,7 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes
assertEquals("fooPart", part.name());
assertTrue(part instanceof FilePart);
assertEquals("foo.txt", ((FilePart) part).filename());
DataBuffer buffer = part
.content()
.reduce(DataBufferUtils.writeAggregator())
.block();
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
assertEquals(12, buffer.readableByteCount());
byte[] byteContent = new byte[12];
buffer.read(byteContent);

3
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -457,8 +457,7 @@ class DefaultWebClient implements WebClient { @@ -457,8 +457,7 @@ class DefaultWebClient implements WebClient {
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBufferUtils.writeAggregator())
return DataBufferUtils.compose(response.body(BodyExtractors.toDataBuffers()))
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);

6
spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java

@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
@ -109,8 +110,9 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport { @@ -109,8 +110,9 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport {
return Mono.just(outputResource);
}
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
return DataBufferUtils.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE)
.reduce(DataBufferUtils.writeAggregator())
Flux<DataBuffer> flux = DataBufferUtils
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
return DataBufferUtils.compose(flux)
.flatMap(dataBuffer -> {
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);

7
spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java

@ -16,9 +16,11 @@ @@ -16,9 +16,11 @@
package org.springframework.web.reactive.resource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@ -42,8 +44,9 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy { @@ -42,8 +44,9 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy {
@Override
public Mono<String> getResourceVersion(Resource resource) {
return DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE)
.reduce(DataBufferUtils.writeAggregator())
Flux<DataBuffer> flux =
DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE);
return DataBufferUtils.compose(flux)
.map(buffer -> {
byte[] result = new byte[buffer.readableByteCount()];
buffer.read(result);

6
spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java

@ -33,6 +33,7 @@ import reactor.core.publisher.Flux; @@ -33,6 +33,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable;
@ -86,8 +87,9 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport { @@ -86,8 +87,9 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport {
}
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
return DataBufferUtils.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE)
.reduce(DataBufferUtils.writeAggregator())
Flux<DataBuffer> flux = DataBufferUtils
.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
return DataBufferUtils.compose(flux)
.flatMap(dataBuffer -> {
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
DataBufferUtils.release(dataBuffer);

3
spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java

@ -332,8 +332,7 @@ public class BodyInsertersTests { @@ -332,8 +332,7 @@ public class BodyInsertersTests {
Mono<Void> result = inserter.insert(request, this.context);
StepVerifier.create(result).expectComplete().verify();
StepVerifier.create(request.getBody()
.reduce(DataBufferUtils.writeAggregator()))
StepVerifier.create(DataBufferUtils.compose(request.getBody()))
.consumeNextWith(dataBuffer -> {
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(resultBytes);

Loading…
Cancel
Save