diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
index ca635c02503..078beb8b2ba 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2017 the original author or authors.
+ * Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -101,6 +101,11 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
.to(container::setAckTime);
map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis)
.to(container::setPollTimeout);
+ map.from(properties::getClientId).whenNonNull().to(container::setClientId);
+ map.from(properties::getIdleEventInterval).whenNonNull().to(container::setIdleEventInterval);
+ map.from(properties::getMonitorInterval).whenNonNull().to(container::setMonitorInterval);
+ map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold);
+ map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig);
}
}
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java
index a64a9670f5e..54466b6f9dc 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2017 the original author or authors.
+ * Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -818,6 +818,31 @@ public class KafkaProperties {
*/
private Duration ackTime;
+ /**
+ * Prefix for the listener's consumer client.id property.
+ */
+ private String clientId;
+
+ /**
+ * Interval (ms) between publishing idle consumer events (no data received).
+ */
+ private Long idleEventInterval;
+
+ /**
+ * Interval (seconds) between checks for non-responsive consumers.
+ */
+ private Integer monitorInterval;
+
+ /**
+ * Multiplier applied to pollTimeout to determine if a consumer is non-responsive.
+ */
+ private Float noPollThreshold;
+
+ /**
+ * When true, log the container configuration during initialization (INFO level).
+ */
+ private Boolean logContainerConfig;
+
public Type getType() {
return this.type;
}
@@ -866,6 +891,46 @@ public class KafkaProperties {
this.ackTime = ackTime;
}
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public Long getIdleEventInterval() {
+ return this.idleEventInterval;
+ }
+
+ public void setIdleEventInterval(Long idleEventInterval) {
+ this.idleEventInterval = idleEventInterval;
+ }
+
+ public Integer getMonitorInterval() {
+ return this.monitorInterval;
+ }
+
+ public void setMonitorInterval(Integer monitorInterval) {
+ this.monitorInterval = monitorInterval;
+ }
+
+ public Float getNoPollThreshold() {
+ return this.noPollThreshold;
+ }
+
+ public void setNoPollThreshold(Float noPollThreshold) {
+ this.noPollThreshold = noPollThreshold;
+ }
+
+ public Boolean getLogContainerConfig() {
+ return this.logContainerConfig;
+ }
+
+ public void setLogContainerConfig(Boolean logContainerConfig) {
+ this.logContainerConfig = logContainerConfig;
+ }
+
}
public static class Ssl {
diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
index 1fb5e66a840..c013be10eb5 100644
--- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
+++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2012-2017 the original author or authors.
+ * Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -259,6 +259,11 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.type=batch",
+ "spring.kafka.listener.client-id=client",
+ "spring.kafka.listener.idle-event-interval=12345",
+ "spring.kafka.listener.monitor-interval=45",
+ "spring.kafka.listener.no-poll-threshold=2.5",
+ "spring.kafka.listener.log-container-config=true",
"spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo",
@@ -292,6 +297,16 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L);
+ assertThat(dfa.getPropertyValue("containerProperties.clientId"))
+ .isEqualTo("client");
+ assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval"))
+ .isEqualTo(12345L);
+ assertThat(dfa.getPropertyValue("containerProperties.monitorInterval"))
+ .isEqualTo(45);
+ assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold"))
+ .isEqualTo(2.5f);
+ assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig"))
+ .isEqualTo(Boolean.TRUE);
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
diff --git a/spring-boot-project/spring-boot-dependencies/pom.xml b/spring-boot-project/spring-boot-dependencies/pom.xml
index 806c03ffe82..4d18ab56323 100644
--- a/spring-boot-project/spring-boot-dependencies/pom.xml
+++ b/spring-boot-project/spring-boot-dependencies/pom.xml
@@ -149,7 +149,7 @@
Kay-SR2
0.24.0.RELEASE
5.0.1.BUILD-SNAPSHOT
- 2.1.0.RELEASE
+ 2.1.1.BUILD-SNAPSHOT
2.3.2.RELEASE
1.2.0.RELEASE
2.0.0.RELEASE
diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
index d75d1d7b34a..f5fed9d603c 100644
--- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
+++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
@@ -987,7 +987,12 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
+ spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
+ spring.kafka.listener.idle-event-interval= # Interval (ms) between publishing idle consumer events (no data received).
+ spring.kafka.listener.log-container-config= # When true, log the container configuration during initialization (INFO level).
+ spring.kafka.listener.monitor-interval= # Interval (seconds) between checks for non-responsive consumers.
+ spring.kafka.listener.no-poll-threshold= # Multiplier applied to pollTimeout to determine if a consumer is non-responsive.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.