diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerRequest.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerRequest.java index 1462ea38260..a53cd7bb664 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerRequest.java @@ -27,13 +27,18 @@ import java.util.OptionalLong; import java.util.function.Supplier; import java.util.stream.Stream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpRange; import org.springframework.http.MediaType; import org.springframework.http.codec.BodyExtractor; +import org.springframework.http.codec.BodyExtractors; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.util.Assert; import org.springframework.web.server.ServerWebExchange; /** @@ -73,6 +78,7 @@ class DefaultServerRequest implements ServerRequest { @Override public T body(BodyExtractor extractor) { + Assert.notNull(extractor, "'extractor' must not be null"); return extractor.extract(request(), new BodyExtractor.Context() { @Override @@ -82,6 +88,16 @@ class DefaultServerRequest implements ServerRequest { }); } + @Override + public Mono bodyToMono(Class elementClass) { + return body(BodyExtractors.toMono(elementClass)); + } + + @Override + public Flux bodyToFlux(Class elementClass) { + return body(BodyExtractors.toFlux(elementClass)); + } + @Override public Optional attribute(String name) { return this.exchange.getAttribute(name); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerResponseBuilder.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerResponseBuilder.java index a872e82117a..f8c1e4ad0d6 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerResponseBuilder.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/DefaultServerResponseBuilder.java @@ -35,7 +35,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.Conventions; -import org.springframework.core.ResolvableType; import org.springframework.http.CacheControl; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -168,11 +167,6 @@ class DefaultServerResponseBuilder implements ServerResponse.BodyBuilder { return body(BodyInserters.fromPublisher(publisher, elementClass)); } - @Override - public , T> ServerResponse body(S publisher, ResolvableType elementType) { - return body(BodyInserters.fromPublisher(publisher, elementType)); - } - @Override public ServerResponse render(String name, Object... modelAttributes) { Assert.hasLength(name, "'name' must not be empty"); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/RequestPredicates.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/RequestPredicates.java index b3d374331d5..9591c2ecb0d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/RequestPredicates.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/RequestPredicates.java @@ -25,6 +25,9 @@ import java.util.Optional; import java.util.Set; import java.util.function.Predicate; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.codec.BodyExtractor; @@ -320,6 +323,16 @@ public abstract class RequestPredicates { return this.request.body(extractor); } + @Override + public Mono bodyToMono(Class elementClass) { + return this.request.bodyToMono(elementClass); + } + + @Override + public Flux bodyToFlux(Class elementClass) { + return this.request.bodyToFlux(elementClass); + } + @Override public Optional attribute(String name) { return this.request.attribute(name); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerRequest.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerRequest.java index d9ce3511c2d..8b48632135c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerRequest.java @@ -24,6 +24,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpRange; @@ -71,6 +74,22 @@ public interface ServerRequest { */ T body(BodyExtractor extractor); + /** + * Extract the body to a {@code Mono}. + * @param elementClass the class of element in the {@code Mono} + * @param the element type + * @return the body as a mono + */ + Mono bodyToMono(Class elementClass); + + /** + * Extract the body to a {@code Flux}. + * @param elementClass the class of element in the {@code Flux} + * @param the element type + * @return the body as a flux + */ + Flux bodyToFlux(Class elementClass); + /** * Return the request attribute value if present. * @param name the attribute name diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerResponse.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerResponse.java index 3c543717c33..fb9b7219b92 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/ServerResponse.java @@ -25,7 +25,6 @@ import java.util.Set; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import org.springframework.core.ResolvableType; import org.springframework.http.CacheControl; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -329,18 +328,6 @@ public interface ServerResponse { */ , T> ServerResponse body(S publisher, Class elementClass); - /** - * Set the body of the response to the given {@code Publisher} and return it. This - * convenience method combines {@link #body(BodyInserter)} and - * {@link BodyInserters#fromPublisher(Publisher, ResolvableType)}. - * @param publisher the {@code Publisher} to write to the response - * @param elementType the type of elements contained in the publisher - * @param the type of the elements contained in the publisher - * @param the type of the {@code Publisher} - * @return the built request - */ - , T> ServerResponse body(S publisher, ResolvableType elementType); - /** * Render the template with the given {@code name} using the given {@code modelAttributes}. * The model attributes are mapped under a diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/support/ServerRequestWrapper.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/support/ServerRequestWrapper.java index e1420284a1c..bf49c56d32c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/support/ServerRequestWrapper.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/function/support/ServerRequestWrapper.java @@ -24,6 +24,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpRange; @@ -88,6 +91,16 @@ public class ServerRequestWrapper implements ServerRequest { return this.request.body(extractor); } + @Override + public Mono bodyToMono(Class elementClass) { + return this.request.bodyToMono(elementClass); + } + + @Override + public Flux bodyToFlux(Class elementClass) { + return this.request.bodyToFlux(elementClass); + } + @Override public Optional attribute(String name) { return this.request.attribute(name); diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/DefaultServerRequestTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/DefaultServerRequestTests.java index 5a5e855e69b..496dde0d1e9 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/DefaultServerRequestTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/DefaultServerRequestTests.java @@ -185,4 +185,45 @@ public class DefaultServerRequestTests { assertEquals("foo", resultMono.block()); } + @Test + public void bodyToMono() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockRequest.getHeaders()).thenReturn(httpHeaders); + when(mockRequest.getBody()).thenReturn(body); + + Set> messageReaders = Collections + .singleton(new DecoderHttpMessageReader(new StringDecoder())); + when(mockHandlerStrategies.messageReaders()).thenReturn(messageReaders::stream); + + Mono resultMono = defaultRequest.bodyToMono(String.class); + assertEquals("foo", resultMono.block()); + } + + @Test + public void bodyToFlux() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockRequest.getHeaders()).thenReturn(httpHeaders); + when(mockRequest.getBody()).thenReturn(body); + + Set> messageReaders = Collections + .singleton(new DecoderHttpMessageReader(new StringDecoder())); + when(mockHandlerStrategies.messageReaders()).thenReturn(messageReaders::stream); + + Flux resultFlux = defaultRequest.bodyToFlux(String.class); + Mono> result = resultFlux.collectList(); + assertEquals(Collections.singletonList("foo"), result.block()); + } + } \ No newline at end of file diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/MockServerRequest.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/MockServerRequest.java index 7a2111f8c9e..991eb0657e0 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/MockServerRequest.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/function/MockServerRequest.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpRange; @@ -96,6 +99,18 @@ public class MockServerRequest implements ServerRequest { return (S) this.body; } + @Override + @SuppressWarnings("unchecked") + public Mono bodyToMono(Class elementClass) { + return (Mono) this.body; + } + + @Override + @SuppressWarnings("unchecked") + public Flux bodyToFlux(Class elementClass) { + return (Flux) this.body; + } + @SuppressWarnings("unchecked") @Override public Optional attribute(String name) { diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/ClientRequest.java b/spring-web/src/main/java/org/springframework/web/client/reactive/ClientRequest.java index 0a585600ea3..8d61049e12a 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/ClientRequest.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/ClientRequest.java @@ -23,7 +23,6 @@ import java.time.ZonedDateTime; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import org.springframework.core.ResolvableType; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -318,16 +317,6 @@ public interface ClientRequest { */ > ClientRequest body(S publisher, Class elementClass); - /** - * Set the body of the request to the given {@code Publisher} and return it. - * @param publisher the {@code Publisher} to write to the request - * @param elementType the type of elements contained in the publisher - * @param the type of the elements contained in the publisher - * @param the type of the {@code Publisher}. - * @return the built request - */ - > ClientRequest body(S publisher, ResolvableType elementType); - } diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/ClientResponse.java b/spring-web/src/main/java/org/springframework/web/client/reactive/ClientResponse.java index 3a77266c824..4d7994a9794 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/ClientResponse.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/ClientResponse.java @@ -20,6 +20,9 @@ import java.util.List; import java.util.Optional; import java.util.OptionalLong; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -55,6 +58,23 @@ public interface ClientResponse { */ T body(BodyExtractor extractor); + /** + * Extract the body to a {@code Mono}. + * @param elementClass the class of element in the {@code Mono} + * @param the element type + * @return the body as a mono + */ + Mono bodyToMono(Class elementClass); + + /** + * Extract the body to a {@code Flux}. + * @param elementClass the class of element in the {@code Flux} + * @param the element type + * @return the body as a flux + */ + Flux bodyToFlux(Class elementClass); + + /** * Represents the headers of the HTTP response. * @see ClientResponse#headers() diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientRequestBuilder.java b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientRequestBuilder.java index 6141b8d733c..fd53a1164e5 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientRequestBuilder.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientRequestBuilder.java @@ -28,7 +28,6 @@ import java.util.stream.Stream; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import org.springframework.core.ResolvableType; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -151,12 +150,6 @@ class DefaultClientRequestBuilder implements ClientRequest.BodyBuilder { return body(BodyInserters.fromPublisher(publisher, elementClass)); } - @Override - public > ClientRequest body(S publisher, - ResolvableType elementType) { - return body(BodyInserters.fromPublisher(publisher, elementType)); - } - private static class BodyInserterRequest implements ClientRequest { private final HttpMethod method; diff --git a/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientResponse.java b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientResponse.java index a7eea233258..5f48dd4b58e 100644 --- a/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientResponse.java +++ b/spring-web/src/main/java/org/springframework/web/client/reactive/DefaultClientResponse.java @@ -23,11 +23,15 @@ import java.util.OptionalLong; import java.util.function.Supplier; import java.util.stream.Stream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ClientHttpResponse; import org.springframework.http.codec.BodyExtractor; +import org.springframework.http.codec.BodyExtractors; import org.springframework.http.codec.HttpMessageReader; /** @@ -71,8 +75,14 @@ class DefaultClientResponse implements ClientResponse { }); } - public ClientHttpResponse clientHttpResponse() { - return this.response; + @Override + public Mono bodyToMono(Class elementClass) { + return body(BodyExtractors.toMono(elementClass)); + } + + @Override + public Flux bodyToFlux(Class elementClass) { + return body(BodyExtractors.toFlux(elementClass)); } private class DefaultHeaders implements Headers { diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultClientResponseTests.java b/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultClientResponseTests.java index 4000c365f95..d58c6c71d1e 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultClientResponseTests.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/DefaultClientResponseTests.java @@ -115,5 +115,46 @@ public class DefaultClientResponseTests { assertEquals("foo", resultMono.block()); } + @Test + public void bodyToMono() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockResponse.getHeaders()).thenReturn(httpHeaders); + when(mockResponse.getBody()).thenReturn(body); + + Set> messageReaders = Collections + .singleton(new DecoderHttpMessageReader(new StringDecoder())); + when(mockWebClientStrategies.messageReaders()).thenReturn(messageReaders::stream); + + Mono resultMono = defaultClientResponse.bodyToMono(String.class); + assertEquals("foo", resultMono.block()); + } + + @Test + public void bodyToFlux() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + Flux body = Flux.just(dataBuffer); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockResponse.getHeaders()).thenReturn(httpHeaders); + when(mockResponse.getBody()).thenReturn(body); + + Set> messageReaders = Collections + .singleton(new DecoderHttpMessageReader(new StringDecoder())); + when(mockWebClientStrategies.messageReaders()).thenReturn(messageReaders::stream); + + Flux resultFlux = defaultClientResponse.bodyToFlux(String.class); + Mono> result = resultFlux.collectList(); + assertEquals(Collections.singletonList("foo"), result.block()); + } + } \ No newline at end of file