Browse Source

Improve concurrent handling of result in WebAsyncManager

1. Use state transitions
2. Increase synchronized scope in setConcurrentResultAndDispatch

See gh-32341
pull/33048/head
rstoyanchev 2 years ago
parent
commit
1a5661d426
  1. 138
      spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java

138
spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java

@ -22,6 +22,7 @@ import java.util.List; @@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log;
@ -34,7 +35,6 @@ import org.springframework.lang.Nullable; @@ -34,7 +35,6 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
import org.springframework.web.util.DisconnectedClientHelper;
/**
* The central class for managing asynchronous request processing, mainly intended
@ -68,16 +68,6 @@ public final class WebAsyncManager { @@ -68,16 +68,6 @@ public final class WebAsyncManager {
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
/**
* Log category to use for network failure after a client has gone away.
* @see DisconnectedClientHelper
*/
private static final String DISCONNECTED_CLIENT_LOG_CATEGORY =
"org.springframework.web.server.DisconnectedClient";
private static final DisconnectedClientHelper disconnectedClientHelper =
new DisconnectedClientHelper(DISCONNECTED_CLIENT_LOG_CATEGORY);
private static final CallableProcessingInterceptor timeoutCallableInterceptor =
new TimeoutCallableProcessingInterceptor();
@ -98,12 +88,7 @@ public final class WebAsyncManager { @@ -98,12 +88,7 @@ public final class WebAsyncManager {
@Nullable
private volatile Object[] concurrentResultContext;
/*
* Whether the concurrentResult is an error. If such errors remain unhandled, some
* Servlet containers will call AsyncListener#onError at the end, after the ASYNC
* and/or the ERROR dispatch (Boot's case), and we need to ignore those.
*/
private volatile boolean errorHandlingInProgress;
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>();
@ -265,6 +250,12 @@ public final class WebAsyncManager { @@ -265,6 +250,12 @@ public final class WebAsyncManager {
* {@linkplain #getConcurrentResultContext() concurrentResultContext}.
*/
public void clearConcurrentResult() {
if (!this.state.compareAndSet(State.RESULT_SET, State.NOT_STARTED)) {
if (logger.isDebugEnabled()) {
logger.debug("Unexpected call to clear: [" + this.state.get() + "]");
}
return;
}
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = null;
@ -305,6 +296,11 @@ public final class WebAsyncManager { @@ -305,6 +296,11 @@ public final class WebAsyncManager {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
throw new IllegalStateException(
"Unexpected call to startCallableProcessing: [" + this.state.get() + "]");
}
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
@ -328,7 +324,7 @@ public final class WebAsyncManager { @@ -328,7 +324,7 @@ public final class WebAsyncManager {
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Async request timeout for " + formatUri(this.asyncWebRequest));
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
}
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
@ -337,14 +333,12 @@ public final class WebAsyncManager { @@ -337,14 +333,12 @@ public final class WebAsyncManager {
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
if (logger.isDebugEnabled()) {
logger.debug("Async request error for " + formatUri(this.asyncWebRequest) + ": " + ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
setConcurrentResultAndDispatch(result);
if (logger.isDebugEnabled()) {
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
setConcurrentResultAndDispatch(result);
});
this.asyncWebRequest.addCompletionHandler(() ->
@ -396,31 +390,34 @@ public final class WebAsyncManager { @@ -396,31 +390,34 @@ public final class WebAsyncManager {
}
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: " +
"[" + this.state.get() + "], ignored result: " + result +
" for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (this.asyncWebRequest.isAsyncComplete()) {
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatUri(this.asyncWebRequest));
logger.debug("Async result set to: " + result + " for " + formatUri(this.asyncWebRequest));
}
return;
}
if (result instanceof Exception ex && disconnectedClientHelper.checkAndLogClientDisconnectedException(ex)) {
return;
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Async " + (this.errorHandlingInProgress ? "error" : "result set") +
", dispatch to " + formatUri(this.asyncWebRequest));
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
this.asyncWebRequest.dispatch();
}
/**
@ -443,6 +440,11 @@ public final class WebAsyncManager { @@ -443,6 +440,11 @@ public final class WebAsyncManager {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
throw new IllegalStateException(
"Unexpected call to startDeferredResultProcessing: [" + this.state.get() + "]");
}
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
@ -456,6 +458,9 @@ public final class WebAsyncManager { @@ -456,6 +458,9 @@ public final class WebAsyncManager {
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
}
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
@ -465,16 +470,17 @@ public final class WebAsyncManager { @@ -465,16 +470,17 @@ public final class WebAsyncManager {
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
if (logger.isDebugEnabled()) {
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
}
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
}
});
@ -500,10 +506,13 @@ public final class WebAsyncManager { @@ -500,10 +506,13 @@ public final class WebAsyncManager {
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
this.errorHandlingInProgress = false;
}
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
if (logger.isDebugEnabled()) {
logger.debug("Started async request for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
@ -515,4 +524,31 @@ public final class WebAsyncManager { @@ -515,4 +524,31 @@ public final class WebAsyncManager {
return (request != null ? request.getRequestURI() : "servlet container");
}
/**
* Represents a state for {@link WebAsyncManager} to be in.
* <p><pre>
* NOT_STARTED <------+
* | |
* v |
* ASYNC_PROCESSING |
* | |
* v |
* RESULT_SET -------+
* </pre>
* @since 5.3.33
*/
private enum State {
/** No async processing in progress. */
NOT_STARTED,
/** Async handling has started, but the result hasn't been set yet. */
ASYNC_PROCESSING,
/** The result is set, and an async dispatch was performed, unless there is a network error. */
RESULT_SET
}
}

Loading…
Cancel
Save