From 375090bb7c7dd77ae67c2e6d25024f6eafe32910 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 16 Apr 2019 20:51:40 -0400 Subject: [PATCH 1/3] LeakAwareDataBuffer related fixes Following on 3ebbfa2191376d9920c57b545fbd3c07167b4c1e where the local refCount was removed in favor of using the internal refCount of the native data buffer, this commit ensures that LeakAwareDataBufferFactory uses a PooledDataBufferFactory delegate by default. There are also fixes for test issues with eager allocation uncovered by these changes in StringDecoder and ResourceDecoder. --- .../core/codec/AbstractDecoderTestCase.java | 69 ++++++++----------- .../core/codec/ResourceDecoderTests.java | 31 ++------- .../codec/ResourceRegionEncoderTests.java | 5 +- .../core/codec/StringDecoderTests.java | 49 ++++--------- .../core/io/buffer/LeakAwareDataBuffer.java | 5 ++ .../io/buffer/LeakAwareDataBufferFactory.java | 8 ++- .../reactive/ChannelSendOperatorTests.java | 11 +-- .../result/view/ZeroDemandResponse.java | 5 +- 8 files changed, 62 insertions(+), 121 deletions(-) diff --git a/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java b/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java index 4feb5766a27..cbbb3b47569 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java +++ b/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.core.codec; +import java.time.Duration; import java.util.Map; import java.util.function.Consumer; @@ -32,6 +33,8 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; +import static org.junit.Assert.*; + /** * Abstract base class for {@link Decoder} unit tests. Subclasses need to implement * {@link #canDecode()}, {@link #decode()} and {@link #decodeToMono()}, possibly using the wide @@ -99,6 +102,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecodeAll(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecodeAll(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -122,6 +126,7 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeAll(Publisher input, ResolvableType outputType, Consumer> stepConsumer, @Nullable MimeType mimeType, @Nullable Map hints) { + testDecode(input, outputType, stepConsumer, mimeType, hints); testDecodeError(input, outputType, mimeType, hints); testDecodeCancel(input, outputType, mimeType, hints); @@ -151,6 +156,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecode(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecode(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -202,16 +208,14 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeError(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectNextCount(1) - .expectError(InputException.class) - .verify(); + input = Mono.from(input).concatWith(Flux.error(new InputException())); + try { + this.decoder.decode(input, outputType, mimeType, hints).blockLast(Duration.ofSeconds(5)); + fail(); + } + catch (InputException ex) { + // expected + } } /** @@ -229,11 +233,7 @@ public abstract class AbstractDecoderTestCase> @Nullable MimeType mimeType, @Nullable Map hints) { Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectNextCount(1) - .thenCancel() - .verify(); + StepVerifier.create(result).expectNextCount(1).thenCancel().verify(); } /** @@ -249,9 +249,7 @@ public abstract class AbstractDecoderTestCase> Flux input = Flux.empty(); Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .verifyComplete(); + StepVerifier.create(result).verifyComplete(); } // Mono @@ -297,6 +295,7 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoAll(Publisher input, ResolvableType outputType, Consumer> stepConsumer, @Nullable MimeType mimeType, @Nullable Map hints) { + testDecodeToMono(input, outputType, stepConsumer, mimeType, hints); testDecodeToMonoError(input, outputType, mimeType, hints); testDecodeToMonoCancel(input, outputType, mimeType, hints); @@ -326,6 +325,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecodeToMono(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecodeToMono(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -377,15 +377,9 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoError(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - + input = Mono.from(input).concatWith(Flux.error(new InputException())); Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); + StepVerifier.create(result).expectError(InputException.class).verify(); } /** @@ -401,10 +395,7 @@ public abstract class AbstractDecoderTestCase> @Nullable MimeType mimeType, @Nullable Map hints) { Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .thenCancel() - .verify(); + StepVerifier.create(result).thenCancel().verify(); } /** @@ -418,11 +409,8 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - Flux input = Flux.empty(); - Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .verifyComplete(); + Mono result = this.decoder.decodeToMono(Flux.empty(), outputType, mimeType, hints); + StepVerifier.create(result).verifyComplete(); } /** @@ -431,10 +419,10 @@ public abstract class AbstractDecoderTestCase> * @return the deferred buffer */ protected Mono dataBuffer(byte[] bytes) { - return Mono.defer(() -> { + return Mono.fromCallable(() -> { DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length); dataBuffer.write(bytes); - return Mono.just(dataBuffer); + return dataBuffer; }); } @@ -442,9 +430,6 @@ public abstract class AbstractDecoderTestCase> * Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError} */ @SuppressWarnings("serial") - public static class InputException extends RuntimeException { - - } - + public static class InputException extends RuntimeException {} } diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java index d8bab5a1f5a..71f87f69d6a 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,25 +18,19 @@ package org.springframework.core.codec; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import org.junit.Test; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; -import org.springframework.core.ResolvableType; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.lang.Nullable; -import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.StreamUtils; import static org.junit.Assert.*; -import static org.springframework.core.ResolvableType.forClass; +import static org.springframework.core.ResolvableType.*; /** * @author Arjen Poutsma @@ -66,9 +60,7 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase input = Flux.concat( - dataBuffer(this.fooBytes), - dataBuffer(this.barBytes)); + Flux input = Flux.concat(dataBuffer(this.fooBytes), dataBuffer(this.barBytes)); testDecodeAll(input, Resource.class, step -> step .consumeNextWith(resource -> { @@ -85,22 +77,7 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase input, ResolvableType outputType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); - } - - @Override - public void decodeToMono() throws Exception { + public void decodeToMono() { Flux input = Flux.concat( dataBuffer(this.fooBytes), dataBuffer(this.barBytes)); diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java index 99f058fd054..5ca6795d7c4 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -19,7 +19,6 @@ package org.springframework.core.codec; import java.util.Collections; import java.util.function.Consumer; -import io.netty.buffer.PooledByteBufAllocator; import org.junit.After; import org.junit.Test; import org.reactivestreams.Subscription; @@ -34,7 +33,6 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.support.DataBufferTestUtils; import org.springframework.core.io.support.ResourceRegion; import org.springframework.util.MimeType; @@ -51,8 +49,7 @@ public class ResourceRegionEncoderTests { private ResourceRegionEncoder encoder = new ResourceRegionEncoder(); - private LeakAwareDataBufferFactory bufferFactory = - new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)); + private LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); @After diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 9110cc9dd14..ed10c96c835 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,18 +19,16 @@ package org.springframework.core.codec; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import org.junit.Test; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.lang.Nullable; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -61,10 +59,8 @@ public class StringDecoderTests extends AbstractDecoderTestCase { assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8"))); - assertFalse(this.decoder.canDecode( - ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)); - assertFalse(this.decoder.canDecode( - ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)); + assertFalse(this.decoder.canDecode(ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)); + assertFalse(this.decoder.canDecode(ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)); } @Override @@ -76,24 +72,7 @@ public class StringDecoderTests extends AbstractDecoderTestCase { String s = String.format("%s\n%s\n%s", u, e, o); Flux input = toDataBuffers(s, 1, UTF_8); - testDecodeAll(input, ResolvableType.forClass(String.class), step -> step - .expectNext(u, e, o) - .verifyComplete(), null, null); - } - - @Override - protected void testDecodeError(Publisher input, ResolvableType outputType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); } @Test @@ -105,21 +84,21 @@ public class StringDecoderTests extends AbstractDecoderTestCase { Flux source = toDataBuffers(s, 2, UTF_16BE); MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); - testDecode(source, TYPE, step -> step - .expectNext(u, e, o) - .verifyComplete(), mimeType, null); + testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null); } private Flux toDataBuffers(String s, int length, Charset charset) { byte[] bytes = s.getBytes(charset); - - List dataBuffers = new ArrayList<>(); + List chunks = new ArrayList<>(); for (int i = 0; i < bytes.length; i += length) { - DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); - dataBuffer.write(bytes, i, length); - dataBuffers.add(dataBuffer); + chunks.add(Arrays.copyOfRange(bytes, i, i + length)); } - return Flux.fromIterable(dataBuffers); + return Flux.fromIterable(chunks) + .map(chunk -> { + DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); + dataBuffer.write(chunk, 0, chunk.length); + return dataBuffer; + }); } @Test diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java index a41a99aa69a..155dc056b5a 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java @@ -63,6 +63,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer { return this.leakError; } + + public DataBuffer getDelegate() { + return this.delegate; + } + @Override public boolean isAllocated() { return this.delegate instanceof PooledDataBuffer && diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index 62900ad9f53..230d1ac51e6 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jetbrains.annotations.NotNull; @@ -55,7 +56,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { * {@link DefaultDataBufferFactory}. */ public LeakAwareDataBufferFactory() { - this(new DefaultDataBufferFactory()); + this(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)); } /** @@ -67,6 +68,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { this.delegate = delegate; } + /** * Checks whether all of the data buffers allocated by this factory have also been released. * If not, then an {@link AssertionError} is thrown. Typically used from a JUnit {@link After} @@ -126,6 +128,10 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { @Override public DataBuffer join(List dataBuffers) { + // Remove LeakAwareDataBuffer wrapper so delegate can find native buffers + dataBuffers = dataBuffers.stream() + .map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).getDelegate() : o) + .collect(Collectors.toList()); return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index d1aa4fc9f94..3c905f6227a 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBufAllocator; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; @@ -38,7 +37,6 @@ import reactor.test.StepVerifier; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import static org.junit.Assert.*; @@ -135,8 +133,7 @@ public class ChannelSendOperatorTests { @Test // gh-22720 public void cancelWhileItemCached() { - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ChannelSendOperator operator = new ChannelSendOperator<>( Mono.fromCallable(() -> { @@ -164,8 +161,7 @@ public class ChannelSendOperatorTests { // 2. writeFunction applied and writeCompletionBarrier subscribed to it // 3. Write Publisher fails right after that and before request(n) from server - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber(); ChannelSendOperator operator = new ChannelSendOperator<>( @@ -200,8 +196,7 @@ public class ChannelSendOperatorTests { // 2. writeFunction applied and writeCompletionBarrier subscribed to it // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ChannelSendOperator operator = new ChannelSendOperator<>( Flux.create(sink -> { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java index 4e4eee2f6b7..e75fcdf783a 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.result.view; import java.util.function.Supplier; -import io.netty.buffer.PooledByteBufAllocator; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; @@ -26,7 +25,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; @@ -47,8 +45,7 @@ public class ZeroDemandResponse implements ServerHttpResponse { public ZeroDemandResponse() { - NettyDataBufferFactory delegate = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); - this.bufferFactory = new LeakAwareDataBufferFactory(delegate); + this.bufferFactory = new LeakAwareDataBufferFactory(); } From 15b2fb1210d648ad75adb253dccfdcabdd1e554b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 16 Apr 2019 15:59:28 -0400 Subject: [PATCH 2/3] Polish Replacing a couple of calls to Mono.fromCallable with Mono.just which seems to work with doOnDiscard except when nested inside Flux.defer. --- .../http/codec/EncoderHttpMessageWriter.java | 8 +++----- .../http/codec/xml/Jaxb2XmlEncoder.java | 12 ++++++------ .../ModelAttributeMethodArgumentResolver.java | 3 +-- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index e8058691db7..836fd3164be 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -125,16 +125,14 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { })) .flatMap(buffer -> { headers.setContentLength(buffer.readableByteCount()); - return message.writeWith( - Mono.fromCallable(() -> buffer) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); + return message.writeWith(Mono.just(buffer) + .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); }); } if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> - Mono.fromCallable(() -> buffer) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); + Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); } return message.writeWith(body); diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java index 59a11970add..8109141e774 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/Jaxb2XmlEncoder.java @@ -108,7 +108,7 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder { }); } - return Flux.defer(() -> { + return Mono.fromCallable(() -> { boolean release = true; DataBuffer buffer = bufferFactory.allocateBuffer(1024); try { @@ -117,21 +117,21 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder { Marshaller marshaller = initMarshaller(clazz); marshaller.marshal(value, outputStream); release = false; - return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class + return buffer; // relying on doOnDiscard in base class } catch (MarshalException ex) { - return Flux.error(new EncodingException( - "Could not marshal " + value.getClass() + " to XML", ex)); + throw new EncodingException( + "Could not marshal " + value.getClass() + " to XML", ex); } catch (JAXBException ex) { - return Flux.error(new CodecException("Invalid JAXB configuration", ex)); + throw new CodecException("Invalid JAXB configuration", ex); } finally { if (release) { DataBufferUtils.release(buffer); } } - }); + }).flux(); } private Marshaller initMarshaller(Class clazz) throws JAXBException { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java index 32ff9c16e9c..d05e55a3d2c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ModelAttributeMethodArgumentResolver.java @@ -135,8 +135,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR BindingResult errors = binder.getBindingResult(); if (adapter != null) { return adapter.fromPublisher(errors.hasErrors() ? - Mono.error(new WebExchangeBindException(parameter, errors)) : - valueMono); + Mono.error(new WebExchangeBindException(parameter, errors)) : valueMono); } else { if (errors.hasErrors() && !hasErrorsArgument(parameter)) { From 5b711a964bfb64c80bde72ebc48e706f9fe2f000 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 16 Apr 2019 16:54:33 -0400 Subject: [PATCH 3/3] Pass Mono to Reactor Netty when feasible Closes gh-22800 --- .../reactive/MockServerHttpResponse.java | 21 +++++++++++++++---- .../reactive/ReactorClientHttpRequest.java | 14 ++++++++++--- .../reactive/AbstractServerHttpResponse.java | 15 ++++++++++--- .../codec/FormHttpMessageWriterTests.java | 8 +++---- .../reactive/test/MockServerHttpResponse.java | 8 +++++-- 5 files changed, 49 insertions(+), 17 deletions(-) diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index 79f06edfb86..71fc199378b 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,10 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpHeaders; @@ -54,10 +56,17 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse() { - super(new DefaultDataBufferFactory()); + this(new DefaultDataBufferFactory()); + } + + public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { + super(dataBufferFactory); this.writeHandler = body -> { - this.body = body.cache(); - return this.body.then(); + // Avoid .then() which causes data buffers to be released + MonoProcessor completion = MonoProcessor.create(); + this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + this.body.subscribe(); + return completion; }; } @@ -125,8 +134,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { * charset or "UTF-8" by default. */ public Mono getBodyAsString() { + Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset) .orElse(StandardCharsets.UTF_8); + return getBody() .reduce(bufferFactory().allocateBuffer(), (previous, current) -> { previous.write(current); @@ -137,8 +148,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { } private static String bufferToString(DataBuffer buffer, Charset charset) { + Assert.notNull(charset, "'charset' must not be null"); byte[] bytes = new byte[buffer.readableByteCount()]; buffer.read(bytes); + DataBufferUtils.release(buffer); return new String(bytes, charset); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 8eef8859507..8cc4d423488 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,8 +81,16 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); - return this.outbound.send(byteBufFlux).then(); + // Send as Mono if possible as an optimization hint to Reactor Netty + if (body instanceof Mono) { + Mono byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufMono).then(); + + } + else { + Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufFlux).then(); + } }); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index ee19d418b4c..785e163e4fe 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,8 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpLogging; import org.springframework.http.HttpStatus; @@ -173,9 +175,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { } @Override + @SuppressWarnings("unchecked") public final Mono writeWith(Publisher body) { - return new ChannelSendOperator<>(body, - writePublisher -> doCommit(() -> writeWithInternal(writePublisher))) + // Write as Mono if possible as an optimization hint to Reactor Netty + // ChannelSendOperator not necessary for Mono + if (body instanceof Mono) { + return ((Mono) body).flatMap(buffer -> + doCommit(() -> writeWithInternal(Mono.just(buffer))) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) .doOnError(t -> removeContentLength()); } diff --git a/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java index 1c6a8a00033..7f23f769b3c 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,8 +85,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase { String expected = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3"; StepVerifier.create(response.getBody()) - .consumeNextWith(stringConsumer( - expected)) + .consumeNextWith(stringConsumer(expected)) .expectComplete() .verify(); HttpHeaders headers = response.getHeaders(); @@ -96,8 +95,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase { private Consumer stringConsumer(String expected) { return dataBuffer -> { - String value = - DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8); + String value = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8); DataBufferUtils.release(dataBuffer); assertEquals(expected, value); }; diff --git a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java index 76049ee9e6b..366dbbdc3c7 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java @@ -25,6 +25,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -61,8 +62,11 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { super(dataBufferFactory); this.writeHandler = body -> { - this.body = body.cache(); - return this.body.then(); + // Avoid .then() which causes data buffers to be released + MonoProcessor completion = MonoProcessor.create(); + this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + this.body.subscribe(); + return completion; }; }