From cec40bb8d400febb2e840651a17afcd1da3909bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 6 Mar 2026 10:55:16 -0600 Subject: [PATCH] Polish spring amqp properties MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Eddú Meléndez --- .../pages/features/dev-services.adoc | 2 +- ...nectionDetailsFactoryIntegrationTests.java | 13 +- ...nectionDetailsFactoryIntegrationTests.java | 5 +- ...etails.java => AmqpConnectionDetails.java} | 39 +- .../amqp/autoconfigure/AmqpProperties.java | 383 +++++ .../PropertiesAmqpConnectionDetails.java | 58 + .../PropertiesRabbitConnectionDetails.java | 88 -- .../RabbitAmqpAutoConfiguration.java | 21 +- .../amqp/autoconfigure/RabbitProperties.java | 1364 ----------------- ...DockerComposeConnectionDetailsFactory.java | 33 +- ...bbitContainerConnectionDetailsFactory.java | 85 - ...bbitContainerConnectionDetailsFactory.java | 31 +- 12 files changed, 496 insertions(+), 1626 deletions(-) rename module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/{RabbitConnectionDetails.java => AmqpConnectionDetails.java} (63%) create mode 100644 module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpProperties.java create mode 100644 module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesAmqpConnectionDetails.java delete mode 100644 module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesRabbitConnectionDetails.java delete mode 100644 module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java delete mode 100644 module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/DeprecatedRabbitContainerConnectionDetailsFactory.java diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc index a46be64c3d9..9f6ffa82039 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc @@ -125,7 +125,7 @@ The following service connections are currently supported: | javadoc:org.springframework.boot.r2dbc.autoconfigure.R2dbcConnectionDetails[] | Containers named "clickhouse/clickhouse-server", "gvenzl/oracle-free", "gvenzl/oracle-xe", "mariadb", "mssql/server", "mysql", or "postgres" -| javadoc:org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails[] +| javadoc:org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails[] | Containers named "rabbitmq" | javadoc:org.springframework.boot.rabbitmq.autoconfigure.RabbitConnectionDetails[] diff --git a/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java b/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java index fbf86b3889d..82d77777381 100644 --- a/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java +++ b/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java @@ -16,8 +16,8 @@ package org.springframework.boot.amqp.docker.compose; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails.Address; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails.Address; import org.springframework.boot.docker.compose.service.connection.test.DockerComposeTest; import org.springframework.boot.testsupport.container.TestImage; @@ -30,20 +30,21 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Andy Wilkinson * @author Phillip Webb * @author Scott Frederick + * @author Eddú Meléndez */ class RabbitDockerComposeConnectionDetailsFactoryIntegrationTests { @DockerComposeTest(composeFile = "rabbit-compose.yaml", image = TestImage.RABBITMQ) - void runCreatesConnectionDetails(RabbitConnectionDetails connectionDetails) { + void runCreatesConnectionDetails(AmqpConnectionDetails connectionDetails) { assertConnectionDetails(connectionDetails); } - private void assertConnectionDetails(RabbitConnectionDetails connectionDetails) { + private void assertConnectionDetails(AmqpConnectionDetails connectionDetails) { assertThat(connectionDetails.getUsername()).isEqualTo("myuser"); assertThat(connectionDetails.getPassword()).isEqualTo("secret"); assertThat(connectionDetails.getVirtualHost()).isEqualTo("/"); - assertThat(connectionDetails.getAddresses()).hasSize(1); - Address address = connectionDetails.getFirstAddress(); + assertThat(connectionDetails.getAddress()).isNotNull(); + Address address = connectionDetails.getAddress(); assertThat(address.host()).isNotNull(); assertThat(address.port()).isGreaterThan(0); } diff --git a/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactoryIntegrationTests.java b/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactoryIntegrationTests.java index ab4cf8da98c..d5449482eb8 100644 --- a/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactoryIntegrationTests.java +++ b/module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactoryIntegrationTests.java @@ -30,8 +30,8 @@ import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbitmq.client.RabbitAmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; import org.springframework.boot.amqp.autoconfigure.RabbitAmqpAutoConfiguration; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.boot.testsupport.container.TestImage; @@ -47,6 +47,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Moritz Halbritter * @author Andy Wilkinson * @author Phillip Webb + * @author Eddú Meléndez */ @SpringJUnitConfig @Testcontainers(disabledWithoutDocker = true) @@ -57,7 +58,7 @@ class RabbitContainerConnectionDetailsFactoryIntegrationTests { static final RabbitMQContainer rabbit = TestImage.container(RabbitMQContainer.class); @Autowired(required = false) - private RabbitConnectionDetails connectionDetails; + private AmqpConnectionDetails connectionDetails; @Autowired private RabbitAmqpTemplate rabbitAmqpTemplate; diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitConnectionDetails.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpConnectionDetails.java similarity index 63% rename from module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitConnectionDetails.java rename to module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpConnectionDetails.java index d6cb1be0512..8c940573f93 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitConnectionDetails.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpConnectionDetails.java @@ -16,23 +16,17 @@ package org.springframework.boot.amqp.autoconfigure; -import java.util.List; - import org.jspecify.annotations.Nullable; import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails; -import org.springframework.boot.ssl.SslBundle; -import org.springframework.util.Assert; /** - * Details required to establish a connection to a RabbitMQ service. + * Details required to establish a connection to a RabbitMQ AMQP service. * - * @author Moritz Halbritter - * @author Andy Wilkinson - * @author Phillip Webb - * @since 4.0.0 + * @author Eddú Meléndez + * @since 4.1.0 */ -public interface RabbitConnectionDetails extends ConnectionDetails { +public interface AmqpConnectionDetails extends ConnectionDetails { /** * Login user to authenticate to the broker. @@ -59,30 +53,11 @@ public interface RabbitConnectionDetails extends ConnectionDetails { } /** - * List of addresses to which the client should connect. Must return at least one - * address. - * @return the list of addresses to which the client should connect - */ - List
getAddresses(); - - /** - * Returns the first address. - * @return the first address + * Returns the address. + * @return the address * @throws IllegalStateException if the address list is empty */ - default Address getFirstAddress() { - List
addresses = getAddresses(); - Assert.state(!addresses.isEmpty(), "Address list is empty"); - return addresses.get(0); - } - - /** - * SSL bundle to use. - * @return the SSL bundle to use - */ - default @Nullable SslBundle getSslBundle() { - return null; - } + Address getAddress(); /** * A RabbitMQ address. diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpProperties.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpProperties.java new file mode 100644 index 00000000000..9cd57b1db47 --- /dev/null +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/AmqpProperties.java @@ -0,0 +1,383 @@ +/* + * Copyright 2012-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.amqp.autoconfigure; + +import org.jspecify.annotations.Nullable; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; +import org.springframework.util.StringUtils; + +/** + * Configuration properties for Rabbit AMQP. + * + * @author Eddú Meléndez + * @since 4.1.0 + */ +@ConfigurationProperties("spring.amqp") +public class AmqpProperties { + + private static final int DEFAULT_PORT = 5672; + + /** + * RabbitMQ host. Ignored if an address is set. + */ + private String host = "localhost"; + + /** + * RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is + * enabled. + */ + private @Nullable Integer port; + + /** + * Login user to authenticate to the broker. + */ + private String username = "guest"; + + /** + * Login to authenticate against the broker. + */ + private String password = "guest"; + + /** + * Virtual host to use when connecting to the broker. + */ + private @Nullable String virtualHost; + + /** + * The address to which the client should connect. When set, the host and port are + * ignored. + */ + private @Nullable String address; + + /** + * Listener container configuration. + */ + private final Listener listener = new Listener(); + + private final Template template = new Template(); + + private @Nullable Address parsedAddress; + + public String getHost() { + return this.host; + } + + public void setHost(String host) { + this.host = host; + } + + public @Nullable Integer getPort() { + return this.port; + } + + /** + * Returns the port from the address, or the configured port if no address have been + * set. + * @return the port + * @see #setAddress(String) + * @see #getPort() + */ + public int determinePort() { + if (this.parsedAddress == null) { + Integer port = getPort(); + if (port != null) { + return port; + } + return DEFAULT_PORT; + } + return this.parsedAddress.port; + } + + public void setPort(@Nullable Integer port) { + this.port = port; + } + + public @Nullable String getAddress() { + return this.address; + } + + /** + * Returns the configured address ({@code host:port}) created from the configured host + * and port. + * @return the address + */ + public String determineAddress() { + if (this.parsedAddress == null) { + if (this.host.contains(",")) { + throw new InvalidConfigurationPropertyValueException("spring.amqp.host", this.host, + "Invalid character ','. Value must be a single host. For multiple hosts, use property 'spring.amqp.address' instead."); + } + return this.host + ":" + determinePort(); + } + return this.parsedAddress.host + ":" + this.parsedAddress.port; + } + + public void setAddress(String address) { + this.address = address; + this.parsedAddress = parseAddress(address); + } + + private Address parseAddress(String address) { + return new Address(address); + } + + public String getUsername() { + return this.username; + } + + /** + * If address has been set and has a username it is returned. Otherwise returns the + * result of calling {@code getUsername()}. + * @return the username + * @see #setAddress(String) + * @see #getUsername() + */ + public String determineUsername() { + if (this.parsedAddress == null) { + return this.username; + } + Address address = this.parsedAddress; + return (address.username != null) ? address.username : this.username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return this.password; + } + + /** + * If address has been set and has a password it is returned. Otherwise returns the + * result of calling {@code getPassword()}. + * @return the password or {@code null} + * @see #setAddress(String) + * @see #getPassword() + */ + public @Nullable String determinePassword() { + if (this.parsedAddress == null) { + return getPassword(); + } + Address address = this.parsedAddress; + return (address.password != null) ? address.password : getPassword(); + } + + public void setPassword(String password) { + this.password = password; + } + + public @Nullable String getVirtualHost() { + return this.virtualHost; + } + + /** + * If address has been set and has a virtual host it is returned. Otherwise returns + * the result of calling {@code getVirtualHost()}. + * @return the virtual host or {@code null} + * @see #setAddress(String) + * @see #getVirtualHost() + */ + public @Nullable String determineVirtualHost() { + if (this.parsedAddress == null) { + return getVirtualHost(); + } + Address address = this.parsedAddress; + return (address.virtualHost != null) ? address.virtualHost : getVirtualHost(); + } + + public void setVirtualHost(@Nullable String virtualHost) { + this.virtualHost = StringUtils.hasText(virtualHost) ? virtualHost : "/"; + } + + public Listener getListener() { + return this.listener; + } + + public Template getTemplate() { + return this.template; + } + + public static class Listener { + + private final AmqpContainer amqp = new AmqpContainer(); + + public AmqpContainer getAmqp() { + return this.amqp; + } + + } + + /** + * Configuration properties for {@code RabbitAmqpListenerContainer}. + */ + public static class AmqpContainer { + + /** + * Whether to enable observation. + */ + private boolean observationEnabled; + + /** + * Batch size, expressed as the number of physical messages, to be used by the + * container. + */ + private @Nullable Integer batchSize; + + public boolean isObservationEnabled() { + return this.observationEnabled; + } + + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + + public @Nullable Integer getBatchSize() { + return this.batchSize; + } + + public void setBatchSize(@Nullable Integer batchSize) { + this.batchSize = batchSize; + } + + } + + public static class Template { + + /** + * Name of the default exchange to use for send operations. + */ + private String exchange = ""; + + /** + * Value of a default routing key to use for send operations. + */ + private String routingKey = ""; + + /** + * Name of the default queue to receive messages from when none is specified + * explicitly. + */ + private @Nullable String defaultReceiveQueue; + + public String getExchange() { + return this.exchange; + } + + public void setExchange(String exchange) { + this.exchange = exchange; + } + + public String getRoutingKey() { + return this.routingKey; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + public @Nullable String getDefaultReceiveQueue() { + return this.defaultReceiveQueue; + } + + public void setDefaultReceiveQueue(@Nullable String defaultReceiveQueue) { + this.defaultReceiveQueue = defaultReceiveQueue; + } + + } + + private static final class Address { + + private static final String PREFIX_AMQP = "amqp://"; + + private static final String PREFIX_AMQP_SECURE = "amqps://"; + + private String host; + + private int port; + + private @Nullable String username; + + private @Nullable String password; + + private @Nullable String virtualHost; + + private Address(String input) { + input = input.trim(); + input = trimPrefix(input); + input = parseUsernameAndPassword(input); + input = parseVirtualHost(input); + parseHostAndPort(input); + } + + private String trimPrefix(String input) { + if (input.startsWith(PREFIX_AMQP_SECURE)) { + return input.substring(PREFIX_AMQP_SECURE.length()); + } + if (input.startsWith(PREFIX_AMQP)) { + return input.substring(PREFIX_AMQP.length()); + } + return input; + } + + private String parseUsernameAndPassword(String input) { + String[] splitInput = StringUtils.split(input, "@"); + if (splitInput == null) { + return input; + } + String credentials = splitInput[0]; + String[] splitCredentials = StringUtils.split(credentials, ":"); + if (splitCredentials == null) { + this.username = credentials; + } + else { + this.username = splitCredentials[0]; + this.password = splitCredentials[1]; + } + return splitInput[1]; + } + + private String parseVirtualHost(String input) { + int hostIndex = input.indexOf('/'); + if (hostIndex >= 0) { + this.virtualHost = input.substring(hostIndex + 1); + if (this.virtualHost.isEmpty()) { + this.virtualHost = "/"; + } + input = input.substring(0, hostIndex); + } + return input; + } + + private void parseHostAndPort(String input) { + int bracketIndex = input.lastIndexOf(']'); + int colonIndex = input.lastIndexOf(':'); + if (colonIndex == -1 || colonIndex < bracketIndex) { + this.host = input; + this.port = DEFAULT_PORT; + } + else { + this.host = input.substring(0, colonIndex); + this.port = Integer.parseInt(input.substring(colonIndex + 1)); + } + } + + } + +} diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesAmqpConnectionDetails.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesAmqpConnectionDetails.java new file mode 100644 index 00000000000..fbc932c8b19 --- /dev/null +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesAmqpConnectionDetails.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.amqp.autoconfigure; + +import org.jspecify.annotations.Nullable; + +/** + * Adapts {@link AmqpProperties} to {@link AmqpConnectionDetails}. + * + * @author Eddú Meléndez + */ +class PropertiesAmqpConnectionDetails implements AmqpConnectionDetails { + + private final AmqpProperties properties; + + PropertiesAmqpConnectionDetails(AmqpProperties properties) { + this.properties = properties; + } + + @Override + public String getUsername() { + return this.properties.determineUsername(); + } + + @Override + public @Nullable String getPassword() { + return this.properties.determinePassword(); + } + + @Override + public @Nullable String getVirtualHost() { + return this.properties.determineVirtualHost(); + } + + @Override + public Address getAddress() { + String address = this.properties.determineAddress(); + int portSeparatorIndex = address.lastIndexOf(':'); + String host = address.substring(0, portSeparatorIndex); + String port = address.substring(portSeparatorIndex + 1); + return new Address(host, Integer.parseInt(port)); + } + +} diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesRabbitConnectionDetails.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesRabbitConnectionDetails.java deleted file mode 100644 index e15052bf330..00000000000 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/PropertiesRabbitConnectionDetails.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.amqp.autoconfigure; - -import java.util.ArrayList; -import java.util.List; - -import org.jspecify.annotations.Nullable; - -import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Ssl; -import org.springframework.boot.ssl.SslBundle; -import org.springframework.boot.ssl.SslBundles; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; - -/** - * Adapts {@link RabbitProperties} to {@link RabbitConnectionDetails}. - * - * @author Moritz Halbritter - * @author Andy Wilkinson - * @author Phillip Webb - */ -class PropertiesRabbitConnectionDetails implements RabbitConnectionDetails { - - private final RabbitProperties properties; - - private final @Nullable SslBundles sslBundles; - - PropertiesRabbitConnectionDetails(RabbitProperties properties, @Nullable SslBundles sslBundles) { - this.properties = properties; - this.sslBundles = sslBundles; - } - - @Override - public String getUsername() { - return this.properties.determineUsername(); - } - - @Override - public @Nullable String getPassword() { - return this.properties.determinePassword(); - } - - @Override - public @Nullable String getVirtualHost() { - return this.properties.determineVirtualHost(); - } - - @Override - public List
getAddresses() { - List
addresses = new ArrayList<>(); - for (String address : this.properties.determineAddresses()) { - int portSeparatorIndex = address.lastIndexOf(':'); - String host = address.substring(0, portSeparatorIndex); - String port = address.substring(portSeparatorIndex + 1); - addresses.add(new Address(host, Integer.parseInt(port))); - } - return addresses; - } - - @Override - public @Nullable SslBundle getSslBundle() { - Ssl ssl = this.properties.getSsl(); - if (!ssl.determineEnabled()) { - return null; - } - if (StringUtils.hasLength(ssl.getBundle())) { - Assert.notNull(this.sslBundles, "SSL bundle name has been set but no SSL bundles found in context"); - return this.sslBundles.getBundle(ssl.getBundle()); - } - return null; - } - -} diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitAmqpAutoConfiguration.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitAmqpAutoConfiguration.java index 39f4979c2be..87d8f01fb40 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitAmqpAutoConfiguration.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitAmqpAutoConfiguration.java @@ -31,14 +31,13 @@ import org.springframework.amqp.rabbitmq.client.config.RabbitAmqpListenerContain import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails.Address; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails.Address; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; -import org.springframework.boot.ssl.SslBundles; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; @@ -50,20 +49,20 @@ import org.springframework.context.annotation.Import; */ @AutoConfiguration @ConditionalOnClass({ RabbitAmqpTemplate.class, Connection.class }) -@EnableConfigurationProperties(RabbitProperties.class) +@EnableConfigurationProperties(AmqpProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public final class RabbitAmqpAutoConfiguration { - private final RabbitProperties properties; + private final AmqpProperties properties; - RabbitAmqpAutoConfiguration(RabbitProperties properties) { + RabbitAmqpAutoConfiguration(AmqpProperties properties) { this.properties = properties; } @Bean @ConditionalOnMissingBean - RabbitConnectionDetails rabbitConnectionDetails(ObjectProvider sslBundles) { - return new PropertiesRabbitConnectionDetails(this.properties, sslBundles.getIfAvailable()); + AmqpConnectionDetails rabbitConnectionDetails() { + return new PropertiesAmqpConnectionDetails(this.properties); } @Bean(name = "rabbitListenerContainerFactory") @@ -73,19 +72,19 @@ public final class RabbitAmqpAutoConfiguration { RabbitAmqpListenerContainerFactory factory = new RabbitAmqpListenerContainerFactory(connectionFactory); amqpContainerCustomizer.ifUnique(factory::setContainerCustomizer); - RabbitProperties.AmqpContainer configuration = this.properties.getListener().getSimple(); + AmqpProperties.AmqpContainer configuration = this.properties.getListener().getAmqp(); factory.setObservationEnabled(configuration.isObservationEnabled()); return factory; } @Bean @ConditionalOnMissingBean - Environment rabbitAmqpEnvironment(RabbitConnectionDetails connectionDetails, + Environment rabbitAmqpEnvironment(AmqpConnectionDetails connectionDetails, ObjectProvider customizers, ObjectProvider credentialsProvider) { PropertyMapper map = PropertyMapper.get(); EnvironmentConnectionSettings environmentConnectionSettings = new AmqpEnvironmentBuilder().connectionSettings(); - Address address = connectionDetails.getFirstAddress(); + Address address = connectionDetails.getAddress(); map.from(address::host).to(environmentConnectionSettings::host); map.from(address::port).to(environmentConnectionSettings::port); map.from(connectionDetails::getUsername).to(environmentConnectionSettings::username); @@ -113,7 +112,7 @@ public final class RabbitAmqpAutoConfiguration { if (messageConverter.getIfAvailable() != null) { rabbitAmqpTemplate.setMessageConverter(messageConverter.getIfAvailable()); } - RabbitProperties.Template templateProperties = this.properties.getTemplate(); + AmqpProperties.Template templateProperties = this.properties.getTemplate(); PropertyMapper map = PropertyMapper.get(); map.from(templateProperties::getDefaultReceiveQueue).to(rabbitAmqpTemplate::setReceiveQueue); diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java deleted file mode 100644 index 419b6da1c6a..00000000000 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java +++ /dev/null @@ -1,1364 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.amqp.autoconfigure; - -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.List; - -import org.jspecify.annotations.Nullable; - -import org.springframework.amqp.core.AcknowledgeMode; -import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.PropertyMapper; -import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; -import org.springframework.boot.convert.DurationUnit; -import org.springframework.boot.retry.RetryPolicySettings; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; -import org.springframework.util.unit.DataSize; - -/** - * Configuration properties for Rabbit. - * - * @author Greg Turnquist - * @author Dave Syer - * @author Stephane Nicoll - * @author Andy Wilkinson - * @author Josh Thornhill - * @author Gary Russell - * @author Artsiom Yudovin - * @author Franjo Zilic - * @author Eddú Meléndez - * @author Rafael Carvalho - * @author Scott Frederick - * @author Lasse Wulff - * @author Yanming Zhou - * @since 4.0.0 - */ -@ConfigurationProperties("spring.rabbitmq") -public class RabbitProperties { - - private static final int DEFAULT_PORT = 5672; - - private static final int DEFAULT_PORT_SECURE = 5671; - - private static final int DEFAULT_STREAM_PORT = 5552; - - /** - * RabbitMQ host. Ignored if an address is set. - */ - private String host = "localhost"; - - /** - * RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is - * enabled. - */ - private @Nullable Integer port; - - /** - * Login user to authenticate to the broker. - */ - private String username = "guest"; - - /** - * Login to authenticate against the broker. - */ - private String password = "guest"; - - /** - * SSL configuration. - */ - private final Ssl ssl = new Ssl(); - - /** - * Virtual host to use when connecting to the broker. - */ - private @Nullable String virtualHost; - - /** - * List of addresses to which the client should connect. When set, the host and port - * are ignored. - */ - private @Nullable List addresses; - - /** - * Mode used to shuffle configured addresses. - */ - private AddressShuffleMode addressShuffleMode = AddressShuffleMode.NONE; - - /** - * Requested heartbeat timeout; zero for none. If a duration suffix is not specified, - * seconds will be used. - */ - @DurationUnit(ChronoUnit.SECONDS) - private @Nullable Duration requestedHeartbeat; - - /** - * Number of channels per connection requested by the client. Use 0 for unlimited. - */ - private int requestedChannelMax = 2047; - - /** - * Whether to enable publisher returns. - */ - private boolean publisherReturns; - - /** - * Type of publisher confirms to use. - */ - private @Nullable ConfirmType publisherConfirmType; - - /** - * Connection timeout. Set it to zero to wait forever. - */ - private @Nullable Duration connectionTimeout; - - /** - * Continuation timeout for RPC calls in channels. Set it to zero to wait forever. - */ - private Duration channelRpcTimeout = Duration.ofMinutes(10); - - /** - * Maximum size of the body of inbound (received) messages. - */ - private DataSize maxInboundMessageBodySize = DataSize.ofMegabytes(64); - - /** - * Cache configuration. - */ - private final Cache cache = new Cache(); - - /** - * Listener container configuration. - */ - private final Listener listener = new Listener(); - - private final Template template = new Template(); - - private final Stream stream = new Stream(); - - private @Nullable List
parsedAddresses; - - public String getHost() { - return this.host; - } - - /** - * Returns the host from the first address, or the configured host if no addresses - * have been set. - * @return the host - * @see #setAddresses(List) - * @see #getHost() - */ - public String determineHost() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - return getHost(); - } - return this.parsedAddresses.get(0).host; - } - - public void setHost(String host) { - this.host = host; - } - - public @Nullable Integer getPort() { - return this.port; - } - - /** - * Returns the port from the first address, or the configured port if no addresses - * have been set. - * @return the port - * @see #setAddresses(List) - * @see #getPort() - */ - public int determinePort() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - Integer port = getPort(); - if (port != null) { - return port; - } - return Boolean.TRUE.equals(getSsl().getEnabled()) ? DEFAULT_PORT_SECURE : DEFAULT_PORT; - } - return this.parsedAddresses.get(0).port; - } - - public void setPort(@Nullable Integer port) { - this.port = port; - } - - public @Nullable List getAddresses() { - return this.addresses; - } - - /** - * Returns the configured addresses or a single address ({@code host:port}) created - * from the configured host and port if no addresses have been set. - * @return the addresses - */ - public List determineAddresses() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - if (this.host.contains(",")) { - throw new InvalidConfigurationPropertyValueException("spring.rabbitmq.host", this.host, - "Invalid character ','. Value must be a single host. For multiple hosts, use property 'spring.rabbitmq.addresses' instead."); - } - return List.of(this.host + ":" + determinePort()); - } - List addressStrings = new ArrayList<>(); - for (Address parsedAddress : this.parsedAddresses) { - addressStrings.add(parsedAddress.host + ":" + parsedAddress.port); - } - return addressStrings; - } - - public void setAddresses(List addresses) { - this.addresses = addresses; - this.parsedAddresses = parseAddresses(addresses); - } - - private List
parseAddresses(List addresses) { - List
parsedAddresses = new ArrayList<>(); - for (String address : addresses) { - parsedAddresses.add(new Address(address, Boolean.TRUE.equals(getSsl().getEnabled()))); - } - return parsedAddresses; - } - - public String getUsername() { - return this.username; - } - - /** - * If addresses have been set and the first address has a username it is returned. - * Otherwise returns the result of calling {@code getUsername()}. - * @return the username - * @see #setAddresses(List) - * @see #getUsername() - */ - public String determineUsername() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - return this.username; - } - Address address = this.parsedAddresses.get(0); - return (address.username != null) ? address.username : this.username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return this.password; - } - - /** - * If addresses have been set and the first address has a password it is returned. - * Otherwise returns the result of calling {@code getPassword()}. - * @return the password or {@code null} - * @see #setAddresses(List) - * @see #getPassword() - */ - public @Nullable String determinePassword() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - return getPassword(); - } - Address address = this.parsedAddresses.get(0); - return (address.password != null) ? address.password : getPassword(); - } - - public void setPassword(String password) { - this.password = password; - } - - public Ssl getSsl() { - return this.ssl; - } - - public @Nullable String getVirtualHost() { - return this.virtualHost; - } - - /** - * If addresses have been set and the first address has a virtual host it is returned. - * Otherwise returns the result of calling {@code getVirtualHost()}. - * @return the virtual host or {@code null} - * @see #setAddresses(List) - * @see #getVirtualHost() - */ - public @Nullable String determineVirtualHost() { - if (CollectionUtils.isEmpty(this.parsedAddresses)) { - return getVirtualHost(); - } - Address address = this.parsedAddresses.get(0); - return (address.virtualHost != null) ? address.virtualHost : getVirtualHost(); - } - - public void setVirtualHost(@Nullable String virtualHost) { - this.virtualHost = StringUtils.hasText(virtualHost) ? virtualHost : "/"; - } - - public AddressShuffleMode getAddressShuffleMode() { - return this.addressShuffleMode; - } - - public void setAddressShuffleMode(AddressShuffleMode addressShuffleMode) { - this.addressShuffleMode = addressShuffleMode; - } - - public @Nullable Duration getRequestedHeartbeat() { - return this.requestedHeartbeat; - } - - public void setRequestedHeartbeat(@Nullable Duration requestedHeartbeat) { - this.requestedHeartbeat = requestedHeartbeat; - } - - public int getRequestedChannelMax() { - return this.requestedChannelMax; - } - - public void setRequestedChannelMax(int requestedChannelMax) { - this.requestedChannelMax = requestedChannelMax; - } - - public boolean isPublisherReturns() { - return this.publisherReturns; - } - - public void setPublisherReturns(boolean publisherReturns) { - this.publisherReturns = publisherReturns; - } - - public @Nullable Duration getConnectionTimeout() { - return this.connectionTimeout; - } - - public void setPublisherConfirmType(@Nullable ConfirmType publisherConfirmType) { - this.publisherConfirmType = publisherConfirmType; - } - - public @Nullable ConfirmType getPublisherConfirmType() { - return this.publisherConfirmType; - } - - public void setConnectionTimeout(@Nullable Duration connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - - public Duration getChannelRpcTimeout() { - return this.channelRpcTimeout; - } - - public void setChannelRpcTimeout(Duration channelRpcTimeout) { - this.channelRpcTimeout = channelRpcTimeout; - } - - public DataSize getMaxInboundMessageBodySize() { - return this.maxInboundMessageBodySize; - } - - public void setMaxInboundMessageBodySize(DataSize maxInboundMessageBodySize) { - this.maxInboundMessageBodySize = maxInboundMessageBodySize; - } - - public Cache getCache() { - return this.cache; - } - - public Listener getListener() { - return this.listener; - } - - public Template getTemplate() { - return this.template; - } - - public Stream getStream() { - return this.stream; - } - - public class Ssl { - - private static final String SUN_X509 = "SunX509"; - - /** - * Whether to enable SSL support. Determined automatically if an address is - * provided with the protocol (amqp:// vs. amqps://). - */ - private @Nullable Boolean enabled; - - /** - * SSL bundle name. - */ - private @Nullable String bundle; - - /** - * Path to the key store that holds the SSL certificate. - */ - private @Nullable String keyStore; - - /** - * Key store type. - */ - private String keyStoreType = "PKCS12"; - - /** - * Password used to access the key store. - */ - private @Nullable String keyStorePassword; - - /** - * Key store algorithm. - */ - private String keyStoreAlgorithm = SUN_X509; - - /** - * Trust store that holds SSL certificates. - */ - private @Nullable String trustStore; - - /** - * Trust store type. - */ - private String trustStoreType = "JKS"; - - /** - * Password used to access the trust store. - */ - private @Nullable String trustStorePassword; - - /** - * Trust store algorithm. - */ - private String trustStoreAlgorithm = SUN_X509; - - /** - * SSL algorithm to use. By default, configured by the Rabbit client library. - */ - private @Nullable String algorithm; - - /** - * Whether to enable server side certificate validation. - */ - private boolean validateServerCertificate = true; - - /** - * Whether to enable hostname verification. - */ - private boolean verifyHostname = true; - - public @Nullable Boolean getEnabled() { - return this.enabled; - } - - /** - * Returns whether SSL is enabled from the first address, or the configured ssl - * enabled flag if no addresses have been set. - * @return whether ssl is enabled - * @see #setAddresses(List) - * @see #getEnabled() () - */ - public boolean determineEnabled() { - boolean defaultEnabled = Boolean.TRUE.equals(getEnabled()) || this.bundle != null; - if (CollectionUtils.isEmpty(RabbitProperties.this.parsedAddresses)) { - return defaultEnabled; - } - Address address = RabbitProperties.this.parsedAddresses.get(0); - return address.determineSslEnabled(defaultEnabled); - } - - public void setEnabled(@Nullable Boolean enabled) { - this.enabled = enabled; - } - - public @Nullable String getBundle() { - return this.bundle; - } - - public void setBundle(@Nullable String bundle) { - this.bundle = bundle; - } - - public @Nullable String getKeyStore() { - return this.keyStore; - } - - public void setKeyStore(@Nullable String keyStore) { - this.keyStore = keyStore; - } - - public String getKeyStoreType() { - return this.keyStoreType; - } - - public void setKeyStoreType(String keyStoreType) { - this.keyStoreType = keyStoreType; - } - - public @Nullable String getKeyStorePassword() { - return this.keyStorePassword; - } - - public void setKeyStorePassword(@Nullable String keyStorePassword) { - this.keyStorePassword = keyStorePassword; - } - - public String getKeyStoreAlgorithm() { - return this.keyStoreAlgorithm; - } - - public void setKeyStoreAlgorithm(String keyStoreAlgorithm) { - this.keyStoreAlgorithm = keyStoreAlgorithm; - } - - public @Nullable String getTrustStore() { - return this.trustStore; - } - - public void setTrustStore(@Nullable String trustStore) { - this.trustStore = trustStore; - } - - public String getTrustStoreType() { - return this.trustStoreType; - } - - public void setTrustStoreType(String trustStoreType) { - this.trustStoreType = trustStoreType; - } - - public @Nullable String getTrustStorePassword() { - return this.trustStorePassword; - } - - public void setTrustStorePassword(@Nullable String trustStorePassword) { - this.trustStorePassword = trustStorePassword; - } - - public String getTrustStoreAlgorithm() { - return this.trustStoreAlgorithm; - } - - public void setTrustStoreAlgorithm(String trustStoreAlgorithm) { - this.trustStoreAlgorithm = trustStoreAlgorithm; - } - - public @Nullable String getAlgorithm() { - return this.algorithm; - } - - public void setAlgorithm(@Nullable String sslAlgorithm) { - this.algorithm = sslAlgorithm; - } - - public boolean isValidateServerCertificate() { - return this.validateServerCertificate; - } - - public void setValidateServerCertificate(boolean validateServerCertificate) { - this.validateServerCertificate = validateServerCertificate; - } - - public boolean isVerifyHostname() { - return this.verifyHostname; - } - - public void setVerifyHostname(boolean verifyHostname) { - this.verifyHostname = verifyHostname; - } - - } - - public static class Cache { - - private final Channel channel = new Channel(); - - private final Connection connection = new Connection(); - - public Channel getChannel() { - return this.channel; - } - - public Connection getConnection() { - return this.connection; - } - - public static class Channel { - - /** - * Number of channels to retain in the cache. When "check-timeout" > 0, max - * channels per connection. - */ - private @Nullable Integer size; - - /** - * Duration to wait to obtain a channel if the cache size has been reached. If - * 0, always create a new channel. - */ - private @Nullable Duration checkoutTimeout; - - public @Nullable Integer getSize() { - return this.size; - } - - public void setSize(@Nullable Integer size) { - this.size = size; - } - - public @Nullable Duration getCheckoutTimeout() { - return this.checkoutTimeout; - } - - public void setCheckoutTimeout(@Nullable Duration checkoutTimeout) { - this.checkoutTimeout = checkoutTimeout; - } - - } - - public static class Connection { - - /** - * Connection factory cache mode. - */ - private CacheMode mode = CacheMode.CHANNEL; - - /** - * Number of connections to cache. Only applies when mode is CONNECTION. - */ - private @Nullable Integer size; - - public CacheMode getMode() { - return this.mode; - } - - public void setMode(CacheMode mode) { - this.mode = mode; - } - - public @Nullable Integer getSize() { - return this.size; - } - - public void setSize(@Nullable Integer size) { - this.size = size; - } - - } - - } - - public enum ContainerType { - - /** - * Container where the RabbitMQ consumer dispatches messages to an invoker thread. - */ - SIMPLE, - - /** - * Container where the listener is invoked directly on the RabbitMQ consumer - * thread. - */ - DIRECT, - - /** - * Container that uses the RabbitMQ Stream Client. - */ - STREAM - - } - - public static class Listener { - - /** - * Listener container type. - */ - private ContainerType type = ContainerType.SIMPLE; - - private final SimpleContainer simple = new SimpleContainer(); - - private final DirectContainer direct = new DirectContainer(); - - private final StreamContainer stream = new StreamContainer(); - - public ContainerType getType() { - return this.type; - } - - public void setType(ContainerType containerType) { - this.type = containerType; - } - - public SimpleContainer getSimple() { - return this.simple; - } - - public DirectContainer getDirect() { - return this.direct; - } - - public StreamContainer getStream() { - return this.stream; - } - - } - - public abstract static class BaseContainer { - - /** - * Whether to enable observation. - */ - private boolean observationEnabled; - - public boolean isObservationEnabled() { - return this.observationEnabled; - } - - public void setObservationEnabled(boolean observationEnabled) { - this.observationEnabled = observationEnabled; - } - - } - - public abstract static class AmqpContainer extends BaseContainer { - - /** - * Whether to start the container automatically on startup. - */ - private boolean autoStartup = true; - - /** - * Acknowledge mode of container. - */ - private @Nullable AcknowledgeMode acknowledgeMode; - - /** - * Maximum number of unacknowledged messages that can be outstanding at each - * consumer. - */ - private @Nullable Integer prefetch; - - /** - * Whether rejected deliveries are re-queued by default. - */ - private @Nullable Boolean defaultRequeueRejected; - - /** - * How often idle container events should be published. - */ - private @Nullable Duration idleEventInterval; - - /** - * Whether the container should present batched messages as discrete messages or - * call the listener with the batch. - */ - private boolean deBatchingEnabled = true; - - /** - * Whether the container (when stopped) should stop immediately after processing - * the current message or stop after processing all pre-fetched messages. - */ - private boolean forceStop; - - /** - * Optional properties for a retry interceptor. - */ - private final ListenerRetry retry = new ListenerRetry(); - - public boolean isAutoStartup() { - return this.autoStartup; - } - - public void setAutoStartup(boolean autoStartup) { - this.autoStartup = autoStartup; - } - - public @Nullable AcknowledgeMode getAcknowledgeMode() { - return this.acknowledgeMode; - } - - public void setAcknowledgeMode(@Nullable AcknowledgeMode acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public @Nullable Integer getPrefetch() { - return this.prefetch; - } - - public void setPrefetch(@Nullable Integer prefetch) { - this.prefetch = prefetch; - } - - public @Nullable Boolean getDefaultRequeueRejected() { - return this.defaultRequeueRejected; - } - - public void setDefaultRequeueRejected(@Nullable Boolean defaultRequeueRejected) { - this.defaultRequeueRejected = defaultRequeueRejected; - } - - public @Nullable Duration getIdleEventInterval() { - return this.idleEventInterval; - } - - public void setIdleEventInterval(@Nullable Duration idleEventInterval) { - this.idleEventInterval = idleEventInterval; - } - - public abstract boolean isMissingQueuesFatal(); - - public boolean isDeBatchingEnabled() { - return this.deBatchingEnabled; - } - - public void setDeBatchingEnabled(boolean deBatchingEnabled) { - this.deBatchingEnabled = deBatchingEnabled; - } - - public boolean isForceStop() { - return this.forceStop; - } - - public void setForceStop(boolean forceStop) { - this.forceStop = forceStop; - } - - public ListenerRetry getRetry() { - return this.retry; - } - - } - - /** - * Configuration properties for {@code SimpleMessageListenerContainer}. - */ - public static class SimpleContainer extends AmqpContainer { - - /** - * Minimum number of listener invoker threads. - */ - private @Nullable Integer concurrency; - - /** - * Maximum number of listener invoker threads. - */ - private @Nullable Integer maxConcurrency; - - /** - * Batch size, expressed as the number of physical messages, to be used by the - * container. - */ - private @Nullable Integer batchSize; - - /** - * Whether to fail if the queues declared by the container are not available on - * the broker and/or whether to stop the container if one or more queues are - * deleted at runtime. - */ - private boolean missingQueuesFatal = true; - - /** - * Whether the container creates a batch of messages based on the - * 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to - * include the contents of a producer created batch in the batch as discrete - * records. - */ - private boolean consumerBatchEnabled; - - public @Nullable Integer getConcurrency() { - return this.concurrency; - } - - public void setConcurrency(@Nullable Integer concurrency) { - this.concurrency = concurrency; - } - - public @Nullable Integer getMaxConcurrency() { - return this.maxConcurrency; - } - - public void setMaxConcurrency(@Nullable Integer maxConcurrency) { - this.maxConcurrency = maxConcurrency; - } - - public @Nullable Integer getBatchSize() { - return this.batchSize; - } - - public void setBatchSize(@Nullable Integer batchSize) { - this.batchSize = batchSize; - } - - @Override - public boolean isMissingQueuesFatal() { - return this.missingQueuesFatal; - } - - public void setMissingQueuesFatal(boolean missingQueuesFatal) { - this.missingQueuesFatal = missingQueuesFatal; - } - - public boolean isConsumerBatchEnabled() { - return this.consumerBatchEnabled; - } - - public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { - this.consumerBatchEnabled = consumerBatchEnabled; - } - - } - - /** - * Configuration properties for {@code DirectMessageListenerContainer}. - */ - public static class DirectContainer extends AmqpContainer { - - /** - * Number of consumers per queue. - */ - private @Nullable Integer consumersPerQueue; - - /** - * Whether to fail if the queues declared by the container are not available on - * the broker. - */ - private boolean missingQueuesFatal; - - public @Nullable Integer getConsumersPerQueue() { - return this.consumersPerQueue; - } - - public void setConsumersPerQueue(@Nullable Integer consumersPerQueue) { - this.consumersPerQueue = consumersPerQueue; - } - - @Override - public boolean isMissingQueuesFatal() { - return this.missingQueuesFatal; - } - - public void setMissingQueuesFatal(boolean missingQueuesFatal) { - this.missingQueuesFatal = missingQueuesFatal; - } - - } - - public static class StreamContainer extends BaseContainer { - - /** - * Whether the container will support listeners that consume native stream - * messages instead of Spring AMQP messages. - */ - private boolean nativeListener; - - public boolean isNativeListener() { - return this.nativeListener; - } - - public void setNativeListener(boolean nativeListener) { - this.nativeListener = nativeListener; - } - - } - - public static class Template { - - private final Retry retry = new Retry(); - - /** - * Whether to enable mandatory messages. - */ - private @Nullable Boolean mandatory; - - /** - * Timeout for receive() operations. - */ - private @Nullable Duration receiveTimeout; - - /** - * Timeout for sendAndReceive() operations. - */ - private @Nullable Duration replyTimeout; - - /** - * Name of the default exchange to use for send operations. - */ - private String exchange = ""; - - /** - * Value of a default routing key to use for send operations. - */ - private String routingKey = ""; - - /** - * Name of the default queue to receive messages from when none is specified - * explicitly. - */ - private @Nullable String defaultReceiveQueue; - - /** - * Whether to enable observation. - */ - private boolean observationEnabled; - - /** - * Simple patterns for allowable packages/classes for deserialization. - */ - private @Nullable List allowedListPatterns; - - public Retry getRetry() { - return this.retry; - } - - public @Nullable Boolean getMandatory() { - return this.mandatory; - } - - public void setMandatory(@Nullable Boolean mandatory) { - this.mandatory = mandatory; - } - - public @Nullable Duration getReceiveTimeout() { - return this.receiveTimeout; - } - - public void setReceiveTimeout(@Nullable Duration receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public @Nullable Duration getReplyTimeout() { - return this.replyTimeout; - } - - public void setReplyTimeout(@Nullable Duration replyTimeout) { - this.replyTimeout = replyTimeout; - } - - public String getExchange() { - return this.exchange; - } - - public void setExchange(String exchange) { - this.exchange = exchange; - } - - public String getRoutingKey() { - return this.routingKey; - } - - public void setRoutingKey(String routingKey) { - this.routingKey = routingKey; - } - - public @Nullable String getDefaultReceiveQueue() { - return this.defaultReceiveQueue; - } - - public void setDefaultReceiveQueue(@Nullable String defaultReceiveQueue) { - this.defaultReceiveQueue = defaultReceiveQueue; - } - - public boolean isObservationEnabled() { - return this.observationEnabled; - } - - public void setObservationEnabled(boolean observationEnabled) { - this.observationEnabled = observationEnabled; - } - - public @Nullable List getAllowedListPatterns() { - return this.allowedListPatterns; - } - - public void setAllowedListPatterns(@Nullable List allowedListPatterns) { - this.allowedListPatterns = allowedListPatterns; - } - - } - - public static class Retry { - - /** - * Whether publishing retries are enabled. - */ - private boolean enabled; - - /** - * Maximum number of retry attempts to deliver a message. - */ - private long maxRetries = 3; - - /** - * Duration between the first and second attempt to deliver a message. - */ - private Duration initialInterval = Duration.ofMillis(1000); - - /** - * Multiplier to apply to the previous retry interval. - */ - private double multiplier = 1.0; - - /** - * Maximum duration between attempts. - */ - private Duration maxInterval = Duration.ofMillis(10000); - - public boolean isEnabled() { - return this.enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public long getMaxRetries() { - return this.maxRetries; - } - - public void setMaxRetries(long maxRetries) { - this.maxRetries = maxRetries; - } - - public Duration getInitialInterval() { - return this.initialInterval; - } - - public void setInitialInterval(Duration initialInterval) { - this.initialInterval = initialInterval; - } - - public double getMultiplier() { - return this.multiplier; - } - - public void setMultiplier(double multiplier) { - this.multiplier = multiplier; - } - - public Duration getMaxInterval() { - return this.maxInterval; - } - - public void setMaxInterval(Duration maxInterval) { - this.maxInterval = maxInterval; - } - - RetryPolicySettings initializeRetryPolicySettings() { - PropertyMapper map = PropertyMapper.get(); - RetryPolicySettings settings = new RetryPolicySettings(); - map.from(this::getMaxRetries).to(settings::setMaxRetries); - map.from(this::getInitialInterval).to(settings::setDelay); - map.from(this::getMultiplier).to(settings::setMultiplier); - map.from(this::getMaxInterval).to(settings::setMaxDelay); - return settings; - } - - } - - public static class ListenerRetry extends Retry { - - /** - * Whether retries are stateless or stateful. - */ - private boolean stateless = true; - - public boolean isStateless() { - return this.stateless; - } - - public void setStateless(boolean stateless) { - this.stateless = stateless; - } - - } - - private static final class Address { - - private static final String PREFIX_AMQP = "amqp://"; - - private static final String PREFIX_AMQP_SECURE = "amqps://"; - - private String host; - - private int port; - - private @Nullable String username; - - private @Nullable String password; - - private @Nullable String virtualHost; - - private @Nullable Boolean secureConnection; - - private Address(String input, boolean sslEnabled) { - input = input.trim(); - input = trimPrefix(input); - input = parseUsernameAndPassword(input); - input = parseVirtualHost(input); - parseHostAndPort(input, sslEnabled); - } - - private String trimPrefix(String input) { - if (input.startsWith(PREFIX_AMQP_SECURE)) { - this.secureConnection = true; - return input.substring(PREFIX_AMQP_SECURE.length()); - } - if (input.startsWith(PREFIX_AMQP)) { - this.secureConnection = false; - return input.substring(PREFIX_AMQP.length()); - } - return input; - } - - private String parseUsernameAndPassword(String input) { - String[] splitInput = StringUtils.split(input, "@"); - if (splitInput == null) { - return input; - } - String credentials = splitInput[0]; - String[] splitCredentials = StringUtils.split(credentials, ":"); - if (splitCredentials == null) { - this.username = credentials; - } - else { - this.username = splitCredentials[0]; - this.password = splitCredentials[1]; - } - return splitInput[1]; - } - - private String parseVirtualHost(String input) { - int hostIndex = input.indexOf('/'); - if (hostIndex >= 0) { - this.virtualHost = input.substring(hostIndex + 1); - if (this.virtualHost.isEmpty()) { - this.virtualHost = "/"; - } - input = input.substring(0, hostIndex); - } - return input; - } - - private void parseHostAndPort(String input, boolean sslEnabled) { - int bracketIndex = input.lastIndexOf(']'); - int colonIndex = input.lastIndexOf(':'); - if (colonIndex == -1 || colonIndex < bracketIndex) { - this.host = input; - this.port = (determineSslEnabled(sslEnabled)) ? DEFAULT_PORT_SECURE : DEFAULT_PORT; - } - else { - this.host = input.substring(0, colonIndex); - this.port = Integer.parseInt(input.substring(colonIndex + 1)); - } - } - - private boolean determineSslEnabled(boolean sslEnabled) { - return (this.secureConnection != null) ? this.secureConnection : sslEnabled; - } - - } - - public static final class Stream { - - /** - * Host of a RabbitMQ instance with the Stream plugin enabled. - */ - private String host = "localhost"; - - /** - * Stream port of a RabbitMQ instance with the Stream plugin enabled. - */ - private int port = DEFAULT_STREAM_PORT; - - /** - * Virtual host of a RabbitMQ instance with the Stream plugin enabled. When not - * set, spring.rabbitmq.virtual-host is used. - */ - private @Nullable String virtualHost; - - /** - * Login user to authenticate to the broker. When not set, - * spring.rabbitmq.username is used. - */ - private @Nullable String username; - - /** - * Login password to authenticate to the broker. When not set - * spring.rabbitmq.password is used. - */ - private @Nullable String password; - - /** - * Name of the stream. - */ - private @Nullable String name; - - public String getHost() { - return this.host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return this.port; - } - - public void setPort(int port) { - this.port = port; - } - - public @Nullable String getVirtualHost() { - return this.virtualHost; - } - - public void setVirtualHost(@Nullable String virtualHost) { - this.virtualHost = virtualHost; - } - - public @Nullable String getUsername() { - return this.username; - } - - public void setUsername(@Nullable String username) { - this.username = username; - } - - public @Nullable String getPassword() { - return this.password; - } - - public void setPassword(@Nullable String password) { - this.password = password; - } - - public @Nullable String getName() { - return this.name; - } - - public void setName(@Nullable String name) { - this.name = name; - } - - } - -} diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactory.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactory.java index 8d3180126f7..de3c9a6e613 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactory.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactory.java @@ -16,23 +16,20 @@ package org.springframework.boot.amqp.docker.compose; -import java.util.List; - import org.jspecify.annotations.Nullable; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; import org.springframework.boot.docker.compose.core.RunningService; import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory; import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource; /** - * {@link DockerComposeConnectionDetailsFactory} to create {@link RabbitConnectionDetails} + * {@link DockerComposeConnectionDetailsFactory} to create {@link AmqpConnectionDetails} * for a {@code rabbitmq} service. * - * @author Andy Wilkinson + * @author Eddú Meléndez */ -class RabbitDockerComposeConnectionDetailsFactory - extends DockerComposeConnectionDetailsFactory { +class RabbitDockerComposeConnectionDetailsFactory extends DockerComposeConnectionDetailsFactory { private static final int RABBITMQ_PORT = 5672; @@ -41,10 +38,9 @@ class RabbitDockerComposeConnectionDetailsFactory } @Override - protected @Nullable RabbitConnectionDetails getDockerComposeConnectionDetails( - DockerComposeConnectionSource source) { + protected @Nullable AmqpConnectionDetails getDockerComposeConnectionDetails(DockerComposeConnectionSource source) { try { - return new RabbitDockerComposeConnectionDetails(source.getRunningService()); + return new AmqpDockerComposeConnectionDetails(source.getRunningService()); } catch (IllegalStateException ex) { return null; @@ -52,20 +48,19 @@ class RabbitDockerComposeConnectionDetailsFactory } /** - * {@link RabbitConnectionDetails} backed by a {@code rabbitmq} - * {@link RunningService}. + * {@link AmqpConnectionDetails} backed by a {@code rabbitmq} {@link RunningService}. */ - static class RabbitDockerComposeConnectionDetails extends DockerComposeConnectionDetails - implements RabbitConnectionDetails { + static class AmqpDockerComposeConnectionDetails extends DockerComposeConnectionDetails + implements AmqpConnectionDetails { private final RabbitEnvironment environment; - private final List
addresses; + private final Address address; - protected RabbitDockerComposeConnectionDetails(RunningService service) { + protected AmqpDockerComposeConnectionDetails(RunningService service) { super(service); this.environment = new RabbitEnvironment(service.env()); - this.addresses = List.of(new Address(service.host(), service.ports().get(RABBITMQ_PORT))); + this.address = new Address(service.host(), service.ports().get(RABBITMQ_PORT)); } @Override @@ -84,8 +79,8 @@ class RabbitDockerComposeConnectionDetailsFactory } @Override - public List
getAddresses() { - return this.addresses; + public Address getAddress() { + return this.address; } } diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/DeprecatedRabbitContainerConnectionDetailsFactory.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/DeprecatedRabbitContainerConnectionDetailsFactory.java deleted file mode 100644 index b641c37e439..00000000000 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/DeprecatedRabbitContainerConnectionDetailsFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.amqp.testcontainers; - -import java.net.URI; -import java.util.List; - -import org.jspecify.annotations.Nullable; -import org.testcontainers.containers.RabbitMQContainer; - -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails; -import org.springframework.boot.ssl.SslBundle; -import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory; -import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource; -import org.springframework.boot.testcontainers.service.connection.ServiceConnection; - -/** - * {@link ContainerConnectionDetailsFactory} to create {@link RabbitConnectionDetails} - * from a {@link ServiceConnection @ServiceConnection}-annotated - * {@link RabbitMQContainer}. - * - * @author Moritz Halbritter - * @author Andy Wilkinson - * @author Phillip Webb - * @deprecated since 4.0.0 for removal in 4.2.0 in favor of - * {@link RabbitContainerConnectionDetailsFactory}. - */ -@Deprecated(since = "4.0.0", forRemoval = true) -class DeprecatedRabbitContainerConnectionDetailsFactory - extends ContainerConnectionDetailsFactory { - - @Override - protected RabbitConnectionDetails getContainerConnectionDetails( - ContainerConnectionSource source) { - return new RabbitMqContainerConnectionDetails(source); - } - - /** - * {@link RabbitConnectionDetails} backed by a {@link ContainerConnectionSource}. - */ - private static final class RabbitMqContainerConnectionDetails extends ContainerConnectionDetails - implements RabbitConnectionDetails { - - private RabbitMqContainerConnectionDetails(ContainerConnectionSource source) { - super(source); - } - - @Override - public String getUsername() { - return getContainer().getAdminUsername(); - } - - @Override - public String getPassword() { - return getContainer().getAdminPassword(); - } - - @Override - public List
getAddresses() { - URI uri = URI.create((getSslBundle() != null) ? getContainer().getAmqpsUrl() : getContainer().getAmqpUrl()); - return List.of(new Address(uri.getHost(), uri.getPort())); - } - - @Override - public @Nullable SslBundle getSslBundle() { - return super.getSslBundle(); - } - - } - -} diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactory.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactory.java index fd47f98fd47..1213116bf9e 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactory.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitContainerConnectionDetailsFactory.java @@ -17,42 +17,37 @@ package org.springframework.boot.amqp.testcontainers; import java.net.URI; -import java.util.List; import org.jspecify.annotations.Nullable; import org.testcontainers.rabbitmq.RabbitMQContainer; -import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails; +import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; import org.springframework.boot.ssl.SslBundle; import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory; import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; /** - * {@link ContainerConnectionDetailsFactory} to create {@link RabbitConnectionDetails} - * from a {@link ServiceConnection @ServiceConnection}-annotated - * {@link RabbitMQContainer}. + * {@link ContainerConnectionDetailsFactory} to create {@link AmqpConnectionDetails} from + * a {@link ServiceConnection @ServiceConnection}-annotated {@link RabbitMQContainer}. * - * @author Moritz Halbritter - * @author Andy Wilkinson - * @author Phillip Webb + * @author Eddú Meléndez */ class RabbitContainerConnectionDetailsFactory - extends ContainerConnectionDetailsFactory { + extends ContainerConnectionDetailsFactory { @Override - protected RabbitConnectionDetails getContainerConnectionDetails( - ContainerConnectionSource source) { - return new RabbitMqContainerConnectionDetails(source); + protected AmqpConnectionDetails getContainerConnectionDetails(ContainerConnectionSource source) { + return new AmqpMqContainerConnectionDetails(source); } /** - * {@link RabbitConnectionDetails} backed by a {@link ContainerConnectionSource}. + * {@link AmqpConnectionDetails} backed by a {@link ContainerConnectionSource}. */ - static final class RabbitMqContainerConnectionDetails extends ContainerConnectionDetails - implements RabbitConnectionDetails { + static final class AmqpMqContainerConnectionDetails extends ContainerConnectionDetails + implements AmqpConnectionDetails { - private RabbitMqContainerConnectionDetails(ContainerConnectionSource source) { + private AmqpMqContainerConnectionDetails(ContainerConnectionSource source) { super(source); } @@ -67,9 +62,9 @@ class RabbitContainerConnectionDetailsFactory } @Override - public List
getAddresses() { + public Address getAddress() { URI uri = URI.create((getSslBundle() != null) ? getContainer().getAmqpsUrl() : getContainer().getAmqpUrl()); - return List.of(new Address(uri.getHost(), uri.getPort())); + return new Address(uri.getHost(), uri.getPort()); } @Override