Browse Source

WriteResultPublisher propagates cancel upstream

Closes gh-26642
pull/26645/head
Rossen Stoyanchev 5 years ago
parent
commit
acb638f828
  1. 35
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  2. 38
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  3. 38
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  4. 10
      spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

35
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -60,6 +60,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -60,6 +60,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
private volatile boolean sourceCompleted;
@Nullable
private volatile AbstractListenerWriteProcessor<?> currentWriteProcessor;
private final WriteResultPublisher resultPublisher;
private final String logPrefix;
@ -75,7 +78,21 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -75,7 +78,21 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
*/
public AbstractListenerWriteFlushProcessor(String logPrefix) {
this.logPrefix = logPrefix;
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] ");
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] ",
() -> {
cancel();
// Complete immediately
State oldState = this.state.getAndSet(State.COMPLETED);
if (rsWriteFlushLogger.isTraceEnabled()) {
rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + this.state);
}
// Propagate to current "write" Processor
AbstractListenerWriteProcessor<?> writeProcessor = this.currentWriteProcessor;
if (writeProcessor != null) {
writeProcessor.cancelAndSetCompleted();
}
this.currentWriteProcessor = null;
});
}
@ -139,8 +156,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -139,8 +156,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
/**
* Invoked during an error or completion callback from the underlying
* container to cancel the upstream subscription.
* Cancel the upstream chain of "write" Publishers only, for example due to
* Servlet container error/completion notifications. This should usually
* be followed up with a call to either {@link #onError(Throwable)} or
* {@link #onComplete()} to notify the downstream chain, that is unless
* cancellation came from downstream.
*/
protected void cancel() {
if (rsWriteFlushLogger.isTraceEnabled()) {
@ -268,9 +288,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -268,9 +288,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
Publisher<? extends T> currentPublisher) {
if (processor.changeState(this, RECEIVED)) {
Processor<? super T, Void> currentProcessor = processor.createWriteProcessor();
currentPublisher.subscribe(currentProcessor);
currentProcessor.subscribe(new WriteResultSubscriber(processor));
Processor<? super T, Void> writeProcessor = processor.createWriteProcessor();
processor.currentWriteProcessor = (AbstractListenerWriteProcessor<?>) writeProcessor;
currentPublisher.subscribe(writeProcessor);
writeProcessor.subscribe(new WriteResultSubscriber(processor));
}
}
@Override
@ -429,6 +450,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -429,6 +450,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
rsWriteFlushLogger.trace(
this.processor.getLogPrefix() + "current \"write\" Publisher failed: " + ex);
}
this.processor.currentWriteProcessor = null;
this.processor.cancel();
this.processor.onError(ex);
}
@ -439,6 +461,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -439,6 +461,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
rsWriteFlushLogger.trace(
this.processor.getLogPrefix() + "current \"write\" Publisher completed");
}
this.processor.currentWriteProcessor = null;
this.processor.state.get().writeComplete(this.processor);
}

38
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -88,8 +88,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -88,8 +88,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* @since 5.1
*/
public AbstractListenerWriteProcessor(String logPrefix) {
// AbstractListenerFlushProcessor calls cancelAndSetCompleted directly, so this cancel task
// won't be used for HTTP responses, but it can be for a WebSocket session.
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WP] ", this::cancelAndSetCompleted);
this.logPrefix = (StringUtils.hasText(logPrefix) ? logPrefix : "");
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WP] ");
}
@ -156,8 +158,11 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -156,8 +158,11 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
/**
* Invoked during an error or completion callback from the underlying
* container to cancel the upstream subscription.
* Cancel the upstream "write" Publisher only, for example due to
* Servlet container error/completion notifications. This should usually
* be followed up with a call to either {@link #onError(Throwable)} or
* {@link #onComplete()} to notify the downstream chain, that is unless
* cancellation came from downstream.
*/
public void cancel() {
if (rsWriteLogger.isTraceEnabled()) {
@ -168,13 +173,34 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -168,13 +173,34 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
/**
* Cancel the "write" Publisher and transition to COMPLETED immediately also
* without notifying the downstream. For use when cancellation came from
* downstream.
*/
void cancelAndSetCompleted() {
cancel();
for (;;) {
State prev = this.state.get();
if (prev.equals(State.COMPLETED)) {
break;
}
if (this.state.compareAndSet(prev, State.COMPLETED)) {
if (rsWriteLogger.isTraceEnabled()) {
rsWriteLogger.trace(getLogPrefix() + prev + " -> " + this.state);
}
if (!prev.equals(State.WRITING)) {
discardCurrentData();
}
break;
}
}
}
// Publisher implementation for result notifications...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
// Technically, cancellation from the result subscriber should be propagated
// to the upstream subscription. In practice, HttpHandler server adapters
// don't have a reason to cancel the result subscription.
this.resultPublisher.subscribe(subscriber);
}

38
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -244,33 +244,35 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { @@ -244,33 +244,35 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
}
public void handleError(Throwable ex) {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.cancel();
processor.onError(ex);
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.cancel();
flushProcessor.onError(ex);
if (flushProcessor != null) {
// Cancel the upstream source of "write" Publishers
flushProcessor.cancel();
// Cancel the current "write" Publisher and propagate onComplete downstream
if (processor != null) {
processor.cancel();
processor.onError(ex);
}
// This is a no-op if processor was connected and onError propagated all the way
flushProcessor.onError(ex);
}
}
@Override
public void onComplete(AsyncEvent event) {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.cancel();
processor.onComplete();
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.cancel();
flushProcessor.onComplete();
if (flushProcessor != null) {
// Cancel the upstream source of "write" Publishers
flushProcessor.cancel();
// Cancel the current "write" Publisher and propagate onComplete downstream
if (processor != null) {
processor.cancel();
processor.onComplete();
}
// This is a no-op if processor was connected and onComplete propagated all the way
flushProcessor.onComplete();
}
}
}

10
spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

@ -50,6 +50,8 @@ class WriteResultPublisher implements Publisher<Void> { @@ -50,6 +50,8 @@ class WriteResultPublisher implements Publisher<Void> {
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private final Runnable cancelTask;
@Nullable
private volatile Subscriber<? super Void> subscriber;
@ -61,7 +63,8 @@ class WriteResultPublisher implements Publisher<Void> { @@ -61,7 +63,8 @@ class WriteResultPublisher implements Publisher<Void> {
private final String logPrefix;
public WriteResultPublisher(String logPrefix) {
public WriteResultPublisher(String logPrefix, Runnable cancelTask) {
this.cancelTask = cancelTask;
this.logPrefix = logPrefix;
}
@ -248,7 +251,10 @@ class WriteResultPublisher implements Publisher<Void> { @@ -248,7 +251,10 @@ class WriteResultPublisher implements Publisher<Void> {
}
void cancel(WriteResultPublisher publisher) {
if (!publisher.changeState(this, COMPLETED)) {
if (publisher.changeState(this, COMPLETED)) {
publisher.cancelTask.run();
}
else {
publisher.state.get().cancel(publisher);
}
}

Loading…
Cancel
Save