From 0241a02e66f56623877f5bc8d91684d8d315cb97 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 30 Apr 2024 11:49:15 +0200 Subject: [PATCH 1/2] Fix guard against multiple subscriptions This commit changes the guard against multiple subscriptions, as the previously used doOnSubscribe hook could not function as guard in certain scenarios. Closes gh-32727 --- .../reactive/AbstractClientHttpResponse.java | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java index 4b128b04748..60cf8b73f61 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java @@ -16,8 +16,12 @@ package org.springframework.http.client.reactive; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; @@ -55,16 +59,7 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { this.statusCode = statusCode; this.headers = headers; this.cookies = cookies; - this.body = singleSubscription(body); - } - - private static Flux singleSubscription(Flux body) { - AtomicBoolean subscribed = new AtomicBoolean(); - return body.doOnSubscribe(s -> { - if (!subscribed.compareAndSet(false, true)) { - throw new IllegalStateException("The client response body can only be consumed once"); - } - }); + this.body = Flux.from(new SingleSubscriberPublisher<>(body)); } @@ -87,4 +82,39 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { public Flux getBody() { return this.body; } + + + private static final class SingleSubscriberPublisher implements Publisher { + + private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() { + @Override + public void request(long l) { + } + + @Override + public void cancel() { + } + }; + + private final Publisher delegate; + + private final AtomicBoolean subscribed = new AtomicBoolean(); + + + public SingleSubscriberPublisher(Publisher delegate) { + this.delegate = delegate; + } + + @Override + public void subscribe(Subscriber subscriber) { + Objects.requireNonNull(subscriber, "Subscriber must not be null"); + if (this.subscribed.compareAndSet(false, true)) { + this.delegate.subscribe(subscriber); + } + else { + subscriber.onSubscribe(NO_OP_SUBSCRIPTION); + subscriber.onError(new IllegalStateException("The client response body can only be consumed once")); + } + } + } } From 32c80d5ae6629f86cd7c29cbb054e5175805f4cf Mon Sep 17 00:00:00 2001 From: Seungrae Kim Date: Tue, 30 Apr 2024 21:30:39 +0900 Subject: [PATCH 2/2] Fix incorrect class reference syntax in Kotlin code sample Closes gh-32733 --- .../modules/ROOT/pages/testing/webtestclient.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/framework-docs/modules/ROOT/pages/testing/webtestclient.adoc b/framework-docs/modules/ROOT/pages/testing/webtestclient.adoc index cd625209b35..3a4713b1332 100644 --- a/framework-docs/modules/ROOT/pages/testing/webtestclient.adoc +++ b/framework-docs/modules/ROOT/pages/testing/webtestclient.adoc @@ -672,13 +672,13 @@ Kotlin:: val result = client.get().uri("/persons/1") .exchange() .expectStatus().isOk() - .expectBody(Person.class) - .returnResult(); + .expectBody(Person::class.java) + .returnResult() // For a response without a body val result = client.get().uri("/path") .exchange() - .expectBody().isEmpty(); + .expectBody().isEmpty() ---- ======