diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 5be7ea2bb93..d26d4e35818 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -73,18 +73,30 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } - // Methods for sub-classes to delegate to, when async I/O events occur... + // Async I/O notification methods... + /** + * Invoked when reading is possible, either in the same thread after a check + * via {@link #checkOnDataAvailable()}, or as a callback from the underlying + * container. + */ public final void onDataAvailable() { this.logger.trace("I/O event onDataAvailable"); this.state.get().onDataAvailable(this); } + /** + * Sub-classes can call this method to delegate a contain notification when + * all data has been read. + */ public void onAllDataRead() { this.logger.trace("I/O event onAllDataRead"); this.state.get().onAllDataRead(this); } + /** + * Sub-classes can call this to delegate container error notifications. + */ public final void onError(Throwable ex) { if (this.logger.isTraceEnabled()) { this.logger.trace("I/O event onError: " + ex); @@ -93,11 +105,11 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } - // Methods for sub-classes to implement... + // Read API methods to be implemented or template methods to override... /** - * Check if data is available, calling {@link #onDataAvailable()} either - * immediately or later when reading is possible. + * Check if data is available and either call {@link #onDataAvailable()} + * immediately or schedule a notification. */ protected abstract void checkOnDataAvailable(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index ae39da19863..96dceba33b3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -53,7 +53,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); - // Subscriber methods and methods to notify of async I/O events... + // Subscriber methods and async I/O notification methods... @Override public final void onSubscribe(Subscription subscription) { @@ -67,8 +67,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } /** - * Notify of an error. This can come from the upstream write Publisher or - * from sub-classes as a result of an I/O error. + * Error signal from the upstream, write Publisher. This is also used by + * sub-classes to delegate error notifications from the container. */ @Override public final void onError(Throwable ex) { @@ -79,8 +79,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } /** - * Notify of completion. This can come from the upstream write Publisher or - * from sub-classes as a result of an I/O completion event. + * Completion signal from the upstream, write Publisher. This is also used + * by sub-classes to delegate completion notifications from the container. */ @Override public final void onComplete() { @@ -88,6 +88,19 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo this.state.get().onComplete(this); } + /** + * Invoked when flusing is possible, either in the same thread after a check + * via {@link #isWritePossible()}, or as a callback from the underlying + * container. + */ + protected final void onFlushPossible() { + this.state.get().onFlushPossible(this); + } + + /** + * Invoked during an error or completion callback from the underlying + * container to cancel the upstream subscription. + */ protected void cancel() { this.logger.trace("Received request to cancel"); if (this.subscription != null) { @@ -95,7 +108,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } } - // Publisher method... + + // Publisher implementation for result notifications... @Override public final void subscribe(Subscriber subscriber) { @@ -103,26 +117,23 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } - // Methods for sub-classes to implement or override... + // Write API methods to be implemented or template methods to override... /** - * Create a new processor for subscribing to the next flush boundary. + * Create a new processor for the current flush boundary. */ protected abstract Processor createWriteProcessor(); /** - * Flush the output if ready, or otherwise {@link #isFlushPending()} should - * return true after that. + * Whether writing/flushing is possible. */ - protected abstract void flush() throws IOException; + protected abstract boolean isWritePossible(); /** - * Invoked when an error happens while flushing. Defaults to no-op. - * Servlet 3.1 based implementations will receive an - * {@link javax.servlet.AsyncListener#onError} event. + * Flush the output if ready, or otherwise {@link #isFlushPending()} should + * return true after. */ - protected void flushingFailed(Throwable t) { - } + protected abstract void flush() throws IOException; /** * Whether flushing is pending. @@ -130,17 +141,14 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo protected abstract boolean isFlushPending(); /** - * Listeners can call this to notify when flushing is possible. + * Invoked when an error happens while flushing. Sub-classes may choose + * to ignore this if they know the underlying API will provide an error + * notification in a container thread. + *

Defaults to no-op. */ - protected final void onFlushPossible() { - this.state.get().onFlushPossible(this); + protected void flushingFailed(Throwable t) { } - /** - * Whether writing is possible. - */ - protected abstract boolean isWritePossible(); - // Private methods for use in State... @@ -167,16 +175,16 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo * Represents a state for the {@link Processor} to be in. * *

-	 *        UNSUBSCRIBED
-	 *             |
-	 *             v
-	 *   +--- REQUESTED <--------> RECEIVED ---+
-	 *   |                            |        |
-	 *   |                            |        |
-	 *   |            FLUSHING <------+        |
-	 *   |                |                    |
-	 *   |                v                    |
-	 *   +----------> COMPLETED <--------------+
+	 *       UNSUBSCRIBED
+	 *            |
+	 *            v
+	 *        REQUESTED <---> RECEIVED ------+
+	 *            |              |           |
+	 *            |              v           |
+	 *            |           FLUSHING       |
+	 *            |              |           |
+	 *            |              v           |
+	 *            +--------> COMPLETED <-----+
 	 * 
*/ private enum State { @@ -269,7 +277,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo processor.state.get().onComplete(processor); } } - public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { + public void onNext(AbstractListenerWriteFlushProcessor proc, Publisher pub) { // ignore } @Override @@ -280,7 +288,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo COMPLETED { @Override - public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { + public void onNext(AbstractListenerWriteFlushProcessor proc, Publisher pub) { // ignore } @Override @@ -294,11 +302,11 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo }; - public void onSubscribe(AbstractListenerWriteFlushProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerWriteFlushProcessor proc, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractListenerWriteFlushProcessor processor, Publisher publisher) { + public void onNext(AbstractListenerWriteFlushProcessor proc, Publisher pub) { throw new IllegalStateException(toString()); } @@ -326,7 +334,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo /** * Subscriber to receive and delegate completion notifications for from - * the current Publisher, i.e. within the current flush boundary. + * the current Publisher, i.e. for the current flush boundary. */ private static class WriteResultSubscriber implements Subscriber { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 990dcea7011..5645f00f722 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor implements Processor implements Processor subscriber) { @@ -113,7 +122,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements ProcessorThe default implementation is a no-op. */ protected void suspendWriting() { } /** - * Invoked when writing is complete. Defaults to no-op. + * Invoked after onComplete or onError notification. + *

The default implementation is a no-op. */ protected void writingComplete() { } /** - * Invoked when an error happens while writing. - *

Defaults to no-op. Servlet 3.1 based implementations will receive - * {@code javax.servlet.WriteListener#onError(Throwable)} event. + * Invoked when an I/O error occurs during a write. Sub-classes may choose + * to ignore this if they know the underlying API will provide an error + * notification in a container thread. + *

Defaults to no-op. */ protected void writingFailed(Throwable ex) { } - // Private methods for use in State... + // Private methods for use from State's... private boolean changeState(State oldState, State newState) { boolean result = this.state.compareAndSet(oldState, newState); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 189d3428891..6b74466a2c2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -60,7 +60,7 @@ class WriteResultPublisher implements Publisher { } /** - * Delegate a completion signal to the subscriber. + * Invoke this to delegate a completion signal to the subscriber. */ public void publishComplete() { if (logger.isTraceEnabled()) { @@ -70,7 +70,7 @@ class WriteResultPublisher implements Publisher { } /** - * Delegate the given error signal to the subscriber. + * Invoke this to delegate an error signal to the subscriber. */ public void publishError(Throwable t) { if (logger.isTraceEnabled()) { @@ -86,7 +86,7 @@ class WriteResultPublisher implements Publisher { /** * Subscription to receive and delegate request and cancel signals from the - * suscbriber to this publisher. + * subscriber to this publisher. */ private static final class WriteResultSubscription implements Subscription { @@ -224,8 +224,9 @@ class WriteResultPublisher implements Publisher { void publishComplete(WriteResultPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { - Assert.state(publisher.subscriber != null, "No subscriber"); - publisher.subscriber.onComplete(); + Subscriber s = publisher.subscriber; + Assert.state(s != null, "No subscriber"); + s.onComplete(); } else { publisher.state.get().publishComplete(publisher); @@ -234,8 +235,9 @@ class WriteResultPublisher implements Publisher { void publishError(WriteResultPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { - Assert.state(publisher.subscriber != null, "No subscriber"); - publisher.subscriber.onError(t); + Subscriber s = publisher.subscriber; + Assert.state(s != null, "No subscriber"); + s.onError(t); } else { publisher.state.get().publishError(publisher, t);