diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java
index df45c1071e1..90daa7966e5 100644
--- a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java
+++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java
@@ -19,34 +19,63 @@ package org.springframework.http.client;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
+import java.util.function.Function;
import io.netty.channel.ChannelOption;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.resources.LoopResources;
+import org.springframework.context.SmartLifecycle;
import org.springframework.http.HttpMethod;
+import org.springframework.http.client.reactive.ReactorResourceFactory;
+import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Reactor-Netty implementation of {@link ClientHttpRequestFactory}.
*
+ *
This class implements {@link SmartLifecycle} and can be optionally declared
+ * as a Spring-managed bean in order to support JVM Checkpoint Restore.
+ *
* @author Arjen Poutsma
+ * @author Sebastien Deleuze
* @since 6.1
*/
-public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory {
+public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory, SmartLifecycle {
+
+ private static final Log logger = LogFactory.getLog(ReactorNettyClientRequestFactory.class);
+
+ private final static Function defaultInitializer = client -> client.compress(true);
+
+
+ private HttpClient httpClient;
- private final HttpClient httpClient;
+ @Nullable
+ private final ReactorResourceFactory resourceFactory;
+
+ @Nullable
+ private final Function mapper;
private Duration exchangeTimeout = Duration.ofSeconds(5);
private Duration readTimeout = Duration.ofSeconds(10);
+ private volatile boolean running = true;
+
+ private final Object lifecycleMonitor = new Object();
+
/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* with a default {@link HttpClient} that has compression enabled.
*/
public ReactorNettyClientRequestFactory() {
- this(HttpClient.create().compress(true));
+ this.httpClient = defaultInitializer.apply(HttpClient.create());
+ this.resourceFactory = null;
+ this.mapper = null;
}
/**
@@ -57,6 +86,47 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
public ReactorNettyClientRequestFactory(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient must not be null");
this.httpClient = httpClient;
+ this.resourceFactory = null;
+ this.mapper = null;
+ }
+
+ /**
+ * Constructor with externally managed Reactor Netty resources, including
+ * {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
+ * for the connection pool.
+ * This constructor should be used only when you don't want the client
+ * to participate in the Reactor Netty global resources. By default the
+ * client participates in the Reactor Netty global resources held in
+ * {@link reactor.netty.http.HttpResources}, which is recommended since
+ * fixed, shared resources are favored for event loop concurrency. However,
+ * consider declaring a {@link ReactorResourceFactory} bean with
+ * {@code globalResources=true} in order to ensure the Reactor Netty global
+ * resources are shut down when the Spring ApplicationContext is stopped or closed
+ * and restarted properly when the Spring ApplicationContext is
+ * (with JVM Checkpoint Restore for example).
+ * @param resourceFactory the resource factory to obtain the resources from
+ * @param mapper a mapper for further initialization of the created client
+ */
+ public ReactorNettyClientRequestFactory(ReactorResourceFactory resourceFactory, Function mapper) {
+ this.httpClient = createHttpClient(resourceFactory, mapper);
+ this.resourceFactory = resourceFactory;
+ this.mapper = mapper;
+ }
+
+
+ private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function mapper) {
+ ConnectionProvider provider = resourceFactory.getConnectionProvider();
+ Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
+ return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory))
+ .apply(HttpClient.create(provider));
+ }
+
+ private static Function applyLoopResources(ReactorResourceFactory factory) {
+ return httpClient -> {
+ LoopResources resources = factory.getLoopResources();
+ Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
+ return httpClient.runOn(resources);
+ };
}
@@ -129,4 +199,52 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
}
+ @Override
+ public void start() {
+ synchronized (this.lifecycleMonitor) {
+ if (!isRunning()) {
+ if (this.resourceFactory != null && this.mapper != null) {
+ this.httpClient = createHttpClient(this.resourceFactory, this.mapper);
+ }
+ else {
+ logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported with externally managed Reactor Netty resources");
+ }
+ this.running = true;
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ synchronized (this.lifecycleMonitor) {
+ if (isRunning()) {
+ this.running = false;
+ }
+ }
+ }
+
+ @Override
+ public final void stop(Runnable callback) {
+ synchronized (this.lifecycleMonitor) {
+ stop();
+ callback.run();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running;
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return false;
+ }
+
+ @Override
+ public int getPhase() {
+ // Start after ReactorResourceFactory
+ return 1;
+ }
+
}
diff --git a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java
index bcc7fd25e32..061ff35c970 100644
--- a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java
+++ b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java
@@ -16,12 +16,19 @@
package org.springframework.http.client;
+import java.util.function.Function;
+
import org.junit.jupiter.api.Test;
+import reactor.netty.http.client.HttpClient;
import org.springframework.http.HttpMethod;
+import org.springframework.http.client.reactive.ReactorResourceFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Arjen Poutsma
+ * @author Sebastien Deleuze
*/
public class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpRequestFactoryTests {
@@ -37,4 +44,44 @@ public class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpReque
assertHttpMethod("patch", HttpMethod.PATCH);
}
+ @Test
+ void restartWithDefaultConstructor() {
+ ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory();
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.stop();
+ assertThat(requestFactory.isRunning()).isFalse();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ }
+
+ @Test
+ void restartWithExternalResourceFactory() {
+ ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
+ resourceFactory.afterPropertiesSet();
+ Function mapper = Function.identity();
+ ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(resourceFactory, mapper);
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.stop();
+ assertThat(requestFactory.isRunning()).isFalse();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ }
+
+ @Test
+ void restartWithHttpClient() {
+ HttpClient httpClient = HttpClient.create();
+ ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient);
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ requestFactory.stop();
+ assertThat(requestFactory.isRunning()).isFalse();
+ requestFactory.start();
+ assertThat(requestFactory.isRunning()).isTrue();
+ }
+
}