From 6fcbf80b31b33ca725c58e785ae257c8dcb2f627 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 4 Jan 2018 11:45:13 -0500 Subject: [PATCH] Add support for additional Kafka listener properties See gh-11502 --- ...fkaListenerContainerFactoryConfigurer.java | 7 +- .../autoconfigure/kafka/KafkaProperties.java | 67 ++++++++++++++++++- .../kafka/KafkaAutoConfigurationTests.java | 17 ++++- .../spring-boot-dependencies/pom.xml | 2 +- .../appendix-application-properties.adoc | 5 ++ 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index ca635c02503..078beb8b2ba 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +101,11 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { .to(container::setAckTime); map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis) .to(container::setPollTimeout); + map.from(properties::getClientId).whenNonNull().to(container::setClientId); + map.from(properties::getIdleEventInterval).whenNonNull().to(container::setIdleEventInterval); + map.from(properties::getMonitorInterval).whenNonNull().to(container::setMonitorInterval); + map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold); + map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index a64a9670f5e..54466b6f9dc 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -818,6 +818,31 @@ public class KafkaProperties { */ private Duration ackTime; + /** + * Prefix for the listener's consumer client.id property. + */ + private String clientId; + + /** + * Interval (ms) between publishing idle consumer events (no data received). + */ + private Long idleEventInterval; + + /** + * Interval (seconds) between checks for non-responsive consumers. + */ + private Integer monitorInterval; + + /** + * Multiplier applied to pollTimeout to determine if a consumer is non-responsive. + */ + private Float noPollThreshold; + + /** + * When true, log the container configuration during initialization (INFO level). + */ + private Boolean logContainerConfig; + public Type getType() { return this.type; } @@ -866,6 +891,46 @@ public class KafkaProperties { this.ackTime = ackTime; } + public String getClientId() { + return this.clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public Long getIdleEventInterval() { + return this.idleEventInterval; + } + + public void setIdleEventInterval(Long idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + + public Integer getMonitorInterval() { + return this.monitorInterval; + } + + public void setMonitorInterval(Integer monitorInterval) { + this.monitorInterval = monitorInterval; + } + + public Float getNoPollThreshold() { + return this.noPollThreshold; + } + + public void setNoPollThreshold(Float noPollThreshold) { + this.noPollThreshold = noPollThreshold; + } + + public Boolean getLogContainerConfig() { + return this.logContainerConfig; + } + + public void setLogContainerConfig(Boolean logContainerConfig) { + this.logContainerConfig = logContainerConfig; + } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 1fb5e66a840..c013be10eb5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -259,6 +259,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.type=batch", + "spring.kafka.listener.client-id=client", + "spring.kafka.listener.idle-event-interval=12345", + "spring.kafka.listener.monitor-interval=45", + "spring.kafka.listener.no-poll-threshold=2.5", + "spring.kafka.listener.log-container-config=true", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", @@ -292,6 +297,16 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(dfa.getPropertyValue("containerProperties.clientId")) + .isEqualTo("client"); + assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval")) + .isEqualTo(12345L); + assertThat(dfa.getPropertyValue("containerProperties.monitorInterval")) + .isEqualTo(45); + assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold")) + .isEqualTo(2.5f); + assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig")) + .isEqualTo(Boolean.TRUE); assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); assertThat( context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) diff --git a/spring-boot-project/spring-boot-dependencies/pom.xml b/spring-boot-project/spring-boot-dependencies/pom.xml index 806c03ffe82..4d18ab56323 100644 --- a/spring-boot-project/spring-boot-dependencies/pom.xml +++ b/spring-boot-project/spring-boot-dependencies/pom.xml @@ -149,7 +149,7 @@ Kay-SR2 0.24.0.RELEASE 5.0.1.BUILD-SNAPSHOT - 2.1.0.RELEASE + 2.1.1.BUILD-SNAPSHOT 2.3.2.RELEASE 1.2.0.RELEASE 2.0.0.RELEASE diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index d75d1d7b34a..f5fed9d603c 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -987,7 +987,12 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". + spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. + spring.kafka.listener.idle-event-interval= # Interval (ms) between publishing idle consumer events (no data received). + spring.kafka.listener.log-container-config= # When true, log the container configuration during initialization (INFO level). + spring.kafka.listener.monitor-interval= # Interval (seconds) between checks for non-responsive consumers. + spring.kafka.listener.no-poll-threshold= # Multiplier applied to pollTimeout to determine if a consumer is non-responsive. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.