diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java index 73f1ed5a598..f78dbeeb199 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -30,6 +28,7 @@ import org.jspecify.annotations.Nullable; import org.reactivestreams.FlowAdapters; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; @@ -44,6 +43,7 @@ import org.springframework.util.StreamUtils; * * @author Arjen Poutsma * @author Juergen Hoeller + * @author Brian Clozel * @since 6.1 */ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest { @@ -54,6 +54,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest private final URI uri; + private final Executor executor; + private final @Nullable Duration exchangeTimeout; @@ -65,19 +67,31 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest * @since 6.2 */ public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) { - this.httpClient = httpClient; - this.method = method; - this.uri = uri; - this.exchangeTimeout = null; + this(httpClient, method, uri, null); + } + + /** + * Create an instance. + *

If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler} + * for performing blocking I/O operations. + * @param httpClient the client to perform the request with + * @param executor the executor to use + * @param method the HTTP method + * @param uri the URI for the request + * @since 6.2.13 + */ + public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor) { + this(httpClient, method, uri, executor, null); } /** * Package private constructor for use until exchangeTimeout is removed. */ - ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) { + ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor, @Nullable Duration exchangeTimeout) { this.httpClient = httpClient; this.method = method; this.uri = uri; + this.executor = (executor != null) ? executor : Schedulers.boundedElastic()::schedule; this.exchangeTimeout = exchangeTimeout; } @@ -132,13 +146,10 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest return Mono.empty(); } - AtomicReference<@Nullable Executor> executorRef = new AtomicReference<>(); - return outbound - .withConnection(connection -> executorRef.set(connection.channel().eventLoop())) .send(FlowAdapters.toPublisher(new OutputStreamPublisher<>( os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound), - Objects.requireNonNull(executorRef.getAndSet(null)), null))); + this.executor, null))); } static IOException convertException(RuntimeException ex) { diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java index 25709a1cad0..5f97142bc29 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java @@ -19,12 +19,14 @@ package org.springframework.http.client; import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.concurrent.Executor; import java.util.function.Function; import io.netty.channel.ChannelOption; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jspecify.annotations.Nullable; +import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -42,6 +44,7 @@ import org.springframework.util.Assert; * @author Arjen Poutsma * @author Juergen Hoeller * @author Sebastien Deleuze + * @author Brian Clozel * @since 6.2 */ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory, SmartLifecycle { @@ -58,6 +61,8 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory private final @Nullable Function mapper; + private @Nullable Executor executor; + private @Nullable Integer connectTimeout; private @Nullable Duration readTimeout; @@ -126,6 +131,16 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory return client; } + /** + * Set the {@code Executor} to use for performing blocking I/O operations. + *

If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}. + * @param executor the executor to use. + * @since 6.2.13 + */ + public void setExecutor(Executor executor) { + Assert.notNull(executor, "Executor must not be null"); + this.executor = executor; + } /** * Set the connect timeout value on the underlying client. @@ -186,7 +201,7 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory "Expected HttpClient or ResourceFactory and mapper"); client = createHttpClient(this.resourceFactory, this.mapper); } - return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout); + return new ReactorClientHttpRequest(client, httpMethod, uri, this.executor, this.exchangeTimeout); }