Browse Source

Automatically configure Spring Kafka's observation convention beans

This automatically registers KafkaListenerObservationConvention on the
container factory, and KafkaTemplateObservationConvention on the
Kafka template.

Closes gh-48914
pull/48952/head
Moritz Halbritter 2 weeks ago
parent
commit
d65eb69647
  1. 4
      documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc
  2. 12
      module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java
  3. 8
      module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java
  4. 5
      module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java
  5. 9
      module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java
  6. 11
      module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

4
documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc

@ -32,6 +32,8 @@ include-code::MyBean[] @@ -32,6 +32,8 @@ include-code::MyBean[]
NOTE: If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] is automatically configured.
Also, if a javadoc:org.springframework.kafka.support.converter.RecordMessageConverter[] bean is defined, it is automatically associated to the auto-configured javadoc:org.springframework.kafka.core.KafkaTemplate[].
If there's a bean of type `KafkaTemplateObservationConvention` in the context, it is automatically registered on the `KafkaTemplate`.
[[messaging.kafka.receiving]]
@ -52,6 +54,8 @@ If only a javadoc:org.springframework.kafka.support.converter.RecordMessageConve @@ -52,6 +54,8 @@ If only a javadoc:org.springframework.kafka.support.converter.RecordMessageConve
TIP: A custom javadoc:org.springframework.kafka.transaction.ChainedKafkaTransactionManager[] must be marked javadoc:org.springframework.context.annotation.Primary[format=annotation] as it usually references the auto-configured javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] bean.
If there's a bean of type `KafkaListenerObservationConvention` in the context, it is automatically registered on the container factory.
[[messaging.kafka.streams]]

12
module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java

@ -37,6 +37,7 @@ import org.springframework.kafka.listener.RecordInterceptor; @@ -37,6 +37,7 @@ import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.util.Assert;
@ -82,6 +83,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -82,6 +83,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private @Nullable SimpleAsyncTaskExecutor listenerTaskExecutor;
private @Nullable KafkaListenerObservationConvention observationConvention;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -186,6 +189,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -186,6 +189,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.listenerTaskExecutor = listenerTaskExecutor;
}
/**
* Sets the observation convention.
* @param observationConvention the observation convention
*/
void setObservationConvention(@Nullable KafkaListenerObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -249,6 +260,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -249,6 +260,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.transactionManager).to(container::setKafkaAwareTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);
map.from(this.observationConvention).to(container::setObservationConvention);
}
}

8
module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java

@ -46,6 +46,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -46,6 +46,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/**
@ -86,6 +87,8 @@ class KafkaAnnotationDrivenConfiguration { @@ -86,6 +87,8 @@ class KafkaAnnotationDrivenConfiguration {
private final @Nullable Function<MessageListenerContainer, String> threadNameSupplier;
private final @Nullable KafkaListenerObservationConvention observationConvention;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> recordMessageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
@ -97,7 +100,8 @@ class KafkaAnnotationDrivenConfiguration { @@ -97,7 +100,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor,
ObjectProvider<Function<MessageListenerContainer, String>> threadNameSupplier) {
ObjectProvider<Function<MessageListenerContainer, String>> threadNameSupplier,
ObjectProvider<KafkaListenerObservationConvention> observationConvention) {
this.properties = properties;
this.recordMessageConverter = recordMessageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
@ -111,6 +115,7 @@ class KafkaAnnotationDrivenConfiguration { @@ -111,6 +115,7 @@ class KafkaAnnotationDrivenConfiguration {
this.recordInterceptor = recordInterceptor.getIfUnique();
this.batchInterceptor = batchInterceptor.getIfUnique();
this.threadNameSupplier = threadNameSupplier.getIfUnique();
this.observationConvention = observationConvention.getIfUnique();
}
@Bean
@ -145,6 +150,7 @@ class KafkaAnnotationDrivenConfiguration { @@ -145,6 +150,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setRecordInterceptor(this.recordInterceptor);
configurer.setBatchInterceptor(this.batchInterceptor);
configurer.setThreadNameSupplier(this.threadNameSupplier);
configurer.setObservationConvention(this.observationConvention);
return configurer;
}

5
module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java

@ -61,6 +61,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -61,6 +61,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOff;
@ -103,10 +104,12 @@ public final class KafkaAutoConfiguration { @@ -103,10 +104,12 @@ public final class KafkaAutoConfiguration {
@ConditionalOnMissingBean(KafkaTemplate.class)
KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplateObservationConvention> observationConvention) {
PropertyMapper map = PropertyMapper.get();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
observationConvention.ifUnique(kafkaTemplate::setObservationConvention);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);

9
module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java

@ -26,6 +26,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; @@ -26,6 +26,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.then;
@ -89,4 +90,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests { @@ -89,4 +90,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
.isEqualTo(Duration.ofSeconds(10));
}
@Test
void shouldApplyObservationConvention() {
DefaultKafkaListenerObservationConvention convention = new DefaultKafkaListenerObservationConvention();
this.configurer.setObservationConvention(convention);
this.configurer.configure(this.factory, this.consumerFactory);
assertThat(this.factory.getContainerProperties().getObservationConvention()).isSameAs(convention);
}
}

11
module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

@ -94,6 +94,8 @@ import org.springframework.kafka.support.converter.BatchMessageConverter; @@ -94,6 +94,8 @@ import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;
@ -1015,6 +1017,15 @@ class KafkaAutoConfigurationTests { @@ -1015,6 +1017,15 @@ class KafkaAutoConfigurationTests {
.withMemberCategories(MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS)).accepts(runtimeHints);
}
@Test
void shouldConfigureObservationConvention() {
KafkaTemplateObservationConvention convention = new DefaultKafkaTemplateObservationConvention();
this.contextRunner.withBean(KafkaTemplateObservationConvention.class, () -> convention).run((context) -> {
KafkaTemplate<?, ?> template = context.getBean(KafkaTemplate.class);
assertThat(template).hasFieldOrPropertyWithValue("observationConvention", convention);
});
}
private KafkaConnectionDetails kafkaConnectionDetails() {
return new KafkaConnectionDetails() {

Loading…
Cancel
Save