diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 0f2449ad807..c707f4865e5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -52,10 +52,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { private volatile long demand; - private volatile boolean publisherCompleted; + private volatile boolean completionBeforeDemand; @Nullable - private volatile Throwable publisherError; + private volatile Throwable errorBeforeDemand; @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater DEMAND_FIELD_UPDATER = @@ -76,11 +76,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } - // Listener delegation methods... + // Methods for sub-classes to delegate to, when async I/O events occur... - /** - * Listeners can call this to notify when reading is possible. - */ public final void onDataAvailable() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onDataAvailable"); @@ -88,9 +85,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { this.state.get().onDataAvailable(this); } - /** - * Listeners can call this to notify when all data has been read. - */ public void onAllDataRead() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onAllDataRead"); @@ -98,9 +92,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { this.state.get().onAllDataRead(this); } - /** - * Listeners can call this to notify when a read error has occurred. - */ public final void onError(Throwable t) { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onError: " + t); @@ -109,11 +100,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } + // Methods for sub-classes to implement... + + /** + * Check if data is available, calling {@link #onDataAvailable()} either + * immediately or later when reading is possible. + */ protected abstract void checkOnDataAvailable(); /** - * Reads a data from the input, if possible. - * @return the data that was read; or {@code null} + * Read once from the input, if possible. + * @return the item that was read; or {@code null} */ @Nullable protected abstract T read() throws IOException; @@ -125,14 +122,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } + // Private methods for use in State... + /** - * Read and publish data from the input. Continue till there is no more - * demand or there is no more data to be read. - * @return {@code true} if there is more demand; {@code false} otherwise + * Read and publish data one at a time until there is no more data, no more + * demand, or perhaps we completed in the mean time. + * @return {@code true} if there is more demand; {@code false} if there is + * no more demand or we have completed. */ private boolean readAndPublish() throws IOException { long r; - while ((r = demand) > 0 && !publisherCompleted) { + while ((r = this.demand) > 0 && !this.state.get().equals(State.COMPLETED)) { T data = read(); if (data != null) { if (r != Long.MAX_VALUE) { @@ -152,96 +152,95 @@ public abstract class AbstractListenerReadPublisher implements Publisher { return this.state.compareAndSet(oldState, newState); } + private Subscription createSubscription() { + return new ReadSubscription(); + } - private static final class ReadSubscription implements Subscription { - private final AbstractListenerReadPublisher publisher; + /** + * Subscription that delegates signals to State. + */ + private final class ReadSubscription implements Subscription { - public ReadSubscription(AbstractListenerReadPublisher publisher) { - this.publisher = publisher; - } @Override public final void request(long n) { - if (this.publisher.logger.isTraceEnabled()) { - this.publisher.logger.trace(state() + " request: " + n); + if (logger.isTraceEnabled()) { + logger.trace(state + " request: " + n); } - state().request(this.publisher, n); + state.get().request(AbstractListenerReadPublisher.this, n); } @Override public final void cancel() { - if (this.publisher.logger.isTraceEnabled()) { - this.publisher.logger.trace(state() + " cancel"); + if (logger.isTraceEnabled()) { + logger.trace(state + " cancel"); } - state().cancel(this.publisher); - } - - private State state() { - return this.publisher.state.get(); + state.get().cancel(AbstractListenerReadPublisher.this); } } /** - * Represents a state for the {@link Publisher} to be in. The following figure - * indicate the four different states that exist, and the relationships between them. - * - *
-	 *       UNSUBSCRIBED
-	 *        |
-	 *        v
-	 * NO_DEMAND -------------------> DEMAND
-	 *    |    ^                      ^    |
-	 *    |    |                      |    |
-	 *    |    --------- READING <-----    |
-	 *    |                 |              |
-	 *    |                 v              |
-	 *    ------------> COMPLETED <---------
+	 * Represents a state for the {@link Publisher} to be in.
+	 * 

+	 *        UNSUBSCRIBED
+	 *             |
+	 *             v
+	 *        SUBSCRIBING
+	 *             |
+	 *             v
+	 *    +---- NO_DEMAND ---------------> DEMAND ---+
+	 *    |        ^                         ^       |
+	 *    |        |                         |       |
+	 *    |        +------- READING <--------+       |
+	 *    |                    |                     |
+	 *    |                    v                     |
+	 *    +--------------> COMPLETED <---------------+
 	 * 
- * Refer to the individual states for more information. */ private enum State { - /** - * The initial unsubscribed state. Will respond to {@link - * #subscribe(AbstractListenerReadPublisher, Subscriber)} by - * changing state to {@link #NO_DEMAND}. - */ UNSUBSCRIBED { @Override void subscribe(AbstractListenerReadPublisher publisher, Subscriber subscriber) { Assert.notNull(publisher, "Publisher must not be null"); Assert.notNull(subscriber, "Subscriber must not be null"); if (publisher.changeState(this, SUBSCRIBING)) { - Subscription subscription = new ReadSubscription(publisher); + Subscription subscription = publisher.createSubscription(); publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); publisher.changeState(SUBSCRIBING, NO_DEMAND); - if (publisher.publisherCompleted) { - publisher.onAllDataRead(); + // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND + if (publisher.completionBeforeDemand) { + publisher.state.get().onAllDataRead(publisher); } - Throwable publisherError = publisher.publisherError; - if (publisherError != null) { - publisher.onError(publisherError); + Throwable ex = publisher.errorBeforeDemand; + if (ex != null) { + publisher.state.get().onError(publisher, ex); } } else { - throw new IllegalStateException(toString()); + throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + + "subscriber: " + subscriber); } } @Override void onAllDataRead(AbstractListenerReadPublisher publisher) { - publisher.publisherCompleted = true; + publisher.completionBeforeDemand = true; } @Override - void onError(AbstractListenerReadPublisher publisher, Throwable t) { - publisher.publisherError = t; + void onError(AbstractListenerReadPublisher publisher, Throwable ex) { + publisher.errorBeforeDemand = ex; } }, + /** + * Very brief state where we know we have a Subscriber but must not + * send onComplete and onError until we after onSubscribe. + */ SUBSCRIBING { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { @@ -254,21 +253,15 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onAllDataRead(AbstractListenerReadPublisher publisher) { - publisher.publisherCompleted = true; + publisher.completionBeforeDemand = true; } @Override - void onError(AbstractListenerReadPublisher publisher, Throwable t) { - publisher.publisherError = t; + void onError(AbstractListenerReadPublisher publisher, Throwable ex) { + publisher.errorBeforeDemand = ex; } }, - /** - * State that gets entered when there is no demand. Responds to {@link - * #request(AbstractListenerReadPublisher, long)} by increasing the demand, - * changing state to {@link #DEMAND} and will check whether there - * is data available for reading. - */ NO_DEMAND { @Override void request(AbstractListenerReadPublisher publisher, long n) { @@ -277,21 +270,17 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (publisher.changeState(this, DEMAND)) { publisher.checkOnDataAvailable(); } + // or else we completed at the same time... } } }, - /** - * State that gets entered when there is demand. Responds to - * {@link #onDataAvailable(AbstractListenerReadPublisher)} by - * reading the available data. The state will be changed to - * {@link #NO_DEMAND} if there is no demand. - */ DEMAND { @Override void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + // Did a concurrent read transition to NO_DEMAND just before us? if (publisher.changeState(NO_DEMAND, DEMAND)) { publisher.checkOnDataAvailable(); } @@ -304,6 +293,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (!read(publisher)) { return; } + // Maybe demand arrived between readAndPublish and READING->NO_DEMAND? long r = publisher.demand; if (r == 0 || publisher.changeState(NO_DEMAND, this)) { break; @@ -311,6 +301,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } } + /** + * @return whether to exit the read loop; false means stop trying + * to read, true means check demand one more time. + */ boolean read(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { @@ -318,18 +312,19 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (demandAvailable) { if (publisher.changeState(READING, DEMAND)) { publisher.checkOnDataAvailable(); - return false; } } else if (publisher.changeState(READING, NO_DEMAND)) { publisher.suspendReading(); + return true; } } catch (IOException ex) { publisher.onError(ex); } - return true; } + // Either competing onDataAvailable calls (via request or container callback) + // Or a concurrent completion return false; } }, @@ -339,6 +334,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + // Did a concurrent read transition to NO_DEMAND just before us? if (publisher.changeState(NO_DEMAND, DEMAND)) { publisher.checkOnDataAvailable(); } @@ -346,9 +342,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } }, - /** - * The terminal completed state. Does not respond to any events. - */ COMPLETED { @Override void request(AbstractListenerReadPublisher publisher, long n) { @@ -377,10 +370,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } void cancel(AbstractListenerReadPublisher publisher) { - if (publisher.changeState(this, COMPLETED)) { - publisher.publisherCompleted = true; - } - else { + if (!publisher.changeState(this, COMPLETED)) { publisher.state.get().cancel(publisher); } } @@ -391,7 +381,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void onAllDataRead(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { - publisher.publisherCompleted = true; if (publisher.subscriber != null) { publisher.subscriber.onComplete(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 613909d3e9a..3d47ef80919 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -43,17 +43,17 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo protected final Log logger = LogFactory.getLog(getClass()); - private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); - private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); - private volatile boolean subscriberCompleted; - @Nullable private Subscription subscription; + private volatile boolean subscriberCompleted; + + private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); - // Subscriber implementation... + + // Subscriber methods... @Override public final void onSubscribe(Subscription subscription) { @@ -88,7 +88,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } - // Publisher implementation... + // Publisher method... @Override public final void subscribe(Subscriber subscriber) { @@ -96,23 +96,16 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } - /** - * Listeners can call this method to cancel further writing. - */ + // Methods for sub-classes to delegate to, when async I/O events occur... + protected void cancel() { if (this.subscription != null) { this.subscription.cancel(); } } - /** - * Invoked when an error happens while flushing. Defaults to no-op. - * Servlet 3.1 based implementations will receive an - * {@link javax.servlet.AsyncListener#onError} event. - */ - protected void flushingFailed(Throwable t) { - } + // Methods for sub-classes to implement or override... /** * Create a new processor for subscribing to the next flush boundary. @@ -120,14 +113,18 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo protected abstract Processor createWriteProcessor(); /** - * Flush the output. + * Flush the output if ready, or otherwise {@link #isFlushPending()} should + * return true after that. */ protected abstract void flush() throws IOException; /** - * Whether writing is possible. + * Invoked when an error happens while flushing. Defaults to no-op. + * Servlet 3.1 based implementations will receive an + * {@link javax.servlet.AsyncListener#onError} event. */ - protected abstract boolean isWritePossible(); + protected void flushingFailed(Throwable t) { + } /** * Whether flushing is pending. @@ -141,25 +138,41 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo this.state.get().onFlushPossible(this); } - private void flushIfPossible() { - if (isWritePossible()) { - onFlushPossible(); - } - } + /** + * Whether writing is possible. + */ + protected abstract boolean isWritePossible(); + + // Private methods for use in State... private boolean changeState(State oldState, State newState) { return this.state.compareAndSet(oldState, newState); } - private void writeComplete() { - if (logger.isTraceEnabled()) { - logger.trace(this.state + " writeComplete"); + private void flushIfPossible() { + if (isWritePossible()) { + onFlushPossible(); } - this.state.get().writeComplete(this); } + /** + * Represents a state for the {@link Processor} to be in. + * + *

+	 *        UNSUBSCRIBED
+	 *             |
+	 *             v
+	 *   +--- REQUESTED <--------> RECEIVED ---+
+	 *   |                            |        |
+	 *   |                            |        |
+	 *   |            FLUSHING <------+        |
+	 *   |                |                    |
+	 *   |                v                    |
+	 *   +----------> COMPLETED <--------------+
+	 * 
+ */ private enum State { UNSUBSCRIBED { @@ -178,11 +191,13 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo REQUESTED { @Override - public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher chunk) { + public void onNext(AbstractListenerWriteFlushProcessor processor, + Publisher currentPublisher) { + if (processor.changeState(this, RECEIVED)) { - Processor chunkProcessor = processor.createWriteProcessor(); - chunk.subscribe(chunkProcessor); - chunkProcessor.subscribe(new WriteSubscriber(processor)); + Processor currentProcessor = processor.createWriteProcessor(); + currentPublisher.subscribe(currentProcessor); + currentProcessor.subscribe(new WriteResultSubscriber(processor)); } } @Override @@ -202,25 +217,25 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo try { processor.flush(); } - catch (IOException ex) { + catch (Throwable ex) { processor.flushingFailed(ex); return; } - if (processor.subscriberCompleted) { - if (processor.isFlushPending()) { - // Ensure the final flush - processor.changeState(this, FLUSHING); - processor.flushIfPossible(); - } - else if (processor.changeState(this, COMPLETED)) { - processor.resultPublisher.publishComplete(); + if (processor.changeState(this, REQUESTED)) { + if (processor.subscriberCompleted) { + if (processor.isFlushPending()) { + // Ensure the final flush + processor.changeState(REQUESTED, FLUSHING); + processor.flushIfPossible(); + } + else if (processor.changeState(REQUESTED, COMPLETED)) { + processor.resultPublisher.publishComplete(); + } + else { + processor.state.get().onComplete(processor); + } } else { - processor.state.get().onComplete(processor); - } - } - else { - if (processor.changeState(this, REQUESTED)) { Assert.state(processor.subscription != null, "No subscription"); processor.subscription.request(1); } @@ -237,7 +252,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo try { processor.flush(); } - catch (IOException ex) { + catch (Throwable ex) { processor.flushingFailed(ex); return; } @@ -272,6 +287,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } }; + public void onSubscribe(AbstractListenerWriteFlushProcessor processor, Subscription subscription) { subscription.cancel(); } @@ -302,11 +318,16 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } - private static class WriteSubscriber implements Subscriber { + /** + * Subscriber to receive and delegate completion notifications for from + * the current Publisher, i.e. within the current flush boundary. + */ + private static class WriteResultSubscriber implements Subscriber { private final AbstractListenerWriteFlushProcessor processor; - public WriteSubscriber(AbstractListenerWriteFlushProcessor processor) { + + public WriteResultSubscriber(AbstractListenerWriteFlushProcessor processor) { this.processor = processor; } @@ -327,7 +348,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public void onComplete() { - this.processor.writeComplete(); + if (this.processor.logger.isTraceEnabled()) { + this.processor.logger.trace(this.processor.state + " writeComplete"); + } + this.processor.state.get().writeComplete(this.processor); } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index efd91c58344..e5041b1c1e4 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -45,20 +45,20 @@ public abstract class AbstractListenerWriteProcessor implements Processor state = new AtomicReference<>(State.UNSUBSCRIBED); + @Nullable + private Subscription subscription; + @Nullable protected volatile T currentData; private volatile boolean subscriberCompleted; - @Nullable - private Subscription subscription; + private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); - // Subscriber implementation... + // Subscriber methods... @Override public final void onSubscribe(Subscription subscription) { @@ -93,7 +93,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor subscriber) { @@ -101,18 +101,12 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor implements Processor implements Processor - * UNSUBSCRIBED - * | - * v - * REQUESTED -------------------> RECEIVED - * ^ ^ - * | | - * --------- WRITING <----- - * | - * v - * COMPLETED + *

+	 *        UNSUBSCRIBED
+	 *             |
+	 *             v
+	 *   +--- REQUESTED -------------> RECEIVED ---+
+	 *   |        ^                       ^        |
+	 *   |        |                       |        |
+	 *   |        + ------ WRITING <------+        |
+	 *   |                    |                    |
+	 *   |                    v                    |
+	 *   +--------------> COMPLETED <--------------+
 	 * 
- * Refer to the individual states for more information. */ private enum State { - /** - * The initial unsubscribed state. Will respond to {@code onSubscribe} by - * requesting 1 data from the subscription, and change state to {@link - * #REQUESTED}. - */ UNSUBSCRIBED { @Override public void onSubscribe(AbstractListenerWriteProcessor processor, Subscription subscription) { @@ -225,12 +227,6 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onNext(AbstractListenerWriteProcessor processor, T data) { @@ -239,7 +235,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { - if (processor.changeState(this, COMPLETED)) { - processor.writingComplete(); - processor.resultPublisher.publishComplete(); - } - else { - processor.state.get().onComplete(processor); - } + processor.changeStateToComplete(this); } }, - /** - * State that gets entered after a data has been - * {@linkplain Subscriber#onNext(Object) received}. Responds to - * {@code onWritePossible} by writing the current data and changes - * the state to {@link #WRITING}. If it can be written completely, - * changes the state to either {@link #REQUESTED} if the subscription - * has not been completed; or {@link #COMPLETED} if it has. If it cannot - * be written completely the state will be changed to {@code #RECEIVED}. - */ RECEIVED { @Override public void onWritePossible(AbstractListenerWriteProcessor processor) { @@ -276,35 +257,24 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { processor.subscriberCompleted = true; } }, - /** - * State that gets entered after a writing of the current data has been - * {@code onWritePossible started}. - */ WRITING { @Override public void onComplete(AbstractListenerWriteProcessor processor) { @@ -329,9 +296,6 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onNext(AbstractListenerWriteProcessor processor, T data) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 26384224d7b..adaeca326be 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -208,8 +208,8 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl } @Override - protected void receiveData(DataBuffer dataBuffer) { - super.receiveData(dataBuffer); + protected void dataReceived(DataBuffer dataBuffer) { + super.dataReceived(dataBuffer); this.byteBuffer = dataBuffer.asByteBuffer(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index acd92f5ac1c..1d3dab4a747 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -33,6 +33,7 @@ import org.springframework.util.Assert; * * @author Arjen Poutsma * @author Violeta Georgieva + * @author Rossen Stoyanchev * @since 5.0 */ class WriteResultPublisher implements Publisher { @@ -44,10 +45,10 @@ class WriteResultPublisher implements Publisher { @Nullable private Subscriber subscriber; - private volatile boolean publisherCompleted; + private volatile boolean completedBeforeSubscribed; @Nullable - private volatile Throwable publisherError; + private volatile Throwable errorBeforeSubscribed; @Override @@ -58,12 +59,8 @@ class WriteResultPublisher implements Publisher { this.state.get().subscribe(this, subscriber); } - private boolean changeState(State oldState, State newState) { - return this.state.compareAndSet(oldState, newState); - } - /** - * Publishes the complete signal to the subscriber of this publisher. + * Delegate a completion signal to the subscriber. */ public void publishComplete() { if (logger.isTraceEnabled()) { @@ -73,7 +70,7 @@ class WriteResultPublisher implements Publisher { } /** - * Publishes the given error signal to the subscriber of this publisher. + * Delegate the given error signal to the subscriber. */ public void publishError(Throwable t) { if (logger.isTraceEnabled()) { @@ -82,12 +79,20 @@ class WriteResultPublisher implements Publisher { this.state.get().publishError(this, t); } + private boolean changeState(State oldState, State newState) { + return this.state.compareAndSet(oldState, newState); + } + - private static final class ResponseBodyWriteResultSubscription implements Subscription { + /** + * Subscription to receive and delegate request and cancel signals from the + * suscbriber to this publisher. + */ + private static final class WriteResultSubscription implements Subscription { private final WriteResultPublisher publisher; - public ResponseBodyWriteResultSubscription(WriteResultPublisher publisher) { + public WriteResultSubscription(WriteResultPublisher publisher) { this.publisher = publisher; } @@ -113,6 +118,21 @@ class WriteResultPublisher implements Publisher { } + /** + * Represents a state for the {@link Publisher} to be in. + *

+	 *     UNSUBSCRIBED
+	 *          |
+	 *          v
+	 *     SUBSCRIBING
+	 *          |
+	 *          v
+	 *      SUBSCRIBED
+	 *          |
+	 *          v
+	 *      COMPLETED
+	 * 
+ */ private enum State { UNSUBSCRIBED { @@ -120,14 +140,15 @@ class WriteResultPublisher implements Publisher { void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { Assert.notNull(subscriber, "Subscriber must not be null"); if (publisher.changeState(this, SUBSCRIBING)) { - Subscription subscription = new ResponseBodyWriteResultSubscription(publisher); + Subscription subscription = new WriteResultSubscription(publisher); publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); publisher.changeState(SUBSCRIBING, SUBSCRIBED); - if (publisher.publisherCompleted) { + // Now safe to check "beforeSubscribed" flags, they won't change once in NO_DEMAND + if (publisher.completedBeforeSubscribed) { publisher.publishComplete(); } - Throwable publisherError = publisher.publisherError; + Throwable publisherError = publisher.errorBeforeSubscribed; if (publisherError != null) { publisher.publishError(publisherError); } @@ -138,11 +159,11 @@ class WriteResultPublisher implements Publisher { } @Override void publishComplete(WriteResultPublisher publisher) { - publisher.publisherCompleted = true; + publisher.completedBeforeSubscribed = true; } @Override - void publishError(WriteResultPublisher publisher, Throwable t) { - publisher.publisherError = t; + void publishError(WriteResultPublisher publisher, Throwable ex) { + publisher.errorBeforeSubscribed = ex; } }, @@ -153,11 +174,11 @@ class WriteResultPublisher implements Publisher { } @Override void publishComplete(WriteResultPublisher publisher) { - publisher.publisherCompleted = true; + publisher.completedBeforeSubscribed = true; } @Override - void publishError(WriteResultPublisher publisher, Throwable t) { - publisher.publisherError = t; + void publishError(WriteResultPublisher publisher, Throwable ex) { + publisher.errorBeforeSubscribed = ex; } }, @@ -166,26 +187,6 @@ class WriteResultPublisher implements Publisher { void request(WriteResultPublisher publisher, long n) { Operators.validate(n); } - @Override - void publishComplete(WriteResultPublisher publisher) { - if (publisher.changeState(this, COMPLETED)) { - Assert.state(publisher.subscriber != null, "No subscriber"); - publisher.subscriber.onComplete(); - } - else { - publisher.state.get().publishComplete(publisher); - } - } - @Override - void publishError(WriteResultPublisher publisher, Throwable t) { - if (publisher.changeState(this, COMPLETED)) { - Assert.state(publisher.subscriber != null, "No subscriber"); - publisher.subscriber.onError(t); - } - else { - publisher.state.get().publishError(publisher, t); - } - } }, COMPLETED { @@ -197,6 +198,14 @@ class WriteResultPublisher implements Publisher { void cancel(WriteResultPublisher publisher) { // ignore } + @Override + void publishComplete(WriteResultPublisher publisher) { + // ignore + } + @Override + void publishError(WriteResultPublisher publisher, Throwable t) { + // ignore + } }; void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { @@ -214,11 +223,23 @@ class WriteResultPublisher implements Publisher { } void publishComplete(WriteResultPublisher publisher) { - // ignore + if (publisher.changeState(this, COMPLETED)) { + Assert.state(publisher.subscriber != null, "No subscriber"); + publisher.subscriber.onComplete(); + } + else { + publisher.state.get().publishComplete(publisher); + } } void publishError(WriteResultPublisher publisher, Throwable t) { - // ignore + if (publisher.changeState(this, COMPLETED)) { + Assert.state(publisher.subscriber != null, "No subscriber"); + publisher.subscriber.onError(t); + } + else { + publisher.state.get().publishError(publisher, t); + } } }