|
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference;
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
import java.util.function.Supplier; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
|
|
|
|
|
|
@ -57,7 +58,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@@ -57,7 +58,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<State> state = new AtomicReference<>(State.NEW); |
|
|
|
|
|
|
|
|
|
private final List<Supplier<? extends Mono<Void>>> commitActions = new ArrayList<>(4); |
|
|
|
|
private final List<Supplier<? extends Publisher<Void>>> commitActions = new ArrayList<>(4); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public AbstractClientHttpRequest() { |
|
|
|
|
@ -112,7 +113,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@@ -112,7 +113,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
|
|
|
|
|
* @param writeAction the action to write the request body (may be {@code null}) |
|
|
|
|
* @return a completion publisher |
|
|
|
|
*/ |
|
|
|
|
protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeAction) { |
|
|
|
|
protected Mono<Void> doCommit(@Nullable Supplier<? extends Publisher<Void>> writeAction) { |
|
|
|
|
if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) { |
|
|
|
|
return Mono.empty(); |
|
|
|
|
} |
|
|
|
|
@ -128,10 +129,10 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
@@ -128,10 +129,10 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
|
|
|
|
|
this.commitActions.add(writeAction); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
List<? extends Mono<Void>> actions = this.commitActions.stream() |
|
|
|
|
List<? extends Publisher<Void>> actions = this.commitActions.stream() |
|
|
|
|
.map(Supplier::get).collect(Collectors.toList()); |
|
|
|
|
|
|
|
|
|
return Flux.concat(actions).next(); |
|
|
|
|
return Mono.fromDirect(Flux.concat(actions)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|