Browse Source

Merge branch '6.2.x'

pull/34207/head
rstoyanchev 12 months ago
parent
commit
e2ea87fe4f
  1. 106
      spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java
  2. 87
      spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java
  3. 8
      spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java
  4. 37
      spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java

106
spring-web/src/main/java/org/springframework/web/context/request/async/DeferredResult.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -280,65 +280,75 @@ public class DeferredResult<T> { @@ -280,65 +280,75 @@ public class DeferredResult<T> {
}
final DeferredResultProcessingInterceptor getInterceptor() {
return new DeferredResultProcessingInterceptor() {
@Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
boolean continueProcessing = true;
try {
if (timeoutCallback != null) {
timeoutCallback.run();
}
}
finally {
Object value = timeoutResult.get();
if (value != RESULT_NONE) {
continueProcessing = false;
try {
setResultInternal(value);
}
catch (Throwable ex) {
logger.debug("Failed to handle timeout result", ex);
}
}
final DeferredResultProcessingInterceptor getLifecycleInterceptor() {
return new LifecycleInterceptor();
}
/**
* Handles a DeferredResult value when set.
*/
@FunctionalInterface
public interface DeferredResultHandler {
void handleResult(@Nullable Object result);
}
/**
* Instance interceptor to receive Servlet container notifications.
*/
private class LifecycleInterceptor implements DeferredResultProcessingInterceptor {
@Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> result) {
boolean continueProcessing = true;
try {
if (timeoutCallback != null) {
timeoutCallback.run();
}
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
}
finally {
finally {
Object value = timeoutResult.get();
if (value != RESULT_NONE) {
continueProcessing = false;
try {
setResultInternal(t);
setResultInternal(value);
}
catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
logger.debug("Failed to handle timeout result", ex);
}
}
return false;
}
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
expired = true;
if (completionCallback != null) {
completionCallback.run();
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> result, Throwable t) {
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
}
};
}
finally {
try {
setResultInternal(t);
}
catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
}
}
return false;
}
/**
* Handles a DeferredResult value when set.
*/
@FunctionalInterface
public interface DeferredResultHandler {
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> result) {
expired = true;
if (completionCallback != null) {
completionCallback.run();
}
}
void handleResult(@Nullable Object result);
}
}

87
spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java

@ -375,36 +375,6 @@ public final class WebAsyncManager { @@ -375,36 +375,6 @@ public final class WebAsyncManager {
}
}
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: [" + this.state.get() +
"], ignored result for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
}
/**
* Start concurrent request processing and initialize the given
* {@link DeferredResult} with a {@link DeferredResultHandler} that saves
@ -437,7 +407,7 @@ public final class WebAsyncManager { @@ -437,7 +407,7 @@ public final class WebAsyncManager {
}
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(deferredResult.getInterceptor());
interceptors.add(deferredResult.getLifecycleInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
@ -449,6 +419,11 @@ public final class WebAsyncManager { @@ -449,6 +419,11 @@ public final class WebAsyncManager {
}
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
synchronized (WebAsyncManager.this) {
// If application thread set the DeferredResult first in a race,
// we must still not return until setConcurrentResultAndDispatch is done
return;
}
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
@ -460,10 +435,12 @@ public final class WebAsyncManager { @@ -460,10 +435,12 @@ public final class WebAsyncManager {
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
}
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex);
synchronized (WebAsyncManager.this) {
// If application thread set the DeferredResult first in a race,
// we must still not return until setConcurrentResultAndDispatch is done
return;
}
deferredResult.setErrorResult(ex);
}
catch (Throwable interceptorEx) {
setConcurrentResultAndDispatch(interceptorEx);
@ -502,6 +479,36 @@ public final class WebAsyncManager { @@ -502,6 +479,36 @@ public final class WebAsyncManager {
this.asyncWebRequest.startAsync();
}
private void setConcurrentResultAndDispatch(@Nullable Object result) {
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
synchronized (WebAsyncManager.this) {
if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
if (logger.isDebugEnabled()) {
logger.debug("Async result already set: [" + this.state.get() + "], " +
"ignored result for " + formatUri(this.asyncWebRequest));
}
return;
}
this.concurrentResult = result;
if (logger.isDebugEnabled()) {
logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
}
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
}
this.asyncWebRequest.dispatch();
}
}
private static String formatUri(AsyncWebRequest asyncWebRequest) {
HttpServletRequest request = asyncWebRequest.getNativeRequest(HttpServletRequest.class);
return (request != null ? "\"" + request.getRequestURI() + "\"" : "servlet container");
@ -511,13 +518,13 @@ public final class WebAsyncManager { @@ -511,13 +518,13 @@ public final class WebAsyncManager {
/**
* Represents a state for {@link WebAsyncManager} to be in.
* <p><pre>
* NOT_STARTED <------+
* | |
* v |
* ASYNC_PROCESSING |
* | |
* v |
* RESULT_SET -------+
* +------> NOT_STARTED <------+
* | | |
* | v |
* | ASYNC_PROCESSING |
* | | |
* | v |
* <-------+ RESULT_SET -------+
* </pre>
* @since 5.3.33
*/

8
spring-web/src/test/java/org/springframework/web/context/request/async/DeferredResultTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -93,7 +93,7 @@ class DeferredResultTests { @@ -93,7 +93,7 @@ class DeferredResultTests {
DeferredResult<String> result = new DeferredResult<>();
result.onCompletion(() -> sb.append("completion event"));
result.getInterceptor().afterCompletion(null, null);
result.getLifecycleInterceptor().afterCompletion(null, null);
assertThat(result.isSetOrExpired()).isTrue();
assertThat(sb.toString()).isEqualTo("completion event");
@ -109,7 +109,7 @@ class DeferredResultTests { @@ -109,7 +109,7 @@ class DeferredResultTests {
result.setResultHandler(handler);
result.onTimeout(() -> sb.append("timeout event"));
result.getInterceptor().handleTimeout(null, null);
result.getLifecycleInterceptor().handleTimeout(null, null);
assertThat(sb.toString()).isEqualTo("timeout event");
assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse();
@ -127,7 +127,7 @@ class DeferredResultTests { @@ -127,7 +127,7 @@ class DeferredResultTests {
Exception e = new Exception();
result.onError(t -> sb.append("error event"));
result.getInterceptor().handleError(null, null, e);
result.getLifecycleInterceptor().handleError(null, null, e);
assertThat(sb.toString()).isEqualTo("error event");
assertThat(result.setResult("hello")).as("Should not be able to set result a second time").isFalse();

37
spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -350,11 +350,18 @@ class ReactiveTypeHandler { @@ -350,11 +350,18 @@ class ReactiveTypeHandler {
this.subscription.request(1);
}
catch (final Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("Send for " + this.emitter + " failed: " + ex);
if (logger.isDebugEnabled()) {
logger.debug("Send for " + this.emitter + " failed: " + ex);
}
terminate();
this.emitter.completeWithError(ex);
try {
this.emitter.completeWithError(ex);
}
catch (Exception ex2) {
if (logger.isDebugEnabled()) {
logger.debug("Failure from emitter completeWithError: " + ex2);
}
}
return;
}
}
@ -364,16 +371,30 @@ class ReactiveTypeHandler { @@ -364,16 +371,30 @@ class ReactiveTypeHandler {
Throwable ex = this.error;
this.error = null;
if (ex != null) {
if (logger.isTraceEnabled()) {
logger.trace("Publisher for " + this.emitter + " failed: " + ex);
if (logger.isDebugEnabled()) {
logger.debug("Publisher for " + this.emitter + " failed: " + ex);
}
try {
this.emitter.completeWithError(ex);
}
catch (Exception ex2) {
if (logger.isDebugEnabled()) {
logger.debug("Failure from emitter completeWithError: " + ex2);
}
}
this.emitter.completeWithError(ex);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Publisher for " + this.emitter + " completed");
}
this.emitter.complete();
try {
this.emitter.complete();
}
catch (Exception ex2) {
if (logger.isDebugEnabled()) {
logger.debug("Failure from emitter complete: " + ex2);
}
}
}
return;
}

Loading…
Cancel
Save