From dd32f951925c8aabd03ce01bc7ee4f076fd5710f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 14 Nov 2024 16:14:46 +0100 Subject: [PATCH] Dispatch in UndertowHttpHandlerAdapter This ensures that the reactive handling of the request is dispatched from the Undertow IO thread, marking the exchange as async rather than ending it once the Undertow `handleRequest` method returns. See gh-33885 Closes gh-33969 --- .../reactive/UndertowHttpHandlerAdapter.java | 34 ++++++++++--------- ...pingMessageConversionIntegrationTests.java | 17 ++++++++++ 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index 8c58eb159d8..4517bc41324 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -66,25 +66,27 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @Override public void handleRequest(HttpServerExchange exchange) { - UndertowServerHttpRequest request = null; - try { - request = new UndertowServerHttpRequest(exchange, getDataBufferFactory()); - } - catch (URISyntaxException ex) { - if (logger.isWarnEnabled()) { - logger.debug("Failed to get request URI: " + ex.getMessage()); + exchange.dispatch(() -> { + UndertowServerHttpRequest request = null; + try { + request = new UndertowServerHttpRequest(exchange, getDataBufferFactory()); } - exchange.setStatusCode(400); - return; - } - ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request); + catch (URISyntaxException ex) { + if (logger.isWarnEnabled()) { + logger.debug("Failed to get request URI: " + ex.getMessage()); + } + exchange.setStatusCode(400); + return; + } + ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request); - if (request.getMethod() == HttpMethod.HEAD) { - response = new HttpHeadResponseDecorator(response); - } + if (request.getMethod() == HttpMethod.HEAD) { + response = new HttpHeadResponseDecorator(response); + } - HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request); - this.httpHandler.handle(request, response).subscribe(resultSubscriber); + HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request); + this.httpHandler.handle(request, response).subscribe(resultSubscriber); + }); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java index a11a8f8185c..20ff7954190 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.result.method.annotation; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -331,6 +332,17 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res); } + @ParameterizedHttpServerTest // see gh-33885 + void personTransformWithFluxDelayed(HttpServer httpServer) throws Exception { + startServer(httpServer); + + List req = asList(new Person("Robert"), new Person("Marie")); + List res = asList(new Person("ROBERT"), new Person("MARIE")); + assertThat(performPost("/person-transform/flux-delayed", JSON, req, JSON, PERSON_LIST)) + .satisfies(r -> assertThat(r.getBody()).isEqualTo(res)) + .satisfies(r -> assertThat(r.getHeaders().getContentLength()).isNotZero()); + } + @ParameterizedHttpServerTest void personTransformWithObservable(HttpServer httpServer) throws Exception { startServer(httpServer); @@ -632,6 +644,11 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap return persons.map(person -> new Person(person.getName().toUpperCase())); } + @PostMapping("/flux-delayed") + Flux transformDelayed(@RequestBody Flux persons) { + return transformFlux(persons).delayElements(Duration.ofMillis(10)); + } + @PostMapping("/observable") Observable transformObservable(@RequestBody Observable persons) { return persons.map(person -> new Person(person.getName().toUpperCase()));