From 6d396b973ff581dbfb9eb7453da72fabe7f87472 Mon Sep 17 00:00:00 2001 From: nklmish Date: Sat, 18 Nov 2017 16:23:42 +0100 Subject: [PATCH] Add Kafka transaction support property Add `spring.kafka.producer.transaction-id-prefix` property that will be passed to `DefaultKafkaProducerFactory.setTransactionIdPrefix(...)` See gh-11076 --- .../kafka/KafkaAutoConfiguration.java | 16 +++++++++++++++- .../autoconfigure/kafka/KafkaProperties.java | 14 ++++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 9 +++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index e409f530249..b710bf1ca98 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -38,6 +38,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.transaction.KafkaTransactionManager; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. @@ -45,6 +46,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; * @author Gary Russell * @author Stephane Nicoll * @author Eddú Meléndez + * @author Nakul Mishra * @since 1.5.0 */ @Configuration @@ -94,8 +96,20 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory kafkaProducerFactory() { - return new DefaultKafkaProducerFactory<>( + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>( this.properties.buildProducerProperties()); + KafkaProperties.Producer producer = this.properties.getProducer(); + if (producer.getTransactionIdPrefix() != null) { + factory.setTransactionIdPrefix(producer.getTransactionIdPrefix()); + } + return factory; + } + + @Bean + @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") + @ConditionalOnMissingBean + public KafkaTransactionManager kafkaTransactionManager(ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 08dd348531f..71ebe1dbec7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils; * @author Gary Russell * @author Stephane Nicoll * @author Artem Bilan + * @author Nakul Mishra * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -519,6 +520,11 @@ public class KafkaProperties { */ private Integer retries; + /** + * When non empty, enables transactional support for producer. + */ + private String transactionIdPrefix; + /** * Additional producer-specific properties used to configure the client. */ @@ -600,6 +606,14 @@ public class KafkaProperties { this.retries = retries; } + public String getTransactionIdPrefix() { + return this.transactionIdPrefix; + } + + public void setTransactionIdPrefix(String transactionIdPrefix) { + this.transactionIdPrefix = transactionIdPrefix; + } + public Map getProperties() { return this.properties; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 64d0f12b64a..e9e39f1a8d5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -48,6 +48,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.kafka.transaction.KafkaTransactionManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -59,6 +60,7 @@ import static org.mockito.Mockito.mock; * @author Gary Russell * @author Stephane Nicoll * @author Eddú Meléndez + * @author Nakul Mishra */ public class KafkaAutoConfigurationTests { @@ -198,6 +200,9 @@ public class KafkaAutoConfigurationTests { assertThat( context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .isEmpty(); + assertThat( + context.getBeansOfType(KafkaTransactionManager.class)) + .isEmpty(); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); }); @@ -256,6 +261,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", + "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") @@ -297,6 +303,9 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo( AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat( + context.getBeansOfType(KafkaTransactionManager.class)) + .hasSize(1); assertThat(((Map) dfa.getPropertyValue("options"))) .containsExactly(entry("useKeyTab", "true")); });