From 735e288d465ca36ae1eeccbd8a4d044d7bd3ad50 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 24 Nov 2016 16:45:32 +0100 Subject: [PATCH] Add DataBuffer BodyInserter/BodyExtractor Added a BodyExtractor for Flux, and a BodyInserter for Publisher Issue: SPR-14918 --- .../http/codec/BodyExtractors.java | 17 +++++++++++-- .../http/codec/BodyInserters.java | 22 +++++++++++++++-- .../http/codec/BodyExtractorsTests.java | 24 ++++++++++++++++--- .../http/codec/BodyInsertersTests.java | 24 +++++++++++++++++++ 4 files changed, 80 insertions(+), 7 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java b/spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java index d355de46fb9..affe991c68f 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java +++ b/spring-web/src/main/java/org/springframework/http/codec/BodyExtractors.java @@ -28,6 +28,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpMessage; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; @@ -85,12 +86,24 @@ public abstract class BodyExtractors { */ public static BodyExtractor, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { Assert.notNull(elementType, "'elementType' must not be null"); - return (request, context) -> readWithMessageReaders(request, context, + return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, - reader -> reader.read(elementType, request, Collections.emptyMap()), + reader -> reader.read(elementType, inputMessage, Collections.emptyMap()), Flux::error); } + /** + * Return a {@code BodyExtractor} that returns the body of the message as a {@link Flux} of + * {@link DataBuffer}s. + *

Note that the returned buffers should be released after usage by calling + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)} + * @return a {@code BodyExtractor} that returns the body + * @see ReactiveHttpInputMessage#getBody() + */ + public static BodyExtractor, ReactiveHttpInputMessage> toDataBuffers() { + return (inputMessage, context) -> inputMessage.getBody(); + } + private static > S readWithMessageReaders( ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, diff --git a/spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java b/spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java index d099e2be792..c59f954bfcf 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java +++ b/spring-web/src/main/java/org/springframework/http/codec/BodyInserters.java @@ -28,6 +28,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -182,16 +183,33 @@ public abstract class BodyInserters { Assert.notNull(eventsPublisher, "'eventsPublisher' must not be null"); Assert.notNull(eventType, "'eventType' must not be null"); return BodyInserter.of( - (response, context) -> { + (outputMessage, context) -> { HttpMessageWriter messageWriter = sseMessageWriter(context); return messageWriter.write(eventsPublisher, eventType, - MediaType.TEXT_EVENT_STREAM, response, Collections.emptyMap()); + MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()); }, () -> eventsPublisher ); } + /** + * Return a {@code BodyInserter} that writes the given {@code Publisher} to the + * body. + * @param publisher the data buffer publisher to write + * @param the type of the publisher + * @return a {@code BodyInserter} that writes directly to the body + * @see ReactiveHttpOutputMessage#writeWith(Publisher) + */ + public static > BodyInserter fromDataBuffers(T publisher) { + Assert.notNull(publisher, "'publisher' must not be null"); + + return BodyInserter.of( + (outputMessage, context) -> outputMessage.writeWith(publisher), + () -> publisher + ); + } + private static HttpMessageWriter sseMessageWriter(BodyInserter.Context context) { return context.messageWriters().get() .filter(messageWriter -> messageWriter diff --git a/spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java b/spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java index 8984b371344..74e829471d4 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/BodyExtractorsTests.java @@ -19,7 +19,6 @@ package org.springframework.http.codec; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.function.Supplier; import java.util.stream.Stream; @@ -40,7 +39,6 @@ import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.xml.Jaxb2XmlDecoder; import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; -import org.springframework.web.server.UnsupportedMediaTypeStatusException; /** * @author Arjen Poutsma @@ -122,7 +120,7 @@ public class BodyExtractorsTests { BodyExtractor.Context emptyContext = new BodyExtractor.Context() { @Override public Supplier>> messageReaders() { - return () -> Collections.>emptySet().stream(); + return Stream::empty; } }; @@ -132,4 +130,24 @@ public class BodyExtractorsTests { .verify(); } + @Test + public void toDataBuffers() throws Exception { + BodyExtractor, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers(); + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + MockServerHttpRequest request = new MockServerHttpRequest(); + request.setBody(body); + + Flux result = extractor.extract(request, this.context); + + StepVerifier.create(result) + .expectNext(dataBuffer) + .expectComplete() + .verify(); + } + } \ No newline at end of file diff --git a/spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java b/spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java index c8da7aee01a..d8f1005a559 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/BodyInsertersTests.java @@ -17,6 +17,7 @@ package org.springframework.http.codec; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; @@ -35,6 +36,7 @@ import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.json.Jackson2JsonEncoder; @@ -164,4 +166,26 @@ public class BodyInsertersTests { StepVerifier.create(result).expectNextCount(0).expectComplete().verify(); } + @Test + public void ofDataBuffers() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + BodyInserter, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(body); + + assertEquals(body, inserter.t()); + + MockServerHttpResponse response = new MockServerHttpResponse(); + Mono result = inserter.insert(response, this.context); + StepVerifier.create(result).expectComplete().verify(); + + StepVerifier.create(response.getBody()) + .expectNext(dataBuffer) + .expectComplete() + .verify(); + } + + } \ No newline at end of file