From ba39385cced0ac3981f9e320b38fb09bbe4a82d3 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 10 Nov 2025 16:12:14 +0100 Subject: [PATCH] Use executor for blocking I/O in Reactor request factory Prior to this commit, the `ReactorClientHttpRequestFactory` and the `ReactorClientHttpRequest` would use the `Executor` from the current event loop for performing write operations. Depending on I/O demand, this work could be blocked and would result in blocked Netty event loop executors and the HTTP client hanging. This commit ensures that the client uses a separate Executor for such operations. If the application does not provide one on the request factory, a `Schedulers#boundedElastic` instance will be used. Fixes gh-34707 --- .../http/client/ReactorClientHttpRequest.java | 38 +++++++++++-------- .../ReactorClientHttpRequestFactory.java | 18 ++++++++- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java index f606d3dc5aa..e535ab0d342 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java @@ -21,13 +21,13 @@ import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; @@ -43,6 +43,7 @@ import org.springframework.util.StreamUtils; * * @author Arjen Poutsma * @author Juergen Hoeller + * @author Brian Clozel * @since 6.1 */ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest { @@ -53,6 +54,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest private final URI uri; + private final Executor executor; + @Nullable private final Duration exchangeTimeout; @@ -65,19 +68,31 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest * @since 6.2 */ public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) { - this.httpClient = httpClient; - this.method = method; - this.uri = uri; - this.exchangeTimeout = null; + this(httpClient, method, uri, null); + } + + /** + * Create an instance. + *

If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler} + * for performing blocking I/O operations. + * @param httpClient the client to perform the request with + * @param executor the executor to use + * @param method the HTTP method + * @param uri the URI for the request + * @since 6.2.13 + */ + public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor) { + this(httpClient, method, uri, executor, null); } /** * Package private constructor for use until exchangeTimeout is removed. */ - ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) { + ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor, @Nullable Duration exchangeTimeout) { this.httpClient = httpClient; this.method = method; this.uri = uri; + this.executor = (executor != null) ? executor : Schedulers.boundedElastic()::schedule; this.exchangeTimeout = exchangeTimeout; } @@ -92,11 +107,7 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest public ReactorClientHttpRequest( HttpClient httpClient, URI uri, HttpMethod method, @Nullable Duration exchangeTimeout, @Nullable Duration readTimeout) { - - this.httpClient = httpClient; - this.method = method; - this.uri = uri; - this.exchangeTimeout = exchangeTimeout; + this(httpClient, method, uri, null, exchangeTimeout); } @@ -150,13 +161,10 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest return Mono.empty(); } - AtomicReference executorRef = new AtomicReference<>(); - return outbound - .withConnection(connection -> executorRef.set(connection.channel().eventLoop())) .send(FlowAdapters.toPublisher(new OutputStreamPublisher<>( os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound), - executorRef.getAndSet(null), null))); + this.executor, null))); } static IOException convertException(RuntimeException ex) { diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java index 0d616f29c68..b4b023fc365 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java @@ -19,11 +19,13 @@ package org.springframework.http.client; import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.concurrent.Executor; import java.util.function.Function; import io.netty.channel.ChannelOption; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -42,6 +44,7 @@ import org.springframework.util.Assert; * @author Arjen Poutsma * @author Juergen Hoeller * @author Sebastien Deleuze + * @author Brian Clozel * @since 6.2 */ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory, SmartLifecycle { @@ -58,6 +61,9 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory @Nullable private final Function mapper; + @Nullable + private Executor executor; + @Nullable private Integer connectTimeout; @@ -129,6 +135,16 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory return client; } + /** + * Set the {@code Executor} to use for performing blocking I/O operations. + *

If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}. + * @param executor the executor to use. + * @since 6.2.13 + */ + public void setExecutor(Executor executor) { + Assert.notNull(executor, "Executor must not be null"); + this.executor = executor; + } /** * Set the connect timeout value on the underlying client. @@ -219,7 +235,7 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory "Expected HttpClient or ResourceFactory and mapper"); client = createHttpClient(this.resourceFactory, this.mapper); } - return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout); + return new ReactorClientHttpRequest(client, httpMethod, uri, this.executor, this.exchangeTimeout); }