diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index b25b563a9cd..85329985b53 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; +import java.time.Duration; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -24,7 +25,9 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; @@ -35,11 +38,15 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.retry.backoff.BackOffPolicyBuilder; +import org.springframework.retry.backoff.SleepingBackOffPolicy; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. @@ -48,6 +55,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; * @author Stephane Nicoll * @author Eddú Meléndez * @author Nakul Mishra + * @author Tomaz Fernandes * @since 1.5.0 */ @AutoConfiguration @@ -137,4 +145,32 @@ public class KafkaAutoConfiguration { return kafkaAdmin; } + @Bean + @ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled") + @ConditionalOnSingleCandidate(KafkaTemplate.class) + public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate kafkaTemplate) { + KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic(); + RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance() + .maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues() + .doNotAutoCreateRetryTopics(); + setBackOffPolicy(builder, retryTopic); + return builder.create(kafkaTemplate); + } + + private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { + long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; + if (delay > 0) { + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder(); + map.from(delay).to(backOffPolicy::delay); + map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay); + map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier); + map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random); + builder.customBackoff((SleepingBackOffPolicy) backOffPolicy.build()); + } + else { + builder.noBackoff(); + } + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 69410bd3279..adc9991c272 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -53,6 +53,7 @@ import org.springframework.util.unit.DataSize; * @author Stephane Nicoll * @author Artem Bilan * @author Nakul Mishra + * @author Tomaz Fernandes * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -93,6 +94,8 @@ public class KafkaProperties { private final Security security = new Security(); + private final Retry retry = new Retry(); + public List getBootstrapServers() { return this.bootstrapServers; } @@ -149,6 +152,10 @@ public class KafkaProperties { return this.security; } + public Retry getRetry() { + return this.retry; + } + private Map buildCommonProperties() { Map properties = new HashMap<>(); if (this.bootstrapServers != null) { @@ -1338,6 +1345,104 @@ public class KafkaProperties { } + public static class Retry { + + private final Topic topic = new Topic(); + + public Topic getTopic() { + return this.topic; + } + + /** + * Properties for non-blocking, topic-based retries. + */ + public static class Topic { + + /** + * Whether to enable topic-based non-blocking retries. + */ + private boolean enabled; + + /** + * Total number of processing attempts made before sending the message to the + * DLT. + */ + private int attempts = 3; + + /** + * Canonical backoff period. Used as an initial value in the exponential case, + * and as a minimum value in the uniform case. + */ + private Duration delay = Duration.ofSeconds(1); + + /** + * Multiplier to use for generating the next backoff delay. + */ + private double multiplier = 0.0; + + /** + * Maximum wait between retries. If less than the delay then the default of 30 + * seconds is applied. + */ + private Duration maxDelay = Duration.ZERO; + + /** + * Whether to have the backoff delays. + */ + private boolean randomBackOff = false; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getAttempts() { + return this.attempts; + } + + public void setAttempts(int attempts) { + this.attempts = attempts; + } + + public Duration getDelay() { + return this.delay; + } + + public void setDelay(Duration delay) { + this.delay = delay; + } + + public double getMultiplier() { + return this.multiplier; + } + + public void setMultiplier(double multiplier) { + this.multiplier = multiplier; + } + + public Duration getMaxDelay() { + return this.maxDelay; + } + + public void setMaxDelay(Duration maxDelay) { + this.maxDelay = maxDelay; + } + + public boolean isRandomBackOff() { + return this.randomBackOff; + } + + public void setRandomBackOff(boolean randomBackOff) { + this.randomBackOff = randomBackOff; + } + + } + + } + public static class Cleanup { /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index d18cd3901d9..7e14da6f485 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.kafka; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -41,6 +43,8 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.retrytopic.DestinationTopic; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -53,12 +57,14 @@ import static org.assertj.core.api.Assertions.assertThat; * * @author Gary Russell * @author Stephane Nicoll + * @author Tomaz Fernandes */ @DisabledOnOs(OS.WINDOWS) @EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC) class KafkaAutoConfigurationIntegrationTests { static final String TEST_TOPIC = "testTopic"; + static final String TEST_RETRY_TOPIC = "testRetryTopic"; private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic"; @@ -89,6 +95,27 @@ class KafkaAutoConfigurationIntegrationTests { producer.close(); } + @SuppressWarnings("unchecked") + @Test + void testEndToEndWithRetryTopics() throws Exception { + load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(), + "spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", + "spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms", + "spring.kafka.consumer.auto-offset-reset=earliest"); + RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay) + .containsExactly(0L, 100L, 200L, 300L, 300L, 0L); + KafkaTemplate template = this.context.getBean(KafkaTemplate.class); + template.send(TEST_RETRY_TOPIC, "foo", "bar"); + RetryListener listener = this.context.getBean(RetryListener.class); + assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo", + "bar"); + assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic", + "testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3"); + } + @Test void testStreams() { load(KafkaStreamsConfig.class, "spring.application.name:my-app", @@ -121,6 +148,11 @@ class KafkaAutoConfigurationIntegrationTests { return new Listener(); } + @Bean + RetryListener retryListener() { + return new RetryListener(); + } + @Bean NewTopic adminCreated() { return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build(); @@ -157,4 +189,38 @@ class KafkaAutoConfigurationIntegrationTests { } + static class RetryListener { + + private final CountDownLatch latch = new CountDownLatch(5); + + private final List topics = new ArrayList<>(); + + private volatile String received; + + private volatile String key; + + @KafkaListener(topics = TEST_RETRY_TOPIC) + void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + this.received = foo; + this.key = key; + this.topics.add(topic); + this.latch.countDown(); + throw new RuntimeException("Test exception"); + } + + private List getTopics() { + return this.topics; + } + + private String getReceived() { + return this.received; + } + + private String getKey() { + return this.key; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 861aff1bed8..913c3902525 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Consumer; import javax.security.auth.login.AppConfigurationEntry; @@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.boot.test.context.runner.ContextConsumer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; @@ -63,6 +66,8 @@ import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.kafka.retrytopic.DestinationTopic; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; @@ -74,6 +79,7 @@ import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.api.Assertions.tuple; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -85,6 +91,7 @@ import static org.mockito.Mockito.never; * @author Stephane Nicoll * @author Eddú Meléndez * @author Nakul Mishra + * @author Tomaz Fernandes */ class KafkaAutoConfigurationTests { @@ -315,6 +322,69 @@ class KafkaAutoConfigurationTests { }); } + @Test + void retryTopicConfigurationIsNotEnabledByDefault() { + this.contextRunner + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093") + .run((context) -> assertThat(context).doesNotHaveBean(RetryTopicConfiguration.class)); + } + + @Test + void retryTopicConfigurationWithExponentialBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", + "spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> { + RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); + assertThat(configuration.getDestinationTopicProperties()).hasSize(6) + .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) + .containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"), + tuple(300L, "-retry-2"), tuple(300L, "-retry-3"), tuple(0L, "-dlt")); + }); + } + + @Test + void retryTopicConfigurationWithDefaultProperties() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true") + .run(assertRetryTopicConfiguration((configuration) -> { + assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) + .containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt")); + assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics") + .asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse(); + })); + } + + @Test + void retryTopicConfigurationWithFixedBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s") + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L))); + } + + @Test + void retryTopicConfigurationWithNoBackOff() { + this.contextRunner.withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", + "spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0") + .run(assertRetryTopicConfiguration( + (configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) + .extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L))); + } + + private ContextConsumer assertRetryTopicConfiguration( + Consumer configuration) { + return (context) -> { + assertThat(context).hasSingleBean(RetryTopicConfiguration.class); + configuration.accept(context.getBean(RetryTopicConfiguration.class)); + }; + } + @SuppressWarnings("unchecked") @Test void streamsWithSeveralStreamsBuilderFactoryBeans() {