From 5a44897c55f29dddc5e945c182a4584aab1e7fb1 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Thu, 2 Jan 2025 13:32:41 +0000 Subject: [PATCH 1/4] Polishing in WebAsyncManager See gh-34192 --- .../context/request/async/DeferredResult.java | 106 ++++++++++-------- .../request/async/WebAsyncManager.java | 76 ++++++------- .../request/async/DeferredResultTests.java | 8 +- .../annotation/ReactiveTypeHandler.java | 10 +- 4 files changed, 105 insertions(+), 95 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java index 3fbd694e8b7..d5e16c10666 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -288,65 +288,75 @@ public class DeferredResult { } - final DeferredResultProcessingInterceptor getInterceptor() { - return new DeferredResultProcessingInterceptor() { - @Override - public boolean handleTimeout(NativeWebRequest request, DeferredResult deferredResult) { - boolean continueProcessing = true; - try { - if (timeoutCallback != null) { - timeoutCallback.run(); - } - } - finally { - Object value = timeoutResult.get(); - if (value != RESULT_NONE) { - continueProcessing = false; - try { - setResultInternal(value); - } - catch (Throwable ex) { - logger.debug("Failed to handle timeout result", ex); - } - } + final DeferredResultProcessingInterceptor getLifecycleInterceptor() { + return new LifecycleInterceptor(); + } + + + /** + * Handles a DeferredResult value when set. + */ + @FunctionalInterface + public interface DeferredResultHandler { + + void handleResult(@Nullable Object result); + } + + + /** + * Instance interceptor to receive Servlet container notifications. + */ + private class LifecycleInterceptor implements DeferredResultProcessingInterceptor { + + @Override + public boolean handleTimeout(NativeWebRequest request, DeferredResult result) { + boolean continueProcessing = true; + try { + if (timeoutCallback != null) { + timeoutCallback.run(); } - return continueProcessing; } - @Override - public boolean handleError(NativeWebRequest request, DeferredResult deferredResult, Throwable t) { - try { - if (errorCallback != null) { - errorCallback.accept(t); - } - } - finally { + finally { + Object value = timeoutResult.get(); + if (value != RESULT_NONE) { + continueProcessing = false; try { - setResultInternal(t); + setResultInternal(value); } catch (Throwable ex) { - logger.debug("Failed to handle error result", ex); + logger.debug("Failed to handle timeout result", ex); } } - return false; } - @Override - public void afterCompletion(NativeWebRequest request, DeferredResult deferredResult) { - expired = true; - if (completionCallback != null) { - completionCallback.run(); + return continueProcessing; + } + + @Override + public boolean handleError(NativeWebRequest request, DeferredResult result, Throwable t) { + try { + if (errorCallback != null) { + errorCallback.accept(t); } } - }; - } - + finally { + try { + setResultInternal(t); + } + catch (Throwable ex) { + logger.debug("Failed to handle error result", ex); + } + } + return false; + } - /** - * Handles a DeferredResult value when set. - */ - @FunctionalInterface - public interface DeferredResultHandler { + @Override + public void afterCompletion(NativeWebRequest request, DeferredResult result) { + expired = true; + if (completionCallback != null) { + completionCallback.run(); + } + } - void handleResult(@Nullable Object result); } } diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index a7438441814..e27899fb255 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -383,36 +383,6 @@ public final class WebAsyncManager { } } - private void setConcurrentResultAndDispatch(@Nullable Object result) { - Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); - synchronized (WebAsyncManager.this) { - if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) { - if (logger.isDebugEnabled()) { - logger.debug("Async result already set: [" + this.state.get() + - "], ignored result for " + formatUri(this.asyncWebRequest)); - } - return; - } - - this.concurrentResult = result; - if (logger.isDebugEnabled()) { - logger.debug("Async result set for " + formatUri(this.asyncWebRequest)); - } - - if (this.asyncWebRequest.isAsyncComplete()) { - if (logger.isDebugEnabled()) { - logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest)); - } - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest)); - } - this.asyncWebRequest.dispatch(); - } - } - /** * Start concurrent request processing and initialize the given * {@link DeferredResult} with a {@link DeferredResultHandler} that saves @@ -445,7 +415,7 @@ public final class WebAsyncManager { } List interceptors = new ArrayList<>(); - interceptors.add(deferredResult.getInterceptor()); + interceptors.add(deferredResult.getLifecycleInterceptor()); interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); @@ -510,6 +480,36 @@ public final class WebAsyncManager { this.asyncWebRequest.startAsync(); } + private void setConcurrentResultAndDispatch(@Nullable Object result) { + Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); + synchronized (WebAsyncManager.this) { + if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) { + if (logger.isDebugEnabled()) { + logger.debug("Async result already set: [" + this.state.get() + "], " + + "ignored result for " + formatUri(this.asyncWebRequest)); + } + return; + } + + this.concurrentResult = result; + if (logger.isDebugEnabled()) { + logger.debug("Async result set for " + formatUri(this.asyncWebRequest)); + } + + if (this.asyncWebRequest.isAsyncComplete()) { + if (logger.isDebugEnabled()) { + logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest)); + } + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest)); + } + this.asyncWebRequest.dispatch(); + } + } + private static String formatUri(AsyncWebRequest asyncWebRequest) { HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class); return (request != null ? "\"" + request.getRequestURI() + "\"" : "servlet container"); @@ -519,13 +519,13 @@ public final class WebAsyncManager { /** * Represents a state for {@link WebAsyncManager} to be in. *

-	 *        NOT_STARTED <------+
-	 *             |             |
-	 *             v             |
-	 *      ASYNC_PROCESSING     |
-	 *             |             |
-	 *             v             |
-	 *         RESULT_SET -------+
+	 *     +------> NOT_STARTED <------+
+	 *     |             |             |
+	 *     |             v             |
+	 *     |      ASYNC_PROCESSING     |
+	 *     |             |             |
+	 *     |             v             |
+	 *     <-------+ RESULT_SET -------+
 	 * 
* @since 5.3.33 */ diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java index c5f632d60f1..24621bd7509 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,7 +93,7 @@ class DeferredResultTests { DeferredResult result = new DeferredResult<>(); result.onCompletion(() -> sb.append("completion event")); - result.getInterceptor().afterCompletion(null, null); + result.getLifecycleInterceptor().afterCompletion(null, null); assertThat(result.isSetOrExpired()).isTrue(); assertThat(sb.toString()).isEqualTo("completion event"); @@ -109,7 +109,7 @@ class DeferredResultTests { result.setResultHandler(handler); result.onTimeout(() -> sb.append("timeout event")); - result.getInterceptor().handleTimeout(null, null); + result.getLifecycleInterceptor().handleTimeout(null, null); assertThat(sb.toString()).isEqualTo("timeout event"); assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse(); @@ -127,7 +127,7 @@ class DeferredResultTests { Exception e = new Exception(); result.onError(t -> sb.append("error event")); - result.getInterceptor().handleError(null, null, e); + result.getLifecycleInterceptor().handleError(null, null, e); assertThat(sb.toString()).isEqualTo("error event"); assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse(); diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index 5e3e589b8b9..2a91b271c3d 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -360,8 +360,8 @@ class ReactiveTypeHandler { this.subscription.request(1); } catch (final Throwable ex) { - if (logger.isTraceEnabled()) { - logger.trace("Send for " + this.emitter + " failed: " + ex); + if (logger.isDebugEnabled()) { + logger.debug("Send for " + this.emitter + " failed: " + ex); } terminate(); this.emitter.completeWithError(ex); @@ -374,8 +374,8 @@ class ReactiveTypeHandler { Throwable ex = this.error; this.error = null; if (ex != null) { - if (logger.isTraceEnabled()) { - logger.trace("Publisher for " + this.emitter + " failed: " + ex); + if (logger.isDebugEnabled()) { + logger.debug("Publisher for " + this.emitter + " failed: " + ex); } this.emitter.completeWithError(ex); } From d94e04d97afaa745bcbd1d7ec5609d5256517255 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Mon, 6 Jan 2025 10:55:52 +0000 Subject: [PATCH 2/4] Minor refactoring in WebAsyncManager There is no need to set the DeferredResult from WebAsyncManager in an onError notification because it is already done from the Lifecycle interceptor in DeferredResult. See gh-34192 --- .../web/context/request/async/WebAsyncManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index e27899fb255..beed2f5976e 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -438,10 +438,7 @@ public final class WebAsyncManager { logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest)); } try { - if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) { - return; - } - deferredResult.setErrorResult(ex); + interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex); } catch (Throwable interceptorEx) { setConcurrentResultAndDispatch(interceptorEx); From 6ec7dcf2c1ca98613999621f64f9be0960d19407 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Fri, 3 Jan 2025 16:04:51 +0000 Subject: [PATCH 3/4] Synchronize in WebAsyncManager onError/onTimeout On connection loss, in a race between application thread and onError callback trying to set the DeferredResult and dispatch, the onError callback must not exit until dispatch completes. Currently, it may do so because the DeferredResult has checks to bypasses locking or even trying to dispatch if result is already set. Closes gh-34192 --- .../web/context/request/async/WebAsyncManager.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index beed2f5976e..2cb1ae18ea0 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -427,6 +427,11 @@ public final class WebAsyncManager { } try { interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult); + synchronized (WebAsyncManager.this) { + // If application thread set the DeferredResult first in a race, + // we must still not return until setConcurrentResultAndDispatch is done + return; + } } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); @@ -439,6 +444,11 @@ public final class WebAsyncManager { } try { interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex); + synchronized (WebAsyncManager.this) { + // If application thread set the DeferredResult first in a race, + // we must still not return until setConcurrentResultAndDispatch is done + return; + } } catch (Throwable interceptorEx) { setConcurrentResultAndDispatch(interceptorEx); From a985b739399468d6ee5c84adb9202734e604105e Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Mon, 6 Jan 2025 12:13:29 +0000 Subject: [PATCH 4/4] Improve logging in ReactiveTypeHandler See gh-34188 --- .../annotation/ReactiveTypeHandler.java | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index 2a91b271c3d..026b1ec7d0d 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -364,7 +364,14 @@ class ReactiveTypeHandler { logger.debug("Send for " + this.emitter + " failed: " + ex); } terminate(); - this.emitter.completeWithError(ex); + try { + this.emitter.completeWithError(ex); + } + catch (Exception ex2) { + if (logger.isDebugEnabled()) { + logger.debug("Failure from emitter completeWithError: " + ex2); + } + } return; } } @@ -377,13 +384,27 @@ class ReactiveTypeHandler { if (logger.isDebugEnabled()) { logger.debug("Publisher for " + this.emitter + " failed: " + ex); } - this.emitter.completeWithError(ex); + try { + this.emitter.completeWithError(ex); + } + catch (Exception ex2) { + if (logger.isDebugEnabled()) { + logger.debug("Failure from emitter completeWithError: " + ex2); + } + } } else { if (logger.isTraceEnabled()) { logger.trace("Publisher for " + this.emitter + " completed"); } - this.emitter.complete(); + try { + this.emitter.complete(); + } + catch (Exception ex2) { + if (logger.isDebugEnabled()) { + logger.debug("Failure from emitter complete: " + ex2); + } + } } return; }