|
|
|
|
@ -22,6 +22,7 @@ import java.util.HashMap;
@@ -22,6 +22,7 @@ import java.util.HashMap;
|
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Properties; |
|
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
|
|
import javax.security.auth.login.AppConfigurationEntry; |
|
|
|
|
|
|
|
|
|
@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories;
@@ -40,7 +41,9 @@ import org.assertj.core.api.InstanceOfAssertFactories;
|
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
|
|
|
|
|
import org.springframework.boot.autoconfigure.AutoConfigurations; |
|
|
|
|
import org.springframework.boot.test.context.assertj.AssertableApplicationContext; |
|
|
|
|
import org.springframework.boot.test.context.runner.ApplicationContextRunner; |
|
|
|
|
import org.springframework.boot.test.context.runner.ContextConsumer; |
|
|
|
|
import org.springframework.context.annotation.Bean; |
|
|
|
|
import org.springframework.context.annotation.Configuration; |
|
|
|
|
import org.springframework.kafka.annotation.EnableKafkaStreams; |
|
|
|
|
@ -65,6 +68,8 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode;
@@ -65,6 +68,8 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
|
|
|
|
import org.springframework.kafka.listener.ErrorHandler; |
|
|
|
|
import org.springframework.kafka.listener.RecordInterceptor; |
|
|
|
|
import org.springframework.kafka.listener.adapter.RecordFilterStrategy; |
|
|
|
|
import org.springframework.kafka.retrytopic.DestinationTopic; |
|
|
|
|
import org.springframework.kafka.retrytopic.RetryTopicConfiguration; |
|
|
|
|
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; |
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessageConverter; |
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; |
|
|
|
|
@ -76,6 +81,7 @@ import org.springframework.test.util.ReflectionTestUtils;
@@ -76,6 +81,7 @@ import org.springframework.test.util.ReflectionTestUtils;
|
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
import static org.assertj.core.api.Assertions.entry; |
|
|
|
|
import static org.assertj.core.api.Assertions.tuple; |
|
|
|
|
import static org.mockito.BDDMockito.then; |
|
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
|
import static org.mockito.Mockito.never; |
|
|
|
|
@ -87,6 +93,7 @@ import static org.mockito.Mockito.never;
@@ -87,6 +93,7 @@ import static org.mockito.Mockito.never;
|
|
|
|
|
* @author Stephane Nicoll |
|
|
|
|
* @author Eddú Meléndez |
|
|
|
|
* @author Nakul Mishra |
|
|
|
|
* @author Tomaz Fernandes |
|
|
|
|
*/ |
|
|
|
|
class KafkaAutoConfigurationTests { |
|
|
|
|
|
|
|
|
|
@ -317,6 +324,69 @@ class KafkaAutoConfigurationTests {
@@ -317,6 +324,69 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void retryTopicConfigurationIsNotEnabledByDefault() { |
|
|
|
|
this.contextRunner |
|
|
|
|
.withPropertyValues("spring.application.name=my-test-app", |
|
|
|
|
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093") |
|
|
|
|
.run((context) -> assertThat(context).doesNotHaveBean(RetryTopicConfiguration.class)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void retryTopicConfigurationWithExponentialBackOff() { |
|
|
|
|
this.contextRunner.withPropertyValues("spring.application.name=my-test-app", |
|
|
|
|
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", |
|
|
|
|
"spring.kafka.retry.topic.attempts=5", "spring.kafka.retry.topic.delay=100ms", |
|
|
|
|
"spring.kafka.retry.topic.multiplier=2", "spring.kafka.retry.topic.max-delay=300ms").run((context) -> { |
|
|
|
|
RetryTopicConfiguration configuration = context.getBean(RetryTopicConfiguration.class); |
|
|
|
|
assertThat(configuration.getDestinationTopicProperties()).hasSize(6) |
|
|
|
|
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) |
|
|
|
|
.containsExactly(tuple(0L, ""), tuple(100L, "-retry-0"), tuple(200L, "-retry-1"), |
|
|
|
|
tuple(300L, "-retry-2"), tuple(300L, "-retry-3"), tuple(0L, "-dlt")); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void retryTopicConfigurationWithDefaultProperties() { |
|
|
|
|
this.contextRunner.withPropertyValues("spring.application.name=my-test-app", |
|
|
|
|
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true") |
|
|
|
|
.run(assertRetryTopicConfiguration((configuration) -> { |
|
|
|
|
assertThat(configuration.getDestinationTopicProperties()).hasSize(3) |
|
|
|
|
.extracting(DestinationTopic.Properties::delay, DestinationTopic.Properties::suffix) |
|
|
|
|
.containsExactly(tuple(0L, ""), tuple(1000L, "-retry"), tuple(0L, "-dlt")); |
|
|
|
|
assertThat(configuration.forKafkaTopicAutoCreation()).extracting("shouldCreateTopics") |
|
|
|
|
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN).isFalse(); |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void retryTopicConfigurationWithFixedBackOff() { |
|
|
|
|
this.contextRunner.withPropertyValues("spring.application.name=my-test-app", |
|
|
|
|
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", |
|
|
|
|
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=2s") |
|
|
|
|
.run(assertRetryTopicConfiguration( |
|
|
|
|
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) |
|
|
|
|
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 2000L, 0L))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
void retryTopicConfigurationWithNoBackOff() { |
|
|
|
|
this.contextRunner.withPropertyValues("spring.application.name=my-test-app", |
|
|
|
|
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", "spring.kafka.retry.topic.enabled=true", |
|
|
|
|
"spring.kafka.retry.topic.attempts=4", "spring.kafka.retry.topic.delay=0") |
|
|
|
|
.run(assertRetryTopicConfiguration( |
|
|
|
|
(configuration) -> assertThat(configuration.getDestinationTopicProperties()).hasSize(3) |
|
|
|
|
.extracting(DestinationTopic.Properties::delay).containsExactly(0L, 0L, 0L))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ContextConsumer<AssertableApplicationContext> assertRetryTopicConfiguration( |
|
|
|
|
Consumer<RetryTopicConfiguration> configuration) { |
|
|
|
|
return (context) -> { |
|
|
|
|
assertThat(context).hasSingleBean(RetryTopicConfiguration.class); |
|
|
|
|
configuration.accept(context.getBean(RetryTopicConfiguration.class)); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
@Test |
|
|
|
|
void streamsWithSeveralStreamsBuilderFactoryBeans() { |
|
|
|
|
|