diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 550e3f424df..c0ae97f999e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -151,6 +151,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); Listener properties = this.properties.getListener(); map.from(properties::getConcurrency).to(factory::setConcurrency); + map.from(properties::isAutoStartup).to(factory::setAutoStartup); map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy); map.from(this.replyTemplate).to(factory::setReplyTemplate); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index d87e6b051cc..bfa7cf1631b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -1012,6 +1012,11 @@ public class KafkaProperties { */ private boolean immediateStop = false; + /** + * Whether to auto start the container. + */ + private boolean autoStartup = true; + public Type getType() { return this.type; } @@ -1140,6 +1145,14 @@ public class KafkaProperties { this.immediateStop = immediateStop; } + public boolean isAutoStartup() { + return this.autoStartup; + } + + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 7f8970aa394..fee655bcd62 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; @@ -501,6 +503,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.isStopImmediate()).isTrue(); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); + assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", true); assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1); KafkaJaasLoginModuleInitializer jaas = context.getBean(KafkaJaasLoginModuleInitializer.class); assertThat(jaas).hasFieldOrPropertyWithValue("loginModule", "foo"); @@ -672,6 +675,16 @@ class KafkaAutoConfigurationTests { }); } + @ParameterizedTest(name = "{0}") + @ValueSource(booleans = { true, false }) + void testConcurrentKafkaListenerContainerFactoryAutoStartup(boolean autoStartup) { + this.contextRunner.withPropertyValues("spring.kafka.listener.auto-startup=" + autoStartup).run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", autoStartup); + }); + } + @Test void specificSecurityProtocolOverridesCommonSecurityProtocol() { this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",