From aa40c0fec0c3fba516e19be89992c4ed2b86d982 Mon Sep 17 00:00:00 2001 From: Vedran Pavic Date: Wed, 28 Aug 2024 23:01:23 +0200 Subject: [PATCH] Add support for configuring Pulsar client IO and listener threads Add configuration properties that allow users to configure number of IO threads and listener threads used by the Pulsar client. See gh-42052 --- .../pulsar/PulsarProperties.java | 40 +++++++++++++++++++ .../pulsar/PulsarPropertiesMapper.java | 3 ++ .../pulsar/PulsarPropertiesMapperTests.java | 5 +++ .../pulsar/PulsarPropertiesTests.java | 11 +++++ 4 files changed, 59 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 96117324300..e7cbd0340e0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -44,6 +44,7 @@ import org.springframework.util.Assert; * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic * @since 3.2.0 */ @ConfigurationProperties("spring.pulsar") @@ -136,6 +137,11 @@ public class PulsarProperties { */ private final Authentication authentication = new Authentication(); + /** + * Thread related configuration. + */ + private final Threads threads = new Threads(); + /** * Failover settings. */ @@ -177,6 +183,10 @@ public class PulsarProperties { return this.authentication; } + public Threads getThreads() { + return this.threads; + } + public Failover getFailover() { return this.failover; } @@ -959,6 +969,36 @@ public class PulsarProperties { } + public static class Threads { + + /** + * Number of threads to be used for handling connections to brokers. + */ + private Integer io; + + /** + * Number of threads to be used for message listeners. + */ + private Integer listener; + + public Integer getIo() { + return this.io; + } + + public void setIo(Integer io) { + this.io = io; + } + + public Integer getListener() { + return this.listener; + } + + public void setListener(Integer listener) { + this.listener = listener; + } + + } + public static class Failover { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index aa9f505b4cb..9665c4cdb94 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -50,6 +50,7 @@ import org.springframework.util.StringUtils; * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ final class PulsarPropertiesMapper { @@ -68,6 +69,8 @@ final class PulsarPropertiesMapper { map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); + map.from(properties.getThreads()::getIo).to(clientBuilder::ioThreads); + map.from(properties.getThreads()::getListener).to(clientBuilder::listenerThreads); map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction); customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication); customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties, diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index f23584aab0a..dbacef33f9c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.never; * @author Chris Bono * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesMapperTests { @@ -69,6 +70,8 @@ class PulsarPropertiesMapperTests { properties.getClient().setConnectionTimeout(Duration.ofSeconds(1)); properties.getClient().setOperationTimeout(Duration.ofSeconds(2)); properties.getClient().setLookupTimeout(Duration.ofSeconds(3)); + properties.getClient().getThreads().setIo(3); + properties.getClient().getThreads().setListener(10); ClientBuilder builder = mock(ClientBuilder.class); new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, new PropertiesPulsarConnectionDetails(properties)); @@ -76,6 +79,8 @@ class PulsarPropertiesMapperTests { then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS); then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS); then(builder).should().lookupTimeout(3000, TimeUnit.MILLISECONDS); + then(builder).should().ioThreads(3); + then(builder).should().listenerThreads(10); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 0c173b3567c..6ef42ef8345 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -54,6 +54,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; * @author Soby Chacko * @author Phillip Webb * @author Swamy Mavuri + * @author Vedran Pavic */ class PulsarPropertiesTests { @@ -88,6 +89,16 @@ class PulsarPropertiesTests { assertThat(properties.getAuthentication().getParam()).containsEntry("token", "1234"); } + @Test + void bindThread() { + Map map = new HashMap<>(); + map.put("spring.pulsar.client.threads.io", "3"); + map.put("spring.pulsar.client.threads.listener", "10"); + PulsarProperties.Client properties = bindProperties(map).getClient(); + assertThat(properties.getThreads().getIo()).isEqualTo(3); + assertThat(properties.getThreads().getListener()).isEqualTo(10); + } + @Test void bindFailover() { Map map = new HashMap<>();