From a7e57e090930aefd7a45dec682087ca32c0b6488 Mon Sep 17 00:00:00 2001 From: wonwoo Date: Sun, 16 Feb 2020 20:09:45 +0900 Subject: [PATCH] Configure codec buffer size in ES Reactive Rest client This commit adds a new configuration property `"spring.data.elasticsearch.client.reactive.max-in-memory-size"` which configures the maximum amount of memory buffered by the `WebClient` used by the Reactive ElasticSearch client. See gh-20205 --- .../ReactiveRestClientAutoConfiguration.java | 16 ++++++++++++++++ .../ReactiveRestClientProperties.java | 15 +++++++++++++++ ...ReactiveRestClientAutoConfigurationTests.java | 1 + 3 files changed, 32 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfiguration.java index 8f2ff327be6..35ca1308153 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfiguration.java @@ -29,6 +29,8 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; import org.springframework.http.HttpHeaders; +import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; /** @@ -52,6 +54,7 @@ public class ReactiveRestClientAutoConfiguration { builder.usingSsl(); } configureTimeouts(builder, properties); + configureWebClient(builder, properties); return builder.build(); } @@ -67,6 +70,19 @@ public class ReactiveRestClientAutoConfiguration { }); } + private void configureWebClient(ClientConfiguration.TerminalClientConfigurationBuilder builder, + ReactiveRestClientProperties properties) { + PropertyMapper map = PropertyMapper.get(); + builder.withWebClientConfigurer((webClient) -> { + ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() + .codecs((configurer) -> map.from(properties.getMaxInMemorySize()).whenNonNull() + .asInt(DataSize::toBytes) + .to((maxInMemorySize) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize))) + .build(); + return webClient.mutate().exchangeStrategies(exchangeStrategies).build(); + }); + } + @Bean @ConditionalOnMissingBean public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientProperties.java index 56d80b10c52..be3c8a6bca4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientProperties.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.util.unit.DataSize; /** * Configuration properties for Elasticsearch Reactive REST clients. @@ -62,6 +63,12 @@ public class ReactiveRestClientProperties { */ private Duration socketTimeout; + /** + * Limit on the number of bytes that can be buffered whenever the input stream needs + * to be aggregated. + */ + private DataSize maxInMemorySize; + public List getEndpoints() { return this.endpoints; } @@ -110,4 +117,12 @@ public class ReactiveRestClientProperties { this.socketTimeout = socketTimeout; } + public DataSize getMaxInMemorySize() { + return this.maxInMemorySize; + } + + public void setMaxInMemorySize(DataSize maxInMemorySize) { + this.maxInMemorySize = maxInMemorySize; + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfigurationTests.java index 1e404f32b90..d48d2671288 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfigurationTests.java @@ -77,6 +77,7 @@ public class ReactiveRestClientAutoConfigurationTests { this.contextRunner.withPropertyValues( "spring.data.elasticsearch.client.reactive.endpoints=" + elasticsearch.getContainerIpAddress() + ":" + elasticsearch.getFirstMappedPort(), + "spring.data.elasticsearch.client.reactive.max-in-memory-size=-1", "spring.data.elasticsearch.client.reactive.connection-timeout=120s", "spring.data.elasticsearch.client.reactive.socket-timeout=120s").run((context) -> { ReactiveElasticsearchClient client = context.getBean(ReactiveElasticsearchClient.class);