Browse Source

Merge pull request #14215 from garyrussell:kafka

* pr/14215:
  Polish "Improve Kafka Auto-configuration"
  Improve Kafka Auto-configuration
pull/14228/head
Stephane Nicoll 8 years ago
parent
commit
76c6f05586
  1. 40
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
  2. 22
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
  3. 185
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java
  4. 2
      spring-boot-project/spring-boot-dependencies/pom.xml
  5. 13
      spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

40
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

@ -23,8 +23,11 @@ import org.springframework.boot.context.properties.PropertyMapper; @@ -23,8 +23,11 @@ import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
@ -41,6 +44,12 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -41,6 +44,12 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaTemplate<Object, Object> replyTemplate;
private KafkaAwareTransactionManager<Object, Object> transactionManager;
private ErrorHandler errorHandler;
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -65,6 +74,32 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -65,6 +74,32 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.replyTemplate = replyTemplate;
}
/**
* Set the {@link KafkaAwareTransactionManager} to use.
* @param transactionManager the transaction manager
*/
void setTransactionManager(
KafkaAwareTransactionManager<Object, Object> transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler
*/
void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor
*/
void setAfterRollbackProcessor(
AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -89,6 +124,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -89,6 +124,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.replyTemplate).whenNonNull().to(factory::setReplyTemplate);
map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
.toCall(() -> factory.setBatchListener(true));
map.from(this.errorHandler).whenNonNull().to(factory::setErrorHandler);
map.from(this.afterRollbackProcessor).whenNonNull()
.to(factory::setAfterRollbackProcessor);
}
private void configureContainer(ContainerProperties container) {
@ -109,6 +147,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -109,6 +147,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).whenNonNull()
.to(container::setLogContainerConfig);
map.from(this.transactionManager).whenNonNull()
.to(container::setTransactionManager);
}
}

22
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,7 +26,10 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -26,7 +26,10 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/**
* Configuration for Kafka annotation-driven support.
@ -45,12 +48,24 @@ class KafkaAnnotationDrivenConfiguration { @@ -45,12 +48,24 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
private final ErrorHandler errorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate) {
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
}
@Bean
@ -60,6 +75,9 @@ class KafkaAnnotationDrivenConfiguration { @@ -60,6 +75,9 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setErrorHandler(this.errorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
return configurer;
}

185
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@ -50,12 +51,17 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -50,12 +51,17 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
@ -76,31 +82,29 @@ public class KafkaAutoConfigurationTests { @@ -76,31 +82,29 @@ public class KafkaAutoConfigurationTests {
@Test
public void consumerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar",
"spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.key-store-location=classpath:ksLoc",
"spring.kafka.ssl.key-store-password=p2",
"spring.kafka.ssl.key-store-type=PKCS12",
"spring.kafka.ssl.trust-store-location=classpath:tsLoc",
"spring.kafka.ssl.trust-store-password=p3",
"spring.kafka.ssl.trust-store-type=PKCS12",
"spring.kafka.ssl.protocol=TLSv1.2",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox",
"spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
this.contextRunner.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar", "spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.key-store-location=classpath:ksLoc",
"spring.kafka.ssl.key-store-password=p2",
"spring.kafka.ssl.key-store-type=PKCS12",
"spring.kafka.ssl.trust-store-location=classpath:tsLoc",
"spring.kafka.ssl.trust-store-password=p3",
"spring.kafka.ssl.trust-store-type=PKCS12",
"spring.kafka.ssl.protocol=TLSv1.2",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox",
"spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
.run((context) -> {
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.getBean(DefaultKafkaConsumerFactory.class);
@ -160,27 +164,25 @@ public class KafkaAutoConfigurationTests { @@ -160,27 +164,25 @@ public class KafkaAutoConfigurationTests {
@Test
public void producerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test
// override
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.properties.fiz.buz=fix.fox",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.producer.ssl.key-store-password=p5",
"spring.kafka.producer.ssl.key-store-type=PKCS12",
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.producer.ssl.trust-store-password=p6",
"spring.kafka.producer.ssl.trust-store-type=PKCS12",
"spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test
// override
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.properties.fiz.buz=fix.fox",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.producer.ssl.key-store-password=p5",
"spring.kafka.producer.ssl.key-store-type=PKCS12",
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.producer.ssl.trust-store-password=p6",
"spring.kafka.producer.ssl.trust-store-type=PKCS12",
"spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
.run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context
.getBean(DefaultKafkaProducerFactory.class);
@ -405,7 +407,7 @@ public class KafkaAutoConfigurationTests { @@ -405,7 +407,7 @@ public class KafkaAutoConfigurationTests {
@SuppressWarnings("unchecked")
@Test
public void listenerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
this.contextRunner
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.client-id=client",
@ -485,6 +487,7 @@ public class KafkaAutoConfigurationTests { @@ -485,6 +487,7 @@ public class KafkaAutoConfigurationTests {
@Test
public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
KafkaTemplate<?, ?> kafkaTemplate = context
.getBean(KafkaTemplate.class);
@ -506,6 +509,56 @@ public class KafkaAutoConfigurationTests { @@ -506,6 +509,56 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("errorHandler"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
this.contextRunner
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
assertThat(context).hasSingleBean(KafkaAwareTransactionManager.class);
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getTransactionManager())
.isSameAs(
context.getBean(KafkaAwareTransactionManager.class));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomTransactionManager() {
this.contextRunner.withUserConfiguration(TransactionManagerConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getTransactionManager())
.isSameAs(context.getBean("chainedTransactionManager"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomAfterRollbackProcessor() {
this.contextRunner
.withUserConfiguration(AfterRollbackProcessorConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(factory);
assertThat(dfa.getPropertyValue("afterRollbackProcessor"))
.isSameAs(context.getBean("afterRollbackProcessor"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
this.contextRunner.run((context) -> {
@ -519,16 +572,46 @@ public class KafkaAutoConfigurationTests { @@ -519,16 +572,46 @@ public class KafkaAutoConfigurationTests {
}
@Configuration
protected static class TestConfiguration {
protected static class MessageConverterConfiguration {
@Bean
public RecordMessageConverter myMessageConverter() {
return mock(RecordMessageConverter.class);
}
}
@Configuration
protected static class MessageConverterConfiguration {
protected static class ErrorHandlerConfiguration {
@Bean
public RecordMessageConverter myMessageConverter() {
return mock(RecordMessageConverter.class);
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
}
@Configuration
protected static class TransactionManagerConfiguration {
@Bean
@Primary
public PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(
kafkaTransactionManager);
}
}
@Configuration
protected static class AfterRollbackProcessorConfiguration {
@Bean
public AfterRollbackProcessor<Object, Object> afterRollbackProcessor() {
return (records, consumer, ex, recoverable) -> {
// no-op
};
}
}

2
spring-boot-project/spring-boot-dependencies/pom.xml

@ -161,7 +161,7 @@ @@ -161,7 +161,7 @@
<spring-data-releasetrain.version>Lovelace-RC2</spring-data-releasetrain.version>
<spring-hateoas.version>0.25.0.RELEASE</spring-hateoas.version>
<spring-integration.version>5.1.0.M2</spring-integration.version>
<spring-kafka.version>2.2.0.M2</spring-kafka.version>
<spring-kafka.version>2.2.0.BUILD-SNAPSHOT</spring-kafka.version>
<spring-ldap.version>2.3.2.RELEASE</spring-ldap.version>
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
<spring-restdocs.version>2.0.2.RELEASE</spring-restdocs.version>

13
spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

@ -5627,8 +5627,7 @@ bean is defined, it is automatically associated to the auto-configured `KafkaTem @@ -5627,8 +5627,7 @@ bean is defined, it is automatically associated to the auto-configured `KafkaTem
When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has
been defined, a default one is automatically configured with keys defined in
`spring.kafka.listener.*`. Also, if a `RecordMessageConverter` bean is defined, it is
automatically associated to the default factory.
`spring.kafka.listener.*`.
The following component creates a listener endpoint on the `someTopic` topic:
@ -5645,6 +5644,16 @@ The following component creates a listener endpoint on the `someTopic` topic: @@ -5645,6 +5644,16 @@ The following component creates a listener endpoint on the `someTopic` topic:
}
----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
container factory. Similarly, if a `RecordMessageConverter`, `ErrorHandler` or
`AfterRollbackProcessor` bean is defined, it is automatically associated to the default
factory.
TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually
reference the auto-configured `KafkaTransactionManager` bean.
[[boot-features-kafka-streams]]
==== Kafka Streams
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and

Loading…
Cancel
Save