diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java index 30759c0926c..2534de667a4 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -46,6 +47,7 @@ import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; +import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; @@ -379,6 +381,11 @@ class DefaultWebTestClient implements WebTestClient { return new DefaultBodyContentSpec(this.result.decodeToByteArray()); } + @Override + public FluxExchangeResult returnResult() { + return this.result.decodeToFlux(BodyExtractors.toDataBuffers()); + } + @Override public FluxExchangeResult returnResult(Class elementType) { return this.result.decodeToFlux(toFlux(elementType)); diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java index 90e33d9aea0..361718926b0 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java @@ -17,6 +17,7 @@ package org.springframework.test.web.reactive.server; import java.time.Duration; +import java.util.function.Consumer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -97,4 +98,23 @@ public class FluxExchangeResult extends ExchangeResult { .block(); } + /** + * Invoke the given consumer within {@link #assertWithDiagnostics(Runnable)} + * passing {@code "this"} instance to it. This method allows the following, + * without leaving the {@code WebTestClient} chain of calls: + *
+	 *	client.get()
+	 * 		.uri("/persons")
+	 * 		.accept(TEXT_EVENT_STREAM)
+	 * 		.exchange()
+	 * 		.expectStatus().isOk()
+	 *	 	.returnResult()
+	 *	 	.consumeWith(result -> assertThat(...);
+	 * 
+ * @param consumer consumer for {@code "this"} instance + */ + public void consumeWith(Consumer> consumer) { + assertWithDiagnostics(() -> consumer.accept(this)); + } + } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java index 2126d10a94b..ffc5e3b3b8c 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WebTestClient.java @@ -29,6 +29,7 @@ import org.reactivestreams.Publisher; import org.springframework.context.ApplicationContext; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.format.FormatterRegistry; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -663,6 +664,15 @@ public interface WebTestClient { * Variant of {@link #returnResult(Class)} for element types with generics. */ FluxExchangeResult returnResult(ParameterizedTypeReference elementType); + + /** + * Return the exchange result with the body decoded to + * {@code Flux}. Use this option for infinite streams and + * consume the stream with the {@code StepVerifier} from the Reactor Add-Ons. + * + * @return + */ + FluxExchangeResult returnResult(); } /**