diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index f959863a1bd..b17c1639886 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import org.springframework.util.StringUtils; * @author Gary Russell * @author Artsiom Yudovin * @author Franjo Zilic + * @author Eddú Meléndez * @since 1.0.0 */ @ConfigurationProperties(prefix = "spring.rabbitmq") @@ -1194,6 +1195,11 @@ public class RabbitProperties { */ private String password; + /** + * Name of the stream. + */ + private String name; + public String getHost() { return this.host; } @@ -1226,6 +1232,14 @@ public class RabbitProperties { this.password = password; } + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 1f402da6a41..acbddb51797 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import org.springframework.amqp.rabbit.config.ContainerCustomizer; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -33,11 +34,16 @@ import org.springframework.context.annotation.Configuration; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * Configuration for Spring RabbitMQ Stream plugin support. * * @author Gary Russell + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamRabbitListenerContainerFactory.class) @@ -63,6 +69,30 @@ class RabbitStreamConfiguration { return configure(Environment.builder(), properties).build(); } + @Bean + @ConditionalOnMissingBean + RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties, + ObjectProvider messageConverter, + ObjectProvider streamMessageConverter, + ObjectProvider producerCustomizer) { + RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer(); + configurer.setMessageConverter(messageConverter.getIfUnique()); + configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique()); + configurer.setProducerCustomizer(producerCustomizer.getIfUnique()); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(RabbitStreamOperations.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") + RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties, + RabbitStreamTemplateConfigurer configurer) { + RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, + properties.getStream().getName()); + configurer.configure(template); + return template; + } + static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) { builder.lazyInitialization(true); RabbitProperties.Stream stream = properties.getStream(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java new file mode 100644 index 00000000000..d323784ac1b --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java @@ -0,0 +1,81 @@ +/* + * Copyright 2012-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +/** + * Configure {@link RabbitStreamTemplate} with sensible defaults. + * + * @author Eddú Meléndez + * @since 2.7.0 + */ +public class RabbitStreamTemplateConfigurer { + + private MessageConverter messageConverter; + + private StreamMessageConverter streamMessageConverter; + + private ProducerCustomizer producerCustomizer; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Set the {@link StreamMessageConverter} to use or {@code null} if the out-of-the-box + * stream message converter should be used. + * @param streamMessageConverter the {@link StreamMessageConverter} + */ + public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) { + this.streamMessageConverter = streamMessageConverter; + } + + /** + * Set the {@link ProducerCustomizer} instances to use. + * @param producerCustomizer the producer customizer + */ + public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { + this.producerCustomizer = producerCustomizer; + } + + /** + * Configure the specified {@link RabbitStreamTemplate}. The template can be further + * tuned and default settings can be overridden. + * @param template the {@link RabbitStreamTemplate} instance to configure + */ + public void configure(RabbitStreamTemplate template) { + if (this.messageConverter != null) { + template.setMessageConverter(this.messageConverter); + } + if (this.streamMessageConverter != null) { + template.setStreamConverter(this.streamMessageConverter); + } + if (this.producerCustomizer != null) { + template.setProducerCustomizer(this.producerCustomizer); + } + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index 9c285dab13a..bd3b7ba736e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,13 +26,18 @@ import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -44,6 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * * @author Gary Russell * @author Andy Wilkinson + * @author Eddú Meléndez */ class RabbitStreamConfigurationTests { @@ -149,6 +155,61 @@ class RabbitStreamConfigurationTests { verify(builder).password("confidential"); } + @Test + void testDefaultRabbitStreamTemplateConfiguration() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(context.getBean(RabbitStreamTemplate.class)).hasFieldOrPropertyWithValue("streamName", + "stream-test"); + }); + } + + @Test + void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { + this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") + .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { + this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + assertThat(streamTemplate).hasFieldOrPropertyWithValue("streamName", "stream-test"); + assertThat(streamTemplate).extracting("messageConverter") + .isSameAs(context.getBean(MessageConverter.class)); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { + this.contextRunner + .withBean("myStreamMessageConverter", StreamMessageConverter.class, + () -> mock(StreamMessageConverter.class)) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("messageConverter") + .isSameAs(context.getBean("myStreamMessageConverter")); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { + this.contextRunner + .withBean("myProducerCustomizer", ProducerCustomizer.class, () -> mock(ProducerCustomizer.class)) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("producerCustomizer") + .isSameAs(context.getBean("myProducerCustomizer")); + }); + } + @Configuration(proxyBeanMethods = false) static class TestConfiguration { @@ -196,4 +257,20 @@ class RabbitStreamConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class MessageConvertersConfiguration { + + @Bean + @Primary + MessageConverter myMessageConverter() { + return mock(MessageConverter.class); + } + + @Bean + MessageConverter anotherMessageConverter() { + return mock(MessageConverter.class); + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc index da0a4d5efd3..8033e54c057 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc @@ -78,6 +78,24 @@ If you need to create more `RabbitTemplate` instances or if you want to override +[[messaging.amqp.sending-stream]] +=== Sending a Message To A Stream +To send a message to a particular stream, specify the name of the stream, as shown in the following example: + +[source,yaml,indent=0,subs="verbatim",configprops,configblocks] +---- + spring: + rabbitmq: + stream: + name: "my-stream" +---- + +If a `MessageConverter`, `StreamMessageConverter`, or `ProducerCustomizer` bean is defined, it is associated automatically to the auto-configured `RabbitStreamTemplate`. + +If you need to create more `RabbitStreamTemplate` instances or if you want to override the default, Spring Boot provides a `RabbitStreamTemplateConfigurer` bean that you can use to initialize a `RabbitStreamTemplate` with the same settings as the factories used by the auto-configuration. + + + [[messaging.amqp.receiving]] === Receiving a Message When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint.