Browse Source

Merge branch '6.2.x'

pull/34993/merge
Brian Clozel 1 month ago
parent
commit
e85afff8a3
  1. 33
      spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java
  2. 17
      spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java

33
spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequest.java

@ -20,9 +20,7 @@ import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.net.URI; import java.net.URI;
import java.time.Duration; import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
@ -30,6 +28,7 @@ import org.jspecify.annotations.Nullable;
import org.reactivestreams.FlowAdapters; import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientRequest;
@ -44,6 +43,7 @@ import org.springframework.util.StreamUtils;
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Juergen Hoeller * @author Juergen Hoeller
* @author Brian Clozel
* @since 6.1 * @since 6.1
*/ */
final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest { final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest {
@ -54,6 +54,8 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
private final URI uri; private final URI uri;
private final Executor executor;
private final @Nullable Duration exchangeTimeout; private final @Nullable Duration exchangeTimeout;
@ -65,19 +67,31 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
* @since 6.2 * @since 6.2
*/ */
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) { public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri) {
this.httpClient = httpClient; this(httpClient, method, uri, null);
this.method = method; }
this.uri = uri;
this.exchangeTimeout = null; /**
* Create an instance.
* <p>If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}
* for performing blocking I/O operations.
* @param httpClient the client to perform the request with
* @param executor the executor to use
* @param method the HTTP method
* @param uri the URI for the request
* @since 6.2.13
*/
public ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor) {
this(httpClient, method, uri, executor, null);
} }
/** /**
* Package private constructor for use until exchangeTimeout is removed. * Package private constructor for use until exchangeTimeout is removed.
*/ */
ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Duration exchangeTimeout) { ReactorClientHttpRequest(HttpClient httpClient, HttpMethod method, URI uri, @Nullable Executor executor, @Nullable Duration exchangeTimeout) {
this.httpClient = httpClient; this.httpClient = httpClient;
this.method = method; this.method = method;
this.uri = uri; this.uri = uri;
this.executor = (executor != null) ? executor : Schedulers.boundedElastic()::schedule;
this.exchangeTimeout = exchangeTimeout; this.exchangeTimeout = exchangeTimeout;
} }
@ -132,13 +146,10 @@ final class ReactorClientHttpRequest extends AbstractStreamingClientHttpRequest
return Mono.empty(); return Mono.empty();
} }
AtomicReference<@Nullable Executor> executorRef = new AtomicReference<>();
return outbound return outbound
.withConnection(connection -> executorRef.set(connection.channel().eventLoop()))
.send(FlowAdapters.toPublisher(new OutputStreamPublisher<>( .send(FlowAdapters.toPublisher(new OutputStreamPublisher<>(
os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound), os -> body.writeTo(StreamUtils.nonClosing(os)), new ByteBufMapper(outbound),
Objects.requireNonNull(executorRef.getAndSet(null)), null))); this.executor, null)));
} }
static IOException convertException(RuntimeException ex) { static IOException convertException(RuntimeException ex) {

17
spring-web/src/main/java/org/springframework/http/client/ReactorClientHttpRequestFactory.java

@ -19,12 +19,14 @@ package org.springframework.http.client;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable; import org.jspecify.annotations.Nullable;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources; import reactor.netty.resources.LoopResources;
@ -42,6 +44,7 @@ import org.springframework.util.Assert;
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Juergen Hoeller * @author Juergen Hoeller
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @author Brian Clozel
* @since 6.2 * @since 6.2
*/ */
public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory, SmartLifecycle { public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory, SmartLifecycle {
@ -58,6 +61,8 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
private final @Nullable Function<HttpClient, HttpClient> mapper; private final @Nullable Function<HttpClient, HttpClient> mapper;
private @Nullable Executor executor;
private @Nullable Integer connectTimeout; private @Nullable Integer connectTimeout;
private @Nullable Duration readTimeout; private @Nullable Duration readTimeout;
@ -126,6 +131,16 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
return client; return client;
} }
/**
* Set the {@code Executor} to use for performing blocking I/O operations.
* <p>If no executor is provided, the request will use an {@link Schedulers#boundedElastic() elastic scheduler}.
* @param executor the executor to use.
* @since 6.2.13
*/
public void setExecutor(Executor executor) {
Assert.notNull(executor, "Executor must not be null");
this.executor = executor;
}
/** /**
* Set the connect timeout value on the underlying client. * Set the connect timeout value on the underlying client.
@ -186,7 +201,7 @@ public class ReactorClientHttpRequestFactory implements ClientHttpRequestFactory
"Expected HttpClient or ResourceFactory and mapper"); "Expected HttpClient or ResourceFactory and mapper");
client = createHttpClient(this.resourceFactory, this.mapper); client = createHttpClient(this.resourceFactory, this.mapper);
} }
return new ReactorClientHttpRequest(client, httpMethod, uri, this.exchangeTimeout); return new ReactorClientHttpRequest(client, httpMethod, uri, this.executor, this.exchangeTimeout);
} }

Loading…
Cancel
Save