Browse Source

Auto-configure Spring Kafka ContainerCustomizer

See gh-34033
pull/34122/head
Guirong Hu 3 years ago committed by Moritz Halbritter
parent
commit
b514ea776e
  1. 8
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
  2. 23
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

8
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import org.springframework.context.annotation.Bean; @@ -24,6 +24,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@ -31,6 +32,7 @@ import org.springframework.kafka.core.KafkaTemplate; @@ -31,6 +32,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@ -121,10 +123,12 @@ class KafkaAnnotationDrivenConfiguration { @@ -121,10 +123,12 @@ class KafkaAnnotationDrivenConfiguration {
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
return factory;
}

23
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

@ -53,6 +53,7 @@ import org.springframework.kafka.annotation.EnableKafkaStreams; @@ -53,6 +53,7 @@ import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@ -65,6 +66,7 @@ import org.springframework.kafka.core.KafkaTemplate; @@ -65,6 +66,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@ -696,6 +698,17 @@ class KafkaAutoConfigurationTests { @@ -696,6 +698,17 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizer() {
this.contextRunner.withUserConfiguration(ObservationEnabledContainerCustomizerConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
ConcurrentMessageListenerContainer<?, ?> container = factory.createContainer("someTopic");
assertThat(container.getContainerProperties().isObservationEnabled()).isEqualTo(true);
});
}
@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",
@ -765,6 +778,16 @@ class KafkaAutoConfigurationTests { @@ -765,6 +778,16 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class ObservationEnabledContainerCustomizerConfiguration {
@Bean
ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>> myContainerCustomizer() {
return (container) -> container.getContainerProperties().setObservationEnabled(true);
}
}
@Configuration(proxyBeanMethods = false)
static class RecordInterceptorConfiguration {

Loading…
Cancel
Save