Browse Source

Upgrade to Undertow 2.3.18.Final, dispatch in UndertowHttpHandlerAdapter

This ensures that the reactive handling of the request is dispatched
from the Undertow IO thread, marking the exchange as async rather than
ending it once the Undertow `handleRequest` method returns.

Closes gh-33885
pull/33889/head
Simon Baslé 1 year ago
parent
commit
35b452b458
  1. 2
      framework-platform/framework-platform.gradle
  2. 34
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java
  3. 17
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java

2
framework-platform/framework-platform.gradle

@ -56,7 +56,7 @@ dependencies { @@ -56,7 +56,7 @@ dependencies {
api("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
api("io.reactivex.rxjava3:rxjava:3.1.9")
api("io.smallrye.reactive:mutiny:1.10.0")
api("io.undertow:undertow-core:2.3.17.Final")
api("io.undertow:undertow-core:2.3.18.Final")
api("io.undertow:undertow-servlet:2.3.17.Final")
api("io.undertow:undertow-websockets-jsr:2.3.17.Final")
api("io.vavr:vavr:0.10.4")

34
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -66,25 +66,27 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @@ -66,25 +66,27 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void handleRequest(HttpServerExchange exchange) {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
exchange.dispatch(() -> {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
});
}

17
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -331,6 +332,17 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap @@ -331,6 +332,17 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}
@ParameterizedHttpServerTest // see gh-33885
void personTransformWithFluxDelayed(HttpServer httpServer) throws Exception {
startServer(httpServer);
List<?> req = asList(new Person("Robert"), new Person("Marie"));
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
assertThat(performPost("/person-transform/flux-delayed", JSON, req, JSON, PERSON_LIST))
.satisfies(r -> assertThat(r.getBody()).isEqualTo(res))
.satisfies(r -> assertThat(r.getHeaders().getContentLength()).isNotZero());
}
@ParameterizedHttpServerTest
void personTransformWithObservable(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -632,6 +644,11 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap @@ -632,6 +644,11 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/flux-delayed")
Flux<Person> transformDelayed(@RequestBody Flux<Person> persons) {
return transformFlux(persons).delayElements(Duration.ofMillis(10));
}
@PostMapping("/observable")
Observable<Person> transformObservable(@RequestBody Observable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));

Loading…
Cancel
Save