From d65eb696476f35c3967cc7ff03c2b3d10b44b943 Mon Sep 17 00:00:00 2001 From: Moritz Halbritter Date: Tue, 20 Jan 2026 15:01:50 +0100 Subject: [PATCH] Automatically configure Spring Kafka's observation convention beans This automatically registers KafkaListenerObservationConvention on the container factory, and KafkaTemplateObservationConvention on the Kafka template. Closes gh-48914 --- .../modules/reference/pages/messaging/kafka.adoc | 4 ++++ ...rrentKafkaListenerContainerFactoryConfigurer.java | 12 ++++++++++++ .../KafkaAnnotationDrivenConfiguration.java | 8 +++++++- .../kafka/autoconfigure/KafkaAutoConfiguration.java | 5 ++++- ...KafkaListenerContainerFactoryConfigurerTests.java | 9 +++++++++ .../autoconfigure/KafkaAutoConfigurationTests.java | 11 +++++++++++ 6 files changed, 47 insertions(+), 2 deletions(-) diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc index 5fc72a22660..c4e7e325a52 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc @@ -32,6 +32,8 @@ include-code::MyBean[] NOTE: If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] is automatically configured. Also, if a javadoc:org.springframework.kafka.support.converter.RecordMessageConverter[] bean is defined, it is automatically associated to the auto-configured javadoc:org.springframework.kafka.core.KafkaTemplate[]. +If there's a bean of type `KafkaTemplateObservationConvention` in the context, it is automatically registered on the `KafkaTemplate`. + [[messaging.kafka.receiving]] @@ -52,6 +54,8 @@ If only a javadoc:org.springframework.kafka.support.converter.RecordMessageConve TIP: A custom javadoc:org.springframework.kafka.transaction.ChainedKafkaTransactionManager[] must be marked javadoc:org.springframework.context.annotation.Primary[format=annotation] as it usually references the auto-configured javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] bean. +If there's a bean of type `KafkaListenerObservationConvention` in the context, it is automatically registered on the container factory. + [[messaging.kafka.streams]] diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 12b8c1ccbd1..df90651c947 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -37,6 +37,7 @@ import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.util.Assert; @@ -82,6 +83,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private @Nullable SimpleAsyncTaskExecutor listenerTaskExecutor; + private @Nullable KafkaListenerObservationConvention observationConvention; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -186,6 +189,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.listenerTaskExecutor = listenerTaskExecutor; } + /** + * Sets the observation convention. + * @param observationConvention the observation convention + */ + void setObservationConvention(@Nullable KafkaListenerObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -249,6 +260,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(this.transactionManager).to(container::setKafkaAwareTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor); + map.from(this.observationConvention).to(container::setObservationConvention); } } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java index 4182e4fca22..19d52507e22 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java @@ -46,6 +46,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; /** @@ -86,6 +87,8 @@ class KafkaAnnotationDrivenConfiguration { private final @Nullable Function threadNameSupplier; + private final @Nullable KafkaListenerObservationConvention observationConvention; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider recordMessageConverter, ObjectProvider> recordFilterStrategy, @@ -97,7 +100,8 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider> afterRollbackProcessor, ObjectProvider> recordInterceptor, ObjectProvider> batchInterceptor, - ObjectProvider> threadNameSupplier) { + ObjectProvider> threadNameSupplier, + ObjectProvider observationConvention) { this.properties = properties; this.recordMessageConverter = recordMessageConverter.getIfUnique(); this.recordFilterStrategy = recordFilterStrategy.getIfUnique(); @@ -111,6 +115,7 @@ class KafkaAnnotationDrivenConfiguration { this.recordInterceptor = recordInterceptor.getIfUnique(); this.batchInterceptor = batchInterceptor.getIfUnique(); this.threadNameSupplier = threadNameSupplier.getIfUnique(); + this.observationConvention = observationConvention.getIfUnique(); } @Bean @@ -145,6 +150,7 @@ class KafkaAnnotationDrivenConfiguration { configurer.setRecordInterceptor(this.recordInterceptor); configurer.setBatchInterceptor(this.batchInterceptor); configurer.setThreadNameSupplier(this.threadNameSupplier); + configurer.setObservationConvention(this.observationConvention); return configurer; } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java index 4e370eeb341..8a79e2803c7 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java @@ -61,6 +61,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention; import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.util.StringUtils; import org.springframework.util.backoff.BackOff; @@ -103,10 +104,12 @@ public final class KafkaAutoConfiguration { @ConditionalOnMissingBean(KafkaTemplate.class) KafkaTemplate kafkaTemplate(ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener, - ObjectProvider messageConverter) { + ObjectProvider messageConverter, + ObjectProvider observationConvention) { PropertyMapper map = PropertyMapper.get(); KafkaTemplate kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); + observationConvention.ifUnique(kafkaTemplate::setObservationConvention); map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); diff --git a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java index 33becc8bf22..932ee15a73d 100644 --- a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java +++ b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java @@ -26,6 +26,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.then; @@ -89,4 +90,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests { .isEqualTo(Duration.ofSeconds(10)); } + @Test + void shouldApplyObservationConvention() { + DefaultKafkaListenerObservationConvention convention = new DefaultKafkaListenerObservationConvention(); + this.configurer.setObservationConvention(convention); + this.configurer.configure(this.factory, this.consumerFactory); + assertThat(this.factory.getContainerProperties().getObservationConvention()).isSameAs(convention); + } + } diff --git a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java index d6b87983c36..e5726228637 100644 --- a/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java +++ b/module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java @@ -94,6 +94,8 @@ import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; +import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.test.util.ReflectionTestUtils; @@ -1015,6 +1017,15 @@ class KafkaAutoConfigurationTests { .withMemberCategories(MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS)).accepts(runtimeHints); } + @Test + void shouldConfigureObservationConvention() { + KafkaTemplateObservationConvention convention = new DefaultKafkaTemplateObservationConvention(); + this.contextRunner.withBean(KafkaTemplateObservationConvention.class, () -> convention).run((context) -> { + KafkaTemplate template = context.getBean(KafkaTemplate.class); + assertThat(template).hasFieldOrPropertyWithValue("observationConvention", convention); + }); + } + private KafkaConnectionDetails kafkaConnectionDetails() { return new KafkaConnectionDetails() {