diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 598c1eacf27..10342d681d1 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -60,6 +60,9 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo private volatile boolean sourceCompleted; + @Nullable + private volatile AbstractListenerWriteProcessor currentWriteProcessor; + private final WriteResultPublisher resultPublisher; private final String logPrefix; @@ -75,7 +78,21 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ public AbstractListenerWriteFlushProcessor(String logPrefix) { this.logPrefix = logPrefix; - this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] "); + this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] ", + () -> { + cancel(); + // Complete immediately + State oldState = this.state.getAndSet(State.COMPLETED); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + this.state); + } + // Propagate to current "write" Processor + AbstractListenerWriteProcessor writeProcessor = this.currentWriteProcessor; + if (writeProcessor != null) { + writeProcessor.cancelAndSetCompleted(); + } + this.currentWriteProcessor = null; + }); } @@ -139,8 +156,11 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } /** - * Invoked during an error or completion callback from the underlying - * container to cancel the upstream subscription. + * Cancel the upstream chain of "write" Publishers only, for example due to + * Servlet container error/completion notifications. This should usually + * be followed up with a call to either {@link #onError(Throwable)} or + * {@link #onComplete()} to notify the downstream chain, that is unless + * cancellation came from downstream. */ protected void cancel() { if (rsWriteFlushLogger.isTraceEnabled()) { @@ -268,9 +288,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo Publisher currentPublisher) { if (processor.changeState(this, RECEIVED)) { - Processor currentProcessor = processor.createWriteProcessor(); - currentPublisher.subscribe(currentProcessor); - currentProcessor.subscribe(new WriteResultSubscriber(processor)); + Processor writeProcessor = processor.createWriteProcessor(); + processor.currentWriteProcessor = (AbstractListenerWriteProcessor) writeProcessor; + currentPublisher.subscribe(writeProcessor); + writeProcessor.subscribe(new WriteResultSubscriber(processor)); } } @Override @@ -429,6 +450,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo rsWriteFlushLogger.trace( this.processor.getLogPrefix() + "current \"write\" Publisher failed: " + ex); } + this.processor.currentWriteProcessor = null; this.processor.cancel(); this.processor.onError(ex); } @@ -439,6 +461,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo rsWriteFlushLogger.trace( this.processor.getLogPrefix() + "current \"write\" Publisher completed"); } + this.processor.currentWriteProcessor = null; this.processor.state.get().writeComplete(this.processor); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index c3dc9fa0c9f..6cfd8412a62 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -88,8 +88,10 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor " + this.state); + } + if (!prev.equals(State.WRITING)) { + discardCurrentData(); + } + break; + } + } + } + // Publisher implementation for result notifications... @Override public final void subscribe(Subscriber subscriber) { - // Technically, cancellation from the result subscriber should be propagated - // to the upstream subscription. In practice, HttpHandler server adapters - // don't have a reason to cancel the result subscription. this.resultPublisher.subscribe(subscriber); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index cc0e2536e70..ab7dd93c8d3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -244,33 +244,35 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { } public void handleError(Throwable ex) { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; ResponseBodyProcessor processor = bodyProcessor; - if (processor != null) { - processor.cancel(); - processor.onError(ex); - } - else { - ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; - if (flushProcessor != null) { - flushProcessor.cancel(); - flushProcessor.onError(ex); + if (flushProcessor != null) { + // Cancel the upstream source of "write" Publishers + flushProcessor.cancel(); + // Cancel the current "write" Publisher and propagate onComplete downstream + if (processor != null) { + processor.cancel(); + processor.onError(ex); } + // This is a no-op if processor was connected and onError propagated all the way + flushProcessor.onError(ex); } } @Override public void onComplete(AsyncEvent event) { + ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; ResponseBodyProcessor processor = bodyProcessor; - if (processor != null) { - processor.cancel(); - processor.onComplete(); - } - else { - ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor; - if (flushProcessor != null) { - flushProcessor.cancel(); - flushProcessor.onComplete(); + if (flushProcessor != null) { + // Cancel the upstream source of "write" Publishers + flushProcessor.cancel(); + // Cancel the current "write" Publisher and propagate onComplete downstream + if (processor != null) { + processor.cancel(); + processor.onComplete(); } + // This is a no-op if processor was connected and onComplete propagated all the way + flushProcessor.onComplete(); } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 57ccf3ae499..9bac8734bc5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -50,6 +50,8 @@ class WriteResultPublisher implements Publisher { private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); + private final Runnable cancelTask; + @Nullable private volatile Subscriber subscriber; @@ -61,7 +63,8 @@ class WriteResultPublisher implements Publisher { private final String logPrefix; - public WriteResultPublisher(String logPrefix) { + public WriteResultPublisher(String logPrefix, Runnable cancelTask) { + this.cancelTask = cancelTask; this.logPrefix = logPrefix; } @@ -248,7 +251,10 @@ class WriteResultPublisher implements Publisher { } void cancel(WriteResultPublisher publisher) { - if (!publisher.changeState(this, COMPLETED)) { + if (publisher.changeState(this, COMPLETED)) { + publisher.cancelTask.run(); + } + else { publisher.state.get().cancel(publisher); } }