diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index f89cdfc07b0..3aa79c4a94a 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -50,9 +50,9 @@ public class MockServerHttpResponse implements ServerHttpResponse { private final MultiValueMap cookies = new LinkedMultiValueMap<>(); - private Publisher body; + private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -77,22 +77,22 @@ public class MockServerHttpResponse implements ServerHttpResponse { return this.cookies; } - public Publisher getBody() { + public Publisher getBody() { return this.body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return this.bodyWithFlushes; } @Override - public Mono writeWith(Publisher body) { + public Mono writeWith(Publisher body) { this.body = body; return Flux.from(this.body).then(); } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return Flux.from(this.bodyWithFlushes).then(); } diff --git a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 043e884663b..e0f61a42310 100644 --- a/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -55,7 +55,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ - Mono writeWith(Publisher body); + Mono writeWith(Publisher body); /** * Use the given {@link Publisher} of {@code Publishers} to write the body of the @@ -64,7 +64,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage { * @param body the body content publisher * @return a {@link Mono} that indicates completion or error */ - Mono writeAndFlushWith(Publisher> body); + Mono writeAndFlushWith(Publisher> body); /** * Indicate that message handling is complete, allowing for any cleanup or diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index d5822617a6d..56a9cfb036a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -74,20 +74,20 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { } @Override - public Mono writeWith(Publisher body) { + public Mono writeWith(Publisher body) { return applyBeforeCommit().then(this.httpRequest .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf))); } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body). map(ReactorClientHttpRequest::toByteBufs); return applyBeforeCommit().then(this.httpRequest .sendGroups(byteBufs)); } - private static Publisher toByteBufs(Publisher dataBuffers) { + private static Publisher toByteBufs(Publisher dataBuffers) { return Flux.from(dataBuffers). map(NettyDataBufferFactory::toByteBuf); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index e2558669e2f..35440503ed6 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -43,14 +43,14 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH @Override - protected final Mono writeWithInternal(Publisher body) { + protected final Mono writeWithInternal(Publisher body) { return writeAndFlushWithInternal(Mono.just(body)); } @Override - protected final Mono writeAndFlushWithInternal(Publisher> body) { + protected final Mono writeAndFlushWithInternal(Publisher> body) { if (this.writeCalled.compareAndSet(false, true)) { - Processor, Void> bodyProcessor = createBodyFlushProcessor(); + Processor, Void> bodyProcessor = createBodyFlushProcessor(); return Mono.from(subscriber -> { body.subscribe(bodyProcessor); bodyProcessor.subscribe(subscriber); @@ -67,6 +67,6 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH * that will write the response body with flushes to the underlying output. Called from * {@link #writeAndFlushWithInternal(Publisher)}. */ - protected abstract Processor, Void> createBodyFlushProcessor(); + protected abstract Processor, Void> createBodyFlushProcessor(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 57510a9c094..56215d668a7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -41,7 +41,7 @@ import org.springframework.core.io.buffer.DataBuffer; * @see UndertowHttpHandlerAdapter * @see ServerHttpResponse#writeAndFlushWith(Publisher) */ -abstract class AbstractResponseBodyFlushProcessor implements Processor, Void> { +abstract class AbstractResponseBodyFlushProcessor implements Processor, Void> { protected final Log logger = LogFactory.getLog(getClass()); @@ -65,7 +65,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor publisher) { + public final void onNext(Publisher publisher) { if (logger.isTraceEnabled()) { logger.trace(this.state + " onNext: " + publisher); } @@ -100,7 +100,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor createBodyProcessor(); + protected abstract Processor createBodyProcessor(); /** * Flushes the output. @@ -144,9 +144,9 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor chunk) { + public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher chunk) { if (processor.changeState(this, RECEIVED)) { - Processor chunkProcessor = processor.createBodyProcessor(); + Processor chunkProcessor = processor.createBodyProcessor(); chunk.subscribe(chunkProcessor); chunkProcessor.subscribe(new WriteSubscriber(processor)); } @@ -192,7 +192,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor publisher) { + Publisher publisher) { // ignore } @@ -212,7 +212,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor publisher) { + public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { throw new IllegalStateException(toString()); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 519f5519341..a5163b22922 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -125,13 +125,13 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { } @Override - public final Mono writeWith(Publisher body) { + public final Mono writeWith(Publisher body) { return new ChannelSendOperator<>(body, writePublisher -> doCommit(() -> writeWithInternal(writePublisher))); } @Override - public final Mono writeAndFlushWith(Publisher> body) { + public final Mono writeAndFlushWith(Publisher> body) { return new ChannelSendOperator<>(body, writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher))); } @@ -186,14 +186,14 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { * Implement this method to write to the underlying the response. * @param body the publisher to write with */ - protected abstract Mono writeWithInternal(Publisher body); + protected abstract Mono writeWithInternal(Publisher body); /** * Implement this method to write to the underlying the response, and flush after * each {@code Publisher}. * @param body the publisher to write and flush with */ - protected abstract Mono writeAndFlushWithInternal(Publisher> body); + protected abstract Mono writeAndFlushWithInternal(Publisher> body); /** * Implement this method to write the status code to the underlying response. diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index c7711d32959..64c8a10f749 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -70,13 +70,13 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse } @Override - protected Mono writeWithInternal(Publisher publisher) { + protected Mono writeWithInternal(Publisher publisher) { Publisher body = toByteBufs(publisher); return this.response.send(body); } @Override - protected Mono writeAndFlushWithInternal(Publisher> publisher) { + protected Mono writeAndFlushWithInternal(Publisher> publisher) { Publisher> body = Flux.from(publisher) .map(ReactorServerHttpResponse::toByteBufs); return this.response.sendGroups(body); @@ -117,7 +117,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse return doCommit(() -> this.response.sendFile(file, position, count)); } - private static Publisher toByteBufs(Publisher dataBuffers) { + private static Publisher toByteBufs(Publisher dataBuffers) { return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index 7b3df4df6be..4d2e37abbe5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -71,7 +71,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { } @Override - protected Mono writeWithInternal(Publisher body) { + protected Mono writeWithInternal(Publisher body) { Observable content = RxReactiveStreams.toObservable(body) .map(NettyDataBufferFactory::toByteBuf); return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content))) @@ -80,7 +80,7 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono writeAndFlushWithInternal( - Publisher> body) { + Publisher> body) { Flux bodyWithFlushSignals = Flux.from(body). flatMap(publisher -> Flux.from(publisher). map(NettyDataBufferFactory::toByteBuf). diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 7967d2dcb75..f6151aa3a7f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -122,7 +122,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } @Override - protected Processor, Void> createBodyFlushProcessor() { + protected Processor, Void> createBodyFlushProcessor() { ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor(); this.bodyFlushProcessor = processor; return processor; @@ -261,7 +261,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { @Override - protected Processor createBodyProcessor() { + protected Processor createBodyProcessor() { try { bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize); return bodyProcessor; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b07b8cdad1e..9aad48d13ff 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -29,6 +29,7 @@ import io.undertow.server.handlers.Cookie; import io.undertow.server.handlers.CookieImpl; import io.undertow.util.HttpString; import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; import org.xnio.ChannelListener; import org.xnio.channels.StreamSinkChannel; import reactor.core.publisher.Mono; @@ -123,7 +124,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } @Override - protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { + protected Processor, Void> createBodyFlushProcessor() { return new ResponseBodyFlushProcessor(); } @@ -209,7 +210,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { @Override - protected Processor createBodyProcessor() { + protected Processor createBodyProcessor() { return UndertowServerHttpResponse.this.createBodyProcessor(); } diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java index 33e566cf371..5789c629a8e 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java @@ -72,7 +72,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = Flux.from(outputMessage.getBodyWithFlush()); + Publisher> result = Flux.from(outputMessage.getBodyWithFlush()); StepVerifier.create(result) .consumeNextWith(sseConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" + ":bla\n:bla bla\n:bla bla bla\n" + "data:bar\n")) @@ -87,7 +87,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:foo\n")) .consumeNextWith(sseConsumer("data:bar\n")) @@ -102,7 +102,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(String.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:foo\ndata:bar\n")) .consumeNextWith(sseConsumer("data:foo\ndata:baz\n")) @@ -118,7 +118,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll messageWriter.write(source, ResolvableType.forClass(Pojo.class), new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap()); - Publisher> result = outputMessage.getBodyWithFlush(); + Publisher> result = outputMessage.getBodyWithFlush(); StepVerifier.create(result) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", "\n")) .consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "\n")) @@ -127,7 +127,7 @@ public class ServerSentEventHttpMessageWriterTests extends AbstractDataBufferAll } - private Consumer> sseConsumer(String... expected) { + private Consumer> sseConsumer(String... expected) { return publisher -> { StepVerifier.Step builder = StepVerifier.create(publisher); for (String value : expected) { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java index 7c7e8fc2721..fd875411caa 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java @@ -27,6 +27,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.ResponseCookie; @@ -56,6 +57,20 @@ public class ServerHttpResponseTests { assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), StandardCharsets.UTF_8)); } + @Test // SPR-14952 + public void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception { + TestServerHttpResponse response = new TestServerHttpResponse(); + Flux> flux = Flux.just(Flux.just(wrap("foo"))); + response.writeAndFlushWith(flux).block(); + + assertTrue(response.statusCodeWritten); + assertTrue(response.headersWritten); + assertTrue(response.cookiesWritten); + + assertEquals(1, response.body.size()); + assertEquals("foo", new String(response.body.get(0).asByteBuffer().array(), StandardCharsets.UTF_8)); + } + @Test public void writeWithError() throws Exception { TestServerHttpResponse response = new TestServerHttpResponse(); @@ -119,7 +134,7 @@ public class ServerHttpResponseTests { - private DataBuffer wrap(String a) { + private DefaultDataBuffer wrap(String a) { return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8))); } @@ -157,7 +172,7 @@ public class ServerHttpResponseTests { } @Override - protected Mono writeWithInternal(Publisher body) { + protected Mono writeWithInternal(Publisher body) { return Flux.from(body).map(b -> { this.body.add(b); return b; @@ -166,8 +181,13 @@ public class ServerHttpResponseTests { @Override protected Mono writeAndFlushWithInternal( - Publisher> body) { - return Mono.error(new UnsupportedOperationException()); + Publisher> bodyWithFlush) { + return Flux.from(bodyWithFlush).flatMap(body -> + Flux.from(body).map(b -> { + this.body.add(b); + return b; + }) + ).then(); } } diff --git a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java index c1f0f94b835..9105cd81ae4 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java @@ -49,9 +49,9 @@ public class MockServerHttpResponse implements ServerHttpResponse { private final MultiValueMap cookies = new LinkedMultiValueMap<>(); - private Publisher body; + private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; private DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @@ -77,22 +77,22 @@ public class MockServerHttpResponse implements ServerHttpResponse { return this.cookies; } - public Publisher getBody() { + public Publisher getBody() { return this.body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return this.bodyWithFlushes; } @Override - public Mono writeWith(Publisher body) { + public Mono writeWith(Publisher body) { this.body = body; return Flux.from(this.body).then(); } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return Flux.from(this.bodyWithFlushes).then(); } diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java index bf51e6c4c79..bffbf351b2a 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java @@ -41,9 +41,9 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); - private Publisher body; + private Publisher body; - private Publisher> bodyWithFlushes; + private Publisher> bodyWithFlushes; public MockClientHttpRequest() { @@ -90,22 +90,22 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { } @Override - public Mono writeWith(Publisher body) { + public Mono writeWith(Publisher body) { this.body = body; return applyBeforeCommit().then(Flux.from(this.body).then()); } @Override - public Mono writeAndFlushWith(Publisher> body) { + public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = body; return applyBeforeCommit().then(Flux.from(this.bodyWithFlushes).then()); } - public Publisher getBody() { + public Publisher getBody() { return body; } - public Publisher> getBodyWithFlush() { + public Publisher> getBodyWithFlush() { return bodyWithFlushes; }