diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java index e1fc6bb3d44..6e04636749e 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -291,7 +290,7 @@ class DefaultWebTestClient implements WebTestClient { public FluxExchangeResult decodeBody(ResolvableType elementType) { Flux body = this.response.body(toFlux(elementType)); - return new FluxExchangeResult<>(this, body); + return new FluxExchangeResult<>(this, body, getTimeout()); } @SuppressWarnings("unchecked") @@ -340,17 +339,6 @@ class DefaultWebTestClient implements WebTestClient { public BodySpec expectBody() { return new DefaultBodySpec(this.result); } - - @Override - public ResponseSpec consumeWith(Consumer consumer) { - this.result.assertWithDiagnostics(() -> consumer.accept(this.result)); - return this; - } - - @Override - public ExchangeResult returnResult() { - return this.result; - } } private class DefaultTypeBodySpec implements TypeBodySpec { diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/EntityExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/EntityExchangeResult.java index c0becbe21c4..1a7abd2894b 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/EntityExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/EntityExchangeResult.java @@ -37,7 +37,7 @@ public class EntityExchangeResult extends ExchangeResult { /** - * Return the body extracted from the response. + * Return the entity extracted from the response body. */ public T getResponseBody() { return this.body; diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java index 8f03f4e9680..0f13ff3b943 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java @@ -18,6 +18,7 @@ package org.springframework.test.web.reactive.server; import java.net.URI; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -29,20 +30,18 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseCookie; +import org.springframework.util.Assert; import org.springframework.util.MultiValueMap; /** - * Provides access to request and response details from an exchange performed - * through the {@link WebTestClient}. + * Container for request and response details for exchanges performed through + * {@link WebTestClient}. * - *

When an {@code ExchangeResult} is first created it has the status and the - * headers of the response ready. Later when the response body is extracted, - * the {@code ExchangeResult} is re-created as {@link EntityExchangeResult} or - * {@link FluxExchangeResult} also exposing the extracted entities. - * - *

Serialized request and response content may also be accessed through the - * methods {@link #getRequestContent()} and {@link #getResponseContent()} after - * that content has been fully read or written. + *

Note that a decoded response body is not exposed at this level since the + * body may not have been decoded and consumed yet. Sub-types + * {@link EntityExchangeResult} and {@link FluxExchangeResult} provide access + * to a decoded response entity and a decoded (but not consumed) response body + * respectively. * * @author Rossen Stoyanchev * @since 5.0 @@ -53,8 +52,8 @@ import org.springframework.util.MultiValueMap; public class ExchangeResult { private static final List PRINTABLE_MEDIA_TYPES = Arrays.asList( - MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.parseMediaType("text/*"), - MediaType.APPLICATION_FORM_URLENCODED); + MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, + MediaType.parseMediaType("text/*"), MediaType.APPLICATION_FORM_URLENCODED); private final WiretapClientHttpRequest request; @@ -101,12 +100,16 @@ public class ExchangeResult { } /** - * Return a "promise" for the raw request body content once completed. + * Return the raw request body content written as a {@code byte[]}. + * @throws IllegalStateException if the request body is not fully written yet. */ - public MonoProcessor getRequestContent() { - return this.request.getBodyContent(); + public byte[] getRequestBodyContent() { + MonoProcessor body = this.request.getRecordedContent(); + Assert.isTrue(body.isTerminated(), "Request body incomplete."); + return body.block(Duration.ZERO); } + /** * Return the status of the executed request. */ @@ -129,10 +132,13 @@ public class ExchangeResult { } /** - * Return a "promise" for the raw response body content once completed. + * Return the raw request body content written as a {@code byte[]}. + * @throws IllegalStateException if the response is not fully read yet. */ - public MonoProcessor getResponseContent() { - return this.response.getBodyContent(); + public byte[] getResponseBodyContent() { + MonoProcessor body = this.response.getRecordedContent(); + Assert.state(body.isTerminated(), "Response body incomplete."); + return body.block(Duration.ZERO); } @@ -157,12 +163,12 @@ public class ExchangeResult { "> " + getMethod() + " " + getUrl() + "\n" + "> " + formatHeaders(getRequestHeaders(), "\n> ") + "\n" + "\n" + - formatBody(getRequestHeaders().getContentType(), getRequestContent()) + "\n" + + formatBody(getRequestHeaders().getContentType(), this.request.getRecordedContent()) + "\n" + "\n" + "< " + getStatus() + " " + getStatusReason() + "\n" + "< " + formatHeaders(getResponseHeaders(), "\n< ") + "\n" + "\n" + - formatBody(getResponseHeaders().getContentType(), getResponseContent()) + "\n\n"; + formatBody(getResponseHeaders().getContentType(), this.response.getRecordedContent()) + "\n\n"; } private String getStatusReason() { diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java index c44371a5474..f57fde83f87 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java @@ -15,10 +15,14 @@ */ package org.springframework.test.web.reactive.server; +import java.time.Duration; + import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** - * {@code ExchangeResult} variant with the response body as a {@code Flux}. + * {@code ExchangeResult} variant with the response body decoded as + * {@code Flux} but not yet consumed. * * @param the type of elements in the response body * @@ -28,20 +32,65 @@ import reactor.core.publisher.Flux; */ public class FluxExchangeResult extends ExchangeResult { + private static final IllegalStateException TIMEOUT_ERROR = + new IllegalStateException("Response timeout: for infinite streams " + + "use getResponseBody() first with explicit cancellation, e.g. via take(n)."); + + private final Flux body; + private final Duration timeout; + - FluxExchangeResult(ExchangeResult result, Flux body) { + FluxExchangeResult(ExchangeResult result, Flux body, Duration timeout) { super(result); this.body = body; + this.timeout = timeout; } /** - * Return the {@code Flux} of elements decoded from the response body. + * Return the response body as a {@code Flux} of decoded elements. + * + *

The response body stream can then be consumed further with the + * "reactor-test" {@code StepVerifier} and cancelled when enough elements have been + * consumed from the (possibly infinite) stream: + * + *

+	 * FluxExchangeResult result = this.client.get()
+	 * 	.uri("/persons")
+	 * 	.accept(TEXT_EVENT_STREAM)
+	 * 	.exchange()
+	 * 	.expectStatus().isOk()
+	 * 	.expectHeader().contentType(TEXT_EVENT_STREAM)
+	 * 	.expectBody(Person.class)
+	 * 	.returnResult();
+	 *
+	 * StepVerifier.create(result.getResponseBody())
+	 * 	.expectNext(new Person("Jane"), new Person("Jason"))
+	 * 	.expectNextCount(4)
+	 * 	.expectNext(new Person("Jay"))
+	 * 	.thenCancel()
+	 * 	.verify();
+	 * 
*/ public Flux getResponseBody() { return this.body; } + /** + * {@inheritDoc} + *

Note: this method should typically be called after + * the response has been consumed in full via {@link #getResponseBody()}. + * Calling it first will cause the response {@code Flux} to be consumed + * via {@code getResponseBody.ignoreElements()}. + */ + @Override + public byte[] getResponseBodyContent() { + return this.body.ignoreElements() + .timeout(this.timeout, Mono.error(TIMEOUT_ERROR)) + .then(() -> Mono.just(super.getResponseBodyContent())) + .block(); + } + } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java index c0c1a04423b..dd5907bfe19 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java @@ -483,19 +483,6 @@ public interface WebTestClient { */ BodySpec expectBody(); - /** - * Consume request and response details of the exchange. Only status - * response headers are available at this stage before one of the - * {@code expectBody} methods is used. - */ - ResponseSpec consumeWith(Consumer consumer); - - /** - * Return the request and response details of the exchange. Only status - * and response headers are available at this stage before one of the - * {@code expectBody} methods is used. - */ - ExchangeResult returnResult(); } /** diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpRequest.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpRequest.java index 28f73845d59..b65c0c75a42 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpRequest.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpRequest.java @@ -52,7 +52,7 @@ class WiretapClientHttpRequest extends ClientHttpRequestDecorator { /** * Return a "promise" with the request body content written to the server. */ - public MonoProcessor getBodyContent() { + public MonoProcessor getRecordedContent() { return this.body; } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpResponse.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpResponse.java index 61e2a0dd16e..f9ea21fe1ce 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapClientHttpResponse.java @@ -50,7 +50,7 @@ class WiretapClientHttpResponse extends ClientHttpResponseDecorator { /** * Return a "promise" with the response body content read from the server. */ - public MonoProcessor getBodyContent() { + public MonoProcessor getRecordedContent() { return this.body; }