diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 778c6f9bac4..60f9dfd73b8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -37,6 +37,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; @@ -722,6 +723,11 @@ public class KafkaProperties { */ private String stateDir; + /** + * Cleanup configuration for the state stores. + */ + private Cleanup cleanup; + /** * Additional Kafka properties used to configure the streams. */ @@ -791,6 +797,14 @@ public class KafkaProperties { this.stateDir = stateDir; } + public Cleanup getCleanup() { + return cleanup; + } + + public void setCleanup(Cleanup cleanup) { + this.cleanup = cleanup; + } + public Map getProperties() { return this.properties; } @@ -1259,6 +1273,32 @@ public class KafkaProperties { } + public static class Cleanup { + + /** + * Cleanup the application's state on start. + */ + private boolean onStart = false; + + /** + * Cleanup the application's state on stop. + */ + private boolean onStop = true; + + public CleanupConfig buildCleanupConfig() { + return new CleanupConfig(this.onStart, this.onStop); + } + + public boolean isOnStart() { + return onStart; + } + + public boolean isOnStop() { + return onStop; + } + + } + @SuppressWarnings("serial") private static class Properties extends HashMap { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 1a5a106392c..ef36f273534 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -91,6 +91,11 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Override public void afterPropertiesSet() { this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + + KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); + if (cleanup != null) { + this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig()); + } } }