From bf46d7244afa42f6d92775d0d8e77baa1d9a4c2f Mon Sep 17 00:00:00 2001 From: tomazfernandes Date: Mon, 14 Feb 2022 20:42:16 -0300 Subject: [PATCH 1/2] Add auto-configuration to Kafka Retry Topics See gh-29812 --- .../kafka/KafkaAutoConfiguration.java | 26 ++++ .../autoconfigure/kafka/KafkaProperties.java | 143 ++++++++++++++++++ ...afkaAutoConfigurationIntegrationTests.java | 66 ++++++++ .../kafka/KafkaAutoConfigurationTests.java | 111 ++++++++++++++ 4 files changed, 346 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index b25b563a9cd..9f70aebaf4b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; @@ -33,13 +34,18 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.retry.backoff.BackOffPolicyBuilder; +import org.springframework.retry.backoff.SleepingBackOffPolicy; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. @@ -48,6 +54,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; * @author Stephane Nicoll * @author Eddú Meléndez * @author Nakul Mishra + * @author Tomaz Fernandes * @since 1.5.0 */ @AutoConfiguration @@ -137,4 +144,23 @@ public class KafkaAutoConfiguration { return kafkaAdmin; } + @Bean + @ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled") + public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations kafkaOperations) { + KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic(); + RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance() + .maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues() + .doNotAutoCreateRetryTopics(); + setBackOffPolicy(builder, retryTopic); + return builder.create(kafkaOperations); + } + + private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { + PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff); + PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0) + .toCall(() -> builder.customBackoff((SleepingBackOffPolicy) BackOffPolicyBuilder.newBuilder() + .delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis()) + .multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build())); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e109753d736..3cd3cfe7237 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; +import java.math.BigDecimal; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -41,6 +42,7 @@ import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.unit.DataSize; @@ -54,6 +56,7 @@ import org.springframework.util.unit.DataSize; * @author Stephane Nicoll * @author Artem Bilan * @author Nakul Mishra + * @author Tomaz Fernandes * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -94,6 +97,8 @@ public class KafkaProperties { private final Security security = new Security(); + private final Retry retry = new Retry(); + public List getBootstrapServers() { return this.bootstrapServers; } @@ -150,6 +155,10 @@ public class KafkaProperties { return this.security; } + public Retry getRetry() { + return this.retry; + } + private Map buildCommonProperties() { Map properties = new HashMap<>(); if (this.bootstrapServers != null) { @@ -1332,6 +1341,140 @@ public class KafkaProperties { } + public static class Retry { + + private Topic topic = new Topic(); + + public Topic getTopic() { + return this.topic.validate(); + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + + /** + * Properties for non-blocking, topic-based retries. + */ + public static class Topic { + + private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic."; + + private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX + + "%s should be greater than or equal to %s. Provided value was %s."; + + /** + * Whether to enable topic-based retries auto-configuration. + */ + private Boolean enabled; + + /** + * The total number of processing attempts made before sending the message to + * the DLT. + */ + private Integer attempts = 3; + + /** + * A canonical backoff period. Used as an initial value in the exponential + * case, and as a minimum value in the uniform case. + */ + private Duration delay = Duration.ofSeconds(1); + + /** + * If positive, then used as a multiplier for generating the next delay for + * backoff. + */ + private Double multiplier = 0.0; + + /** + * The maximum wait between retries. If less than the delay then the default + * of 30 seconds is applied. + */ + private Duration maxDelay = Duration.ZERO; + + /** + * In the exponential case, set this to true to have the backoff delays + * randomized. + */ + private Boolean randomBackOff = false; + + public Boolean getEnabled() { + return this.enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Integer getAttempts() { + return this.attempts; + } + + public void setAttempts(Integer attempts) { + this.attempts = attempts; + } + + public Duration getDelay() { + return this.delay; + } + + public Long getDelayMillis() { + return (this.delay != null) ? this.delay.toMillis() : null; + } + + public void setDelay(Duration delay) { + this.delay = delay; + } + + public Double getMultiplier() { + return this.multiplier; + } + + public void setMultiplier(Double multiplier) { + this.multiplier = multiplier; + } + + public Duration getMaxDelay() { + return this.maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } + + public Long getMaxDelayMillis() { + return (this.maxDelay != null) ? this.maxDelay.toMillis() : null; + } + + public Boolean isRandomBackOff() { + return this.randomBackOff; + } + + public void setRandomBackOff(Boolean randomBackOff) { + this.randomBackOff = randomBackOff; + } + + private Topic validate() { + validateProperty("attempts", this.attempts, 1); + validateProperty("delay", this.getDelayMillis(), 0); + validateProperty("multiplier", this.multiplier, 0); + validateProperty("maxDelay", this.getMaxDelayMillis(), 0); + Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(), + "Property " + RETRY_TOPIC_PROPERTIES_PREFIX + + "randomBackOff should not be true with non-exponential back offs."); + return this; + } + + private static void validateProperty(String propertyName, Number providedValue, int minValue) { + Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null."); + Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0, + () -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue)); + } + + } + + } + public static class Security { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index d18cd3901d9..c5ebbd6addf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.kafka; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -41,6 +43,8 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.retrytopic.DestinationTopic; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -53,12 +57,14 @@ import static org.assertj.core.api.Assertions.assertThat; * * @author Gary Russell * @author Stephane Nicoll + * @author Tomaz Fernandes */ @DisabledOnOs(OS.WINDOWS) @EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC) class KafkaAutoConfigurationIntegrationTests { static final String TEST_TOPIC = "testTopic"; + static final String TEST_RETRY_TOPIC = "testRetryTopic"; private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic"; @@ -89,6 +95,27 @@ class KafkaAutoConfigurationIntegrationTests { producer.close(); } + @SuppressWarnings("unchecked") + @Test + void testEndToEndWithRetryTopics() throws Exception { + load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(), + "spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", + "spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms", + "spring.kafka.consumer.auto-offset-reset=earliest"); + RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay) + .containsExactly(0L, 100L, 200L, 300L, 300L, 0L); + KafkaTemplate template = this.context.getBean(KafkaTemplate.class); + template.send(TEST_RETRY_TOPIC, "foo", "bar"); + RetryListener listener = this.context.getBean(RetryListener.class); + assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo", + "bar"); + assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic", + "testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3"); + } + @Test void testStreams() { load(KafkaStreamsConfig.class, "spring.application.name:my-app", @@ -121,6 +148,11 @@ class KafkaAutoConfigurationIntegrationTests { return new Listener(); } + @Bean + RetryListener retryListener() { + return new RetryListener(); + } + @Bean NewTopic adminCreated() { return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build(); @@ -157,4 +189,38 @@ class KafkaAutoConfigurationIntegrationTests { } + static class RetryListener { + + private final CountDownLatch latch = new CountDownLatch(5); + + private final List topics = new ArrayList<>(); + + private volatile String received; + + private volatile String key; + + @KafkaListener(topics = TEST_RETRY_TOPIC) + void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + this.received = foo; + this.key = key; + this.topics.add(topic); + this.latch.countDown(); + throw new RuntimeException("Test exception"); + } + + private List getTopics() { + return this.topics; + } + + private String getReceived() { + return this.received; + } + + private String getKey() { + return this.key; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 770d3d746c1..13a16e3f65a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -65,6 +65,8 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.kafka.retrytopic.DestinationTopic; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; @@ -76,6 +78,7 @@ import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.api.Assertions.tuple; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -87,6 +90,7 @@ import static org.mockito.Mockito.never; * @author Stephane Nicoll * @author Eddú Meléndez * @author Nakul Mishra + * @author Tomaz Fernandes */ class KafkaAutoConfigurationTests { @@ -317,6 +321,113 @@ class KafkaAutoConfigurationTests { }); } + @Test + void retryTopicConfigurationWithExponentialBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", + "spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> { + RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).hasSize(6) + .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) + .containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"), + tuple(300L, "-retry-2"), tuple(300L, "-retry-3"), tuple(0L, "-dlt")); + }); + } + + @Test + void retryTopicConfigurationWithDefaultProperties() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true") + .run((context) -> { + RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) + .containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt")); + assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics") + .asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse(); + }); + } + + @Test + void retryTopicConfigurationWithFixedBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s") + .run((context) -> assertThat( + context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L)); + } + + @Test + void retryTopicConfigurationWithNoBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0") + .run((context) -> assertThat( + context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L)); + } + + @Test + void retryTopicConfigurationWithNegativeDelay() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.delay=-1") + .run((context) -> assertThat(context.getStartupFailure()).getRootCause() + .isInstanceOf(IllegalArgumentException.class).message() + .isEqualTo("Property spring.kafka.retry.topic.delay" + + " should be greater than or equal to 0. Provided value was -1.")); + } + + @Test + void retryTopicConfigurationWithNegativeMultiplier() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.multiplier=-1") + .run((context) -> assertThat(context.getStartupFailure()).getRootCause() + .isInstanceOf(IllegalArgumentException.class).message() + .isEqualTo("Property spring.kafka.retry.topic.multiplier" + + " should be greater than or equal to 0. Provided value was -1.0.")); + } + + @Test + void retryTopicConfigurationWithNegativeMaxDelay() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.maxDelay=-1") + .run((context) -> assertThat(context.getStartupFailure()).getRootCause() + .isInstanceOf(IllegalArgumentException.class).message() + .isEqualTo("Property spring.kafka.retry.topic.maxDelay" + + " should be greater than or equal to 0. Provided value was -1.")); + } + + @Test + void retryTopicConfigurationWithZeroAttempts() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=0") + .run((context) -> assertThat(context.getStartupFailure()).getRootCause() + .isInstanceOf(IllegalArgumentException.class).message() + .isEqualTo("Property spring.kafka.retry.topic.attempts" + + " should be greater than or equal to 1. Provided value was 0.")); + } + + @Test + void retryTopicConfigurationWithZeroMultiplierAndRandomBackOff() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.randomBackOff=true") + .run((context) -> assertThat(context.getStartupFailure()).getRootCause() + .isInstanceOf(IllegalArgumentException.class).message().isEqualTo( + "Property spring.kafka.retry.topic.randomBackOff should not be true with non-exponential back offs.")); + } + @SuppressWarnings("unchecked") @Test void streamsWithSeveralStreamsBuilderFactoryBeans() { From b3e3581271696682608e98255961610220e26712 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 21 Apr 2022 09:09:25 +0200 Subject: [PATCH 2/2] Polish "Add auto-configuration to Kafka Retry Topics" See gh-29812 --- .../kafka/KafkaAutoConfiguration.java | 26 ++-- .../autoconfigure/kafka/KafkaProperties.java | 132 +++++++----------- ...afkaAutoConfigurationIntegrationTests.java | 2 +- .../kafka/KafkaAutoConfigurationTests.java | 91 ++++-------- 4 files changed, 91 insertions(+), 160 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 9f70aebaf4b..85329985b53 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; +import java.time.Duration; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -24,6 +25,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -34,7 +36,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; -import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; @@ -146,21 +147,30 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled") - public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaOperations kafkaOperations) { + @ConditionalOnSingleCandidate(KafkaTemplate.class) + public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate kafkaTemplate) { KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic(); RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance() .maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues() .doNotAutoCreateRetryTopics(); setBackOffPolicy(builder, retryTopic); - return builder.create(kafkaOperations); + return builder.create(kafkaTemplate); } private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { - PropertyMapper.get().from(retryTopic.getDelayMillis()).whenEqualTo(0L).toCall(builder::noBackoff); - PropertyMapper.get().from(retryTopic.getDelayMillis()).when((delay) -> delay > 0) - .toCall(() -> builder.customBackoff((SleepingBackOffPolicy) BackOffPolicyBuilder.newBuilder() - .delay(retryTopic.getDelayMillis()).maxDelay(retryTopic.getMaxDelayMillis()) - .multiplier(retryTopic.getMultiplier()).random(retryTopic.isRandomBackOff()).build())); + long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; + if (delay > 0) { + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder(); + map.from(delay).to(backOffPolicy::delay); + map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); + map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier); + map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random); + builder.customBackoff((SleepingBackOffPolicy) backOffPolicy.build()); + } + else { + builder.noBackoff(); + } } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 3cd3cfe7237..0f80b9336c9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -17,7 +17,6 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; -import java.math.BigDecimal; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -42,7 +41,6 @@ import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; -import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.unit.DataSize; @@ -1341,16 +1339,36 @@ public class KafkaProperties { } - public static class Retry { + public static class Security { - private Topic topic = new Topic(); + /** + * Security protocol used to communicate with brokers. + */ + private String protocol; - public Topic getTopic() { - return this.topic.validate(); + public String getProtocol() { + return this.protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public Map buildProperties() { + Properties properties = new Properties(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + return properties; } - public void setTopic(Topic topic) { - this.topic = topic; + } + + public static class Retry { + + private final Topic topic = new Topic(); + + public Topic getTopic() { + return this.topic; } /** @@ -1358,59 +1376,52 @@ public class KafkaProperties { */ public static class Topic { - private static final String RETRY_TOPIC_PROPERTIES_PREFIX = "spring.kafka.retry.topic."; - - private static final String RETRY_TOPIC_VALIDATION_ERROR_MSG = "Property " + RETRY_TOPIC_PROPERTIES_PREFIX - + "%s should be greater than or equal to %s. Provided value was %s."; - /** - * Whether to enable topic-based retries auto-configuration. + * Whether to enable topic-based non-blocking retries. */ - private Boolean enabled; + private boolean enabled; /** - * The total number of processing attempts made before sending the message to - * the DLT. + * Total number of processing attempts made before sending the message to the + * DLT. */ - private Integer attempts = 3; + private int attempts = 3; /** - * A canonical backoff period. Used as an initial value in the exponential - * case, and as a minimum value in the uniform case. + * Canonical backoff period. Used as an initial value in the exponential case, + * and as a minimum value in the uniform case. */ private Duration delay = Duration.ofSeconds(1); /** - * If positive, then used as a multiplier for generating the next delay for - * backoff. + * Multiplier to use for generating the next backoff delay. */ - private Double multiplier = 0.0; + private double multiplier = 0.0; /** - * The maximum wait between retries. If less than the delay then the default - * of 30 seconds is applied. + * Maximum wait between retries. If less than the delay then the default of 30 + * seconds is applied. */ private Duration maxDelay = Duration.ZERO; /** - * In the exponential case, set this to true to have the backoff delays - * randomized. + * Whether to have the backoff delays. */ - private Boolean randomBackOff = false; + private boolean randomBackOff = false; - public Boolean getEnabled() { + public boolean isEnabled() { return this.enabled; } - public void setEnabled(Boolean enabled) { + public void setEnabled(boolean enabled) { this.enabled = enabled; } - public Integer getAttempts() { + public int getAttempts() { return this.attempts; } - public void setAttempts(Integer attempts) { + public void setAttempts(int attempts) { this.attempts = attempts; } @@ -1418,19 +1429,15 @@ public class KafkaProperties { return this.delay; } - public Long getDelayMillis() { - return (this.delay != null) ? this.delay.toMillis() : null; - } - public void setDelay(Duration delay) { this.delay = delay; } - public Double getMultiplier() { + public double getMultiplier() { return this.multiplier; } - public void setMultiplier(Double multiplier) { + public void setMultiplier(double multiplier) { this.multiplier = multiplier; } @@ -1442,59 +1449,14 @@ public class KafkaProperties { this.maxDelay = maxDelay; } - public Long getMaxDelayMillis() { - return (this.maxDelay != null) ? this.maxDelay.toMillis() : null; - } - - public Boolean isRandomBackOff() { + public boolean isRandomBackOff() { return this.randomBackOff; } - public void setRandomBackOff(Boolean randomBackOff) { + public void setRandomBackOff(boolean randomBackOff) { this.randomBackOff = randomBackOff; } - private Topic validate() { - validateProperty("attempts", this.attempts, 1); - validateProperty("delay", this.getDelayMillis(), 0); - validateProperty("multiplier", this.multiplier, 0); - validateProperty("maxDelay", this.getMaxDelayMillis(), 0); - Assert.isTrue(this.multiplier != 0 || !this.isRandomBackOff(), - "Property " + RETRY_TOPIC_PROPERTIES_PREFIX - + "randomBackOff should not be true with non-exponential back offs."); - return this; - } - - private static void validateProperty(String propertyName, Number providedValue, int minValue) { - Assert.notNull(providedValue, () -> RETRY_TOPIC_PROPERTIES_PREFIX + propertyName + " cannot be null."); - Assert.isTrue(new BigDecimal(providedValue.toString()).compareTo(BigDecimal.valueOf(minValue)) >= 0, - () -> String.format(RETRY_TOPIC_VALIDATION_ERROR_MSG, propertyName, minValue, providedValue)); - } - - } - - } - - public static class Security { - - /** - * Security protocol used to communicate with brokers. - */ - private String protocol; - - public String getProtocol() { - return this.protocol; - } - - public void setProtocol(String protocol) { - this.protocol = protocol; - } - - public Map buildProperties() { - Properties properties = new Properties(); - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); - return properties; } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index c5ebbd6addf..7e14da6f485 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 13a16e3f65a..116668e1721 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Consumer; import javax.security.auth.login.AppConfigurationEntry; @@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.boot.test.context.runner.ContextConsumer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; @@ -321,6 +324,14 @@ class KafkaAutoConfigurationTests { }); } + @Test + void retryTopicConfigurationIsNotEnabledByDefault() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093") + .run((context) -> assertThat(context).doesNotHaveBean(RetryTopicConfiguration.class)); + } + @Test void retryTopicConfigurationWithExponentialBackOff() { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", @@ -339,14 +350,13 @@ class KafkaAutoConfigurationTests { void retryTopicConfigurationWithDefaultProperties() { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true") - .run((context) -> { - RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); + .run(assertRetryTopicConfiguration((configuration) -> { assertThat(configuration.getDestinationTopicProperties()).hasSize(3) .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) .containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt")); assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics") .asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse(); - }); + })); } @Test @@ -354,9 +364,9 @@ class KafkaAutoConfigurationTests { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s") - .run((context) -> assertThat( - context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3) - .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L)); + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L))); } @Test @@ -364,68 +374,17 @@ class KafkaAutoConfigurationTests { this.contextRunner.withPropertyValues("spring.application.name=my-test-app", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0") - .run((context) -> assertThat( - context.getBean(RetryTopicConfiguration.class).getDestinationTopicProperties()).hasSize(3) - .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L)); - } - - @Test - void retryTopicConfigurationWithNegativeDelay() { - this.contextRunner - .withPropertyValues("spring.application.name=my-test-app", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.delay=-1") - .run((context) -> assertThat(context.getStartupFailure()).getRootCause() - .isInstanceOf(IllegalArgumentException.class).message() - .isEqualTo("Property spring.kafka.retry.topic.delay" - + " should be greater than or equal to 0. Provided value was -1.")); - } - - @Test - void retryTopicConfigurationWithNegativeMultiplier() { - this.contextRunner - .withPropertyValues("spring.application.name=my-test-app", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.multiplier=-1") - .run((context) -> assertThat(context.getStartupFailure()).getRootCause() - .isInstanceOf(IllegalArgumentException.class).message() - .isEqualTo("Property spring.kafka.retry.topic.multiplier" - + " should be greater than or equal to 0. Provided value was -1.0.")); - } - - @Test - void retryTopicConfigurationWithNegativeMaxDelay() { - this.contextRunner - .withPropertyValues("spring.application.name=my-test-app", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.maxDelay=-1") - .run((context) -> assertThat(context.getStartupFailure()).getRootCause() - .isInstanceOf(IllegalArgumentException.class).message() - .isEqualTo("Property spring.kafka.retry.topic.maxDelay" - + " should be greater than or equal to 0. Provided value was -1.")); - } - - @Test - void retryTopicConfigurationWithZeroAttempts() { - this.contextRunner - .withPropertyValues("spring.application.name=my-test-app", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.attempts=0") - .run((context) -> assertThat(context.getStartupFailure()).getRootCause() - .isInstanceOf(IllegalArgumentException.class).message() - .isEqualTo("Property spring.kafka.retry.topic.attempts" - + " should be greater than or equal to 1. Provided value was 0.")); + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L))); } - @Test - void retryTopicConfigurationWithZeroMultiplierAndRandomBackOff() { - this.contextRunner - .withPropertyValues("spring.application.name=my-test-app", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.kafka.retry.topic.enabled=true", "spring.kafka.retry.topic.randomBackOff=true") - .run((context) -> assertThat(context.getStartupFailure()).getRootCause() - .isInstanceOf(IllegalArgumentException.class).message().isEqualTo( - "Property spring.kafka.retry.topic.randomBackOff should not be true with non-exponential back offs.")); + private ContextConsumer assertRetryTopicConfiguration( + Consumer configuration) { + return (context) -> { + assertThat(context).hasSingleBean(RetryTopicConfiguration.class); + configuration.accept(context.getBean(RetryTopicConfiguration.class)); + }; } @SuppressWarnings("unchecked")