From d4411f4cc63186f45fbba757aff713c7dd441fb9 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 13 Dec 2016 22:58:01 +0100 Subject: [PATCH] Update AbstractClientHttpRequest with server changes This commit updates `AbstractClientHttpRequest` to make it more similar to its server counterpart. --- .../reactive/AbstractClientHttpRequest.java | 84 +++++++++++++------ .../reactive/ReactorClientHttpRequest.java | 11 ++- .../reactive/test/MockClientHttpRequest.java | 10 +-- 3 files changed, 69 insertions(+), 36 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java index 38d9b24b553..3c0d5c4813b 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/AbstractClientHttpRequest.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.http.HttpCookie; @@ -39,13 +41,21 @@ import org.springframework.util.MultiValueMap; */ public abstract class AbstractClientHttpRequest implements ClientHttpRequest { + /** + * COMMITTING -> COMMITTED is the period after doCommit is called but before + * the response status and headers have been applied to the underlying + * response during which time pre-commit actions can still make changes to + * the response status and headers. + */ + private enum State {NEW, COMMITTING, COMMITTED} + private final HttpHeaders headers; private final MultiValueMap cookies; private AtomicReference state = new AtomicReference<>(State.NEW); - private final List>> beforeCommitActions = new ArrayList<>(4); + private final List>> commitActions = new ArrayList<>(4); public AbstractClientHttpRequest() { @@ -61,7 +71,7 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest { @Override public HttpHeaders getHeaders() { - if (State.COMITTED.equals(this.state.get())) { + if (State.COMMITTED.equals(this.state.get())) { return HttpHeaders.readOnlyHttpHeaders(this.headers); } return this.headers; @@ -69,42 +79,66 @@ public abstract class AbstractClientHttpRequest implements ClientHttpRequest { @Override public MultiValueMap getCookies() { - if (State.COMITTED.equals(this.state.get())) { + if (State.COMMITTED.equals(this.state.get())) { return CollectionUtils.unmodifiableMultiValueMap(this.cookies); } return this.cookies; } - protected Mono applyBeforeCommit() { - Mono mono = Mono.empty(); - if (this.state.compareAndSet(State.NEW, State.COMMITTING)) { - for (Supplier> action : this.beforeCommitActions) { - mono = mono.then(() -> action.get()); - } - return mono - .otherwise(ex -> { - // Ignore errors from beforeCommit actions - return Mono.empty(); - }) - .then(() -> { - this.state.set(State.COMITTED); - writeHeaders(); - writeCookies(); - return Mono.empty(); - }); + /** + * A variant of {@link #doCommit(Supplier)} for a request without body. + * @return a completion publisher + */ + protected Mono doCommit() { + return (this.state.get() == State.NEW ? doCommit(null) : Mono.empty()); + } + + /** + * Apply {@link #beforeCommit(Supplier) beforeCommit} actions, apply the + * request headers/cookies, and write the request body. + * @param writeAction the action to write the request body or {@code null} + * @return a completion publisher + */ + protected Mono doCommit(Supplier> writeAction) { + + if (!this.state.compareAndSet(AbstractClientHttpRequest.State.NEW, AbstractClientHttpRequest.State.COMMITTING)) { + return Mono.empty(); + } + + this.commitActions.add(() -> { + applyHeaders(); + applyCookies(); + this.state.set(AbstractClientHttpRequest.State.COMMITTED); + return Mono.empty(); + }); + + if (writeAction != null) { + this.commitActions.add(writeAction); } - return mono; + + List> actions = this.commitActions.stream() + .map(Supplier::get).collect(Collectors.toList()); + + return Flux.concat(actions).next(); } @Override public void beforeCommit(Supplier> action) { Assert.notNull(action); - this.beforeCommitActions.add(action); + this.commitActions.add(action); } - protected abstract void writeHeaders(); + /** + * Implement this method to apply header changes from {@link #getHeaders()} + * to the underlying response. This method is called once only. + */ + protected abstract void applyHeaders(); + + /** + * Implement this method to add cookies from {@link #getHeaders()} to the + * underlying response. This method is called once only. + */ + protected abstract void applyCookies(); - protected abstract void writeCookies(); - private enum State {NEW, COMMITTING, COMITTED} } \ No newline at end of file diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 46c9dd6715a..216fade96d7 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -75,7 +75,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { - return applyBeforeCommit().then(this.httpRequest + return doCommit(() -> this.httpRequest .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); } @@ -83,8 +83,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body). map(ReactorClientHttpRequest::toByteBufs); - return applyBeforeCommit().then(this.httpRequest - .sendGroups(byteBufs).then()); + return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then()); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -94,17 +93,17 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono setComplete() { - return applyBeforeCommit().then(httpRequest.sendHeaders().then()); + return doCommit(() -> httpRequest.sendHeaders().then()); } @Override - protected void writeHeaders() { + protected void applyHeaders() { getHeaders().entrySet() .forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue())); } @Override - protected void writeCookies() { + protected void applyCookies() { getCookies().values().stream().flatMap(Collection::stream) .map(cookie -> new DefaultCookie(cookie.getName(), cookie.getValue())) .forEach(this.httpRequest::addCookie); diff --git a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java index 2125f92defa..5b6427e24b2 100644 --- a/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java +++ b/spring-web/src/test/java/org/springframework/web/client/reactive/test/MockClientHttpRequest.java @@ -92,13 +92,13 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { this.body = Flux.from(body); - return applyBeforeCommit().then(this.body.then()); + return doCommit(() -> this.body.then()); } @Override public Mono writeAndFlushWith(Publisher> body) { this.bodyWithFlushes = Flux.from(body).map(p -> Flux.from(p)); - return applyBeforeCommit().then(this.bodyWithFlushes.then()); + return doCommit(() -> this.bodyWithFlushes.then()); } public Publisher getBody() { @@ -111,12 +111,12 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono setComplete() { - return applyBeforeCommit().then(); + return doCommit().then(); } @Override - protected void writeHeaders() { } + protected void applyHeaders() { } @Override - protected void writeCookies() { } + protected void applyCookies() { } }