diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 778c6f9bac4..a8f092cd7bd 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -685,6 +685,8 @@ public class KafkaProperties { private final Security security = new Security(); + private final Cleanup cleanup = new Cleanup(); + /** * Kafka streams application.id property; default spring.application.name. */ @@ -735,6 +737,10 @@ public class KafkaProperties { return this.security; } + public Cleanup getCleanup() { + return this.cleanup; + } + public String getApplicationId() { return this.applicationId; } @@ -1234,6 +1240,36 @@ public class KafkaProperties { } + public static class Cleanup { + + /** + * Cleanup the application’s local state directory on startup. + */ + private boolean onStartup = false; + + /** + * Cleanup the application’s local state directory on shutdown. + */ + private boolean onShutdown = true; + + public boolean isOnStartup() { + return this.onStartup; + } + + public void setOnStartup(boolean onStartup) { + this.onStartup = onStartup; + } + + public boolean isOnShutdown() { + return this.onShutdown; + } + + public void setOnShutdown(boolean onShutdown) { + this.onShutdown = onShutdown; + } + + } + public enum IsolationLevel { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 1a5a106392c..feec421f840 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; /** * Configuration for Kafka Streams annotation-driven support. @@ -91,6 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Override public void afterPropertiesSet() { this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); + CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); + this.factoryBean.setCleanupConfig(cleanupConfig); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3e3e93e0528..a5e592bf7b9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests { }); } + @Test + void streamsWithCleanupConfig() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true", + "spring.kafka.streams.cleanup.on-shutdown=false") + .run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean) + .extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class)) + .satisfies((cleanupConfig) -> { + assertThat(cleanupConfig.cleanupOnStart()).isTrue(); + assertThat(cleanupConfig.cleanupOnStop()).isFalse(); + }); + }); + } + @Test void streamsApplicationIdIsMandatory() { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index faa4d470202..87361f78599 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka; import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties; import static org.assertj.core.api.Assertions.assertThat; @@ -48,4 +50,12 @@ class KafkaPropertiesTests { assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); } + @Test + void cleanupConfigDefaultValuesAreConsistent() { + CleanupConfig cleanupConfig = new CleanupConfig(); + Cleanup cleanup = new KafkaProperties().getStreams().getCleanup(); + assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart()); + assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop()); + } + }