diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index edd8f4a37c4..1a2102ad8a2 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -21,6 +21,7 @@ import java.util.function.Function; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; @@ -42,6 +43,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; * @author Gary Russell * @author Eddú Meléndez * @author Thomas Kåsene + * @author Moritz Halbritter * @since 1.5.0 */ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -70,6 +72,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private Function threadNameSupplier; + private SimpleAsyncTaskExecutor listenerTaskExecutor; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -168,6 +172,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.threadNameSupplier = threadNameSupplier; } + /** + * Set the executor for threads that poll the consumer. + * @param listenerTaskExecutor task executor + */ + void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) { + this.listenerTaskExecutor = listenerTaskExecutor; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -226,6 +238,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::isImmediateStop).to(container::setStopImmediate); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); + map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index bedeec2844a..f7a103305f0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -21,8 +21,11 @@ import java.util.function.Function; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading; +import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ContainerCustomizer; @@ -49,6 +52,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; * @author Gary Russell * @author Eddú Meléndez * @author Thomas Kåsene + * @author Moritz Halbritter */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableKafka.class) @@ -107,7 +111,23 @@ class KafkaAnnotationDrivenConfiguration { @Bean @ConditionalOnMissingBean + @ConditionalOnThreading(Threading.PLATFORM) ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { + return configurer(); + } + + @Bean(name = "kafkaListenerContainerFactoryConfigurer") + @ConditionalOnMissingBean + @ConditionalOnThreading(Threading.VIRTUAL) + ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() { + ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer(); + SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-"); + executor.setVirtualThreads(true); + configurer.setListenerTaskExecutor(executor); + return configurer; + } + + private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); configurer.setBatchMessageConverter(this.batchMessageConverter); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java index b82a6efcabd..779d9974d3f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java @@ -21,10 +21,12 @@ import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.MessageListenerContainer; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -70,4 +72,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests { then(this.factory).should().setChangeConsumerThreadName(true); } + @Test + void shouldApplyListenerTaskExecutor() { + SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(); + this.configurer.setListenerTaskExecutor(executor); + this.configurer.configure(this.factory, this.consumerFactory); + assertThat(this.factory.getContainerProperties().getListenerTaskExecutor()).isEqualTo(executor); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index f274a3b5177..516db167450 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -47,8 +49,11 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ContextConsumer; +import org.springframework.boot.testsupport.assertj.SimpleAsyncTaskExecutorAssert; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; @@ -570,6 +575,31 @@ class KafkaAutoConfigurationTests { }); } + @Test + void shouldUsePlatformThreadsByDefault() { + this.contextRunner.run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).isNotNull(); + AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor(); + assertThat(listenerTaskExecutor).isNull(); + }); + } + + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void shouldUseVirtualThreadsIfEnabled() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).isNotNull(); + AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor(); + assertThat(listenerTaskExecutor).isInstanceOf(SimpleAsyncTaskExecutor.class); + SimpleAsyncTaskExecutorAssert.assertThat((SimpleAsyncTaskExecutor) listenerTaskExecutor) + .usesVirtualThreads(); + }); + } + @SuppressWarnings("unchecked") @Test void listenerProperties() {