From 363d5a6413e17eced01a0963f78afe584bb5f1f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Deleuze?= Date: Tue, 10 Oct 2023 11:08:27 +0200 Subject: [PATCH] Add CRaC support to ReactorNettyClientRequestFactory This commit adds a constructor with externally managed Reactor Netty resources to ReactorNettyClientRequestFactory and makes it lifecycle-aware in order to support Project CRaC. Closes gh-31280 Closes gh-31281 --- .../ReactorNettyClientRequestFactory.java | 124 +++++++++++++++++- ...torNettyClientHttpRequestFactoryTests.java | 47 +++++++ 2 files changed, 168 insertions(+), 3 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java index df45c1071e1..90daa7966e5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/ReactorNettyClientRequestFactory.java @@ -19,34 +19,63 @@ package org.springframework.http.client; import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.function.Function; import io.netty.channel.ChannelOption; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import org.springframework.context.SmartLifecycle; import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ReactorResourceFactory; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** * Reactor-Netty implementation of {@link ClientHttpRequestFactory}. * + *

This class implements {@link SmartLifecycle} and can be optionally declared + * as a Spring-managed bean in order to support JVM Checkpoint Restore. + * * @author Arjen Poutsma + * @author Sebastien Deleuze * @since 6.1 */ -public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory { +public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory, SmartLifecycle { + + private static final Log logger = LogFactory.getLog(ReactorNettyClientRequestFactory.class); + + private final static Function defaultInitializer = client -> client.compress(true); + + + private HttpClient httpClient; - private final HttpClient httpClient; + @Nullable + private final ReactorResourceFactory resourceFactory; + + @Nullable + private final Function mapper; private Duration exchangeTimeout = Duration.ofSeconds(5); private Duration readTimeout = Duration.ofSeconds(10); + private volatile boolean running = true; + + private final Object lifecycleMonitor = new Object(); + /** * Create a new instance of the {@code ReactorNettyClientRequestFactory} * with a default {@link HttpClient} that has compression enabled. */ public ReactorNettyClientRequestFactory() { - this(HttpClient.create().compress(true)); + this.httpClient = defaultInitializer.apply(HttpClient.create()); + this.resourceFactory = null; + this.mapper = null; } /** @@ -57,6 +86,47 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor public ReactorNettyClientRequestFactory(HttpClient httpClient) { Assert.notNull(httpClient, "HttpClient must not be null"); this.httpClient = httpClient; + this.resourceFactory = null; + this.mapper = null; + } + + /** + * Constructor with externally managed Reactor Netty resources, including + * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} + * for the connection pool. + *

This constructor should be used only when you don't want the client + * to participate in the Reactor Netty global resources. By default the + * client participates in the Reactor Netty global resources held in + * {@link reactor.netty.http.HttpResources}, which is recommended since + * fixed, shared resources are favored for event loop concurrency. However, + * consider declaring a {@link ReactorResourceFactory} bean with + * {@code globalResources=true} in order to ensure the Reactor Netty global + * resources are shut down when the Spring ApplicationContext is stopped or closed + * and restarted properly when the Spring ApplicationContext is + * (with JVM Checkpoint Restore for example). + * @param resourceFactory the resource factory to obtain the resources from + * @param mapper a mapper for further initialization of the created client + */ + public ReactorNettyClientRequestFactory(ReactorResourceFactory resourceFactory, Function mapper) { + this.httpClient = createHttpClient(resourceFactory, mapper); + this.resourceFactory = resourceFactory; + this.mapper = mapper; + } + + + private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function mapper) { + ConnectionProvider provider = resourceFactory.getConnectionProvider(); + Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); + return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory)) + .apply(HttpClient.create(provider)); + } + + private static Function applyLoopResources(ReactorResourceFactory factory) { + return httpClient -> { + LoopResources resources = factory.getLoopResources(); + Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); + return httpClient.runOn(resources); + }; } @@ -129,4 +199,52 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout); } + @Override + public void start() { + synchronized (this.lifecycleMonitor) { + if (!isRunning()) { + if (this.resourceFactory != null && this.mapper != null) { + this.httpClient = createHttpClient(this.resourceFactory, this.mapper); + } + else { + logger.warn("Restarting a ReactorNettyClientRequestFactory bean is only supported with externally managed Reactor Netty resources"); + } + this.running = true; + } + } + } + + @Override + public void stop() { + synchronized (this.lifecycleMonitor) { + if (isRunning()) { + this.running = false; + } + } + } + + @Override + public final void stop(Runnable callback) { + synchronized (this.lifecycleMonitor) { + stop(); + callback.run(); + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public boolean isAutoStartup() { + return false; + } + + @Override + public int getPhase() { + // Start after ReactorResourceFactory + return 1; + } + } diff --git a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java index bcc7fd25e32..061ff35c970 100644 --- a/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/ReactorNettyClientHttpRequestFactoryTests.java @@ -16,12 +16,19 @@ package org.springframework.http.client; +import java.util.function.Function; + import org.junit.jupiter.api.Test; +import reactor.netty.http.client.HttpClient; import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ReactorResourceFactory; + +import static org.assertj.core.api.Assertions.assertThat; /** * @author Arjen Poutsma + * @author Sebastien Deleuze */ public class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpRequestFactoryTests { @@ -37,4 +44,44 @@ public class ReactorNettyClientHttpRequestFactoryTests extends AbstractHttpReque assertHttpMethod("patch", HttpMethod.PATCH); } + @Test + void restartWithDefaultConstructor() { + ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.stop(); + assertThat(requestFactory.isRunning()).isFalse(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + } + + @Test + void restartWithExternalResourceFactory() { + ReactorResourceFactory resourceFactory = new ReactorResourceFactory(); + resourceFactory.afterPropertiesSet(); + Function mapper = Function.identity(); + ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(resourceFactory, mapper); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.stop(); + assertThat(requestFactory.isRunning()).isFalse(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + } + + @Test + void restartWithHttpClient() { + HttpClient httpClient = HttpClient.create(); + ReactorNettyClientRequestFactory requestFactory = new ReactorNettyClientRequestFactory(httpClient); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + requestFactory.stop(); + assertThat(requestFactory.isRunning()).isFalse(); + requestFactory.start(); + assertThat(requestFactory.isRunning()).isTrue(); + } + }