diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index b28b6e47a05..a432dc7a780 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -123,18 +123,22 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * all data has been read. */ public void onAllDataRead() { - rsReadLogger.trace(getLogPrefix() + "onAllDataRead"); - this.state.get().onAllDataRead(this); + State state = this.state.get(); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "onAllDataRead [" + state + "]"); + } + state.onAllDataRead(this); } /** * Sub-classes can call this to delegate container error notifications. */ public final void onError(Throwable ex) { + State state = this.state.get(); if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + "Connection error: " + ex); + rsReadLogger.trace(getLogPrefix() + "onError: " + ex + " [" + state + "]"); } - this.state.get().onError(this, ex); + state.onError(this, ex); } @@ -191,13 +195,13 @@ public abstract class AbstractListenerReadPublisher implements Publisher { Subscriber subscriber = this.subscriber; Assert.state(subscriber != null, "No subscriber"); if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + "Publishing data read"); + rsReadLogger.trace(getLogPrefix() + "Publishing " + data.getClass().getSimpleName()); } subscriber.onNext(data); } else { if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + "No more data to read"); + rsReadLogger.trace(getLogPrefix() + "No more to read"); } return true; } @@ -255,17 +259,18 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override public final void request(long n) { if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + n + " requested"); + rsReadLogger.trace(getLogPrefix() + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE")); } state.get().request(AbstractListenerReadPublisher.this, n); } @Override public final void cancel() { + State state = AbstractListenerReadPublisher.this.state.get(); if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(getLogPrefix() + "Cancellation"); + rsReadLogger.trace(getLogPrefix() + "cancel [" + state + "]"); } - state.get().cancel(AbstractListenerReadPublisher.this); + state.cancel(AbstractListenerReadPublisher.this); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index f8575ab3159..598c1eacf27 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -75,7 +75,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ public AbstractListenerWriteFlushProcessor(String logPrefix) { this.logPrefix = logPrefix; - this.resultPublisher = new WriteResultPublisher(logPrefix); + this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] "); } @@ -98,7 +98,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public final void onNext(Publisher publisher) { if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher"); + rsWriteFlushLogger.trace(getLogPrefix() + "onNext: \"write\" Publisher"); } this.state.get().onNext(this, publisher); } @@ -109,10 +109,11 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ @Override public final void onError(Throwable ex) { + State state = this.state.get(); if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex); + rsWriteFlushLogger.trace(getLogPrefix() + "onError: " + ex + " [" + state + "]"); } - this.state.get().onError(this, ex); + state.onError(this, ex); } /** @@ -121,10 +122,11 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ @Override public final void onComplete() { + State state = this.state.get(); if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete"); + rsWriteFlushLogger.trace(getLogPrefix() + "onComplete [" + state + "]"); } - this.state.get().onComplete(this); + state.onComplete(this); } /** @@ -142,7 +144,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ protected void cancel() { if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel"); + rsWriteFlushLogger.trace(getLogPrefix() + "cancel [" + this.state + "]"); } if (this.subscription != null) { this.subscription.cancel(); @@ -294,7 +296,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } if (processor.changeState(this, REQUESTED)) { if (processor.sourceCompleted) { - handleSubscriberCompleted(processor); + handleSourceCompleted(processor); } else { Assert.state(processor.subscription != null, "No subscription"); @@ -307,11 +309,11 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo processor.sourceCompleted = true; // A competing write might have completed very quickly if (processor.state.get().equals(State.REQUESTED)) { - handleSubscriberCompleted(processor); + handleSourceCompleted(processor); } } - private void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor processor) { + private void handleSourceCompleted(AbstractListenerWriteFlushProcessor processor) { if (processor.isFlushPending()) { // Ensure the final flush processor.changeState(State.REQUESTED, State.FLUSHING); @@ -423,6 +425,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public void onError(Throwable ex) { + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace( + this.processor.getLogPrefix() + "current \"write\" Publisher failed: " + ex); + } this.processor.cancel(); this.processor.onError(ex); } @@ -430,10 +436,16 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public void onComplete() { if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete"); + rsWriteFlushLogger.trace( + this.processor.getLogPrefix() + "current \"write\" Publisher completed"); } this.processor.state.get().writeComplete(this.processor); } + + @Override + public String toString() { + return this.processor.getClass().getSimpleName() + "-WriteResultSubscriber"; + } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 19c478554f2..c3dc9fa0c9f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -89,7 +89,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor implements Processor implements Processor implements Processor")); + if (logger.isDebugEnabled()) { + logger.debug(this.logPrefix + "AsyncEvent onError: " + (ex != null ? ex : "")); + } delegateError(this.requestAsyncListener, event); delegateError(this.responseAsyncListener, event); handleTimeoutOrError(event); @@ -378,7 +383,9 @@ public class ServletHttpHandlerAdapter implements Servlet { @Override public void onError(Throwable ex) { - logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage()); + if (logger.isTraceEnabled()) { + logger.trace(this.logPrefix + "onError: " + ex.getMessage()); + } runIfAsyncNotComplete(this.asyncContext, this.completionFlag, () -> { if (this.asyncContext.getResponse().isCommitted()) { logger.trace(this.logPrefix + "Dispatch to container, to raise the error on servlet thread"); @@ -400,7 +407,9 @@ public class ServletHttpHandlerAdapter implements Servlet { @Override public void onComplete() { - logger.trace(this.logPrefix + "Handling completed"); + if (logger.isTraceEnabled()) { + logger.trace(this.logPrefix + "onComplete"); + } runIfAsyncNotComplete(this.asyncContext, this.completionFlag, this.asyncContext::complete); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index c621246f567..84dc43fff1a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -324,7 +324,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { @Override protected void flush() throws IOException { if (rsWriteFlushLogger.isTraceEnabled()) { - rsWriteFlushLogger.trace(getLogPrefix() + "Flush attempt"); + rsWriteFlushLogger.trace(getLogPrefix() + "flushing"); } ServletServerHttpResponse.this.flush(); } @@ -362,7 +362,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { protected boolean write(DataBuffer dataBuffer) throws IOException { if (ServletServerHttpResponse.this.flushOnNext) { if (rsWriteLogger.isTraceEnabled()) { - rsWriteLogger.trace(getLogPrefix() + "Flush attempt"); + rsWriteLogger.trace(getLogPrefix() + "flushing"); } flush(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 39f6051b4f6..57ccf3ae499 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ class WriteResultPublisher implements Publisher { @Override public final void subscribe(Subscriber subscriber) { if (rsWriteResultLogger.isTraceEnabled()) { - rsWriteResultLogger.trace(this.logPrefix + this.state + " subscribe: " + subscriber); + rsWriteResultLogger.trace(this.logPrefix + "got subscriber " + subscriber); } this.state.get().subscribe(this, subscriber); } @@ -78,20 +78,22 @@ class WriteResultPublisher implements Publisher { * Invoke this to delegate a completion signal to the subscriber. */ public void publishComplete() { + State state = this.state.get(); if (rsWriteResultLogger.isTraceEnabled()) { - rsWriteResultLogger.trace(this.logPrefix + this.state + " publishComplete"); + rsWriteResultLogger.trace(this.logPrefix + "completed [" + state + "]"); } - this.state.get().publishComplete(this); + state.publishComplete(this); } /** * Invoke this to delegate an error signal to the subscriber. */ public void publishError(Throwable t) { + State state = this.state.get(); if (rsWriteResultLogger.isTraceEnabled()) { - rsWriteResultLogger.trace(this.logPrefix + this.state + " publishError: " + t); + rsWriteResultLogger.trace(this.logPrefix + "failed: " + t + " [" + state + "]"); } - this.state.get().publishError(this, t); + state.publishError(this, t); } private boolean changeState(State oldState, State newState) { @@ -114,20 +116,22 @@ class WriteResultPublisher implements Publisher { @Override public final void request(long n) { if (rsWriteResultLogger.isTraceEnabled()) { - rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " request: " + n); + rsWriteResultLogger.trace(this.publisher.logPrefix + + "request " + (n != Long.MAX_VALUE ? n : "Long.MAX_VALUE")); } - state().request(this.publisher, n); + getState().request(this.publisher, n); } @Override public final void cancel() { + State state = getState(); if (rsWriteResultLogger.isTraceEnabled()) { - rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " cancel"); + rsWriteResultLogger.trace(this.publisher.logPrefix + "cancel [" + state + "]"); } - state().cancel(this.publisher); + state.cancel(this.publisher); } - private State state() { + private State getState() { return this.publisher.state.get(); } } @@ -161,11 +165,11 @@ class WriteResultPublisher implements Publisher { publisher.changeState(SUBSCRIBING, SUBSCRIBED); // Now safe to check "beforeSubscribed" flags, they won't change once in NO_DEMAND if (publisher.completedBeforeSubscribed) { - publisher.publishComplete(); + publisher.state.get().publishComplete(publisher); } - Throwable publisherError = publisher.errorBeforeSubscribed; - if (publisherError != null) { - publisher.publishError(publisherError); + Throwable ex = publisher.errorBeforeSubscribed; + if (ex != null) { + publisher.state.get().publishError(publisher, ex); } } else {