diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 2bff6719ddd..20f609ef431 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Listener; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -41,6 +42,22 @@ class RabbitAnnotationDrivenConfiguration { ConnectionFactory connectionFactory, RabbitProperties config) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); + Listener listenerConfig = config.getListener(); + if (listenerConfig.getAckMode() != null) { + factory.setAcknowledgeMode(listenerConfig.getAckMode()); + } + if (listenerConfig.getConcurrency() != null) { + factory.setConcurrentConsumers(listenerConfig.getConcurrency()); + } + if (listenerConfig.getMaxConcurrency() != null) { + factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency()); + } + if (listenerConfig.getPrefetch() != null) { + factory.setPrefetchCount(listenerConfig.getPrefetch()); + } + if (listenerConfig.getTxSize() != null) { + factory.setTxSize(listenerConfig.getTxSize()); + } return factory; } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 8b29f1c0319..67579dc5d3d 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -20,6 +20,7 @@ import java.util.LinkedHashSet; import java.util.Properties; import java.util.Set; +import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.util.StringUtils; @@ -74,6 +75,11 @@ public class RabbitProperties { */ private Integer requestedHeartbeat; + /** + * Listener container configuration. + */ + private final Listener listener = new Listener(); + public String getHost() { if (this.addresses == null) { return this.host; @@ -180,6 +186,10 @@ public class RabbitProperties { this.requestedHeartbeat = requestedHeartbeat; } + public Listener getListener() { + return listener; + } + public static class Ssl { /** @@ -271,4 +281,73 @@ public class RabbitProperties { } } + + public static class Listener { + + /** + * Acknowledge mode of container. + */ + private AcknowledgeMode ackMode; + + /** + * Minimum number of consumers. + */ + private Integer concurrency; + + /** + * Maximum number of consumers. + */ + private Integer maxConcurrency; + + /** + * Message prefetch count. + */ + private Integer prefetch; + + /** + * Number of messages in a transaction. + */ + private Integer txSize; + + public AcknowledgeMode getAckMode() { + return ackMode; + } + + public void setAckMode(AcknowledgeMode ackMode) { + this.ackMode = ackMode; + } + + public Integer getConcurrency() { + return concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + + public Integer getMaxConcurrency() { + return maxConcurrency; + } + + public void setMaxConcurrency(Integer maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public Integer getPrefetch() { + return prefetch; + } + + public void setPrefetch(Integer prefetch) { + this.prefetch = prefetch; + } + + public Integer getTxSize() { + return txSize; + } + + public void setTxSize(Integer txSize) { + this.txSize = txSize; + } + } + } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 532107b7152..81c41bfc50d 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -23,6 +23,8 @@ import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; + +import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; @@ -228,6 +230,28 @@ public class RabbitAutoConfigurationTests { "spring.rabbitmq.ssl.trustStorePassword=secret"); } + @Test + public void testRabbitListenerContainerFactoryOverrides() { + load(TestConfiguration.class, "spring.rabbitmq.listener.prefetch:20", + "spring.rabbitmq.listener.ackMode:MANUAL", + "spring.rabbitmq.listener.concurrency:10", + "spring.rabbitmq.listener.maxConcurrency:10", + "spring.rabbitmq.listener.txSize:20"); + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + .getBean("rabbitListenerContainerFactory", + SimpleRabbitListenerContainerFactory.class); + assertEquals(new Integer(20), (Integer) + new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("prefetchCount")); + assertEquals(AcknowledgeMode.MANUAL, (AcknowledgeMode) + new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("acknowledgeMode")); + assertEquals(new Integer(10), (Integer) + new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("concurrentConsumers")); + assertEquals(new Integer(10), (Integer) + new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("maxConcurrentConsumers")); + assertEquals(new Integer(20), (Integer) + new DirectFieldAccessor(rabbitListenerContainerFactory).getPropertyValue("txSize")); + } + private com.rabbitmq.client.ConnectionFactory getTargetConnectionFactory() { CachingConnectionFactory connectionFactory = this.context .getBean(CachingConnectionFactory.class);