From 415dcd899d3c46fab37699ff934d6abe374a8698 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Mon, 20 Apr 2020 15:56:09 +0200 Subject: [PATCH] Configure Cassandra's request throttling Closes gh-19674 --- .../cassandra/CassandraAutoConfiguration.java | 19 ++- .../cassandra/CassandraProperties.java | 123 ++++++++++++++++-- ...itional-spring-configuration-metadata.json | 12 ++ .../CassandraAutoConfigurationTests.java | 47 ++++++- 4 files changed, 185 insertions(+), 16 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java index 825d9636ba5..53d6d3cef77 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfiguration.java @@ -39,6 +39,8 @@ import com.typesafe.config.ConfigFactory; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.cassandra.CassandraProperties.Throttler; +import org.springframework.boot.autoconfigure.cassandra.CassandraProperties.ThrottlerType; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -116,6 +118,7 @@ public class CassandraAutoConfiguration { mapQueryOptions(properties, options); mapSocketOptions(properties, options); mapPoolingOptions(properties, options); + mapThrottlingOptions(properties, options); map.from(mapContactPoints(properties)) .to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints)); map.from(properties.getLocalDatacenter()).to( @@ -150,8 +153,22 @@ public class CassandraAutoConfiguration { .to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout)); map.from(poolProperties::getHeartbeatInterval).whenNonNull().asInt(Duration::getSeconds) .to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval)); - map.from(poolProperties::getMaxQueueSize) + } + + private void mapThrottlingOptions(CassandraProperties properties, CassandraDriverOptions options) { + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + Throttler throttlerProperties = properties.getThrottler(); + map.from(throttlerProperties::getType).as(ThrottlerType::type) + .to((type) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_CLASS, type)); + map.from(throttlerProperties::getMaxQueueSize) .to((maxQueueSize) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize)); + map.from(throttlerProperties::getMaxConcurrentRequests).to((maxConcurrentRequests) -> options + .add(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS, maxConcurrentRequests)); + map.from(throttlerProperties::getMaxRequestsPerSecond).to((maxRequestsPerSecond) -> options + .add(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND, maxRequestsPerSecond)); + map.from(throttlerProperties::getDrainInterval).asInt(Duration::toMillis).to( + (drainInterval) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL, drainInterval)); + } private List mapContactPoints(CassandraProperties properties) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraProperties.java index cbdfd86f4a4..eb1e199c79d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/cassandra/CassandraProperties.java @@ -122,6 +122,11 @@ public class CassandraProperties { */ private final Pool pool = new Pool(); + /** + * Request throttling configuration. + */ + private final Throttler throttler = new Throttler(); + public String getKeyspaceName() { return this.keyspaceName; } @@ -264,6 +269,82 @@ public class CassandraProperties { return this.pool; } + public Throttler getThrottler() { + return this.throttler; + } + + public static class Throttler { + + /** + * Request throttling type. + */ + private ThrottlerType type = ThrottlerType.NONE; + + /** + * Maximum number of requests that can be enqueued when the throttling threshold + * is exceeded. + */ + private int maxQueueSize = 10000; + + /** + * Maximum number of requests that are allowed to execute in parallel. + */ + private int maxConcurrentRequests = 10000; + + /** + * Maximum allowed request rate. + */ + private int maxRequestsPerSecond = 10000; + + /** + * How often the throttler attempts to dequeue requests. Set this high enough that + * each attempt will process multiple entries in the queue, but not delay requests + * too much. + */ + private Duration drainInterval = Duration.ofMillis(10); + + public ThrottlerType getType() { + return this.type; + } + + public void setType(ThrottlerType type) { + this.type = type; + } + + public int getMaxQueueSize() { + return this.maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public int getMaxConcurrentRequests() { + return this.maxConcurrentRequests; + } + + public void setMaxConcurrentRequests(int maxConcurrentRequests) { + this.maxConcurrentRequests = maxConcurrentRequests; + } + + public int getMaxRequestsPerSecond() { + return this.maxRequestsPerSecond; + } + + public void setMaxRequestsPerSecond(int maxRequestsPerSecond) { + this.maxRequestsPerSecond = maxRequestsPerSecond; + } + + public Duration getDrainInterval() { + return this.drainInterval; + } + + public void setDrainInterval(Duration drainInterval) { + this.drainInterval = drainInterval; + } + + } + /** * Pool properties. */ @@ -284,11 +365,6 @@ public class CassandraProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration heartbeatInterval = Duration.ofSeconds(30); - /** - * Maximum number of requests that get queued if no connection is available. - */ - private int maxQueueSize = 256; - public Duration getIdleTimeout() { return this.idleTimeout; } @@ -305,14 +381,6 @@ public class CassandraProperties { this.heartbeatInterval = heartbeatInterval; } - public int getMaxQueueSize() { - return this.maxQueueSize; - } - - public void setMaxQueueSize(int maxQueueSize) { - this.maxQueueSize = maxQueueSize; - } - } /** @@ -337,4 +405,33 @@ public class CassandraProperties { } + public enum ThrottlerType { + + /** + * Limit the number of requests that can be executed in parallel. + */ + CONCURRENCY_LIMITING("ConcurrencyLimitingRequestThrottler"), + + /** + * Limits the request rate per second. + */ + RATE_LIMITING("RateLimitingRequestThrottler"), + + /** + * No request throttling. + */ + NONE("PassThroughRequestThrottler"); + + private final String type; + + ThrottlerType(String type) { + this.type = type; + } + + public String type() { + return this.type; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index cbef66b69a4..f8a864d0511 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -349,6 +349,14 @@ "name": "spring.data.cassandra.compression", "defaultValue": "none" }, + { + "name": "spring.data.cassandra.pool.max-queue-size", + "type": "java.lang.Integer", + "deprecation": { + "replacement": "spring.data.cassandra.throttler.max-queue-size", + "level": "error" + } + }, { "name": "spring.data.cassandra.repositories.type", "type": "org.springframework.boot.autoconfigure.data.RepositoryType", @@ -365,6 +373,10 @@ "level": "error" } }, + { + "name": "spring.data.cassandra.throttler.type", + "defaultValue": "none" + }, { "name": "spring.data.couchbase.repositories.type", "type": "org.springframework.boot.autoconfigure.data.RepositoryType", diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java index 42b8e5fb02f..61e72052887 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/cassandra/CassandraAutoConfigurationTests.java @@ -21,6 +21,9 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler; +import com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler; +import com.datastax.oss.driver.internal.core.session.throttling.RateLimitingRequestThrottler; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -129,13 +132,53 @@ class CassandraAutoConfigurationTests { @Test void driverConfigLoaderCustomizePoolOptions() { this.contextRunner.withPropertyValues("spring.data.cassandra.pool.idle-timeout=42", - "spring.data.cassandra.pool.heartbeat-interval=62", "spring.data.cassandra.pool.max-queue-size=72") - .run((context) -> { + "spring.data.cassandra.pool.heartbeat-interval=62").run((context) -> { DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig() .getDefaultProfile(); assertThat(config.getInt(DefaultDriverOption.HEARTBEAT_TIMEOUT)).isEqualTo(42); assertThat(config.getInt(DefaultDriverOption.HEARTBEAT_INTERVAL)).isEqualTo(62); + }); + } + + @Test + void driverConfigLoaderUsePassThroughLimitingRequestThrottlerByDefault() { + this.contextRunner.withPropertyValues().run((context) -> { + DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig() + .getDefaultProfile(); + assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS)) + .isEqualTo(PassThroughRequestThrottler.class.getSimpleName()); + }); + } + + @Test + void driverConfigLoaderCustomizeConcurrencyLimitingRequestThrottler() { + this.contextRunner.withPropertyValues("spring.data.cassandra.throttler.type=concurrency-limiting", + "spring.data.cassandra.throttler.max-concurrent-requests=62", + "spring.data.cassandra.throttler.max-queue-size=72").run((context) -> { + DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig() + .getDefaultProfile(); + assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS)) + .isEqualTo(ConcurrencyLimitingRequestThrottler.class.getSimpleName()); + assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS)) + .isEqualTo(62); + assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE)).isEqualTo(72); + }); + } + + @Test + void driverConfigLoaderCustomizeRateLimitingRequestThrottler() { + this.contextRunner.withPropertyValues("spring.data.cassandra.throttler.type=rate-limiting", + "spring.data.cassandra.throttler.max-requests-per-second=62", + "spring.data.cassandra.throttler.max-queue-size=72", + "spring.data.cassandra.throttler.drain-interval=16ms").run((context) -> { + DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig() + .getDefaultProfile(); + assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS)) + .isEqualTo(RateLimitingRequestThrottler.class.getSimpleName()); + assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND)) + .isEqualTo(62); assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE)).isEqualTo(72); + assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL)).isEqualTo(16); }); }