diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index bdc00693f57..11dcb632931 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -23,8 +23,11 @@ import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; /** * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. @@ -41,6 +44,12 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaTemplate replyTemplate; + private KafkaAwareTransactionManager transactionManager; + + private ErrorHandler errorHandler; + + private AfterRollbackProcessor afterRollbackProcessor; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -65,6 +74,32 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.replyTemplate = replyTemplate; } + /** + * Set the {@link KafkaAwareTransactionManager} to use. + * @param transactionManager the transaction manager + */ + void setTransactionManager( + KafkaAwareTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + /** + * Set the {@link ErrorHandler} to use. + * @param errorHandler the error handler + */ + void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Set the {@link AfterRollbackProcessor} to use. + * @param afterRollbackProcessor the after rollback processor + */ + void setAfterRollbackProcessor( + AfterRollbackProcessor afterRollbackProcessor) { + this.afterRollbackProcessor = afterRollbackProcessor; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -89,6 +124,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(this.replyTemplate).whenNonNull().to(factory::setReplyTemplate); map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) .toCall(() -> factory.setBatchListener(true)); + map.from(this.errorHandler).whenNonNull().to(factory::setErrorHandler); + map.from(this.afterRollbackProcessor).whenNonNull() + .to(factory::setAfterRollbackProcessor); } private void configureContainer(ContainerProperties container) { @@ -109,6 +147,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { .as(Number::intValue).to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).whenNonNull() .to(container::setLogContainerConfig); + map.from(this.transactionManager).whenNonNull() + .to(container::setTransactionManager); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 57ee0c9c7ed..e6d4af48bcf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,10 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AfterRollbackProcessor; +import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; /** * Configuration for Kafka annotation-driven support. @@ -45,12 +48,24 @@ class KafkaAnnotationDrivenConfiguration { private final KafkaTemplate kafkaTemplate; + private final KafkaAwareTransactionManager transactionManager; + + private final ErrorHandler errorHandler; + + private final AfterRollbackProcessor afterRollbackProcessor; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, - ObjectProvider> kafkaTemplate) { + ObjectProvider> kafkaTemplate, + ObjectProvider> kafkaTransactionManager, + ObjectProvider errorHandler, + ObjectProvider> afterRollbackProcessor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.kafkaTemplate = kafkaTemplate.getIfUnique(); + this.transactionManager = kafkaTransactionManager.getIfUnique(); + this.errorHandler = errorHandler.getIfUnique(); + this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); } @Bean @@ -60,6 +75,9 @@ class KafkaAnnotationDrivenConfiguration { configurer.setKafkaProperties(this.properties); configurer.setMessageConverter(this.messageConverter); configurer.setReplyTemplate(this.kafkaTemplate); + configurer.setTransactionManager(this.transactionManager); + configurer.setErrorHandler(this.errorHandler); + configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 816559e8864..1a4881334d9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -50,12 +51,17 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.kafka.transaction.ChainedKafkaTransactionManager; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -76,31 +82,29 @@ public class KafkaAutoConfigurationTests { @Test public void consumerProperties() { - this.contextRunner.withUserConfiguration(TestConfiguration.class) - .withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", - "spring.kafka.properties.foo=bar", - "spring.kafka.properties.baz=qux", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.ssl.key-password=p1", - "spring.kafka.ssl.key-store-location=classpath:ksLoc", - "spring.kafka.ssl.key-store-password=p2", - "spring.kafka.ssl.key-store-type=PKCS12", - "spring.kafka.ssl.trust-store-location=classpath:tsLoc", - "spring.kafka.ssl.trust-store-password=p3", - "spring.kafka.ssl.trust-store-type=PKCS12", - "spring.kafka.ssl.protocol=TLSv1.2", - "spring.kafka.consumer.auto-commit-interval=123", - "spring.kafka.consumer.max-poll-records=42", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.client-id=ccid", // test override common - "spring.kafka.consumer.enable-auto-commit=false", - "spring.kafka.consumer.fetch-max-wait=456", - "spring.kafka.consumer.properties.fiz.buz=fix.fox", - "spring.kafka.consumer.fetch-min-size=789", - "spring.kafka.consumer.group-id=bar", - "spring.kafka.consumer.heartbeat-interval=234", - "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", - "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") + this.contextRunner.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.properties.foo=bar", "spring.kafka.properties.baz=qux", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.ssl.key-password=p1", + "spring.kafka.ssl.key-store-location=classpath:ksLoc", + "spring.kafka.ssl.key-store-password=p2", + "spring.kafka.ssl.key-store-type=PKCS12", + "spring.kafka.ssl.trust-store-location=classpath:tsLoc", + "spring.kafka.ssl.trust-store-password=p3", + "spring.kafka.ssl.trust-store-type=PKCS12", + "spring.kafka.ssl.protocol=TLSv1.2", + "spring.kafka.consumer.auto-commit-interval=123", + "spring.kafka.consumer.max-poll-records=42", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.client-id=ccid", // test override common + "spring.kafka.consumer.enable-auto-commit=false", + "spring.kafka.consumer.fetch-max-wait=456", + "spring.kafka.consumer.properties.fiz.buz=fix.fox", + "spring.kafka.consumer.fetch-min-size=789", + "spring.kafka.consumer.group-id=bar", + "spring.kafka.consumer.heartbeat-interval=234", + "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", + "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") .run((context) -> { DefaultKafkaConsumerFactory consumerFactory = context .getBean(DefaultKafkaConsumerFactory.class); @@ -160,27 +164,25 @@ public class KafkaAutoConfigurationTests { @Test public void producerProperties() { - this.contextRunner.withUserConfiguration(TestConfiguration.class) - .withPropertyValues("spring.kafka.clientId=cid", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.producer.acks=all", - "spring.kafka.producer.batch-size=20", - "spring.kafka.producer.bootstrap-servers=bar:1234", // test - // override - "spring.kafka.producer.buffer-memory=12345", - "spring.kafka.producer.compression-type=gzip", - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", - "spring.kafka.producer.retries=2", - "spring.kafka.producer.properties.fiz.buz=fix.fox", - "spring.kafka.producer.ssl.key-password=p4", - "spring.kafka.producer.ssl.key-store-location=classpath:ksLocP", - "spring.kafka.producer.ssl.key-store-password=p5", - "spring.kafka.producer.ssl.key-store-type=PKCS12", - "spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP", - "spring.kafka.producer.ssl.trust-store-password=p6", - "spring.kafka.producer.ssl.trust-store-type=PKCS12", - "spring.kafka.producer.ssl.protocol=TLSv1.2", - "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer") + this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20", + "spring.kafka.producer.bootstrap-servers=bar:1234", // test + // override + "spring.kafka.producer.buffer-memory=12345", + "spring.kafka.producer.compression-type=gzip", + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", + "spring.kafka.producer.retries=2", + "spring.kafka.producer.properties.fiz.buz=fix.fox", + "spring.kafka.producer.ssl.key-password=p4", + "spring.kafka.producer.ssl.key-store-location=classpath:ksLocP", + "spring.kafka.producer.ssl.key-store-password=p5", + "spring.kafka.producer.ssl.key-store-type=PKCS12", + "spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP", + "spring.kafka.producer.ssl.trust-store-password=p6", + "spring.kafka.producer.ssl.trust-store-type=PKCS12", + "spring.kafka.producer.ssl.protocol=TLSv1.2", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer") .run((context) -> { DefaultKafkaProducerFactory producerFactory = context .getBean(DefaultKafkaProducerFactory.class); @@ -405,7 +407,7 @@ public class KafkaAutoConfigurationTests { @SuppressWarnings("unchecked") @Test public void listenerProperties() { - this.contextRunner.withUserConfiguration(TestConfiguration.class) + this.contextRunner .withPropertyValues("spring.kafka.template.default-topic=testTopic", "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client", @@ -485,6 +487,7 @@ public class KafkaAutoConfigurationTests { @Test public void testKafkaTemplateRecordMessageConverters() { this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) + .withPropertyValues("spring.kafka.producer.transaction-id-prefix=test") .run((context) -> { KafkaTemplate kafkaTemplate = context .getBean(KafkaTemplate.class); @@ -506,6 +509,56 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() { + this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler")) + .isSameAs(context.getBean("errorHandler")); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() { + this.contextRunner + .withPropertyValues("spring.kafka.producer.transaction-id-prefix=test") + .run((context) -> { + assertThat(context).hasSingleBean(KafkaAwareTransactionManager.class); + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory.getContainerProperties().getTransactionManager()) + .isSameAs( + context.getBean(KafkaAwareTransactionManager.class)); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomTransactionManager() { + this.contextRunner.withUserConfiguration(TransactionManagerConfiguration.class) + .withPropertyValues("spring.kafka.producer.transaction-id-prefix=test") + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory.getContainerProperties().getTransactionManager()) + .isSameAs(context.getBean("chainedTransactionManager")); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomAfterRollbackProcessor() { + this.contextRunner + .withUserConfiguration(AfterRollbackProcessorConfiguration.class) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor(factory); + assertThat(dfa.getPropertyValue("afterRollbackProcessor")) + .isSameAs(context.getBean("afterRollbackProcessor")); + }); + } + @Test public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() { this.contextRunner.run((context) -> { @@ -519,16 +572,46 @@ public class KafkaAutoConfigurationTests { } @Configuration - protected static class TestConfiguration { + protected static class MessageConverterConfiguration { + + @Bean + public RecordMessageConverter myMessageConverter() { + return mock(RecordMessageConverter.class); + } } @Configuration - protected static class MessageConverterConfiguration { + protected static class ErrorHandlerConfiguration { @Bean - public RecordMessageConverter myMessageConverter() { - return mock(RecordMessageConverter.class); + public SeekToCurrentErrorHandler errorHandler() { + return new SeekToCurrentErrorHandler(); + } + + } + + @Configuration + protected static class TransactionManagerConfiguration { + + @Bean + @Primary + public PlatformTransactionManager chainedTransactionManager( + KafkaTransactionManager kafkaTransactionManager) { + return new ChainedKafkaTransactionManager( + kafkaTransactionManager); + } + + } + + @Configuration + protected static class AfterRollbackProcessorConfiguration { + + @Bean + public AfterRollbackProcessor afterRollbackProcessor() { + return (records, consumer, ex, recoverable) -> { + // no-op + }; } } diff --git a/spring-boot-project/spring-boot-dependencies/pom.xml b/spring-boot-project/spring-boot-dependencies/pom.xml index 8ccaae14073..8868b75aaa4 100644 --- a/spring-boot-project/spring-boot-dependencies/pom.xml +++ b/spring-boot-project/spring-boot-dependencies/pom.xml @@ -161,7 +161,7 @@ Lovelace-RC2 0.25.0.RELEASE 5.1.0.M2 - 2.2.0.M2 + 2.2.0.BUILD-SNAPSHOT 2.3.2.RELEASE 1.2.0.RELEASE 2.0.2.RELEASE diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index d871f00702e..5d945496ddd 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5627,8 +5627,7 @@ bean is defined, it is automatically associated to the auto-configured `KafkaTem When the Apache Kafka infrastructure is present, any bean can be annotated with `@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has been defined, a default one is automatically configured with keys defined in -`spring.kafka.listener.*`. Also, if a `RecordMessageConverter` bean is defined, it is -automatically associated to the default factory. +`spring.kafka.listener.*`. The following component creates a listener endpoint on the `someTopic` topic: @@ -5645,6 +5644,16 @@ The following component creates a listener endpoint on the `someTopic` topic: } ---- +If a `KafkaTransactionManager` bean is defined, it is automatically associated to the +container factory. Similarly, if a `RecordMessageConverter`, `ErrorHandler` or +`AfterRollbackProcessor` bean is defined, it is automatically associated to the default +factory. + +TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually +reference the auto-configured `KafkaTransactionManager` bean. + + + [[boot-features-kafka-streams]] ==== Kafka Streams Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and