From b2d1423e34906ac4a319e68b272b8b770224b174 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 14 Sep 2021 12:30:16 -0400 Subject: [PATCH 1/2] Add Spring Integration default poller auto-config When polling consumers or source polling channel adapters are used in Spring Integration applications, they require some polling policy to be configured. This comment auto-configures a PollerMetadata bean which customized via newly added `spring.integration.poller.*` configuration properties or overriden completely be user-defined bean. See gh-27992 --- .../IntegrationAutoConfiguration.java | 29 ++++++ .../integration/IntegrationProperties.java | 91 +++++++++++++++++++ .../IntegrationAutoConfigurationTests.java | 80 ++++++++++++++++ .../messaging/spring-integration.adoc | 1 + 4 files changed, 201 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java index 71dfd778e14..6c26a6351f1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.integration; +import java.time.Duration; + import javax.management.MBeanServer; import javax.sql.DataSource; @@ -56,10 +58,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.scheduling.support.PeriodicTrigger; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -110,6 +116,29 @@ public class IntegrationAutoConfiguration { @EnableIntegration protected static class IntegrationConfiguration { + @Bean(PollerMetadata.DEFAULT_POLLER) + @ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) { + IntegrationProperties.Poller poller = integrationProperties.getPoller(); + int hasCron = poller.getCron() != null ? 1 : 0; + int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0; + int hasFixedRate = poller.getFixedRate() != null ? 1 : 0; + Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1, + "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."); + PollerMetadata pollerMetadata = new PollerMetadata(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll); + map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout); + map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger); + map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate()) + .as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> { + map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay); + trigger.setFixedRate(poller.getFixedRate() != null); + return trigger; + }).to(pollerMetadata::setTrigger); + return pollerMetadata; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java index ed4109a1970..b22c930aae4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.integration; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -44,6 +45,8 @@ public class IntegrationProperties { private final RSocket rsocket = new RSocket(); + private final Poller poller = new Poller(); + public Channel getChannel() { return this.channel; } @@ -64,6 +67,10 @@ public class IntegrationProperties { return this.rsocket; } + public Poller getPoller() { + return this.poller; + } + public static class Channel { /** @@ -295,4 +302,88 @@ public class IntegrationProperties { } + public static class Poller { + + /** + * Maximum of messages to poll per polling cycle. + */ + private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED + + /** + * How long to wait for messages on poll. + */ + private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT + + /** + * Polling delay period. Mutually explusive with 'cron' and 'fixedRate'. + */ + private Duration fixedDelay; + + /** + * Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'. + */ + private Duration fixedRate; + + /** + * Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for + * 'cron'. + */ + private Duration initialDelay; + + /** + * Cron expression for polling. Mutually explusive with 'fixedDelay' and + * 'fixedRate'. + */ + private String cron; + + public int getMaxMessagesPerPoll() { + return this.maxMessagesPerPoll; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public Duration getReceiveTimeout() { + return this.receiveTimeout; + } + + public void setReceiveTimeout(Duration receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public Duration getFixedDelay() { + return this.fixedDelay; + } + + public void setFixedDelay(Duration fixedDelay) { + this.fixedDelay = fixedDelay; + } + + public Duration getFixedRate() { + return this.fixedRate; + } + + public void setFixedRate(Duration fixedRate) { + this.fixedRate = fixedRate; + } + + public Duration getInitialDelay() { + return this.initialDelay; + } + + public void setInitialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + } + + public String getCron() { + return this.cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java index ab1f994e36e..6ae281eadc9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java @@ -16,12 +16,17 @@ package org.springframework.boot.autoconfigure.integration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; import javax.sql.DataSource; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.io.ResourceLoader; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.IntegrationManagementConfigurer; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.MessageProcessorMessageSource; @@ -55,13 +62,17 @@ import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jmx.export.MBeanExporter; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; +import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -390,6 +401,54 @@ class IntegrationAutoConfigurationTests { .hasBean("customInitializer")); } + @Test + void defaultPoller() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER) + .hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED) + .hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT) + .hasFieldOrPropertyWithValue("trigger", null); + + GenericMessage testMessage = new GenericMessage<>("test"); + context.getBean("testChannel", QueueChannel.class).send(testMessage); + @SuppressWarnings("unchecked") + BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); + assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + }); + } + + @Test + void customPollerProperties() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class) + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.max-messages-per-poll=1", + "spring.integration.poller.receive-timeout=10s") + .run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class) + .getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class) + .hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L) + .hasFieldOrPropertyWithValue("receiveTimeout", 10000L) + .extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class) + .hasFieldOrPropertyWithValue("expression", "* * * ? * *"); + + GenericMessage testMessage = new GenericMessage<>("test"); + context.getBean("testChannel", QueueChannel.class).send(testMessage); + @SuppressWarnings("unchecked") + BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); + assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + }); + } + + @Test + void triggerPropertiesAreMutuallyExclusive() { + this.contextRunner + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.fixed-delay=1s") + .run((context) -> assertThat(context).hasFailed().getFailure() + .hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining( + "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.")); + } + @Configuration(proxyBeanMethods = false) static class CustomMBeanExporter { @@ -478,4 +537,25 @@ class IntegrationAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class PollingConsumerConfiguration { + + @Bean + QueueChannel testChannel() { + return new QueueChannel(); + } + + @Bean + BlockingQueue> sink() { + return new LinkedBlockingQueue<>(); + } + + @ServiceActivator(inputChannel = "testChannel") + @Bean + MessageHandler handler(BlockingQueue> sink) { + return sink::add; + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc index 607b653bda7..2a7731db9aa 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc @@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation. Spring Integration polling logic relies <>. +The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties. Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules. If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX. From 3274e24d55b3749b7b019e181ca50c7a6ab28569 Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Thu, 23 Sep 2021 16:35:06 +0100 Subject: [PATCH 2/2] Polish "Add Spring Integration default poller auto-config" See gh-27992 Co-authored-by: Phillip Webb --- .../IntegrationAutoConfiguration.java | 46 ++++++++++----- .../IntegrationAutoConfigurationTests.java | 56 +++++++++---------- 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java index 6c26a6351f1..ba4c8aa28ca 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java @@ -39,6 +39,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfig import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; @@ -62,10 +63,10 @@ import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; +import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.PeriodicTrigger; -import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -118,27 +119,44 @@ public class IntegrationAutoConfiguration { @Bean(PollerMetadata.DEFAULT_POLLER) @ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER) - public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) { + public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) { IntegrationProperties.Poller poller = integrationProperties.getPoller(); - int hasCron = poller.getCron() != null ? 1 : 0; - int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0; - int hasFixedRate = poller.getFixedRate() != null ? 1 : 0; - Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1, - "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.integration.poller.cron", + StringUtils.hasText(poller.getCron()) ? poller.getCron() : null); + entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay()); + entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate()); + }); PollerMetadata pollerMetadata = new PollerMetadata(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll); map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout); - map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger); - map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate()) - .as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> { - map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay); - trigger.setFixedRate(poller.getFixedRate() != null); - return trigger; - }).to(pollerMetadata::setTrigger); + map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger); return pollerMetadata; } + private Trigger asTrigger(IntegrationProperties.Poller poller) { + if (StringUtils.hasText(poller.getCron())) { + return new CronTrigger(poller.getCron()); + } + if (poller.getFixedDelay() != null) { + return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true); + } + if (poller.getFixedRate() != null) { + return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false); + } + return null; + } + + private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) { + PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis()); + if (initialDelay != null) { + trigger.setInitialDelay(initialDelay.toMillis()); + } + trigger.setFixedRate(fixedRate); + return trigger; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java index 6ae281eadc9..fad104b02ef 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java @@ -18,15 +18,14 @@ package org.springframework.boot.autoconfigure.integration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.sql.DataSource; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -42,6 +41,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfig import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration; import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration; +import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer; import org.springframework.boot.sql.init.DatabaseInitializationMode; import org.springframework.boot.sql.init.DatabaseInitializationSettings; @@ -70,7 +70,6 @@ import org.springframework.jmx.export.MBeanExporter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; -import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; @@ -404,49 +403,48 @@ class IntegrationAutoConfigurationTests { @Test void defaultPoller() { this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> { - assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER) - .hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED) - .hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT) - .hasFieldOrPropertyWithValue("trigger", null); - - GenericMessage testMessage = new GenericMessage<>("test"); - context.getBean("testChannel", QueueChannel.class).send(testMessage); - @SuppressWarnings("unchecked") - BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); - assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + assertThat(context).hasSingleBean(PollerMetadata.class); + PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class); + assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED); + assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT); + assertThat(metadata.getTrigger()).isNull(); }); } @Test - void customPollerProperties() { + void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() { this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class) .withPropertyValues("spring.integration.poller.cron=* * * ? * *", "spring.integration.poller.max-messages-per-poll=1", "spring.integration.poller.receive-timeout=10s") .run((context) -> { - assertThat(context).hasSingleBean(PollerMetadata.class) - .getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class) - .hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L) - .hasFieldOrPropertyWithValue("receiveTimeout", 10000L) - .extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class) - .hasFieldOrPropertyWithValue("expression", "* * * ? * *"); - - GenericMessage testMessage = new GenericMessage<>("test"); - context.getBean("testChannel", QueueChannel.class).send(testMessage); - @SuppressWarnings("unchecked") - BlockingQueue> sink = context.getBean("sink", BlockingQueue.class); - assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage); + assertThat(context).hasSingleBean(PollerMetadata.class); + PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class); + assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L); + assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L); + assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class)) + .satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *")); }); } @Test - void triggerPropertiesAreMutuallyExclusive() { + void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() { this.contextRunner .withPropertyValues("spring.integration.poller.cron=* * * ? * *", "spring.integration.poller.fixed-delay=1s") .run((context) -> assertThat(context).hasFailed().getFailure() - .hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining( - "The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.")); + .hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class) + .getRootCause() + .asInstanceOf( + InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class)) + .satisfies((ex) -> { + assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder( + "spring.integration.poller.cron", "spring.integration.poller.fixed-delay"); + assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder( + "spring.integration.poller.cron", "spring.integration.poller.fixed-delay", + "spring.integration.poller.fixed-rate"); + })); + } @Configuration(proxyBeanMethods = false)