Browse Source

Merge branch '2.7.x'

pull/30778/head
Stephane Nicoll 4 years ago
parent
commit
f76371be76
  1. 36
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
  2. 105
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java
  3. 68
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java
  4. 70
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

36
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.time.Duration;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
@ -24,7 +25,9 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -24,7 +25,9 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
@ -35,11 +38,15 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -35,11 +38,15 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.retry.backoff.BackOffPolicyBuilder;
import org.springframework.retry.backoff.SleepingBackOffPolicy;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@ -48,6 +55,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; @@ -48,6 +55,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager;
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Tomaz Fernandes
* @since 1.5.0
*/
@AutoConfiguration
@ -137,4 +145,32 @@ public class KafkaAutoConfiguration { @@ -137,4 +145,32 @@ public class KafkaAutoConfiguration {
return kafkaAdmin;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.retry.topic.enabled")
@ConditionalOnSingleCandidate(KafkaTemplate.class)
public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?> kafkaTemplate) {
KafkaProperties.Retry.Topic retryTopic = this.properties.getRetry().getTopic();
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(retryTopic.getAttempts()).useSingleTopicForFixedDelays().suffixTopicsWithIndexValues()
.doNotAutoCreateRetryTopics();
setBackOffPolicy(builder, retryTopic);
return builder.create(kafkaTemplate);
}
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
if (delay > 0) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
BackOffPolicyBuilder backOffPolicy = BackOffPolicyBuilder.newBuilder();
map.from(delay).to(backOffPolicy::delay);
map.from(retryTopic.getMaxDelay()).as(Duration::toMillis).to(backOffPolicy::maxDelay);
map.from(retryTopic.getMultiplier()).to(backOffPolicy::multiplier);
map.from(retryTopic.isRandomBackOff()).to(backOffPolicy::random);
builder.customBackoff((SleepingBackOffPolicy<?>) backOffPolicy.build());
}
else {
builder.noBackoff();
}
}
}

105
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

@ -53,6 +53,7 @@ import org.springframework.util.unit.DataSize; @@ -53,6 +53,7 @@ import org.springframework.util.unit.DataSize;
* @author Stephane Nicoll
* @author Artem Bilan
* @author Nakul Mishra
* @author Tomaz Fernandes
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
@ -93,6 +94,8 @@ public class KafkaProperties { @@ -93,6 +94,8 @@ public class KafkaProperties {
private final Security security = new Security();
private final Retry retry = new Retry();
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
@ -149,6 +152,10 @@ public class KafkaProperties { @@ -149,6 +152,10 @@ public class KafkaProperties {
return this.security;
}
public Retry getRetry() {
return this.retry;
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
@ -1338,6 +1345,104 @@ public class KafkaProperties { @@ -1338,6 +1345,104 @@ public class KafkaProperties {
}
public static class Retry {
private final Topic topic = new Topic();
public Topic getTopic() {
return this.topic;
}
/**
* Properties for non-blocking, topic-based retries.
*/
public static class Topic {
/**
* Whether to enable topic-based non-blocking retries.
*/
private boolean enabled;
/**
* Total number of processing attempts made before sending the message to the
* DLT.
*/
private int attempts = 3;
/**
* Canonical backoff period. Used as an initial value in the exponential case,
* and as a minimum value in the uniform case.
*/
private Duration delay = Duration.ofSeconds(1);
/**
* Multiplier to use for generating the next backoff delay.
*/
private double multiplier = 0.0;
/**
* Maximum wait between retries. If less than the delay then the default of 30
* seconds is applied.
*/
private Duration maxDelay = Duration.ZERO;
/**
* Whether to have the backoff delays.
*/
private boolean randomBackOff = false;
public boolean isEnabled() {
return this.enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public int getAttempts() {
return this.attempts;
}
public void setAttempts(int attempts) {
this.attempts = attempts;
}
public Duration getDelay() {
return this.delay;
}
public void setDelay(Duration delay) {
this.delay = delay;
}
public double getMultiplier() {
return this.multiplier;
}
public void setMultiplier(double multiplier) {
this.multiplier = multiplier;
}
public Duration getMaxDelay() {
return this.maxDelay;
}
public void setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
}
public boolean isRandomBackOff() {
return this.randomBackOff;
}
public void setRandomBackOff(boolean randomBackOff) {
this.randomBackOff = randomBackOff;
}
}
}
public static class Cleanup {
/**

68
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -41,6 +43,8 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean; @@ -41,6 +43,8 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
@ -53,12 +57,14 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -53,12 +57,14 @@ import static org.assertj.core.api.Assertions.assertThat;
*
* @author Gary Russell
* @author Stephane Nicoll
* @author Tomaz Fernandes
*/
@DisabledOnOs(OS.WINDOWS)
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
class KafkaAutoConfigurationIntegrationTests {
static final String TEST_TOPIC = "testTopic";
static final String TEST_RETRY_TOPIC = "testRetryTopic";
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
@ -89,6 +95,27 @@ class KafkaAutoConfigurationIntegrationTests { @@ -89,6 +95,27 @@ class KafkaAutoConfigurationIntegrationTests {
producer.close();
}
@SuppressWarnings("unchecked")
@Test
void testEndToEndWithRetryTopics() throws Exception {
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms",
"spring.kafka.consumer.auto-offset-reset=earliest");
RetryTopicConfiguration configuration = this.context.getBean(RetryTopicConfiguration.class);
assertThat(configuration.getDestinationTopicProperties()).extracting(DestinationTopic.Properties::delay)
.containsExactly(0L, 100L, 200L, 300L, 300L, 0L);
KafkaTemplate<String, String> template = this.context.getBean(KafkaTemplate.class);
template.send(TEST_RETRY_TOPIC, "foo", "bar");
RetryListener listener = this.context.getBean(RetryListener.class);
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(listener).extracting(RetryListener::getKey, RetryListener::getReceived).containsExactly("foo",
"bar");
assertThat(listener).extracting(RetryListener::getTopics).asList().hasSize(5).containsSequence("testRetryTopic",
"testRetryTopic-retry-0", "testRetryTopic-retry-1", "testRetryTopic-retry-2", "testRetryTopic-retry-3");
}
@Test
void testStreams() {
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
@ -121,6 +148,11 @@ class KafkaAutoConfigurationIntegrationTests { @@ -121,6 +148,11 @@ class KafkaAutoConfigurationIntegrationTests {
return new Listener();
}
@Bean
RetryListener retryListener() {
return new RetryListener();
}
@Bean
NewTopic adminCreated() {
return TopicBuilder.name(ADMIN_CREATED_TOPIC).partitions(10).replicas(1).build();
@ -157,4 +189,38 @@ class KafkaAutoConfigurationIntegrationTests { @@ -157,4 +189,38 @@ class KafkaAutoConfigurationIntegrationTests {
}
static class RetryListener {
private final CountDownLatch latch = new CountDownLatch(5);
private final List<String> topics = new ArrayList<>();
private volatile String received;
private volatile String key;
@KafkaListener(topics = TEST_RETRY_TOPIC)
void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
this.received = foo;
this.key = key;
this.topics.add(topic);
this.latch.countDown();
throw new RuntimeException("Test exception");
}
private List<String> getTopics() {
return this.topics;
}
private String getReceived() {
return this.received;
}
private String getKey() {
return this.key;
}
}
}

70
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

@ -22,6 +22,7 @@ import java.util.HashMap; @@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import javax.security.auth.login.AppConfigurationEntry;
@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories; @@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@ -63,6 +66,8 @@ import org.springframework.kafka.listener.ContainerProperties; @@ -63,6 +66,8 @@ import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@ -74,6 +79,7 @@ import org.springframework.test.util.ReflectionTestUtils; @@ -74,6 +79,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.api.Assertions.tuple;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -85,6 +91,7 @@ import static org.mockito.Mockito.never; @@ -85,6 +91,7 @@ import static org.mockito.Mockito.never;
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Tomaz Fernandes
*/
class KafkaAutoConfigurationTests {
@ -315,6 +322,69 @@ class KafkaAutoConfigurationTests { @@ -315,6 +322,69 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void retryTopicConfigurationIsNotEnabledByDefault() {
this.contextRunner
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093")
.run((context) -> assertThat(context).doesNotHaveBean(RetryTopicConfiguration.class));
}
@Test
void retryTopicConfigurationWithExponentialBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms",
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> {
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class);
assertThat(configuration.getDestinationTopicProperties()).hasSize(6)
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"),
tuple(300L, "-retry-2"), tuple(300L, "-retry-3"), tuple(0L, "-dlt"));
});
}
@Test
void retryTopicConfigurationWithDefaultProperties() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true")
.run(assertRetryTopicConfiguration((configuration) -> {
assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix)
.containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt"));
assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics")
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse();
}));
}
@Test
void retryTopicConfigurationWithFixedBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L)));
}
@Test
void retryTopicConfigurationWithNoBackOff() {
this.contextRunner.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true",
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0")
.run(assertRetryTopicConfiguration(
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3)
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L)));
}
private ContextConsumer<AssertableApplicationContext> assertRetryTopicConfiguration(
Consumer<RetryTopicConfiguration> configuration) {
return (context) -> {
assertThat(context).hasSingleBean(RetryTopicConfiguration.class);
configuration.accept(context.getBean(RetryTopicConfiguration.class));
};
}
@SuppressWarnings("unchecked")
@Test
void streamsWithSeveralStreamsBuilderFactoryBeans() {

Loading…
Cancel
Save