Browse Source

Merge pull request #44901 from quaff

* pr/44901:
  Add 'spring.kafka.consumer.max-poll-interval' configuration property

Closes gh-44901
pull/44917/head
Moritz Halbritter 11 months ago
parent
commit
63e796c8fe
  1. 18
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java
  2. 7
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

18
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

@ -59,6 +59,7 @@ import org.springframework.util.unit.DataSize; @@ -59,6 +59,7 @@ import org.springframework.util.unit.DataSize;
* @author Tomaz Fernandes
* @author Andy Wilkinson
* @author Scott Frederick
* @author Yanming Zhou
* @since 1.5.0
*/
@ConfigurationProperties("spring.kafka")
@ -337,6 +338,12 @@ public class KafkaProperties { @@ -337,6 +338,12 @@ public class KafkaProperties {
*/
private Integer maxPollRecords;
/**
* Maximum delay between invocations of poll() when using consumer group
* management.
*/
private Duration maxPollInterval;
/**
* Additional consumer-specific properties used to configure the client.
*/
@ -454,6 +461,14 @@ public class KafkaProperties { @@ -454,6 +461,14 @@ public class KafkaProperties {
this.maxPollRecords = maxPollRecords;
}
public Duration getMaxPollInterval() {
return this.maxPollInterval;
}
public void setMaxPollInterval(Duration maxPollInterval) {
this.maxPollInterval = maxPollInterval;
}
public Map<String, String> getProperties() {
return this.properties;
}
@ -483,6 +498,9 @@ public class KafkaProperties { @@ -483,6 +498,9 @@ public class KafkaProperties {
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
map.from(this::getMaxPollInterval)
.asInt(Duration::toMillis)
.to(properties.in(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG));
return properties.with(this.ssl, this.security, this.properties, sslBundles);
}

7
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

@ -115,6 +115,7 @@ import static org.mockito.Mockito.never; @@ -115,6 +115,7 @@ import static org.mockito.Mockito.never;
* @author Andy Wilkinson
* @author Phillip Webb
* @author Scott Frederick
* @author Yanming Zhou
*/
class KafkaAutoConfigurationTests {
@ -132,8 +133,9 @@ class KafkaAutoConfigurationTests { @@ -132,8 +133,9 @@ class KafkaAutoConfigurationTests {
"spring.kafka.ssl.key-store-type=PKCS12", "spring.kafka.ssl.trust-store-location=classpath:tsLoc",
"spring.kafka.ssl.trust-store-password=p3", "spring.kafka.ssl.trust-store-type=PKCS12",
"spring.kafka.ssl.protocol=TLSv1.2", "spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.max-poll-interval=30s",
"spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.client-id=ccid",
// test override common
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
@ -172,6 +174,7 @@ class KafkaAutoConfigurationTests { @@ -172,6 +174,7 @@ class KafkaAutoConfigurationTests {
assertThat(configs).containsEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
assertThat(configs).containsEntry(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 42);
assertThat(configs).containsEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
assertThat(configs).containsEntry("foo", "bar");
assertThat(configs).containsEntry("baz", "qux");
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");

Loading…
Cancel
Save