From 4eba42f6ddb58445a41870b23fac4f2b15afb77f Mon Sep 17 00:00:00 2001 From: Vedran Pavic Date: Wed, 4 Sep 2024 12:22:37 +0200 Subject: [PATCH] Improve Pulsar listener container concurrency configuration This is a follow-up to gh-42062 that utilizes newly introduced `concurrency` property in `PulsarContainerProperties` to simplify auto-configuration support for Pulsar listener container concurrency. See: https://github.com/spring-projects/spring-pulsar/issues/820 See gh-42120 --- .../pulsar/PulsarAutoConfiguration.java | 6 +----- .../pulsar/PulsarPropertiesMapper.java | 10 +--------- .../pulsar/PulsarPropertiesMapperTests.java | 15 ++------------- 3 files changed, 4 insertions(+), 27 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 9d2f4d88d31..5b5f9dc4123 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -69,7 +69,6 @@ import org.springframework.pulsar.transaction.PulsarTransactionManager; * @author Alexander Preuß * @author Phillip Webb * @author Jonas Geiregat - * @author Vedran Pavic * @since 3.2.0 */ @AutoConfiguration @@ -188,10 +187,7 @@ public class PulsarAutoConfiguration { } pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); - ConcurrentPulsarListenerContainerFactory listenerContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( - pulsarConsumerFactory, containerProperties); - this.propertiesMapper.customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); - return listenerContainerFactory; + return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java index f645616892b..cfe34eb6f8b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapper.java @@ -39,7 +39,6 @@ import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuil import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.json.JsonWriter; -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; import org.springframework.pulsar.reader.PulsarReaderContainerProperties; @@ -197,17 +196,10 @@ final class PulsarPropertiesMapper { PulsarProperties.Listener properties = this.properties.getListener(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(properties::getSchemaType).to(containerProperties::setSchemaType); + map.from(properties::getConcurrency).to(containerProperties::setConcurrency); map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); } - @SuppressWarnings("removal") - void customizeConcurrentPulsarListenerContainerFactory( - ConcurrentPulsarListenerContainerFactory listenerContainerFactory) { - PulsarProperties.Listener properties = this.properties.getListener(); - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(properties::getConcurrency).to(listenerContainerFactory::setConcurrency); - } - void customizeReaderBuilder(ReaderBuilder readerBuilder) { PulsarProperties.Reader properties = this.properties.getReader(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java index 353d78ea128..5e8ca006fcd 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesMapperTests.java @@ -41,7 +41,6 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer; import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster; -import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -264,6 +263,7 @@ class PulsarPropertiesMapperTests { properties.getConsumer().getSubscription().setType(SubscriptionType.Shared); properties.getConsumer().getSubscription().setName("my-subscription"); properties.getListener().setSchemaType(SchemaType.AVRO); + properties.getListener().setConcurrency(10); properties.getListener().setObservationEnabled(true); properties.getTransaction().setEnabled(true); PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern"); @@ -271,22 +271,11 @@ class PulsarPropertiesMapperTests { assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared); assertThat(containerProperties.getSubscriptionName()).isEqualTo("my-subscription"); assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO); + assertThat(containerProperties.getConcurrency()).isEqualTo(10); assertThat(containerProperties.isObservationEnabled()).isTrue(); assertThat(containerProperties.transactions().isEnabled()).isTrue(); } - @Test - @SuppressWarnings("removal") - void customizeConcurrentPulsarListenerContainerFactory() { - PulsarProperties properties = new PulsarProperties(); - properties.getListener().setConcurrency(10); - ConcurrentPulsarListenerContainerFactory listenerContainerFactory = mock( - ConcurrentPulsarListenerContainerFactory.class); - new PulsarPropertiesMapper(properties) - .customizeConcurrentPulsarListenerContainerFactory(listenerContainerFactory); - then(listenerContainerFactory).should().setConcurrency(10); - } - @Test @SuppressWarnings("unchecked") void customizeReaderBuilder() {