From 3952046132fb47ebfb31a26331d9f4e5a9edb344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 17 Sep 2021 17:41:04 -0500 Subject: [PATCH 1/2] Add support for RabbitStreamTemplate See gh-28060 --- .../autoconfigure/amqp/RabbitProperties.java | 14 +++ .../amqp/RabbitStreamConfiguration.java | 30 +++++++ .../amqp/RabbitStreamTemplateConfigurer.java | 74 ++++++++++++++++ .../amqp/RabbitStreamConfigurationTests.java | 86 +++++++++++++++++++ 4 files changed, 204 insertions(+) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index f959863a1bd..b5fd1584bbd 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -42,6 +42,7 @@ import org.springframework.util.StringUtils; * @author Gary Russell * @author Artsiom Yudovin * @author Franjo Zilic + * @author Eddú Meléndez * @since 1.0.0 */ @ConfigurationProperties(prefix = "spring.rabbitmq") @@ -1194,6 +1195,11 @@ public class RabbitProperties { */ private String password; + /** + * Name of the stream. + */ + private String name; + public String getHost() { return this.host; } @@ -1226,6 +1232,14 @@ public class RabbitProperties { this.password = password; } + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 1f402da6a41..15e92de5593 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -23,6 +23,7 @@ import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import org.springframework.amqp.rabbit.config.ContainerCustomizer; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -33,11 +34,16 @@ import org.springframework.context.annotation.Configuration; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamOperations; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; /** * Configuration for Spring RabbitMQ Stream plugin support. * * @author Gary Russell + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamRabbitListenerContainerFactory.class) @@ -78,4 +84,28 @@ class RabbitStreamConfiguration { return (value) -> (value != null) ? value : fallback.get(); } + @Bean + @ConditionalOnMissingBean + RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties, + ObjectProvider messageConverter, + ObjectProvider streamMessageConverter, + ObjectProvider producerCustomizer) { + RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer(); + configurer.setMessageConverter(messageConverter.getIfUnique()); + configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique()); + configurer.setProducerCustomizer(producerCustomizer.getIfUnique()); + return configurer; + } + + @Bean + @ConditionalOnMissingBean(RabbitStreamOperations.class) + @ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name") + RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties, + RabbitStreamTemplateConfigurer configurer) { + RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment, + properties.getStream().getName()); + configurer.configure(template); + return template; + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java new file mode 100644 index 00000000000..7456c8d8de6 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java @@ -0,0 +1,74 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; + +/** + * Configure {@link RabbitStreamTemplate} with sensible defaults. + * + * @author Eddú Meléndez + * @since 2.7.0 + */ +public class RabbitStreamTemplateConfigurer { + + private MessageConverter messageConverter; + + private StreamMessageConverter streamMessageConverter; + + private ProducerCustomizer producerCustomizer; + + /** + * Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box + * converter should be used. + * @param messageConverter the {@link MessageConverter} + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) { + this.streamMessageConverter = streamMessageConverter; + } + + public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { + this.producerCustomizer = producerCustomizer; + } + + /** + * Configure the specified {@link RabbitStreamTemplate}. The template can be further + * tuned and default settings can be overridden. + * @param template the {@link RabbitStreamTemplate} instance to configure + */ + public void configure(RabbitStreamTemplate template) { + PropertyMapper map = PropertyMapper.get(); + if (this.messageConverter != null) { + template.setMessageConverter(this.messageConverter); + } + if (this.streamMessageConverter != null) { + template.setStreamConverter(this.streamMessageConverter); + } + if (this.producerCustomizer != null) { + template.setProducerCustomizer(this.producerCustomizer); + } + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index 9c285dab13a..138bd8defb6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -26,6 +26,7 @@ import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; @@ -33,6 +34,10 @@ import org.springframework.context.annotation.Configuration; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.rabbit.stream.producer.ProducerCustomizer; +import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -44,6 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; * * @author Gary Russell * @author Andy Wilkinson + * @author Eddú Meléndez */ class RabbitStreamConfigurationTests { @@ -149,6 +155,66 @@ class RabbitStreamConfigurationTests { verify(builder).password("confidential"); } + @Test + void testDefaultRabbitStreamTemplateConfiguration() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() { + this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream") + .run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class)); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { + this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, + "messageConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { + this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils + .getField(streamTemplate, "streamConverter"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + + @Test + void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { + this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") + .run((context) -> { + RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); + ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils + .getField(streamTemplate, "producerCustomizer"); + String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); + assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); + assertThat(streamName).isEqualTo("stream-test"); + }); + } + @Configuration(proxyBeanMethods = false) static class TestConfiguration { @@ -196,4 +262,24 @@ class RabbitStreamConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class StreamMessageConverterConfiguration { + + @Bean + StreamMessageConverter myStreamMessageConverter() { + return mock(StreamMessageConverter.class); + } + + } + + @Configuration(proxyBeanMethods = false) + static class ProducerCustomizerConfiguration { + + @Bean + ProducerCustomizer myProducerCustomizer() { + return mock(ProducerCustomizer.class); + } + + } + } From 6b6da22f2cb578fd7926f522d54377988caf24b2 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Mon, 3 Jan 2022 13:50:26 +0100 Subject: [PATCH 2/2] Polish "Add support for RabbitStreamTemplate" See gh-28060 --- .../autoconfigure/amqp/RabbitProperties.java | 2 +- .../amqp/RabbitStreamConfiguration.java | 32 +++++----- .../amqp/RabbitStreamTemplateConfigurer.java | 13 +++- .../amqp/RabbitStreamConfigurationTests.java | 61 ++++++++----------- .../src/docs/asciidoc/messaging/amqp.adoc | 18 ++++++ 5 files changed, 71 insertions(+), 55 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index b5fd1584bbd..b17c1639886 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 15e92de5593..acbddb51797 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,21 +69,6 @@ class RabbitStreamConfiguration { return configure(Environment.builder(), properties).build(); } - static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) { - builder.lazyInitialization(true); - RabbitProperties.Stream stream = properties.getStream(); - PropertyMapper mapper = PropertyMapper.get(); - mapper.from(stream.getHost()).to(builder::host); - mapper.from(stream.getPort()).to(builder::port); - mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username); - mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password); - return builder; - } - - private static Function withFallback(Supplier fallback) { - return (value) -> (value != null) ? value : fallback.get(); - } - @Bean @ConditionalOnMissingBean RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties, @@ -108,4 +93,19 @@ class RabbitStreamConfiguration { return template; } + static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) { + builder.lazyInitialization(true); + RabbitProperties.Stream stream = properties.getStream(); + PropertyMapper mapper = PropertyMapper.get(); + mapper.from(stream.getHost()).to(builder::host); + mapper.from(stream.getPort()).to(builder::port); + mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username); + mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password); + return builder; + } + + private static Function withFallback(Supplier fallback) { + return (value) -> (value != null) ? value : fallback.get(); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java index 7456c8d8de6..d323784ac1b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamTemplateConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.boot.autoconfigure.amqp; import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.rabbit.stream.producer.ProducerCustomizer; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; @@ -45,10 +44,19 @@ public class RabbitStreamTemplateConfigurer { this.messageConverter = messageConverter; } + /** + * Set the {@link StreamMessageConverter} to use or {@code null} if the out-of-the-box + * stream message converter should be used. + * @param streamMessageConverter the {@link StreamMessageConverter} + */ public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) { this.streamMessageConverter = streamMessageConverter; } + /** + * Set the {@link ProducerCustomizer} instances to use. + * @param producerCustomizer the producer customizer + */ public void setProducerCustomizer(ProducerCustomizer producerCustomizer) { this.producerCustomizer = producerCustomizer; } @@ -59,7 +67,6 @@ public class RabbitStreamTemplateConfigurer { * @param template the {@link RabbitStreamTemplate} instance to configure */ public void configure(RabbitStreamTemplate template) { - PropertyMapper map = PropertyMapper.get(); if (this.messageConverter != null) { template.setMessageConverter(this.messageConverter); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index 138bd8defb6..bd3b7ba736e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,13 +31,13 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; import org.springframework.rabbit.stream.producer.ProducerCustomizer; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; -import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -160,10 +160,9 @@ class RabbitStreamConfigurationTests { this.contextRunner .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); assertThat(context).hasSingleBean(RabbitStreamTemplate.class); - assertThat(streamName).isEqualTo("stream-test"); + assertThat(context.getBean(RabbitStreamTemplate.class)).hasFieldOrPropertyWithValue("streamName", + "stream-test"); }); } @@ -175,43 +174,39 @@ class RabbitStreamConfigurationTests { @Test void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() { - this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class) + this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") .run((context) -> { + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate, - "messageConverter"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class)); - assertThat(streamName).isEqualTo("stream-test"); + assertThat(streamTemplate).hasFieldOrPropertyWithValue("streamName", "stream-test"); + assertThat(streamTemplate).extracting("messageConverter") + .isSameAs(context.getBean(MessageConverter.class)); }); } @Test void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() { - this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class) + this.contextRunner + .withBean("myStreamMessageConverter", StreamMessageConverter.class, + () -> mock(StreamMessageConverter.class)) .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils - .getField(streamTemplate, "streamConverter"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class)); - assertThat(streamName).isEqualTo("stream-test"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("messageConverter") + .isSameAs(context.getBean("myStreamMessageConverter")); }); } @Test void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() { - this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class) + this.contextRunner + .withBean("myProducerCustomizer", ProducerCustomizer.class, () -> mock(ProducerCustomizer.class)) .withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test") .run((context) -> { - RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class); - ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils - .getField(streamTemplate, "producerCustomizer"); - String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName"); - assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class)); - assertThat(streamName).isEqualTo("stream-test"); + assertThat(context).hasSingleBean(RabbitStreamTemplate.class); + assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("producerCustomizer") + .isSameAs(context.getBean("myProducerCustomizer")); }); } @@ -263,21 +258,17 @@ class RabbitStreamConfigurationTests { } @Configuration(proxyBeanMethods = false) - static class StreamMessageConverterConfiguration { + static class MessageConvertersConfiguration { @Bean - StreamMessageConverter myStreamMessageConverter() { - return mock(StreamMessageConverter.class); + @Primary + MessageConverter myMessageConverter() { + return mock(MessageConverter.class); } - } - - @Configuration(proxyBeanMethods = false) - static class ProducerCustomizerConfiguration { - @Bean - ProducerCustomizer myProducerCustomizer() { - return mock(ProducerCustomizer.class); + MessageConverter anotherMessageConverter() { + return mock(MessageConverter.class); } } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc index da0a4d5efd3..8033e54c057 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/amqp.adoc @@ -78,6 +78,24 @@ If you need to create more `RabbitTemplate` instances or if you want to override +[[messaging.amqp.sending-stream]] +=== Sending a Message To A Stream +To send a message to a particular stream, specify the name of the stream, as shown in the following example: + +[source,yaml,indent=0,subs="verbatim",configprops,configblocks] +---- + spring: + rabbitmq: + stream: + name: "my-stream" +---- + +If a `MessageConverter`, `StreamMessageConverter`, or `ProducerCustomizer` bean is defined, it is associated automatically to the auto-configured `RabbitStreamTemplate`. + +If you need to create more `RabbitStreamTemplate` instances or if you want to override the default, Spring Boot provides a `RabbitStreamTemplateConfigurer` bean that you can use to initialize a `RabbitStreamTemplate` with the same settings as the factories used by the auto-configuration. + + + [[messaging.amqp.receiving]] === Receiving a Message When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint.