|
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
@@ -22,6 +22,7 @@ import java.util.List;
|
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.concurrent.Callable; |
|
|
|
|
import java.util.concurrent.Future; |
|
|
|
|
import java.util.concurrent.atomic.AtomicReference; |
|
|
|
|
|
|
|
|
|
import jakarta.servlet.http.HttpServletRequest; |
|
|
|
|
import org.apache.commons.logging.Log; |
|
|
|
|
@ -33,7 +34,6 @@ import org.springframework.lang.Nullable;
@@ -33,7 +34,6 @@ import org.springframework.lang.Nullable;
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
import org.springframework.web.context.request.RequestAttributes; |
|
|
|
|
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; |
|
|
|
|
import org.springframework.web.util.DisconnectedClientHelper; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* The central class for managing asynchronous request processing, mainly intended |
|
|
|
|
@ -67,16 +67,6 @@ public final class WebAsyncManager {
@@ -67,16 +67,6 @@ public final class WebAsyncManager {
|
|
|
|
|
|
|
|
|
|
private static final Log logger = LogFactory.getLog(WebAsyncManager.class); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Log category to use for network failure after a client has gone away. |
|
|
|
|
* @see DisconnectedClientHelper |
|
|
|
|
*/ |
|
|
|
|
private static final String DISCONNECTED_CLIENT_LOG_CATEGORY = |
|
|
|
|
"org.springframework.web.server.DisconnectedClient"; |
|
|
|
|
|
|
|
|
|
private static final DisconnectedClientHelper disconnectedClientHelper = |
|
|
|
|
new DisconnectedClientHelper(DISCONNECTED_CLIENT_LOG_CATEGORY); |
|
|
|
|
|
|
|
|
|
private static final CallableProcessingInterceptor timeoutCallableInterceptor = |
|
|
|
|
new TimeoutCallableProcessingInterceptor(); |
|
|
|
|
|
|
|
|
|
@ -95,12 +85,7 @@ public final class WebAsyncManager {
@@ -95,12 +85,7 @@ public final class WebAsyncManager {
|
|
|
|
|
@Nullable |
|
|
|
|
private volatile Object[] concurrentResultContext; |
|
|
|
|
|
|
|
|
|
/* |
|
|
|
|
* Whether the concurrentResult is an error. If such errors remain unhandled, some |
|
|
|
|
* Servlet containers will call AsyncListener#onError at the end, after the ASYNC |
|
|
|
|
* and/or the ERROR dispatch (Boot's case), and we need to ignore those. |
|
|
|
|
*/ |
|
|
|
|
private volatile boolean errorHandlingInProgress; |
|
|
|
|
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED); |
|
|
|
|
|
|
|
|
|
private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>(); |
|
|
|
|
|
|
|
|
|
@ -262,6 +247,12 @@ public final class WebAsyncManager {
@@ -262,6 +247,12 @@ public final class WebAsyncManager {
|
|
|
|
|
* {@linkplain #getConcurrentResultContext() concurrentResultContext}. |
|
|
|
|
*/ |
|
|
|
|
public void clearConcurrentResult() { |
|
|
|
|
if (!this.state.compareAndSet(State.RESULT_SET, State.NOT_STARTED)) { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Unexpected call to clear: [" + this.state.get() + "]"); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
synchronized (WebAsyncManager.this) { |
|
|
|
|
this.concurrentResult = RESULT_NONE; |
|
|
|
|
this.concurrentResultContext = null; |
|
|
|
|
@ -302,6 +293,11 @@ public final class WebAsyncManager {
@@ -302,6 +293,11 @@ public final class WebAsyncManager {
|
|
|
|
|
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); |
|
|
|
|
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); |
|
|
|
|
|
|
|
|
|
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"Unexpected call to startCallableProcessing: [" + this.state.get() + "]"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Long timeout = webAsyncTask.getTimeout(); |
|
|
|
|
if (timeout != null) { |
|
|
|
|
this.asyncWebRequest.setTimeout(timeout); |
|
|
|
|
@ -322,7 +318,7 @@ public final class WebAsyncManager {
@@ -322,7 +318,7 @@ public final class WebAsyncManager {
|
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.addTimeoutHandler(() -> { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async request timeout for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable); |
|
|
|
|
if (result != CallableProcessingInterceptor.RESULT_NONE) { |
|
|
|
|
@ -331,14 +327,12 @@ public final class WebAsyncManager {
@@ -331,14 +327,12 @@ public final class WebAsyncManager {
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.addErrorHandler(ex -> { |
|
|
|
|
if (!this.errorHandlingInProgress) { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async request error for " + formatUri(this.asyncWebRequest) + ": " + ex); |
|
|
|
|
} |
|
|
|
|
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex); |
|
|
|
|
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex); |
|
|
|
|
setConcurrentResultAndDispatch(result); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex); |
|
|
|
|
} |
|
|
|
|
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex); |
|
|
|
|
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex); |
|
|
|
|
setConcurrentResultAndDispatch(result); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.addCompletionHandler(() -> |
|
|
|
|
@ -370,31 +364,34 @@ public final class WebAsyncManager {
@@ -370,31 +364,34 @@ public final class WebAsyncManager {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void setConcurrentResultAndDispatch(@Nullable Object result) { |
|
|
|
|
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); |
|
|
|
|
synchronized (WebAsyncManager.this) { |
|
|
|
|
if (this.concurrentResult != RESULT_NONE) { |
|
|
|
|
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async result already set: " + |
|
|
|
|
"[" + this.state.get() + "], ignored result: " + result + |
|
|
|
|
" for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
this.concurrentResult = result; |
|
|
|
|
this.errorHandlingInProgress = (result instanceof Throwable); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); |
|
|
|
|
if (this.asyncWebRequest.isAsyncComplete()) { |
|
|
|
|
this.concurrentResult = result; |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async result set but request already complete: " + formatUri(this.asyncWebRequest)); |
|
|
|
|
logger.debug("Async result set to: " + result + " for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (result instanceof Exception ex && disconnectedClientHelper.checkAndLogClientDisconnectedException(ex)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (this.asyncWebRequest.isAsyncComplete()) { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Async " + (this.errorHandlingInProgress ? "error" : "result set") + |
|
|
|
|
", dispatch to " + formatUri(this.asyncWebRequest)); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
this.asyncWebRequest.dispatch(); |
|
|
|
|
} |
|
|
|
|
this.asyncWebRequest.dispatch(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
@ -417,6 +414,11 @@ public final class WebAsyncManager {
@@ -417,6 +414,11 @@ public final class WebAsyncManager {
|
|
|
|
|
Assert.notNull(deferredResult, "DeferredResult must not be null"); |
|
|
|
|
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); |
|
|
|
|
|
|
|
|
|
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) { |
|
|
|
|
throw new IllegalStateException( |
|
|
|
|
"Unexpected call to startDeferredResultProcessing: [" + this.state.get() + "]"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Long timeout = deferredResult.getTimeoutValue(); |
|
|
|
|
if (timeout != null) { |
|
|
|
|
this.asyncWebRequest.setTimeout(timeout); |
|
|
|
|
@ -430,6 +432,9 @@ public final class WebAsyncManager {
@@ -430,6 +432,9 @@ public final class WebAsyncManager {
|
|
|
|
|
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); |
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.addTimeoutHandler(() -> { |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult); |
|
|
|
|
} |
|
|
|
|
@ -439,16 +444,17 @@ public final class WebAsyncManager {
@@ -439,16 +444,17 @@ public final class WebAsyncManager {
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.addErrorHandler(ex -> { |
|
|
|
|
if (!this.errorHandlingInProgress) { |
|
|
|
|
try { |
|
|
|
|
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
deferredResult.setErrorResult(ex); |
|
|
|
|
} |
|
|
|
|
catch (Throwable interceptorEx) { |
|
|
|
|
setConcurrentResultAndDispatch(interceptorEx); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
deferredResult.setErrorResult(ex); |
|
|
|
|
} |
|
|
|
|
catch (Throwable interceptorEx) { |
|
|
|
|
setConcurrentResultAndDispatch(interceptorEx); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
@ -474,10 +480,13 @@ public final class WebAsyncManager {
@@ -474,10 +480,13 @@ public final class WebAsyncManager {
|
|
|
|
|
synchronized (WebAsyncManager.this) { |
|
|
|
|
this.concurrentResult = RESULT_NONE; |
|
|
|
|
this.concurrentResultContext = processingContext; |
|
|
|
|
this.errorHandlingInProgress = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Started async request for " + formatUri(this.asyncWebRequest)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.asyncWebRequest.startAsync(); |
|
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
|
logger.debug("Started async request"); |
|
|
|
|
@ -489,4 +498,31 @@ public final class WebAsyncManager {
@@ -489,4 +498,31 @@ public final class WebAsyncManager {
|
|
|
|
|
return (request != null ? request.getRequestURI() : "servlet container"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Represents a state for {@link WebAsyncManager} to be in. |
|
|
|
|
* <p><pre> |
|
|
|
|
* NOT_STARTED <------+ |
|
|
|
|
* | | |
|
|
|
|
* v | |
|
|
|
|
* ASYNC_PROCESSING | |
|
|
|
|
* | | |
|
|
|
|
* v | |
|
|
|
|
* RESULT_SET -------+ |
|
|
|
|
* </pre> |
|
|
|
|
* @since 5.3.33 |
|
|
|
|
*/ |
|
|
|
|
private enum State { |
|
|
|
|
|
|
|
|
|
/** No async processing in progress. */ |
|
|
|
|
NOT_STARTED, |
|
|
|
|
|
|
|
|
|
/** Async handling has started, but the result hasn't been set yet. */ |
|
|
|
|
ASYNC_PROCESSING, |
|
|
|
|
|
|
|
|
|
/** The result is set, and an async dispatch was performed, unless there is a network error. */ |
|
|
|
|
RESULT_SET |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|