Browse Source

Fix guard against multiple subscriptions

This commit changes the guard against multiple subscriptions, as the
previously used doOnSubscribe hook could not function as guard in
certain scenarios.

See gh-32727
Closes gh-32728
pull/32754/head
Arjen Poutsma 2 years ago
parent
commit
b3724ebf84
  1. 50
      spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java

50
spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpResponse.java

@ -16,8 +16,12 @@ @@ -16,8 +16,12 @@
package org.springframework.http.client.reactive;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
@ -54,16 +58,7 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { @@ -54,16 +58,7 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
this.statusCode = statusCode;
this.headers = headers;
this.cookies = cookies;
this.body = singleSubscription(body);
}
private static Flux<DataBuffer> singleSubscription(Flux<DataBuffer> body) {
AtomicBoolean subscribed = new AtomicBoolean();
return body.doOnSubscribe(s -> {
if (!subscribed.compareAndSet(false, true)) {
throw new IllegalStateException("The client response body can only be consumed once");
}
});
this.body = Flux.from(new SingleSubscriberPublisher<>(body));
}
@ -91,4 +86,39 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse { @@ -91,4 +86,39 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
public Flux<DataBuffer> getBody() {
return this.body;
}
private static final class SingleSubscriberPublisher<T> implements Publisher<T> {
private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() {
@Override
public void request(long l) {
}
@Override
public void cancel() {
}
};
private final Publisher<T> delegate;
private final AtomicBoolean subscribed = new AtomicBoolean();
public SingleSubscriberPublisher(Publisher<T> delegate) {
this.delegate = delegate;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "Subscriber must not be null");
if (this.subscribed.compareAndSet(false, true)) {
this.delegate.subscribe(subscriber);
}
else {
subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
subscriber.onError(new IllegalStateException("The client response body can only be consumed once"));
}
}
}
}

Loading…
Cancel
Save