diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 1631f5980d9..e5775b6b61d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.support.converter.MessageConverter; @@ -53,6 +54,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private AfterRollbackProcessor afterRollbackProcessor; + private ConsumerAwareRebalanceListener rebalanceListener; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -111,6 +114,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.afterRollbackProcessor = afterRollbackProcessor; } + /** + * Set the {@link ConsumerAwareRebalanceListener} to use. + * @param rebalanceListener the rebalance listener. + * @since 2.2 + */ + void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) { + this.rebalanceListener = rebalanceListener; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -160,6 +172,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); + map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 34e78548c91..5f8b009803b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; @@ -63,6 +64,8 @@ class KafkaAnnotationDrivenConfiguration { private final AfterRollbackProcessor afterRollbackProcessor; + private final ConsumerAwareRebalanceListener rebalanceListener; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, ObjectProvider batchMessageConverter, @@ -70,7 +73,8 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider> kafkaTransactionManager, ObjectProvider errorHandler, ObjectProvider batchErrorHandler, - ObjectProvider> afterRollbackProcessor) { + ObjectProvider> afterRollbackProcessor, + ObjectProvider rebalanceListener) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.batchMessageConverter = batchMessageConverter.getIfUnique( @@ -80,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration { this.errorHandler = errorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); + this.rebalanceListener = rebalanceListener.getIfUnique(); } @Bean @@ -95,6 +100,7 @@ class KafkaAnnotationDrivenConfiguration { configurer.setErrorHandler(this.errorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); + configurer.setRebalanceListener(this.rebalanceListener); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 935af69698e..585675c265f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -55,6 +55,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; @@ -674,6 +675,18 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() { + this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory.getContainerProperties()) + .hasFieldOrPropertyWithValue("consumerRebalanceListener", + context.getBean("rebalanceListener")); + }); + } + @Test public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() { this.contextRunner.run((context) -> { @@ -749,6 +762,17 @@ public class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + protected static class RebalanceListenerConfiguration { + + @Bean + public ConsumerAwareRebalanceListener rebalanceListener() { + return new ConsumerAwareRebalanceListener() { + }; + } + + } + @Configuration(proxyBeanMethods = false) @EnableKafkaStreams protected static class EnableKafkaStreamsConfiguration {