Browse Source

Merge pull request #27992 from artembilan

* pr/27992:
  Polish "Add Spring Integration default poller auto-config"
  Add Spring Integration default poller auto-config

Closes gh-27992
pull/28141/head
Phillip Webb 4 years ago
parent
commit
5bd34be468
  1. 47
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java
  2. 91
      spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java
  3. 78
      spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java
  4. 1
      spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc

47
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.integration;
import java.time.Duration;
import javax.management.MBeanServer;
import javax.sql.DataSource;
@ -37,6 +39,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfig @@ -37,6 +39,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfig
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.task.TaskSchedulerBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
@ -56,10 +59,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; @@ -56,10 +59,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.util.StringUtils;
/**
@ -110,6 +117,46 @@ public class IntegrationAutoConfiguration { @@ -110,6 +117,46 @@ public class IntegrationAutoConfiguration {
@EnableIntegration
protected static class IntegrationConfiguration {
@Bean(PollerMetadata.DEFAULT_POLLER)
@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) {
IntegrationProperties.Poller poller = integrationProperties.getPoller();
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
entries.put("spring.integration.poller.cron",
StringUtils.hasText(poller.getCron()) ? poller.getCron() : null);
entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay());
entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate());
});
PollerMetadata pollerMetadata = new PollerMetadata();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger);
return pollerMetadata;
}
private Trigger asTrigger(IntegrationProperties.Poller poller) {
if (StringUtils.hasText(poller.getCron())) {
return new CronTrigger(poller.getCron());
}
if (poller.getFixedDelay() != null) {
return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true);
}
if (poller.getFixedRate() != null) {
return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false);
}
return null;
}
private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) {
PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis());
if (initialDelay != null) {
trigger.setInitialDelay(initialDelay.toMillis());
}
trigger.setFixedRate(fixedRate);
return trigger;
}
}
/**

91
spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.integration;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@ -44,6 +45,8 @@ public class IntegrationProperties { @@ -44,6 +45,8 @@ public class IntegrationProperties {
private final RSocket rsocket = new RSocket();
private final Poller poller = new Poller();
public Channel getChannel() {
return this.channel;
}
@ -64,6 +67,10 @@ public class IntegrationProperties { @@ -64,6 +67,10 @@ public class IntegrationProperties {
return this.rsocket;
}
public Poller getPoller() {
return this.poller;
}
public static class Channel {
/**
@ -295,4 +302,88 @@ public class IntegrationProperties { @@ -295,4 +302,88 @@ public class IntegrationProperties {
}
public static class Poller {
/**
* Maximum of messages to poll per polling cycle.
*/
private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED
/**
* How long to wait for messages on poll.
*/
private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT
/**
* Polling delay period. Mutually explusive with 'cron' and 'fixedRate'.
*/
private Duration fixedDelay;
/**
* Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'.
*/
private Duration fixedRate;
/**
* Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for
* 'cron'.
*/
private Duration initialDelay;
/**
* Cron expression for polling. Mutually explusive with 'fixedDelay' and
* 'fixedRate'.
*/
private String cron;
public int getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
public Duration getReceiveTimeout() {
return this.receiveTimeout;
}
public void setReceiveTimeout(Duration receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
public Duration getFixedDelay() {
return this.fixedDelay;
}
public void setFixedDelay(Duration fixedDelay) {
this.fixedDelay = fixedDelay;
}
public Duration getFixedRate() {
return this.fixedRate;
}
public void setFixedRate(Duration fixedRate) {
this.fixedRate = fixedRate;
}
public Duration getInitialDelay() {
return this.initialDelay;
}
public void setInitialDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
}
public String getCron() {
return this.cron;
}
public void setCron(String cron) {
this.cron = cron;
}
}
}

78
spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java

@ -16,11 +16,15 @@ @@ -16,11 +16,15 @@
package org.springframework.boot.autoconfigure.integration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.MBeanServer;
import javax.sql.DataSource;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.springframework.beans.DirectFieldAccessor;
@ -37,6 +41,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfig @@ -37,6 +41,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfig
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer;
import org.springframework.boot.sql.init.DatabaseInitializationMode;
import org.springframework.boot.sql.init.DatabaseInitializationSettings;
@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary; @@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ResourceLoader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.IntegrationManagementConfigurer;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.endpoint.MessageProcessorMessageSource;
@ -55,13 +62,16 @@ import org.springframework.integration.rsocket.ClientRSocketConnector; @@ -55,13 +62,16 @@ import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@ -390,6 +400,53 @@ class IntegrationAutoConfigurationTests { @@ -390,6 +400,53 @@ class IntegrationAutoConfigurationTests {
.hasBean("customInitializer"));
}
@Test
void defaultPoller() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class);
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED);
assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT);
assertThat(metadata.getTrigger()).isNull();
});
}
@Test
void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() {
this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.max-messages-per-poll=1",
"spring.integration.poller.receive-timeout=10s")
.run((context) -> {
assertThat(context).hasSingleBean(PollerMetadata.class);
PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L);
assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L);
assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class))
.satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *"));
});
}
@Test
void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() {
this.contextRunner
.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
"spring.integration.poller.fixed-delay=1s")
.run((context) -> assertThat(context).hasFailed().getFailure()
.hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class)
.getRootCause()
.asInstanceOf(
InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class))
.satisfies((ex) -> {
assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder(
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay");
assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder(
"spring.integration.poller.cron", "spring.integration.poller.fixed-delay",
"spring.integration.poller.fixed-rate");
}));
}
@Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter {
@ -478,4 +535,25 @@ class IntegrationAutoConfigurationTests { @@ -478,4 +535,25 @@ class IntegrationAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class PollingConsumerConfiguration {
@Bean
QueueChannel testChannel() {
return new QueueChannel();
}
@Bean
BlockingQueue<Message<?>> sink() {
return new LinkedBlockingQueue<>();
}
@ServiceActivator(inputChannel = "testChannel")
@Bean
MessageHandler handler(BlockingQueue<Message<?>> sink) {
return sink::add;
}
}
}

1
spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc

@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport @@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport
If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation.
Spring Integration polling logic relies <<features#features.task-execution-and-scheduling,on the auto-configured `TaskScheduler`>>.
The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties.
Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules.
If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.

Loading…
Cancel
Save