Browse Source

Merge pull request #48960 from jayychoi

* gh-48960:
  Polish "Add SSL support to auto-configuration for Rabbit Streams"
  Add SSL support to auto-configuration for Rabbit Streams

Closes gh-48960
pull/49641/head
Andy Wilkinson 1 day ago
parent
commit
e71c75182e
  1. 3
      documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc
  2. 6
      documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/amqp.adoc
  3. 3
      documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/testing/testcontainers.adoc
  4. 3
      module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java
  5. 10
      module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactoryIntegrationTests.java
  6. 151
      module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitStreamWithSslContainerConnectionDetailsFactoryIntegrationTests.java
  7. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/ca.crt
  8. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/client.crt
  9. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/client.key
  10. 2
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbit-ssl-compose.yaml
  11. 32
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbit-stream-ssl-compose.yaml
  12. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbitmq-ssl.conf
  13. 8
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbitmq-stream-ssl.conf
  14. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/server.crt
  15. 0
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/server.key
  16. 8
      module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/testcontainers/rabbitmq-stream-ssl.conf
  17. 45
      module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java
  18. 57
      module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java
  19. 10
      module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java
  20. 18
      module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java
  21. 14
      module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java
  22. 31
      module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java
  23. 123
      module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java

3
documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/features/dev-services.adoc

@ -152,7 +152,8 @@ SSL is supported for the following service connections: @@ -152,7 +152,8 @@ SSL is supported for the following service connections:
* Cassandra
* Elasticsearch
* MongoDB
* RabbitMQ (excluding streams)
* RabbitMQ
* RabbitMQ Streams
* Redis
To enable SSL support for a service, you can use https://docs.docker.com/reference/compose-file/services/#labels[service labels].

6
documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/amqp.adoc

@ -101,6 +101,12 @@ If you need to create more javadoc:org.springframework.rabbit.stream.producer.Ra @@ -101,6 +101,12 @@ If you need to create more javadoc:org.springframework.rabbit.stream.producer.Ra
[[messaging.amqp.sending-stream.ssl]]
=== SSL
To use SSL with RabbitMQ Streams, set configprop:spring.rabbitmq.stream.ssl.enabled[] to `true` or set configprop:spring.rabbitmq.stream.ssl.bundle[] to configure the xref:features/ssl.adoc#features.ssl.bundles[SSL bundle] to use.
[[messaging.amqp.receiving]]
== Receiving a Message

3
documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/testing/testcontainers.adoc

@ -235,7 +235,8 @@ The SSL annotations are supported for the following service connections: @@ -235,7 +235,8 @@ The SSL annotations are supported for the following service connections:
* Elasticsearch
* Kafka
* MongoDB
* RabbitMQ (excluding streams)
* RabbitMQ
* RabbitMQ Streams
* Redis
The `ElasticsearchContainer` additionally supports automatic detection of server side SSL.

3
module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitDockerComposeConnectionDetailsFactoryIntegrationTests.java

@ -41,7 +41,8 @@ class RabbitDockerComposeConnectionDetailsFactoryIntegrationTests { @@ -41,7 +41,8 @@ class RabbitDockerComposeConnectionDetailsFactoryIntegrationTests {
}
@DockerComposeTest(composeFile = "rabbit-ssl-compose.yaml", image = TestImage.RABBITMQ,
additionalResources = { "ca.crt", "server.crt", "server.key", "client.crt", "client.key", "rabbitmq.conf" })
additionalResources = { "../../ca.crt", "../../server.crt", "../../server.key", "../../client.crt",
"../../client.key", "rabbitmq-ssl.conf" })
void runWithSslCreatesConnectionDetails(RabbitConnectionDetails connectionDetails) {
assertConnectionDetails(connectionDetails);
SslBundle sslBundle = connectionDetails.getSslBundle();

10
module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactoryIntegrationTests.java

@ -18,6 +18,7 @@ package org.springframework.boot.amqp.docker.compose; @@ -18,6 +18,7 @@ package org.springframework.boot.amqp.docker.compose;
import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.docker.compose.service.connection.test.DockerComposeTest;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.testsupport.container.TestImage;
import static org.assertj.core.api.Assertions.assertThat;
@ -34,6 +35,15 @@ class RabbitStreamDockerComposeConnectionDetailsFactoryIntegrationTests { @@ -34,6 +35,15 @@ class RabbitStreamDockerComposeConnectionDetailsFactoryIntegrationTests {
assertConnectionDetails(connectionDetails);
}
@DockerComposeTest(composeFile = "rabbit-stream-ssl-compose.yaml", image = TestImage.RABBITMQ,
additionalResources = { "../../ca.crt", "../../server.crt", "../../server.key", "../../client.crt",
"../../client.key", "rabbitmq-stream-ssl.conf" })
void runWithSslCreatesConnectionDetails(RabbitStreamConnectionDetails connectionDetails) {
assertConnectionDetails(connectionDetails);
SslBundle sslBundle = connectionDetails.getSslBundle();
assertThat(sslBundle).isNotNull();
}
private void assertConnectionDetails(RabbitStreamConnectionDetails connectionDetails) {
assertThat(connectionDetails.getUsername()).isEqualTo("myuser");
assertThat(connectionDetails.getPassword()).isEqualTo("secret");

151
module/spring-boot-amqp/src/dockerTest/java/org/springframework/boot/amqp/testcontainers/RabbitStreamWithSslContainerConnectionDetailsFactoryIntegrationTests.java

@ -0,0 +1,151 @@ @@ -0,0 +1,151 @@
/*
* Copyright 2012-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.amqp.testcontainers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.rabbitmq.RabbitMQContainer;
import org.testcontainers.utility.MountableFile;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.amqp.autoconfigure.EnvironmentBuilderCustomizer;
import org.springframework.boot.amqp.autoconfigure.RabbitAutoConfiguration;
import org.springframework.boot.amqp.autoconfigure.RabbitConnectionDetails;
import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.amqp.testcontainers.RabbitContainerConnectionDetailsFactory.RabbitMqContainerConnectionDetails;
import org.springframework.boot.amqp.testcontainers.RabbitStreamContainerConnectionDetailsFactory.RabbitMqStreamContainerConnectionDetails;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.testcontainers.service.connection.PemKeyStore;
import org.springframework.boot.testcontainers.service.connection.PemTrustStore;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.boot.testsupport.container.TestImage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.StreamAdmin;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link RabbitStreamContainerConnectionDetailsFactory} with SSL.
*
* @author Eddú Meléndez
* @author Andy Wilkinson
*/
@SpringJUnitConfig
@TestPropertySource(
properties = { "spring.rabbitmq.stream.name=stream.queue1", "spring.rabbitmq.listener.type=stream" })
@Testcontainers(disabledWithoutDocker = true)
class RabbitStreamWithSslContainerConnectionDetailsFactoryIntegrationTests {
private static final int RABBITMQ_STREAMS_TLS_PORT = 5551;
@Container
@ServiceConnection(type = RabbitStreamConnectionDetails.class)
@PemTrustStore(certificate = "classpath:org/springframework/boot/amqp/ca.crt")
@PemKeyStore(certificate = "classpath:org/springframework/boot/amqp/client.crt",
privateKey = "classpath:org/springframework/boot/amqp/client.key")
static final RabbitMQContainer rabbit = getRabbitMqStreamContainer();
private static RabbitMQContainer getRabbitMqStreamContainer() {
RabbitMQContainer container = TestImage.container(RabbitMQContainer.class);
container.addExposedPorts(RABBITMQ_STREAMS_TLS_PORT);
String enabledPlugins = "[rabbitmq_stream,rabbitmq_prometheus].";
container.withCopyToContainer(Transferable.of(enabledPlugins), "/etc/rabbitmq/enabled_plugins");
container.withCopyFileToContainer(
MountableFile
.forClasspathResource("org/springframework/boot/amqp/testcontainers/rabbitmq-stream-ssl.conf"),
"/etc/rabbitmq/rabbitmq.conf");
container.withCopyFileToContainer(MountableFile.forClasspathResource("org/springframework/boot/amqp/ca.crt"),
"/etc/rabbitmq/ca.crt");
container.withCopyFileToContainer(
MountableFile.forClasspathResource("org/springframework/boot/amqp/server.key"),
"/etc/rabbitmq/server.key");
container.withCopyFileToContainer(
MountableFile.forClasspathResource("org/springframework/boot/amqp/server.crt"),
"/etc/rabbitmq/server.crt");
return container;
}
@Autowired(required = false)
private RabbitConnectionDetails connectionDetails;
@Autowired(required = false)
private RabbitStreamConnectionDetails streamConnectionDetails;
@Autowired
private RabbitStreamTemplate rabbitStreamTemplate;
@Autowired
private TestListener listener;
@Test
void connectionCanBeMadeToRabbitContainer() {
assertThat(this.connectionDetails).isNotInstanceOf(RabbitMqContainerConnectionDetails.class);
assertThat(this.streamConnectionDetails).isInstanceOf(RabbitMqStreamContainerConnectionDetails.class);
assertThat(this.streamConnectionDetails.getSslBundle()).isNotNull();
this.rabbitStreamTemplate.convertAndSend("message");
Awaitility.waitAtMost(Duration.ofMinutes(4))
.untilAsserted(() -> assertThat(this.listener.messages).containsExactly("message"));
}
@Configuration(proxyBeanMethods = false)
@ImportAutoConfiguration(RabbitAutoConfiguration.class)
static class TestConfiguration {
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, (sc) -> sc.stream("stream.queue1").create());
}
@Bean
EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
return (env) -> env.addressResolver(
(address) -> new Address(rabbit.getHost(), rabbit.getMappedPort(RABBITMQ_STREAMS_TLS_PORT)));
}
@Bean
TestListener testListener() {
return new TestListener();
}
}
static class TestListener {
private final List<String> messages = new ArrayList<>();
@RabbitListener(queues = "stream.queue1")
void processMessage(String message) {
this.messages.add(message);
}
}
}

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/ca.crt → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/ca.crt

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/client.crt → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/client.crt

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/client.key → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/client.key

2
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbit-ssl-compose.yaml

@ -12,7 +12,7 @@ services: @@ -12,7 +12,7 @@ services:
- ssl-key
- ssl-cert
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
- ./rabbitmq-ssl.conf:/etc/rabbitmq/rabbitmq.conf:ro
labels:
- 'org.springframework.boot.sslbundle.pem.keystore.certificate=client.crt'
- 'org.springframework.boot.sslbundle.pem.keystore.private-key=client.key'

32
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbit-stream-ssl-compose.yaml

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
services:
rabbitmq:
image: '{imageName}'
environment:
- 'RABBITMQ_DEFAULT_USER=myuser'
- 'RABBITMQ_DEFAULT_PASS=secret'
configs:
- source: plugins
target: /etc/rabbitmq/enabled_plugins
ports:
- '5551'
- '5552'
secrets:
- ssl-ca
- ssl-key
- ssl-cert
volumes:
- ./rabbitmq-stream-ssl.conf:/etc/rabbitmq/rabbitmq.conf:ro
labels:
- 'org.springframework.boot.sslbundle.pem.keystore.certificate=client.crt'
- 'org.springframework.boot.sslbundle.pem.keystore.private-key=client.key'
- 'org.springframework.boot.sslbundle.pem.truststore.certificate=ca.crt'
secrets:
ssl-ca:
file: 'ca.crt'
ssl-key:
file: 'server.key'
ssl-cert:
file: 'server.crt'
configs:
plugins:
content: "[rabbitmq_stream]."

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbitmq.conf → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbitmq-ssl.conf

8
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/rabbitmq-stream-ssl.conf

@ -0,0 +1,8 @@ @@ -0,0 +1,8 @@
stream.listeners.ssl.1 = 5551
ssl_options.cacertfile=/run/secrets/ssl-ca
ssl_options.certfile=/run/secrets/ssl-cert
ssl_options.keyfile=/run/secrets/ssl-key
ssl_options.verify=verify_peer
ssl_options.fail_if_no_peer_cert=true

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/server.crt → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/server.crt

0
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/docker/compose/server.key → module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/server.key

8
module/spring-boot-amqp/src/dockerTest/resources/org/springframework/boot/amqp/testcontainers/rabbitmq-stream-ssl.conf

@ -0,0 +1,8 @@ @@ -0,0 +1,8 @@
stream.listeners.ssl.1 = 5551
ssl_options.cacertfile=/etc/rabbitmq/ca.crt
ssl_options.certfile=/etc/rabbitmq/server.crt
ssl_options.keyfile=/etc/rabbitmq/server.key
ssl_options.verify=verify_peer
ssl_options.fail_if_no_peer_cert=true

45
module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java

@ -52,6 +52,7 @@ import org.springframework.util.unit.DataSize; @@ -52,6 +52,7 @@ import org.springframework.util.unit.DataSize;
* @author Scott Frederick
* @author Lasse Wulff
* @author Yanming Zhou
* @author Jay Choi
* @since 4.0.0
*/
@ConfigurationProperties("spring.rabbitmq")
@ -1311,6 +1312,11 @@ public class RabbitProperties { @@ -1311,6 +1312,11 @@ public class RabbitProperties {
*/
private @Nullable String name;
/**
* SSL configuration for RabbitMQ instance with the Stream plugin enabled.
*/
private final Ssl ssl = new Ssl();
public String getHost() {
return this.host;
}
@ -1359,6 +1365,45 @@ public class RabbitProperties { @@ -1359,6 +1365,45 @@ public class RabbitProperties {
this.name = name;
}
public Ssl getSsl() {
return this.ssl;
}
public static class Ssl {
/**
* Whether to enable SSL support. Enabled automatically if "bundle" is
* provided.
*/
private @Nullable Boolean enabled;
/**
* SSL bundle name.
*/
private @Nullable String bundle;
public @Nullable Boolean getEnabled() {
return this.enabled;
}
public boolean determineEnabled() {
return Boolean.TRUE.equals(getEnabled()) || this.bundle != null;
}
public void setEnabled(@Nullable Boolean enabled) {
this.enabled = enabled;
}
public @Nullable String getBundle() {
return this.bundle;
}
public void setBundle(@Nullable String bundle) {
this.bundle = bundle;
}
}
}
}

57
module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java

@ -16,19 +16,28 @@ @@ -16,19 +16,28 @@
package org.springframework.boot.amqp.autoconfigure;
import javax.net.ssl.SSLException;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream.Ssl;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.StreamContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.boot.ssl.SslManagerBundle;
import org.springframework.boot.ssl.SslOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
@ -41,12 +50,14 @@ import org.springframework.rabbit.stream.producer.RabbitStreamOperations; @@ -41,12 +50,14 @@ import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Configuration for Spring RabbitMQ Stream plugin support.
*
* @author Gary Russell
* @author Eddú Meléndez
* @author Jay Choi
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
@ -54,8 +65,9 @@ class RabbitStreamConfiguration { @@ -54,8 +65,9 @@ class RabbitStreamConfiguration {
@Bean
@ConditionalOnMissingBean
RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties) {
return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream());
RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties,
@Nullable SslBundles sslBundles) {
return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream(), sslBundles);
}
@Bean(name = "rabbitListenerContainerFactory")
@ -131,15 +143,41 @@ class RabbitStreamConfiguration { @@ -131,15 +143,41 @@ class RabbitStreamConfiguration {
.to(builder::virtualHost);
map.from(streamConnectionDetails.getUsername()).orFrom(connectionDetails::getUsername).to(builder::username);
map.from(streamConnectionDetails.getPassword()).orFrom(connectionDetails::getPassword).to(builder::password);
SslBundle sslBundle = streamConnectionDetails.getSslBundle();
if (sslBundle != null) {
builder.tls().sslContext(createSslContext(sslBundle));
}
else if (stream.getSsl().determineEnabled()) {
builder.tls();
}
return builder;
}
private static SslContext createSslContext(SslBundle sslBundle) {
SslOptions options = sslBundle.getOptions();
SslManagerBundle managers = sslBundle.getManagers();
try {
return SslContextBuilder.forClient()
.keyManager(managers.getKeyManagerFactory())
.trustManager(managers.getTrustManagerFactory())
.ciphers(SslOptions.asSet(options.getCiphers()))
.protocols(options.getEnabledProtocols())
.build();
}
catch (SSLException ex) {
throw new IllegalStateException("Failed to create SSL context for RabbitMQ Stream", ex);
}
}
static class PropertiesRabbitStreamConnectionDetails implements RabbitStreamConnectionDetails {
private final Stream streamProperties;
PropertiesRabbitStreamConnectionDetails(Stream streamProperties) {
private final @Nullable SslBundles sslBundles;
PropertiesRabbitStreamConnectionDetails(Stream streamProperties, @Nullable SslBundles sslBundles) {
this.streamProperties = streamProperties;
this.sslBundles = sslBundles;
}
@Override
@ -167,6 +205,19 @@ class RabbitStreamConfiguration { @@ -167,6 +205,19 @@ class RabbitStreamConfiguration {
return this.streamProperties.getPassword();
}
@Override
public @Nullable SslBundle getSslBundle() {
Ssl ssl = this.streamProperties.getSsl();
if (!ssl.determineEnabled()) {
return null;
}
if (StringUtils.hasLength(ssl.getBundle())) {
Assert.notNull(this.sslBundles, "SSL bundle name has been set but no SSL bundles found in context");
return this.sslBundles.getBundle(ssl.getBundle());
}
return null;
}
}
}

10
module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java

@ -19,11 +19,13 @@ package org.springframework.boot.amqp.autoconfigure; @@ -19,11 +19,13 @@ package org.springframework.boot.amqp.autoconfigure;
import org.jspecify.annotations.Nullable;
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
import org.springframework.boot.ssl.SslBundle;
/**
* Details required to establish a connection to a RabbitMQ Stream service.
*
* @author Eddú Meléndez
* @author Jay Choi
* @since 4.1.0
*/
public interface RabbitStreamConnectionDetails extends ConnectionDetails {
@ -64,4 +66,12 @@ public interface RabbitStreamConnectionDetails extends ConnectionDetails { @@ -64,4 +66,12 @@ public interface RabbitStreamConnectionDetails extends ConnectionDetails {
return null;
}
/**
* SSL bundle to use.
* @return the SSL bundle to use or {@code null}
*/
default @Nullable SslBundle getSslBundle() {
return null;
}
}

18
module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java

@ -23,6 +23,7 @@ import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails @@ -23,6 +23,7 @@ import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails
import org.springframework.boot.docker.compose.core.RunningService;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource;
import org.springframework.boot.ssl.SslBundle;
/**
* {@link DockerComposeConnectionDetailsFactory} to create {@link RabbitConnectionDetails}
@ -32,12 +33,11 @@ import org.springframework.boot.docker.compose.service.connection.DockerComposeC @@ -32,12 +33,11 @@ import org.springframework.boot.docker.compose.service.connection.DockerComposeC
* @author Andy Wilkinson
* @author Phillip Webb
* @author Scott Frederick
* @author Jay Choi
*/
class RabbitStreamDockerComposeConnectionDetailsFactory
extends DockerComposeConnectionDetailsFactory<RabbitStreamConnectionDetails> {
private static final int RABBITMQ_STREAMS_PORT = 5552;
protected RabbitStreamDockerComposeConnectionDetailsFactory() {
super("rabbitmq");
}
@ -60,17 +60,24 @@ class RabbitStreamDockerComposeConnectionDetailsFactory @@ -60,17 +60,24 @@ class RabbitStreamDockerComposeConnectionDetailsFactory
static class RabbitStreamDockerComposeConnectionDetails extends DockerComposeConnectionDetails
implements RabbitStreamConnectionDetails {
private static final int STREAMS_PORT = 5552;
private static final int STREAMS_TLS_PORT = 5551;
private final RabbitEnvironment environment;
private final String host;
private final int port;
private final @Nullable SslBundle sslBundle;
protected RabbitStreamDockerComposeConnectionDetails(RunningService service) {
super(service);
this.environment = new RabbitEnvironment(service.env());
this.host = service.host();
this.port = service.ports().get(RABBITMQ_STREAMS_PORT);
this.sslBundle = getSslBundle(service);
this.port = service.ports().get((this.sslBundle != null) ? STREAMS_TLS_PORT : STREAMS_PORT);
}
@Override
@ -98,6 +105,11 @@ class RabbitStreamDockerComposeConnectionDetailsFactory @@ -98,6 +105,11 @@ class RabbitStreamDockerComposeConnectionDetailsFactory
return this.port;
}
@Override
public @Nullable SslBundle getSslBundle() {
return this.sslBundle;
}
}
}

14
module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java

@ -16,9 +16,11 @@ @@ -16,9 +16,11 @@
package org.springframework.boot.amqp.testcontainers;
import org.jspecify.annotations.Nullable;
import org.testcontainers.rabbitmq.RabbitMQContainer;
import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
@ -29,6 +31,7 @@ import org.springframework.boot.testcontainers.service.connection.ServiceConnect @@ -29,6 +31,7 @@ import org.springframework.boot.testcontainers.service.connection.ServiceConnect
* {@link ServiceConnection @ServiceConnection}-annotated {@link RabbitMQContainer}.
*
* @author Eddú Meléndez
* @author Jay Choi
*/
class RabbitStreamContainerConnectionDetailsFactory
extends ContainerConnectionDetailsFactory<RabbitMQContainer, RabbitStreamConnectionDetails> {
@ -57,6 +60,10 @@ class RabbitStreamContainerConnectionDetailsFactory @@ -57,6 +60,10 @@ class RabbitStreamContainerConnectionDetailsFactory
static final class RabbitMqStreamContainerConnectionDetails extends ContainerConnectionDetails<RabbitMQContainer>
implements RabbitStreamConnectionDetails {
private static final int STREAMS_PORT = 5552;
private static final int STREAMS_TLS_PORT = 5551;
private RabbitMqStreamContainerConnectionDetails(ContainerConnectionSource<RabbitMQContainer> source) {
super(source);
}
@ -68,7 +75,7 @@ class RabbitStreamContainerConnectionDetailsFactory @@ -68,7 +75,7 @@ class RabbitStreamContainerConnectionDetailsFactory
@Override
public int getPort() {
return getContainer().getMappedPort(5552);
return getContainer().getMappedPort((getSslBundle() != null) ? STREAMS_TLS_PORT : STREAMS_PORT);
}
@Override
@ -81,6 +88,11 @@ class RabbitStreamContainerConnectionDetailsFactory @@ -81,6 +88,11 @@ class RabbitStreamContainerConnectionDetailsFactory
return getContainer().getAdminPassword();
}
@Override
public @Nullable SslBundle getSslBundle() {
return super.getSslBundle();
}
}
}

31
module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java

@ -38,6 +38,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -38,6 +38,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
* @author Stephane Nicoll
* @author Rafael Carvalho
* @author Scott Frederick
* @author Jay Choi
*/
class RabbitPropertiesTests {
@ -381,4 +382,34 @@ class RabbitPropertiesTests { @@ -381,4 +382,34 @@ class RabbitPropertiesTests {
.withMessageContaining("spring.rabbitmq.host");
}
@Test
void streamSslIsDisabledByDefault() {
assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse();
}
@Test
void streamSslIsEnabledWhenEnabledIsTrue() {
this.properties.getStream().getSsl().setEnabled(true);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}
@Test
void streamSslIsEnabledWhenBundleIsSet() {
this.properties.getStream().getSsl().setBundle("test-bundle");
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}
@Test
void streamSslIsDisabledWhenEnabledIsFalseAndBundleIsNotSet() {
this.properties.getStream().getSsl().setEnabled(false);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse();
}
@Test
void streamSslIsEnabledWhenBundleIsSetButEnabledIsFalse() {
this.properties.getStream().getSsl().setBundle("test-bundle");
this.properties.getStream().getSsl().setEnabled(false);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}
}

123
module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java

@ -25,6 +25,7 @@ import com.rabbitmq.stream.Codec; @@ -25,6 +25,7 @@ import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.MessageListenerContainer;
@ -35,6 +36,10 @@ import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; @@ -35,6 +36,10 @@ import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.amqp.autoconfigure.RabbitStreamConfiguration.PropertiesRabbitStreamConnectionDetails;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration;
import org.springframework.boot.ssl.NoSuchSslBundleException;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -53,8 +58,11 @@ import org.springframework.rabbit.stream.support.converter.StreamMessageConverte @@ -53,8 +58,11 @@ import org.springframework.rabbit.stream.support.converter.StreamMessageConverte
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
/**
* Tests for {@link RabbitStreamConfiguration}.
@ -63,11 +71,12 @@ import static org.mockito.Mockito.mock; @@ -63,11 +71,12 @@ import static org.mockito.Mockito.mock;
* @author Andy Wilkinson
* @author Eddú Meléndez
* @author Moritz Halbritter
* @author Jay Choi
*/
class RabbitStreamConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class));
.withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class, SslAutoConfiguration.class));
@Test
@SuppressWarnings("unchecked")
@ -150,7 +159,7 @@ class RabbitStreamConfigurationTests { @@ -150,7 +159,7 @@ class RabbitStreamConfigurationTests {
RabbitProperties properties = new RabbitProperties();
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().port(5552);
then(builder).should().host("localhost");
then(builder).should().virtualHost("vhost");
@ -167,7 +176,7 @@ class RabbitStreamConfigurationTests { @@ -167,7 +176,7 @@ class RabbitStreamConfigurationTests {
properties.getStream().setPort(5553);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().port(5553);
}
@ -178,7 +187,7 @@ class RabbitStreamConfigurationTests { @@ -178,7 +187,7 @@ class RabbitStreamConfigurationTests {
properties.getStream().setHost("stream.rabbit.example.com");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().host("stream.rabbit.example.com");
}
@ -189,7 +198,7 @@ class RabbitStreamConfigurationTests { @@ -189,7 +198,7 @@ class RabbitStreamConfigurationTests {
properties.getStream().setVirtualHost("stream-virtual-host");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().virtualHost("stream-virtual-host");
}
@ -200,7 +209,7 @@ class RabbitStreamConfigurationTests { @@ -200,7 +209,7 @@ class RabbitStreamConfigurationTests {
properties.setVirtualHost("default-virtual-host");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "default-virtual-host"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().virtualHost("default-virtual-host");
}
@ -212,7 +221,7 @@ class RabbitStreamConfigurationTests { @@ -212,7 +221,7 @@ class RabbitStreamConfigurationTests {
properties.setPassword("secret");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("bob", "password", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().username("bob");
then(builder).should().password("password");
}
@ -227,7 +236,7 @@ class RabbitStreamConfigurationTests { @@ -227,7 +236,7 @@ class RabbitStreamConfigurationTests {
properties.getStream().setPassword("confidential");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties));
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().username("bob");
then(builder).should().password("confidential");
}
@ -319,8 +328,92 @@ class RabbitStreamConfigurationTests { @@ -319,8 +328,92 @@ class RabbitStreamConfigurationTests {
.containsExactly("rabbitmq", 5555));
}
private RabbitStreamConnectionDetails getRabbitStreamConnectionDetails(RabbitProperties properties) {
return new PropertiesRabbitStreamConnectionDetails(properties.getStream());
@Test
void whenStreamSslIsNotConfiguredThenTlsIsNotUsed() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties, null));
then(builder).should(never()).tls();
}
@Test
void whenStreamSslIsEnabledThenTlsIsUsed() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().getSsl().setEnabled(true);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties, null));
then(builder).should().tls();
}
@Test
void whenStreamSslBundleIsConfiguredThenTlsIsUsed() {
this.contextRunner.withPropertyValues("spring.rabbitmq.stream.ssl.bundle=test-bundle",
"spring.ssl.bundle.jks.test-bundle.keystore.location=classpath:org/springframework/boot/amqp/autoconfigure/test.jks",
"spring.ssl.bundle.jks.test-bundle.keystore.password=secret")
.run((context) -> {
assertThat(context).hasNotFailed();
assertThat(context).hasSingleBean(Environment.class);
});
}
@Test
void whenStreamSslIsDisabledThenTlsIsNotUsed() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().getSsl().setEnabled(false);
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties, null));
then(builder).should(never()).tls();
}
@Test
void whenStreamSslIsDisabledWithBundleThenTlsIsStillUsed() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
SslBundles sslBundles = mock(SslBundles.class);
RabbitProperties properties = new RabbitProperties();
properties.getStream().getSsl().setEnabled(false);
properties.getStream().getSsl().setBundle("some-bundle");
RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties, sslBundles));
then(builder).should().tls();
}
@Test
void whenConnectionDetailsSslBundleIsProvidedThenTlsIsUsedWithoutProperties() {
this.contextRunner.withUserConfiguration(CustomConnectionDetails.class)
.withPropertyValues(
"spring.ssl.bundle.jks.test-bundle.keystore.location=classpath:org/springframework/boot/amqp/autoconfigure/test.jks",
"spring.ssl.bundle.jks.test-bundle.keystore.password=secret")
.run((context) -> {
assertThat(context).hasNotFailed();
assertThat(context).hasSingleBean(Environment.class);
});
}
@Test
void whenStreamSslBundleIsInvalidThenFails() {
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
SslBundles sslBundles = mock(SslBundles.class);
given(sslBundles.getBundle("invalid-bundle")).willThrow(
new NoSuchSslBundleException("invalid-bundle", "SSL bundle name 'invalid-bundle' cannot be found"));
RabbitProperties properties = new RabbitProperties();
properties.getStream().getSsl().setBundle("invalid-bundle");
assertThatExceptionOfType(NoSuchSslBundleException.class)
.isThrownBy(() -> RabbitStreamConfiguration.configure(builder, properties,
new TestRabbitConnectionDetails("guest", "guest", "vhost"),
getRabbitStreamConnectionDetails(properties, sslBundles)))
.withMessageContaining("invalid-bundle");
}
private RabbitStreamConnectionDetails getRabbitStreamConnectionDetails(RabbitProperties properties,
@Nullable SslBundles sslBundles) {
return new PropertiesRabbitStreamConnectionDetails(properties.getStream(), sslBundles);
}
@Configuration(proxyBeanMethods = false)
@ -448,7 +541,7 @@ class RabbitStreamConfigurationTests { @@ -448,7 +541,7 @@ class RabbitStreamConfigurationTests {
static class CustomConnectionDetails {
@Bean
RabbitStreamConnectionDetails customRabbitMqStreamConnectionDetails() {
RabbitStreamConnectionDetails customRabbitMqStreamConnectionDetails(SslBundles sslBundles) {
return new RabbitStreamConnectionDetails() {
@Override
public String getHost() {
@ -459,6 +552,14 @@ class RabbitStreamConfigurationTests { @@ -459,6 +552,14 @@ class RabbitStreamConfigurationTests {
public int getPort() {
return 5555;
}
@Override
public @Nullable SslBundle getSslBundle() {
if (sslBundles.getBundleNames().contains("test-bundle")) {
return sslBundles.getBundle("test-bundle");
}
return null;
}
};
}

Loading…
Cancel
Save