From 6eddf1b372dd8ef87499ec381d05458adeff8577 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 1 May 2017 16:18:59 -0400 Subject: [PATCH 1/3] Support direct AMQP container Add support for auto configuration - select container type and separate discrete properties. See gh-9055 --- ...bitListenerContainerFactoryConfigurer.java | 124 ++++++++++++++++++ ...bitListenerContainerFactoryConfigurer.java | 39 ++++++ .../RabbitAnnotationDrivenConfiguration.java | 84 +++++++++--- .../autoconfigure/amqp/RabbitProperties.java | 51 ++++++- ...bitListenerContainerFactoryConfigurer.java | 100 ++------------ .../amqp/RabbitAutoConfigurationTests.java | 31 ++++- .../appendix-application-properties.adoc | 8 +- 7 files changed, 319 insertions(+), 118 deletions(-) create mode 100644 spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java create mode 100644 spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 00000000000..fc2baffa3c3 --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,124 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; +import org.springframework.util.Assert; + +/** + * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * + * @param the container factory type. + * + * @author Gary Russell + * @since 2.0 + * + */ +public abstract class AbstractRabbitListenerContainerFactoryConfigurer< + T extends AbstractRabbitListenerContainerFactory> { + + private MessageConverter messageConverter; + + private MessageRecoverer messageRecoverer; + + private RabbitProperties rabbitProperties; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + protected void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. + * @param messageRecoverer the {@link MessageRecoverer} + */ + protected void setMessageRecoverer(MessageRecoverer messageRecoverer) { + this.messageRecoverer = messageRecoverer; + } + + /** + * Set the {@link RabbitProperties} to use. + * @param rabbitProperties the {@link RabbitProperties} + */ + protected void setRabbitProperties(RabbitProperties rabbitProperties) { + this.rabbitProperties = rabbitProperties; + } + + /** + * Configure the specified rabbit listener container factory. The factory can be + * further tuned and default settings can be overridden. + * @param factory the {@link AbstractRabbitListenerContainerFactory} instance to + * configure + * @param connectionFactory the {@link ConnectionFactory} to use + */ + public final void configure(T factory, ConnectionFactory connectionFactory) { + Assert.notNull(factory, "Factory must not be null"); + Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + factory.setConnectionFactory(connectionFactory); + if (this.messageConverter != null) { + factory.setMessageConverter(this.messageConverter); + } + RabbitProperties.Listener listenerConfig = this.rabbitProperties.getListener(); + factory.setAutoStartup(listenerConfig.isAutoStartup()); + if (listenerConfig.getAcknowledgeMode() != null) { + factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode()); + } + if (listenerConfig.getPrefetch() != null) { + factory.setPrefetchCount(listenerConfig.getPrefetch()); + } + if (listenerConfig.getDefaultRequeueRejected() != null) { + factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected()); + } + if (listenerConfig.getIdleEventInterval() != null) { + factory.setIdleEventInterval(listenerConfig.getIdleEventInterval()); + } + ListenerRetry retryConfig = listenerConfig.getRetry(); + if (retryConfig.isEnabled()) { + RetryInterceptorBuilder builder = (retryConfig.isStateless() + ? RetryInterceptorBuilder.stateless() + : RetryInterceptorBuilder.stateful()); + builder.maxAttempts(retryConfig.getMaxAttempts()); + builder.backOffOptions(retryConfig.getInitialInterval(), + retryConfig.getMultiplier(), retryConfig.getMaxInterval()); + MessageRecoverer recoverer = (this.messageRecoverer != null + ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); + builder.recoverer(recoverer); + factory.setAdviceChain(builder.build()); + } + configure(factory, this.rabbitProperties); + } + + /** + * Perform factory-specific configuration. + * + * @param factory the factory. + * @param rabbitProperties the properties. + */ + protected abstract void configure(T factory, RabbitProperties rabbitProperties); + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java new file mode 100644 index 00000000000..4a7026210bc --- /dev/null +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2012-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; + +/** + * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * + * @author Gary Russell + * @since 2.0 + */ +public final class DirectRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { + + @Override + protected void configure(DirectRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { + RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); + if (listenerConfig.getConsumersPerQueue() != null) { + factory.setConsumersPerQueue(listenerConfig.getConsumersPerQueue()); + } + } + +} diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 18c107ca2d7..b7504546018 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -25,6 +26,7 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,11 +41,11 @@ import org.springframework.context.annotation.Configuration; @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { - private final ObjectProvider messageConverter; + protected final ObjectProvider messageConverter; - private final ObjectProvider messageRecoverer; + protected final ObjectProvider messageRecoverer; - private final RabbitProperties properties; + protected final RabbitProperties properties; RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter, ObjectProvider messageRecoverer, @@ -53,24 +55,68 @@ class RabbitAnnotationDrivenConfiguration { this.properties = properties; } - @Bean - @ConditionalOnMissingBean - public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { - SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); - configurer.setMessageConverter(this.messageConverter.getIfUnique()); - configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); - configurer.setRabbitProperties(this.properties); - return configurer; + + @Configuration + @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "simple", + matchIfMissing = true) + public static class SimpleContainerConfiguration extends RabbitAnnotationDrivenConfiguration { + + SimpleContainerConfiguration(ObjectProvider messageConverter, + ObjectProvider messageRecoverer, RabbitProperties properties) { + super(messageConverter, messageRecoverer, properties); + } + + @Bean + @ConditionalOnMissingBean + public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { + SimpleRabbitListenerContainerFactoryConfigurer configurer = + new SimpleRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } } - @Bean - @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( - SimpleRabbitListenerContainerFactoryConfigurer configurer, - ConnectionFactory connectionFactory) { - SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); - configurer.configure(factory, connectionFactory); - return factory; + @Configuration + @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "direct") + public static class DirectContainerConfiguration extends RabbitAnnotationDrivenConfiguration { + + DirectContainerConfiguration(ObjectProvider messageConverter, + ObjectProvider messageRecoverer, RabbitProperties properties) { + super(messageConverter, messageRecoverer, properties); + } + + @Bean + @ConditionalOnMissingBean + public DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { + DirectRabbitListenerContainerFactoryConfigurer configurer = + new DirectRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory( + DirectRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } } @EnableRabbit diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index f5b8f883cdc..a8ca050c995 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -40,6 +40,22 @@ import org.springframework.util.StringUtils; @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { + public enum ContainerType { + + /** + * SimpleMessageListenerContainer - legacy container where the RabbitMQ consumer + * dispatches messages to an invoker thread. + */ + SIMPLE, + + /** + * DirectMessageListenerContainer - container where the listener is invoked + * directly on the RabbitMQ consumer thread. + */ + DIRECT + + } + /** * RabbitMQ host. */ @@ -572,6 +588,11 @@ public class RabbitProperties { public static class AmqpContainer { + /** + * Container type. + */ + private ContainerType type = ContainerType.SIMPLE; + /** * Start the container automatically on startup. */ @@ -583,15 +604,20 @@ public class RabbitProperties { private AcknowledgeMode acknowledgeMode; /** - * Minimum number of consumers. + * Minimum number of listener invoker threads - applies only to simple containers. */ private Integer concurrency; /** - * Maximum number of consumers. + * Maximum number of listener invoker threads - applies only to simple containers. */ private Integer maxConcurrency; + /** + * Number of RabbitMQ consumers per queue - applies only to direct containers. + */ + private Integer consumersPerQueue; + /** * Number of messages to be handled in a single request. It should be greater than * or equal to the transaction size (if used). @@ -599,8 +625,9 @@ public class RabbitProperties { private Integer prefetch; /** - * Number of messages to be processed in a transaction. For best results it should - * be less than or equal to the prefetch count. + * Number of messages to be processed in a transaction; number of messages between + * acks. For best results it should be less than or equal to the prefetch count - + * applies only to simple containers. */ private Integer transactionSize; @@ -620,6 +647,14 @@ public class RabbitProperties { @NestedConfigurationProperty private final ListenerRetry retry = new ListenerRetry(); + public ContainerType getType() { + return this.type; + } + + public void setType(ContainerType containerType) { + this.type = containerType; + } + public boolean isAutoStartup() { return this.autoStartup; } @@ -652,6 +687,14 @@ public class RabbitProperties { this.maxConcurrency = maxConcurrency; } + public Integer getConsumersPerQueue() { + return this.consumersPerQueue; + } + + public void setConsumersPerQueue(Integer consumersPerQueue) { + this.consumersPerQueue = consumersPerQueue; + } + public Integer getPrefetch() { return this.prefetch; } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index 52b6509039f..8d2845c38b3 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -16,15 +16,8 @@ package org.springframework.boot.autoconfigure.amqp; -import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.retry.MessageRecoverer; -import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry; -import org.springframework.util.Assert; /** * Configure {@link RabbitListenerContainerFactory} with sensible defaults. @@ -33,92 +26,21 @@ import org.springframework.util.Assert; * @author Gary Russell * @since 1.3.3 */ -public final class SimpleRabbitListenerContainerFactoryConfigurer { +public final class SimpleRabbitListenerContainerFactoryConfigurer + extends AbstractRabbitListenerContainerFactoryConfigurer { - private MessageConverter messageConverter; - - private MessageRecoverer messageRecoverer; - - private RabbitProperties rabbitProperties; - - /** - * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box - * converter should be used. - * @param messageConverter the {@link MessageConverter} - */ - void setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; - } - - /** - * Set the {@link MessageRecoverer} to use or {@code null} to rely on the default. - * @param messageRecoverer the {@link MessageRecoverer} - */ - void setMessageRecoverer(MessageRecoverer messageRecoverer) { - this.messageRecoverer = messageRecoverer; - } - - /** - * Set the {@link RabbitProperties} to use. - * @param rabbitProperties the {@link RabbitProperties} - */ - void setRabbitProperties(RabbitProperties rabbitProperties) { - this.rabbitProperties = rabbitProperties; - } - - /** - * Configure the specified rabbit listener container factory. The factory can be - * further tuned and default settings can be overridden. - * @param factory the {@link SimpleRabbitListenerContainerFactory} instance to - * configure - * @param connectionFactory the {@link ConnectionFactory} to use - */ - public void configure(SimpleRabbitListenerContainerFactory factory, - ConnectionFactory connectionFactory) { - Assert.notNull(factory, "Factory must not be null"); - Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); - factory.setConnectionFactory(connectionFactory); - if (this.messageConverter != null) { - factory.setMessageConverter(this.messageConverter); - } - RabbitProperties.AmqpContainer config = this.rabbitProperties.getListener() - .getSimple(); - factory.setAutoStartup(config.isAutoStartup()); - if (config.getAcknowledgeMode() != null) { - factory.setAcknowledgeMode(config.getAcknowledgeMode()); - } - if (config.getConcurrency() != null) { - factory.setConcurrentConsumers(config.getConcurrency()); - } - if (config.getMaxConcurrency() != null) { - factory.setMaxConcurrentConsumers(config.getMaxConcurrency()); + @Override + protected void configure(SimpleRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { + RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); + if (listenerConfig.getConcurrency() != null) { + factory.setConcurrentConsumers(listenerConfig.getConcurrency()); } - if (config.getPrefetch() != null) { - factory.setPrefetchCount(config.getPrefetch()); + if (listenerConfig.getMaxConcurrency() != null) { + factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency()); } - if (config.getTransactionSize() != null) { - factory.setTxSize(config.getTransactionSize()); + if (listenerConfig.getTransactionSize() != null) { + factory.setTxSize(listenerConfig.getTransactionSize()); } - if (config.getDefaultRequeueRejected() != null) { - factory.setDefaultRequeueRejected(config.getDefaultRequeueRejected()); - } - if (config.getIdleEventInterval() != null) { - factory.setIdleEventInterval(config.getIdleEventInterval()); - } - ListenerRetry retryConfig = config.getRetry(); - if (retryConfig.isEnabled()) { - RetryInterceptorBuilder builder = (retryConfig.isStateless() - ? RetryInterceptorBuilder.stateless() - : RetryInterceptorBuilder.stateful()); - builder.maxAttempts(retryConfig.getMaxAttempts()); - builder.backOffOptions(retryConfig.getInitialInterval(), - retryConfig.getMultiplier(), retryConfig.getMaxInterval()); - MessageRecoverer recoverer = (this.messageRecoverer != null - ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); - builder.recoverer(recoverer); - factory.setAdviceChain(builder.build()); - } - } } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index d1a056f74d8..c1c2b23af03 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -30,6 +30,7 @@ import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; @@ -336,6 +337,33 @@ public class RabbitAutoConfigurationTests { .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); + assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); + assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); + checkCommonProps(dfa); + } + + @Test + public void testDirectRabbitListenerContainerFactoryWithCustomSettings() { + load(new Class[] { MessageConvertersConfiguration.class, + MessageRecoverersConfiguration.class }, + "spring.rabbitmq.listener.type:direct", + "spring.rabbitmq.listener.retry.enabled:true", + "spring.rabbitmq.listener.retry.maxAttempts:4", + "spring.rabbitmq.listener.retry.initialInterval:2000", + "spring.rabbitmq.listener.retry.multiplier:1.5", + "spring.rabbitmq.listener.retry.maxInterval:5000", + "spring.rabbitmq.listener.autoStartup:false", + "spring.rabbitmq.listener.acknowledgeMode:manual", + "spring.rabbitmq.listener.consumers-per-queue:5", + "spring.rabbitmq.listener.prefetch:40", + "spring.rabbitmq.listener.defaultRequeueRejected:false", + "spring.rabbitmq.listener.idleEventInterval:5"); + DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + .getBean("rabbitListenerContainerFactory", + DirectRabbitListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertThat(dfa.getPropertyValue("consumersPerQueue")).isEqualTo(5); checkCommonProps(dfa); } @@ -343,10 +371,7 @@ public class RabbitAutoConfigurationTests { assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE); assertThat(dfa.getPropertyValue("acknowledgeMode")) .isEqualTo(AcknowledgeMode.MANUAL); - assertThat(dfa.getPropertyValue("concurrentConsumers")).isEqualTo(5); - assertThat(dfa.getPropertyValue("maxConcurrentConsumers")).isEqualTo(10); assertThat(dfa.getPropertyValue("prefetchCount")).isEqualTo(40); - assertThat(dfa.getPropertyValue("txSize")).isEqualTo(20); assertThat(dfa.getPropertyValue("messageConverter")) .isSameAs(this.context.getBean("myMessageConverter")); assertThat(dfa.getPropertyValue("defaultRequeueRejected")) diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f6166bbc17d..c601a48ea11 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -977,10 +977,11 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.host=localhost # RabbitMQ host. spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup. - spring.rabbitmq.listener.simple.concurrency= # Minimum number of consumers. + spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads for a `simple` container. + spring.rabbitmq.listener.consumers-per-queue= # The number of Consumers per queue for a `direct` container. spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds. - spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of consumers. + spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker for a `simple` container. spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. @@ -988,7 +989,8 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful. - spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. + spring.rabbitmq.listener.simple.transaction-size= Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count - applies only to `simple` containers. + spring.rabbitmq.listener.type= # The listener container type `simple` or `direct`; default `simple`. spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms. From 2894e57146fcb6a4a7d5459c116f0f21a85fb77d Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 4 May 2017 18:00:06 +0200 Subject: [PATCH 2/3] Polish "Support direct AMQP container" Closes gh-9055 --- ...bitListenerContainerFactoryConfigurer.java | 48 +++-- ...bitListenerContainerFactoryConfigurer.java | 15 +- .../RabbitAnnotationDrivenConfiguration.java | 100 ++++------ .../autoconfigure/amqp/RabbitProperties.java | 187 ++++++++++-------- ...bitListenerContainerFactoryConfigurer.java | 22 ++- .../amqp/RabbitAutoConfigurationTests.java | 68 +++++-- .../appendix-application-properties.adoc | 12 +- .../main/asciidoc/spring-boot-features.adoc | 17 +- 8 files changed, 262 insertions(+), 207 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index fc2baffa3c3..cedc2642532 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -30,13 +30,12 @@ import org.springframework.util.Assert; * Configure {@link RabbitListenerContainerFactory} with sensible defaults. * * @param the container factory type. - * * @author Gary Russell - * @since 2.0 - * + * @author Stephane Nicoll + * @since 2.0.0 */ public abstract class AbstractRabbitListenerContainerFactoryConfigurer< - T extends AbstractRabbitListenerContainerFactory> { + T extends AbstractRabbitListenerContainerFactory> { private MessageConverter messageConverter; @@ -69,6 +68,10 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer< this.rabbitProperties = rabbitProperties; } + protected final RabbitProperties getRabbitProperties() { + return this.rabbitProperties; + } + /** * Configure the specified rabbit listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -76,28 +79,32 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer< * configure * @param connectionFactory the {@link ConnectionFactory} to use */ - public final void configure(T factory, ConnectionFactory connectionFactory) { + public abstract void configure(T factory, ConnectionFactory connectionFactory); + + + protected void configure(T factory, ConnectionFactory connectionFactory, + RabbitProperties.AmqpContainer configuration) { Assert.notNull(factory, "Factory must not be null"); Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + Assert.notNull(configuration, "Configuration must not be null"); factory.setConnectionFactory(connectionFactory); if (this.messageConverter != null) { factory.setMessageConverter(this.messageConverter); } - RabbitProperties.Listener listenerConfig = this.rabbitProperties.getListener(); - factory.setAutoStartup(listenerConfig.isAutoStartup()); - if (listenerConfig.getAcknowledgeMode() != null) { - factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode()); + factory.setAutoStartup(configuration.isAutoStartup()); + if (configuration.getAcknowledgeMode() != null) { + factory.setAcknowledgeMode(configuration.getAcknowledgeMode()); } - if (listenerConfig.getPrefetch() != null) { - factory.setPrefetchCount(listenerConfig.getPrefetch()); + if (configuration.getPrefetch() != null) { + factory.setPrefetchCount(configuration.getPrefetch()); } - if (listenerConfig.getDefaultRequeueRejected() != null) { - factory.setDefaultRequeueRejected(listenerConfig.getDefaultRequeueRejected()); + if (configuration.getDefaultRequeueRejected() != null) { + factory.setDefaultRequeueRejected(configuration.getDefaultRequeueRejected()); } - if (listenerConfig.getIdleEventInterval() != null) { - factory.setIdleEventInterval(listenerConfig.getIdleEventInterval()); + if (configuration.getIdleEventInterval() != null) { + factory.setIdleEventInterval(configuration.getIdleEventInterval()); } - ListenerRetry retryConfig = listenerConfig.getRetry(); + ListenerRetry retryConfig = configuration.getRetry(); if (retryConfig.isEnabled()) { RetryInterceptorBuilder builder = (retryConfig.isStateless() ? RetryInterceptorBuilder.stateless() @@ -110,15 +117,6 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer< builder.recoverer(recoverer); factory.setAdviceChain(builder.build()); } - configure(factory, this.rabbitProperties); } - /** - * Perform factory-specific configuration. - * - * @param factory the factory. - * @param rabbitProperties the properties. - */ - protected abstract void configure(T factory, RabbitProperties rabbitProperties); - } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java index 4a7026210bc..07879468dc9 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/DirectRabbitListenerContainerFactoryConfigurer.java @@ -17,22 +17,25 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; /** - * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * Configure {@link DirectRabbitListenerContainerFactoryConfigurer} with sensible defaults. * * @author Gary Russell + * @author Stephane Nicoll * @since 2.0 */ public final class DirectRabbitListenerContainerFactoryConfigurer extends AbstractRabbitListenerContainerFactoryConfigurer { @Override - protected void configure(DirectRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { - RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); - if (listenerConfig.getConsumersPerQueue() != null) { - factory.setConsumersPerQueue(listenerConfig.getConsumersPerQueue()); + public void configure(DirectRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) { + RabbitProperties.DirectContainer config = getRabbitProperties().getListener() + .getDirect(); + configure(factory, connectionFactory, config); + if (config.getConsumersPerQueue() != null) { + factory.setConsumersPerQueue(config.getConsumersPerQueue()); } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index b7504546018..7da43ccf28c 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -41,11 +41,11 @@ import org.springframework.context.annotation.Configuration; @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { - protected final ObjectProvider messageConverter; + private final ObjectProvider messageConverter; - protected final ObjectProvider messageRecoverer; + private final ObjectProvider messageRecoverer; - protected final RabbitProperties properties; + private final RabbitProperties properties; RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter, ObjectProvider messageRecoverer, @@ -55,68 +55,48 @@ class RabbitAnnotationDrivenConfiguration { this.properties = properties; } - - @Configuration - @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "simple", - matchIfMissing = true) - public static class SimpleContainerConfiguration extends RabbitAnnotationDrivenConfiguration { - - SimpleContainerConfiguration(ObjectProvider messageConverter, - ObjectProvider messageRecoverer, RabbitProperties properties) { - super(messageConverter, messageRecoverer, properties); - } - - @Bean - @ConditionalOnMissingBean - public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { - SimpleRabbitListenerContainerFactoryConfigurer configurer = - new SimpleRabbitListenerContainerFactoryConfigurer(); - configurer.setMessageConverter(this.messageConverter.getIfUnique()); - configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); - configurer.setRabbitProperties(this.properties); - return configurer; - } - - @Bean - @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( - SimpleRabbitListenerContainerFactoryConfigurer configurer, - ConnectionFactory connectionFactory) { - SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); - configurer.configure(factory, connectionFactory); - return factory; - } + @Bean + @ConditionalOnMissingBean + public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() { + SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; } - @Configuration - @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "listener.type", havingValue = "direct") - public static class DirectContainerConfiguration extends RabbitAnnotationDrivenConfiguration { + @Bean(name = "rabbitListenerContainerFactory") + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) + public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; + } - DirectContainerConfiguration(ObjectProvider messageConverter, - ObjectProvider messageRecoverer, RabbitProperties properties) { - super(messageConverter, messageRecoverer, properties); - } - @Bean - @ConditionalOnMissingBean - public DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() { - DirectRabbitListenerContainerFactoryConfigurer configurer = - new DirectRabbitListenerContainerFactoryConfigurer(); - configurer.setMessageConverter(this.messageConverter.getIfUnique()); - configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); - configurer.setRabbitProperties(this.properties); - return configurer; - } + @Bean + @ConditionalOnMissingBean + public DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurer() { + DirectRabbitListenerContainerFactoryConfigurer configurer = + new DirectRabbitListenerContainerFactoryConfigurer(); + configurer.setMessageConverter(this.messageConverter.getIfUnique()); + configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); + configurer.setRabbitProperties(this.properties); + return configurer; + } - @Bean - @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") - public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory( - DirectRabbitListenerContainerFactoryConfigurer configurer, - ConnectionFactory connectionFactory) { - DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); - configurer.configure(factory, connectionFactory); - return factory; - } + @Bean(name = "rabbitListenerContainerFactory") + @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") + @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct") + public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory( + DirectRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + return factory; } @EnableRabbit diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index a8ca050c995..35c2a3785b3 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -40,22 +40,6 @@ import org.springframework.util.StringUtils; @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { - public enum ContainerType { - - /** - * SimpleMessageListenerContainer - legacy container where the RabbitMQ consumer - * dispatches messages to an invoker thread. - */ - SIMPLE, - - /** - * DirectMessageListenerContainer - container where the listener is invoked - * directly on the RabbitMQ consumer thread. - */ - DIRECT - - } - /** * RabbitMQ host. */ @@ -481,10 +465,42 @@ public class RabbitProperties { } + public enum ContainerType { + + /** + * Legacy container where the RabbitMQ consumer dispatches messages to an + * invoker thread. + */ + SIMPLE, + + /** + * Container where the listener is invoked directly on the RabbitMQ consumer + * thread. + */ + DIRECT + + } + public static class Listener { + /** + * Listener container type. + */ + private ContainerType type = ContainerType.SIMPLE; + @NestedConfigurationProperty - private final AmqpContainer simple = new AmqpContainer(); + private final SimpleContainer simple = new SimpleContainer(); + + @NestedConfigurationProperty + private final DirectContainer direct = new DirectContainer(); + + public ContainerType getType() { + return this.type; + } + + public void setType(ContainerType containerType) { + this.type = containerType; + } @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup") @Deprecated @@ -580,18 +596,17 @@ public class RabbitProperties { return getSimple().getRetry(); } - public AmqpContainer getSimple() { + public SimpleContainer getSimple() { return this.simple; } - } + public DirectContainer getDirect() { + return this.direct; + } - public static class AmqpContainer { + } - /** - * Container type. - */ - private ContainerType type = ContainerType.SIMPLE; + public static abstract class AmqpContainer { /** * Start the container automatically on startup. @@ -603,34 +618,12 @@ public class RabbitProperties { */ private AcknowledgeMode acknowledgeMode; - /** - * Minimum number of listener invoker threads - applies only to simple containers. - */ - private Integer concurrency; - - /** - * Maximum number of listener invoker threads - applies only to simple containers. - */ - private Integer maxConcurrency; - - /** - * Number of RabbitMQ consumers per queue - applies only to direct containers. - */ - private Integer consumersPerQueue; - /** * Number of messages to be handled in a single request. It should be greater than * or equal to the transaction size (if used). */ private Integer prefetch; - /** - * Number of messages to be processed in a transaction; number of messages between - * acks. For best results it should be less than or equal to the prefetch count - - * applies only to simple containers. - */ - private Integer transactionSize; - /** * Whether rejected deliveries are requeued by default; default true. */ @@ -647,14 +640,6 @@ public class RabbitProperties { @NestedConfigurationProperty private final ListenerRetry retry = new ListenerRetry(); - public ContainerType getType() { - return this.type; - } - - public void setType(ContainerType containerType) { - this.type = containerType; - } - public boolean isAutoStartup() { return this.autoStartup; } @@ -671,36 +656,72 @@ public class RabbitProperties { this.acknowledgeMode = acknowledgeMode; } - public Integer getConcurrency() { - return this.concurrency; + public Integer getPrefetch() { + return this.prefetch; } - public void setConcurrency(Integer concurrency) { - this.concurrency = concurrency; + public void setPrefetch(Integer prefetch) { + this.prefetch = prefetch; } - public Integer getMaxConcurrency() { - return this.maxConcurrency; + public Boolean getDefaultRequeueRejected() { + return this.defaultRequeueRejected; } - public void setMaxConcurrency(Integer maxConcurrency) { - this.maxConcurrency = maxConcurrency; + public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { + this.defaultRequeueRejected = defaultRequeueRejected; } - public Integer getConsumersPerQueue() { - return this.consumersPerQueue; + public Long getIdleEventInterval() { + return this.idleEventInterval; } - public void setConsumersPerQueue(Integer consumersPerQueue) { - this.consumersPerQueue = consumersPerQueue; + public void setIdleEventInterval(Long idleEventInterval) { + this.idleEventInterval = idleEventInterval; } - public Integer getPrefetch() { - return this.prefetch; + public ListenerRetry getRetry() { + return this.retry; } - public void setPrefetch(Integer prefetch) { - this.prefetch = prefetch; + } + + /** + * Configuration properties for {@code SimpleMessageListenerContainer}. + */ + public static class SimpleContainer extends AmqpContainer { + + /** + * Minimum number of listener invoker threads. + */ + private Integer concurrency; + + /** + * Maximum number of listener invoker threads. + */ + private Integer maxConcurrency; + + /** + * Number of messages to be processed in a transaction; number of messages + * between acks. For best results it should + * be less than or equal to the prefetch count. + */ + private Integer transactionSize; + + public Integer getConcurrency() { + return this.concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } + + public void setMaxConcurrency(Integer maxConcurrency) { + this.maxConcurrency = maxConcurrency; } public Integer getTransactionSize() { @@ -711,24 +732,24 @@ public class RabbitProperties { this.transactionSize = transactionSize; } - public Boolean getDefaultRequeueRejected() { - return this.defaultRequeueRejected; - } + } - public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { - this.defaultRequeueRejected = defaultRequeueRejected; - } + /** + * Configuration properties for {@code DirectMessageListenerContainer}. + */ + public static class DirectContainer extends AmqpContainer { - public Long getIdleEventInterval() { - return this.idleEventInterval; - } + /** + * Number of consumers per queue. + */ + private Integer consumersPerQueue; - public void setIdleEventInterval(Long idleEventInterval) { - this.idleEventInterval = idleEventInterval; + public Integer getConsumersPerQueue() { + return this.consumersPerQueue; } - public ListenerRetry getRetry() { - return this.retry; + public void setConsumersPerQueue(Integer consumersPerQueue) { + this.consumersPerQueue = consumersPerQueue; } } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index 8d2845c38b3..a31db8b0945 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -17,10 +17,10 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; /** - * Configure {@link RabbitListenerContainerFactory} with sensible defaults. + * Configure {@link SimpleRabbitListenerContainerFactoryConfigurer} with sensible defaults. * * @author Stephane Nicoll * @author Gary Russell @@ -30,16 +30,18 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer extends AbstractRabbitListenerContainerFactoryConfigurer { @Override - protected void configure(SimpleRabbitListenerContainerFactory factory, RabbitProperties rabbitProperties) { - RabbitProperties.Listener listenerConfig = rabbitProperties.getListener(); - if (listenerConfig.getConcurrency() != null) { - factory.setConcurrentConsumers(listenerConfig.getConcurrency()); + public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) { + RabbitProperties.SimpleContainer config = getRabbitProperties().getListener() + .getSimple(); + configure(factory, connectionFactory, config); + if (config.getConcurrency() != null) { + factory.setConcurrentConsumers(config.getConcurrency()); } - if (listenerConfig.getMaxConcurrency() != null) { - factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency()); + if (config.getMaxConcurrency() != null) { + factory.setMaxConcurrentConsumers(config.getMaxConcurrency()); } - if (listenerConfig.getTransactionSize() != null) { - factory.setTxSize(listenerConfig.getTransactionSize()); + if (config.getTransactionSize() != null) { + factory.setTxSize(config.getTransactionSize()); } } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index c1c2b23af03..7272aa175ec 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -348,17 +348,17 @@ public class RabbitAutoConfigurationTests { load(new Class[] { MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class }, "spring.rabbitmq.listener.type:direct", - "spring.rabbitmq.listener.retry.enabled:true", - "spring.rabbitmq.listener.retry.maxAttempts:4", - "spring.rabbitmq.listener.retry.initialInterval:2000", - "spring.rabbitmq.listener.retry.multiplier:1.5", - "spring.rabbitmq.listener.retry.maxInterval:5000", - "spring.rabbitmq.listener.autoStartup:false", - "spring.rabbitmq.listener.acknowledgeMode:manual", - "spring.rabbitmq.listener.consumers-per-queue:5", - "spring.rabbitmq.listener.prefetch:40", - "spring.rabbitmq.listener.defaultRequeueRejected:false", - "spring.rabbitmq.listener.idleEventInterval:5"); + "spring.rabbitmq.listener.direct.retry.enabled:true", + "spring.rabbitmq.listener.direct.retry.maxAttempts:4", + "spring.rabbitmq.listener.direct.retry.initialInterval:2000", + "spring.rabbitmq.listener.direct.retry.multiplier:1.5", + "spring.rabbitmq.listener.direct.retry.maxInterval:5000", + "spring.rabbitmq.listener.direct.autoStartup:false", + "spring.rabbitmq.listener.direct.acknowledgeMode:manual", + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40", + "spring.rabbitmq.listener.direct.defaultRequeueRejected:false", + "spring.rabbitmq.listener.direct.idleEventInterval:5"); DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context .getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class); @@ -367,6 +367,52 @@ public class RabbitAutoConfigurationTests { checkCommonProps(dfa); } + @Test + public void testRabbitListenerContainerFactoryConfigurersAreAvailable() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.simple.concurrency:5", + "spring.rabbitmq.listener.simple.maxConcurrency:10", + "spring.rabbitmq.listener.simple.prefetch:40", + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40"); + assertThat(this.context.getBeansOfType( + SimpleRabbitListenerContainerFactoryConfigurer.class)).hasSize(1); + assertThat(this.context.getBeansOfType( + DirectRabbitListenerContainerFactoryConfigurer.class)).hasSize(1); + } + + @Test + public void testSimpleRabbitListenerContainerFactoryConfigurerUsesConfig() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.type:direct", // listener type is irrelevant + "spring.rabbitmq.listener.simple.concurrency:5", + "spring.rabbitmq.listener.simple.maxConcurrency:10", + "spring.rabbitmq.listener.simple.prefetch:40"); + SimpleRabbitListenerContainerFactoryConfigurer configurer = this.context + .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); + SimpleRabbitListenerContainerFactory factory = + mock(SimpleRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + verify(factory).setConcurrentConsumers(5); + verify(factory).setMaxConcurrentConsumers(10); + verify(factory).setPrefetchCount(40); + } + + @Test + public void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.type:simple", // listener type is irrelevant + "spring.rabbitmq.listener.direct.consumers-per-queue:5", + "spring.rabbitmq.listener.direct.prefetch:40"); + DirectRabbitListenerContainerFactoryConfigurer configurer = this.context + .getBean(DirectRabbitListenerContainerFactoryConfigurer.class); + DirectRabbitListenerContainerFactory factory = + mock(DirectRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + verify(factory).setConsumersPerQueue(5); + verify(factory).setPrefetchCount(40); + } + private void checkCommonProps(DirectFieldAccessor dfa) { assertThat(dfa.getPropertyValue("autoStartup")).isEqualTo(Boolean.FALSE); assertThat(dfa.getPropertyValue("acknowledgeMode")) diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index c601a48ea11..8ceeb8818d2 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -975,13 +975,13 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite. spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean. spring.rabbitmq.host=localhost # RabbitMQ host. + spring.rabbitmq.listener.direct.consumers-per-queue= # Number of consumers per queue. spring.rabbitmq.listener.simple.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.simple.auto-startup=true # Start the container automatically on startup. - spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads for a `simple` container. - spring.rabbitmq.listener.consumers-per-queue= # The number of Consumers per queue for a `direct` container. - spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. + spring.rabbitmq.listener.simple.concurrency= # Minimum number of listener invoker threads. + spring.rabbitmq.listener.simple.default-requeue-rejected= # Whether or not to requeue delivery failures. spring.rabbitmq.listener.simple.idle-event-interval= # How often idle container events should be published in milliseconds. - spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker for a `simple` container. + spring.rabbitmq.listener.simple.max-concurrency= # Maximum number of listener invoker. spring.rabbitmq.listener.simple.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.simple.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.simple.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. @@ -989,8 +989,8 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.listener.simple.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.simple.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.simple.retry.stateless=true # Whether or not retry is stateless or stateful. - spring.rabbitmq.listener.simple.transaction-size= Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count - applies only to `simple` containers. - spring.rabbitmq.listener.type= # The listener container type `simple` or `direct`; default `simple`. + spring.rabbitmq.listener.simple.transaction-size= # Number of messages to be processed in a transaction; number of messages between acks. For best results it should be less than or equal to the prefetch count. + spring.rabbitmq.listener.type=simple # Listener container type. spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 6ea3bce6daa..0b2e81ad0ac 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -4653,9 +4653,10 @@ the broker connection is lost. Retries are disabled by default. ==== Receiving a message When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory` -has been defined, a default one is configured automatically. If a `MessageConverter` or -`MessageRecoverer` beans are defined, they are associated automatically to the default -factory. +has been defined, a default `SimpleRabbitListenerContainerFactory` is configured +automatically and you can switch to a direct container using the +`spring.rabbitmq.listener.type` property. If a `MessageConverter` or `MessageRecoverer` +beans are defined, they are associated automatically to the default factory. The following component creates a listener endpoint on the `someQueue` queue: @@ -4677,9 +4678,13 @@ for more details. If you need to create more `RabbitListenerContainerFactory` instances or if you want to override the default, Spring Boot provides a -`SimpleRabbitListenerContainerFactoryConfigurer` that you can use to initialize a -`SimpleRabbitListenerContainerFactory` with the same settings as the one that is -auto-configured. +`SimpleRabbitListenerContainerFactoryConfigurer` and +`DirectRabbitListenerContainerFactoryConfigurer` that you can use to initialize a +`SimpleRabbitListenerContainerFactory` and `DirectRabbitListenerContainerFactory` with the +same settings as the one used by the auto-configuration. + +TIP: It doesn't matter which container type you've chosen, those two beans are exposed by +the auto-configuration. For instance, the following exposes another factory that uses a specific `MessageConverter`: From 1e9c1a68e47c489d41dd5ddbd5a6bd41d9e3859d Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Fri, 5 May 2017 12:03:21 +0200 Subject: [PATCH 3/3] Remove deprecated spring.rabbitmq.listener.* properties See gh-9055, gh-9108 --- .../autoconfigure/amqp/RabbitProperties.java | 95 ------------------- ...itional-spring-configuration-metadata.json | 66 ------------- .../amqp/RabbitAutoConfigurationTests.java | 27 +----- 3 files changed, 2 insertions(+), 186 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 35c2a3785b3..d2d0eb30db0 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -22,7 +22,6 @@ import java.util.List; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -502,100 +501,6 @@ public class RabbitProperties { this.type = containerType; } - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.auto-startup") - @Deprecated - public boolean isAutoStartup() { - return getSimple().isAutoStartup(); - } - - @Deprecated - public void setAutoStartup(boolean autoStartup) { - getSimple().setAutoStartup(autoStartup); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.acknowledge-mode") - @Deprecated - public AcknowledgeMode getAcknowledgeMode() { - return getSimple().getAcknowledgeMode(); - } - - @Deprecated - public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { - getSimple().setAcknowledgeMode(acknowledgeMode); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.concurrency") - @Deprecated - public Integer getConcurrency() { - return getSimple().getConcurrency(); - } - - @Deprecated - public void setConcurrency(Integer concurrency) { - getSimple().setConcurrency(concurrency); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.max-concurrency") - @Deprecated - public Integer getMaxConcurrency() { - return getSimple().getMaxConcurrency(); - } - - @Deprecated - public void setMaxConcurrency(Integer maxConcurrency) { - getSimple().setMaxConcurrency(maxConcurrency); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.prefetch") - @Deprecated - public Integer getPrefetch() { - return getSimple().getPrefetch(); - } - - @Deprecated - public void setPrefetch(Integer prefetch) { - getSimple().setPrefetch(prefetch); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.transaction-size") - @Deprecated - public Integer getTransactionSize() { - return getSimple().getTransactionSize(); - } - - @Deprecated - public void setTransactionSize(Integer transactionSize) { - getSimple().setTransactionSize(transactionSize); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.default-requeue-rejected") - @Deprecated - public Boolean getDefaultRequeueRejected() { - return getSimple().getDefaultRequeueRejected(); - } - - @Deprecated - public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { - getSimple().setDefaultRequeueRejected(defaultRequeueRejected); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.idle-event-interval") - @Deprecated - public Long getIdleEventInterval() { - return getSimple().getIdleEventInterval(); - } - - @Deprecated - public void setIdleEventInterval(Long idleEventInterval) { - getSimple().setIdleEventInterval(idleEventInterval); - } - - @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.retry") - @Deprecated - public ListenerRetry getRetry() { - return getSimple().getRetry(); - } - public SimpleContainer getSimple() { return this.simple; } diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 447876ff987..9bb77b83705 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -363,72 +363,6 @@ "description": "Create an AmqpAdmin bean.", "defaultValue": true }, - { - "name": "spring.rabbitmq.listener.retry.enabled", - "type": "java.lang.Boolean", - "description": "Whether or not publishing retries are enabled.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": false, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.enabled" - } - }, - { - "name": "spring.rabbitmq.listener.retry.initial-interval", - "type": "java.lang.Long", - "description": "Interval between the first and second attempt to publish or deliver a message.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 1000, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.initial-interval" - } - }, - { - "name": "spring.rabbitmq.listener.retry.max-attempts", - "type": "java.lang.Integer", - "description": "Maximum number of attempts to publish or deliver a message.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 3, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.max-attempts" - } - }, - { - "name": "spring.rabbitmq.listener.retry.max-interval", - "type": "java.lang.Long", - "description": "Maximum interval between attempts.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 10000, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.max-interval" - } - }, - { - "name": "spring.rabbitmq.listener.retry.multiplier", - "type": "java.lang.Double", - "description": "A multiplier to apply to the previous retry interval.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": 1, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.multiplier" - } - }, - { - "name": "spring.rabbitmq.listener.retry.stateless", - "type": "java.lang.Boolean", - "description": "Whether or not retries are stateless or stateful.", - "sourceType": "org.springframework.boot.autoconfigure.amqp.RabbitProperties$ListenerRetry", - "defaultValue": true, - "deprecated": true, - "deprecation": { - "replacement": "spring.rabbitmq.listener.simple.retry.stateless" - } - }, { "name": "spring.session.hazelcast.flush-mode", "defaultValue": "on-save" diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 7272aa175ec..98058a3cebd 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -293,28 +293,10 @@ public class RabbitAutoConfigurationTests { assertThat(adviceChain).isNull(); } - @Test - @Deprecated - public void testSimpleRabbitListenerContainerFactoryWithCustomDeprecatedSettings() { - testSimpleRabbitListenerContainerFactoryWithCustomSettings( - "spring.rabbitmq.listener.retry.enabled:true", - "spring.rabbitmq.listener.retry.maxAttempts:4", - "spring.rabbitmq.listener.retry.initialInterval:2000", - "spring.rabbitmq.listener.retry.multiplier:1.5", - "spring.rabbitmq.listener.retry.maxInterval:5000", - "spring.rabbitmq.listener.autoStartup:false", - "spring.rabbitmq.listener.acknowledgeMode:manual", - "spring.rabbitmq.listener.concurrency:5", - "spring.rabbitmq.listener.maxConcurrency:10", - "spring.rabbitmq.listener.prefetch:40", - "spring.rabbitmq.listener.defaultRequeueRejected:false", - "spring.rabbitmq.listener.idleEventInterval:5", - "spring.rabbitmq.listener.transactionSize:20"); - } - @Test public void testSimpleRabbitListenerContainerFactoryWithCustomSettings() { - testSimpleRabbitListenerContainerFactoryWithCustomSettings( + load(new Class[] { MessageConvertersConfiguration.class, + MessageRecoverersConfiguration.class }, "spring.rabbitmq.listener.simple.retry.enabled:true", "spring.rabbitmq.listener.simple.retry.maxAttempts:4", "spring.rabbitmq.listener.simple.retry.initialInterval:2000", @@ -328,11 +310,6 @@ public class RabbitAutoConfigurationTests { "spring.rabbitmq.listener.simple.defaultRequeueRejected:false", "spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.transactionSize:20"); - } - - private void testSimpleRabbitListenerContainerFactoryWithCustomSettings(String... environment) { - load(new Class[] { MessageConvertersConfiguration.class, - MessageRecoverersConfiguration.class }, environment); SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);