@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder;
@@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig ;
import org.assertj.core.api.InstanceOfAssertFactories ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.params.ParameterizedTest ;
import org.junit.jupiter.params.provider.ValueSource ;
import org.springframework.boot.autoconfigure.AutoConfigurations ;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext ;
@ -501,6 +503,7 @@ class KafkaAutoConfigurationTests {
@@ -501,6 +503,7 @@ class KafkaAutoConfigurationTests {
assertThat ( containerProperties . isStopImmediate ( ) ) . isTrue ( ) ;
assertThat ( kafkaListenerContainerFactory ) . extracting ( "concurrency" ) . isEqualTo ( 3 ) ;
assertThat ( kafkaListenerContainerFactory . isBatchListener ( ) ) . isTrue ( ) ;
assertThat ( kafkaListenerContainerFactory ) . hasFieldOrPropertyWithValue ( "autoStartup" , true ) ;
assertThat ( context . getBeansOfType ( KafkaJaasLoginModuleInitializer . class ) ) . hasSize ( 1 ) ;
KafkaJaasLoginModuleInitializer jaas = context . getBean ( KafkaJaasLoginModuleInitializer . class ) ;
assertThat ( jaas ) . hasFieldOrPropertyWithValue ( "loginModule" , "foo" ) ;
@ -672,6 +675,16 @@ class KafkaAutoConfigurationTests {
@@ -672,6 +675,16 @@ class KafkaAutoConfigurationTests {
} ) ;
}
@ParameterizedTest ( name = "{0}" )
@ValueSource ( booleans = { true , false } )
void testConcurrentKafkaListenerContainerFactoryAutoStartup ( boolean autoStartup ) {
this . contextRunner . withPropertyValues ( "spring.kafka.listener.auto-startup=" + autoStartup ) . run ( ( context ) - > {
ConcurrentKafkaListenerContainerFactory < ? , ? > kafkaListenerContainerFactory = context
. getBean ( ConcurrentKafkaListenerContainerFactory . class ) ;
assertThat ( kafkaListenerContainerFactory ) . hasFieldOrPropertyWithValue ( "autoStartup" , autoStartup ) ;
} ) ;
}
@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol ( ) {
this . contextRunner . withPropertyValues ( "spring.kafka.security.protocol=SSL" ,