Browse Source

Dispatch in UndertowHttpHandlerAdapter

This ensures that the reactive handling of the request is dispatched
from the Undertow IO thread, marking the exchange as async rather than
ending it once the Undertow `handleRequest` method returns.

See gh-33885
Closes gh-33969
pull/34008/head
Simon Baslé 1 year ago
parent
commit
dd32f95192
  1. 34
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java
  2. 17
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java

34
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -66,25 +66,27 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override @Override
public void handleRequest(HttpServerExchange exchange) { public void handleRequest(HttpServerExchange exchange) {
UndertowServerHttpRequest request = null; exchange.dispatch(() -> {
try { UndertowServerHttpRequest request = null;
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory()); try {
} request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
} }
exchange.setStatusCode(400); catch (URISyntaxException ex) {
return; if (logger.isWarnEnabled()) {
} logger.debug("Failed to get request URI: " + ex.getMessage());
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request); }
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
if (request.getMethod() == HttpMethod.HEAD) { if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response); response = new HttpHeadResponseDecorator(response);
} }
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber); this.httpHandler.handle(request, response).subscribe(resultSubscriber);
});
} }

17
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java

@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation; package org.springframework.web.reactive.result.method.annotation;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -331,6 +332,17 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res); assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
} }
@ParameterizedHttpServerTest // see gh-33885
void personTransformWithFluxDelayed(HttpServer httpServer) throws Exception {
startServer(httpServer);
List<?> req = asList(new Person("Robert"), new Person("Marie"));
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
assertThat(performPost("/person-transform/flux-delayed", JSON, req, JSON, PERSON_LIST))
.satisfies(r -> assertThat(r.getBody()).isEqualTo(res))
.satisfies(r -> assertThat(r.getHeaders().getContentLength()).isNotZero());
}
@ParameterizedHttpServerTest @ParameterizedHttpServerTest
void personTransformWithObservable(HttpServer httpServer) throws Exception { void personTransformWithObservable(HttpServer httpServer) throws Exception {
startServer(httpServer); startServer(httpServer);
@ -632,6 +644,11 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
return persons.map(person -> new Person(person.getName().toUpperCase())); return persons.map(person -> new Person(person.getName().toUpperCase()));
} }
@PostMapping("/flux-delayed")
Flux<Person> transformDelayed(@RequestBody Flux<Person> persons) {
return transformFlux(persons).delayElements(Duration.ofMillis(10));
}
@PostMapping("/observable") @PostMapping("/observable")
Observable<Person> transformObservable(@RequestBody Observable<Person> persons) { Observable<Person> transformObservable(@RequestBody Observable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase())); return persons.map(person -> new Person(person.getName().toUpperCase()));

Loading…
Cancel
Save