Browse Source

Polish "Use BatchErrorHandler when Kafka listener type is batch"

Closes gh-16499
pull/16593/head
Stephane Nicoll 7 years ago
parent
commit
b710dc71fe
  1. 7
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

7
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

@ -98,7 +98,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* Set the {@link BatchErrorHandler} to use. * Set the {@link BatchErrorHandler} to use.
* @param batchErrorHandler the error handler * @param batchErrorHandler the error handler
*/ */
public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) { void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
this.batchErrorHandler = batchErrorHandler; this.batchErrorHandler = batchErrorHandler;
} }
@ -133,15 +133,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getConcurrency).to(factory::setConcurrency); map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.replyTemplate).to(factory::setReplyTemplate); map.from(this.replyTemplate).to(factory::setReplyTemplate);
map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
.toCall(() -> factory.setBatchListener(true));
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
if (properties.getType().equals(Listener.Type.BATCH)) { if (properties.getType().equals(Listener.Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler); factory.setBatchErrorHandler(this.batchErrorHandler);
} }
else { else {
factory.setErrorHandler(this.errorHandler); factory.setErrorHandler(this.errorHandler);
} }
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
} }
private void configureContainer(ContainerProperties container) { private void configureContainer(ContainerProperties container) {

Loading…
Cancel
Save