diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index b7d40d0195b..3ac3d9b0bcb 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2016 the original author or authors. + * Copyright 2012-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } + if (container.getType() == Listener.Type.BATCH) { + listenerContainerFactory.setBatchListener(true); + } } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index cd96bd5463d..51a8fffe6e2 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -645,6 +645,25 @@ public class KafkaProperties { public static class Listener { + public enum Type { + + /** + * Invokes the endpoint with one ConsumerRecord at a time. + */ + SINGLE, + + /** + * Invokes the endpoint with a batch of ConsumerRecord. + */ + BATCH; + + } + + /** + * Listener type. + */ + private Type type = Type.SINGLE; + /** * Listener AckMode; see the spring-kafka documentation. */ @@ -672,6 +691,14 @@ public class KafkaProperties { */ private Long ackTime; + public Type getType() { + return this.type; + } + + public void setType(Type type) { + this.type = type; + } + public AckMode getAckMode() { return this.ackMode; } diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 356299c4b0d..04bf53776c3 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -329,6 +329,10 @@ "name": "spring.kafka.jaas.control-flag", "defaultValue": "required" }, + { + "name": "spring.kafka.listener.type", + "defaultValue": "single" + }, { "name": "spring.mobile.devicedelegatingviewresolver.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 34328bbf740..b79f447ee18 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -176,6 +176,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true"); @@ -198,6 +199,8 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(dfa.getPropertyValue("batchListener")) + .isEqualTo(true); assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .hasSize(1); KafkaJaasLoginModuleInitializer jaas = this.context diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index c39e8715234..88f45dadb3f 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -986,6 +986,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer. + spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Number of records to batch before sending. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.