diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java index 58b5e93c6c1..a54129ea6ea 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java @@ -16,8 +16,12 @@ package org.springframework.http.client.reactive; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; @@ -54,16 +58,7 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { this.statusCode = statusCode; this.headers = headers; this.cookies = cookies; - this.body = singleSubscription(body); - } - - private static Flux singleSubscription(Flux body) { - AtomicBoolean subscribed = new AtomicBoolean(); - return body.doOnSubscribe(s -> { - if (!subscribed.compareAndSet(false, true)) { - throw new IllegalStateException("The client response body can only be consumed once"); - } - }); + this.body = Flux.from(new SingleSubscriberPublisher<>(body)); } @@ -91,4 +86,39 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { public Flux getBody() { return this.body; } + + + private static final class SingleSubscriberPublisher implements Publisher { + + private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() { + @Override + public void request(long l) { + } + + @Override + public void cancel() { + } + }; + + private final Publisher delegate; + + private final AtomicBoolean subscribed = new AtomicBoolean(); + + + public SingleSubscriberPublisher(Publisher delegate) { + this.delegate = delegate; + } + + @Override + public void subscribe(Subscriber subscriber) { + Objects.requireNonNull(subscriber, "Subscriber must not be null"); + if (this.subscribed.compareAndSet(false, true)) { + this.delegate.subscribe(subscriber); + } + else { + subscriber.onSubscribe(NO_OP_SUBSCRIPTION); + subscriber.onError(new IllegalStateException("The client response body can only be consumed once")); + } + } + } }