From c5ca56bd50af8e86be58feb8f69080f719045ff7 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 12 Oct 2018 12:15:45 -0400 Subject: [PATCH] Polish --- .../web/reactive/server/ExchangeResult.java | 4 +- .../web/reactive/server/WiretapConnector.java | 71 ++++++++----------- 2 files changed, 30 insertions(+), 45 deletions(-) diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java index 81ed0ea199e..7ab56a90579 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java @@ -231,7 +231,7 @@ public class ExchangeResult { return body .map(bytes -> { if (contentType == null) { - return "Unknown content type (" + bytes.length + " bytes)"; + return bytes.length + " bytes of content (unknown content-type)."; } Charset charset = contentType.getCharset(); if (charset != null) { @@ -240,7 +240,7 @@ public class ExchangeResult { if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) { return new String(bytes, StandardCharsets.UTF_8); } - return "Unknown charset (" + bytes.length + " bytes)"; + return bytes.length + " bytes of content."; }) .defaultIfEmpty("No content") .onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage())) diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index 2a051a3bc03..13547d1336f 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -115,10 +114,8 @@ class WiretapConnector implements ClientHttpConnector { public ExchangeResult createExchangeResult(Duration timeout, @Nullable String uriTemplate) { - return new ExchangeResult(this.request, this.response, - Mono.defer(() -> this.request.getRecorder().getContent()), - Mono.defer(() -> this.response.getRecorder().getContent()), - timeout, uriTemplate); + return new ExchangeResult(this.request, this.response, this.request.getRecorder().getContent(), + this.response.getRecorder().getContent(), timeout, uriTemplate); } } @@ -137,11 +134,11 @@ class WiretapConnector implements ClientHttpConnector { @Nullable private final Flux> publisherNested; - private final DataBuffer buffer; + private final DataBuffer buffer = bufferFactory.allocateBuffer(); - private final MonoProcessor content; + private final MonoProcessor content = MonoProcessor.create(); - private volatile boolean subscriberRegistered; + private boolean hasContentConsumer; public WiretapRecorder(@Nullable Publisher publisher, @@ -153,22 +150,23 @@ class WiretapConnector implements ClientHttpConnector { this.publisher = publisher != null ? Flux.from(publisher) - .doOnSubscribe(this::handleOnSubscribe) - .doOnNext(this::handleOnNext) + .doOnSubscribe(s -> this.hasContentConsumer = true) + .doOnNext(this.buffer::write) .doOnError(this::handleOnError) .doOnCancel(this::handleOnComplete) .doOnComplete(this::handleOnComplete) : null; this.publisherNested = publisherNested != null ? Flux.from(publisherNested) - .doOnSubscribe(this::handleOnSubscribe) - .map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError)) + .doOnSubscribe(s -> this.hasContentConsumer = true) + .map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError)) .doOnError(this::handleOnError) .doOnCancel(this::handleOnComplete) .doOnComplete(this::handleOnComplete) : null; - this.buffer = bufferFactory.allocateBuffer(); - this.content = MonoProcessor.create(); + if (publisher == null && publisherNested == null) { + this.content.onComplete(); + } } @@ -183,39 +181,26 @@ class WiretapConnector implements ClientHttpConnector { } public Mono getContent() { - // No publisher (e.g. request#setComplete) - if (this.publisher == null && this.publisherNested == null) { - return Mono.empty(); - } - if (this.content.isTerminated()) { - return this.content; - } - if (this.subscriberRegistered) { - return Mono.error(new IllegalStateException( - "Subscriber registered but content is not yet fully consumed.")); - } - else { - // No subscriber, e.g.: - // - mock server request body never consumed (error before read) - // - FluxExchangeResult#getResponseBodyContent called - (this.publisher != null ? this.publisher : this.publisherNested) - .onErrorMap(ex -> new IllegalStateException( - "Content was not been consumed and " + - "an error was raised on attempt to produce it:", ex)) - .subscribe(); + return Mono.defer(() -> { + if (this.content.isTerminated()) { + return this.content; + } + if (!this.hasContentConsumer) { + // Couple of possible cases: + // 1. Mock server never consumed request body (e.g. error before read) + // 2. FluxExchangeResult: getResponseBodyContent called before getResponseBody + //noinspection ConstantConditions + (this.publisher != null ? this.publisher : this.publisherNested) + .onErrorMap(ex -> new IllegalStateException( + "Content was not been consumed and " + + "an error was raised on attempt to produce it:", ex)) + .subscribe(); + } return this.content; - } + }); } - private void handleOnSubscribe(Subscription subscription) { - this.subscriberRegistered = true; - } - - private void handleOnNext(DataBuffer nextBuffer) { - this.buffer.write(nextBuffer); - } - private void handleOnError(Throwable ex) { if (!this.content.isTerminated()) { this.content.onError(ex);