Browse Source

Merge branch '2.1.x' into 2.2.x

Closes gh-20615
pull/20629/head
Stephane Nicoll 6 years ago
parent
commit
39b965e1fd
  1. 8
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
  2. 25
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

8
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ import org.springframework.kafka.annotation.EnableKafka; @@ -26,6 +26,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
@ -112,9 +113,10 @@ class KafkaAnnotationDrivenConfiguration { @@ -112,9 +113,10 @@ class KafkaAnnotationDrivenConfiguration {
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

25
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -50,6 +50,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -50,6 +50,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
@ -569,6 +570,16 @@ class KafkaAutoConfigurationTests { @@ -569,6 +570,16 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() {
this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory.getConsumerFactory())
.isNotSameAs(context.getBean(ConsumerFactoryConfiguration.class).consumerFactory);
});
}
@Configuration(proxyBeanMethods = false)
static class MessageConverterConfiguration {
@ -633,6 +644,18 @@ class KafkaAutoConfigurationTests { @@ -633,6 +644,18 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class ConsumerFactoryConfiguration {
private final ConsumerFactory<String, Object> consumerFactory = mock(ConsumerFactory.class);
@Bean
ConsumerFactory<String, Object> myConsumerFactory() {
return this.consumerFactory;
}
}
@Configuration(proxyBeanMethods = false)
static class RecordInterceptorConfiguration {

Loading…
Cancel
Save