diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 0fbc9b52a7e..0fddd289611 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -98,7 +98,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { * Set the {@link BatchErrorHandler} to use. * @param batchErrorHandler the error handler */ - public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { + void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { this.batchErrorHandler = batchErrorHandler; } @@ -133,15 +133,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getConcurrency).to(factory::setConcurrency); map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.replyTemplate).to(factory::setReplyTemplate); - map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) - .toCall(() -> factory.setBatchListener(true)); - map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); if (properties.getType().equals(Listener.Type.BATCH)) { + factory.setBatchListener(true); factory.setBatchErrorHandler(this.batchErrorHandler); } else { factory.setErrorHandler(this.errorHandler); } + map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); } private void configureContainer(ContainerProperties container) {