From b710dc71feefb3e892860fddff38f53575f0be5c Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Wed, 17 Apr 2019 11:36:10 +0200 Subject: [PATCH] Polish "Use BatchErrorHandler when Kafka listener type is batch" Closes gh-16499 --- .../ConcurrentKafkaListenerContainerFactoryConfigurer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 0fbc9b52a7e..0fddd289611 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -98,7 +98,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { * Set the {@link BatchErrorHandler} to use. * @param batchErrorHandler the error handler */ - public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { + void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { this.batchErrorHandler = batchErrorHandler; } @@ -133,15 +133,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getConcurrency).to(factory::setConcurrency); map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.replyTemplate).to(factory::setReplyTemplate); - map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) - .toCall(() -> factory.setBatchListener(true)); - map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); if (properties.getType().equals(Listener.Type.BATCH)) { + factory.setBatchListener(true); factory.setBatchErrorHandler(this.batchErrorHandler); } else { factory.setErrorHandler(this.errorHandler); } + map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); } private void configureContainer(ContainerProperties container) {