Browse Source
This commit adds auto-configuration for the generic AMQP 1.0 client in Spring AMQP 4.1, using Qpid ProtonJ. The auto-configuration provides an AmqpConnectionFactory as well as an AmqpClient with standard customizer callbacks. The "spring.amqp" namespace exposes settings to connect to an AMQP 1.0 compliant broker. Docker compose and testcontainers support using RabbitMQ have been added too. Closes gh-49621pull/46608/head
47 changed files with 1611 additions and 28 deletions
@ -0,0 +1,37 @@
@@ -0,0 +1,37 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.docs.messaging.amqp.generic.sending; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class MyBean { |
||||
|
||||
private final AmqpClient amqpClient; |
||||
|
||||
public MyBean(AmqpClient amqpClient) { |
||||
this.amqpClient = amqpClient; |
||||
} |
||||
|
||||
// @fold:on // ...
|
||||
public void sendMessage(String msg) { |
||||
this.amqpClient.to("/queues/test").body(msg).send(); |
||||
} |
||||
// @fold:off
|
||||
|
||||
} |
||||
@ -0,0 +1,32 @@
@@ -0,0 +1,32 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.docs.messaging.amqp.generic.sending |
||||
|
||||
import org.springframework.amqp.client.AmqpClient |
||||
import org.springframework.stereotype.Component |
||||
|
||||
@Component |
||||
class MyBean(private val amqpClient: AmqpClient) { |
||||
|
||||
// @fold:on // ... |
||||
fun someOtherMethod(msg: String) { |
||||
amqpClient.to("/queues/test").body(msg).send() |
||||
} |
||||
// @fold:off |
||||
|
||||
} |
||||
|
||||
@ -0,0 +1,61 @@
@@ -0,0 +1,61 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the License); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
plugins { |
||||
id "java-library" |
||||
id "org.springframework.boot.docker-test" |
||||
id "org.springframework.boot.auto-configuration" |
||||
id "org.springframework.boot.configuration-properties" |
||||
id "org.springframework.boot.deployed" |
||||
id "org.springframework.boot.optional-dependencies" |
||||
} |
||||
|
||||
description = "Spring Boot support for AMQP 1.0" |
||||
|
||||
dependencies { |
||||
api(project(":core:spring-boot")) |
||||
api("org.apache.qpid:protonj2-client") |
||||
api("org.springframework:spring-messaging") |
||||
api("org.springframework.amqp:spring-amqp-client") |
||||
|
||||
implementation(project(":module:spring-boot-transaction")) |
||||
|
||||
optional(project(":core:spring-boot-autoconfigure")) |
||||
optional(project(":core:spring-boot-docker-compose")) |
||||
optional(project(":module:spring-boot-jackson")) |
||||
optional(project(":core:spring-boot-testcontainers")) |
||||
optional("org.testcontainers:testcontainers-rabbitmq") |
||||
|
||||
dockerTestImplementation(project(":core:spring-boot-test")) |
||||
dockerTestImplementation(project(":test-support:spring-boot-docker-test-support")) |
||||
dockerTestImplementation(testFixtures(project(":core:spring-boot-docker-compose"))) |
||||
dockerTestImplementation("ch.qos.logback:logback-classic") |
||||
dockerTestImplementation("org.testcontainers:testcontainers-junit-jupiter") |
||||
|
||||
|
||||
testImplementation(project(":core:spring-boot-test")) |
||||
testImplementation(project(":test-support:spring-boot-test-support")) |
||||
|
||||
testRuntimeOnly("ch.qos.logback:logback-classic") |
||||
} |
||||
|
||||
tasks.named("compileTestJava") { |
||||
options.nullability.checking = "tests" |
||||
} |
||||
|
||||
tasks.named("compileDockerTestJava") { |
||||
options.nullability.checking = "tests" |
||||
} |
||||
@ -0,0 +1,74 @@
@@ -0,0 +1,74 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.testcontainers.junit.jupiter.Container; |
||||
import org.testcontainers.junit.jupiter.Testcontainers; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
import org.springframework.boot.autoconfigure.AutoConfigurations; |
||||
import org.springframework.boot.jackson.autoconfigure.JacksonAutoConfiguration; |
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner; |
||||
import org.springframework.boot.testsupport.container.RabbitMqManagementContainer; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* Integration tests for {@link AmqpAutoConfiguration}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
@Testcontainers(disabledWithoutDocker = true) |
||||
class AmqpAutoConfigurationIntegrationTests { |
||||
|
||||
@Container |
||||
static final RabbitMqManagementContainer container = new RabbitMqManagementContainer(); |
||||
|
||||
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() |
||||
.withConfiguration(AutoConfigurations.of(AmqpAutoConfiguration.class)) |
||||
.withPropertyValues("spring.amqp.host=" + container.getHost(), "spring.amqp.port=" + container.getAmqpPort()); |
||||
|
||||
@Test |
||||
void sendAndReceiveUsingAmqpClient() { |
||||
String queue = container.createRandomQueue(); |
||||
this.contextRunner.run((context) -> { |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient.to(queue).body("Hello World").send().get(1, TimeUnit.SECONDS)).isTrue(); |
||||
assertThat(amqpClient.from(queue).receiveAndConvert().get(1, TimeUnit.SECONDS)).isEqualTo("Hello World"); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void sendAndReceiveUsingJson() { |
||||
String queue = container.createRandomQueue(); |
||||
this.contextRunner.withConfiguration(AutoConfigurations.of(JacksonAutoConfiguration.class)).run((context) -> { |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient.to(queue).body(new TestMessage("hello", 42)).send().get(1, TimeUnit.SECONDS)) |
||||
.isTrue(); |
||||
assertThat(amqpClient.from(queue).receiveAndConvert().get(1, TimeUnit.SECONDS)) |
||||
.isEqualTo(new TestMessage("hello", 42)); |
||||
}); |
||||
} |
||||
|
||||
record TestMessage(String value, int counter) { |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,42 @@
@@ -0,0 +1,42 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.docker.compose; |
||||
|
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; |
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails.Address; |
||||
import org.springframework.boot.docker.compose.service.connection.test.DockerComposeTest; |
||||
import org.springframework.boot.testsupport.container.TestImage; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* Integration tests for {@link RabbitMqDockerComposeConnectionDetailsFactory}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class RabbitMqDockerComposeConnectionDetailsFactoryIntegrationTests { |
||||
|
||||
@DockerComposeTest(composeFile = "rabbitmq-compose.yaml", image = TestImage.RABBITMQ) |
||||
void runCreatesConnectionDetails(AmqpConnectionDetails connectionDetails) { |
||||
assertThat(connectionDetails.getUsername()).isEqualTo("myuser"); |
||||
assertThat(connectionDetails.getPassword()).isEqualTo("secret"); |
||||
Address address = connectionDetails.getAddress(); |
||||
assertThat(address.host()).isNotNull(); |
||||
assertThat(address.port()).isGreaterThan(0); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,71 @@
@@ -0,0 +1,71 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.testcontainers; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.testcontainers.junit.jupiter.Container; |
||||
import org.testcontainers.junit.jupiter.Testcontainers; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.boot.amqp.autoconfigure.AmqpAutoConfiguration; |
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; |
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; |
||||
import org.springframework.boot.testcontainers.service.connection.ServiceConnection; |
||||
import org.springframework.boot.testsupport.container.RabbitMqManagementContainer; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* Tests for {@link RabbitMqContainerConnectionDetailsFactory}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
@SpringJUnitConfig |
||||
@Testcontainers(disabledWithoutDocker = true) |
||||
class RabbitMqContainerConnectionDetailsFactoryIntegrationTests { |
||||
|
||||
@Container |
||||
@ServiceConnection |
||||
static final RabbitMqManagementContainer container = new RabbitMqManagementContainer(); |
||||
|
||||
@Autowired(required = false) |
||||
private AmqpConnectionDetails connectionDetails; |
||||
|
||||
@Autowired |
||||
private AmqpClient amqpClient; |
||||
|
||||
@Test |
||||
void connectionCanBeMadeToRabbitMqContainer() throws Exception { |
||||
assertThat(this.connectionDetails).isNotNull(); |
||||
container.createQueue("test"); |
||||
this.amqpClient.to("/queues/test").body("test message").send(); |
||||
Object message = this.amqpClient.from("/queues/test").receiveAndConvert().get(4, TimeUnit.MINUTES); |
||||
assertThat(message).isEqualTo("test message"); |
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ImportAutoConfiguration(AmqpAutoConfiguration.class) |
||||
static class TestConfiguration { |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,4 @@
@@ -0,0 +1,4 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<configuration> |
||||
<include resource="org/springframework/boot/logging/logback/base.xml"/> |
||||
</configuration> |
||||
@ -0,0 +1,8 @@
@@ -0,0 +1,8 @@
|
||||
services: |
||||
rabbitmq: |
||||
image: '{imageName}' |
||||
environment: |
||||
- 'RABBITMQ_DEFAULT_USER=myuser' |
||||
- 'RABBITMQ_DEFAULT_PASS=secret' |
||||
ports: |
||||
- '5672' |
||||
@ -0,0 +1 @@
@@ -0,0 +1 @@
|
||||
spring.test.context.cache.maxSize=1 |
||||
@ -0,0 +1,132 @@
@@ -0,0 +1,132 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import java.util.stream.Stream; |
||||
|
||||
import org.apache.qpid.protonj2.client.Client; |
||||
import org.apache.qpid.protonj2.client.ClientOptions; |
||||
import org.apache.qpid.protonj2.client.ConnectionOptions; |
||||
import tools.jackson.databind.json.JsonMapper; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
import org.springframework.amqp.client.AmqpConnectionFactory; |
||||
import org.springframework.amqp.client.SingleAmqpConnectionFactory; |
||||
import org.springframework.amqp.support.converter.JacksonJsonMessageConverter; |
||||
import org.springframework.amqp.support.converter.MessageConverter; |
||||
import org.springframework.beans.factory.ObjectProvider; |
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails.Address; |
||||
import org.springframework.boot.autoconfigure.AutoConfiguration; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; |
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties; |
||||
import org.springframework.boot.context.properties.PropertyMapper; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
/** |
||||
* Auto-configuration for generic AMQP 1.0 support. |
||||
* |
||||
* @author Stephane Nicoll |
||||
* @since 4.1.0 |
||||
*/ |
||||
@AutoConfiguration(afterName = "org.springframework.boot.jackson.autoconfigure.JacksonAutoConfiguration") |
||||
@ConditionalOnClass({ Client.class, AmqpConnectionFactory.class }) |
||||
@EnableConfigurationProperties(AmqpProperties.class) |
||||
public final class AmqpAutoConfiguration { |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
static class AmqpConnectionFactoryConfiguration { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean |
||||
Client protonClient() { |
||||
return Client.create(new ClientOptions()); |
||||
} |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean |
||||
AmqpConnectionDetails amqpConnectionDetails(AmqpProperties properties) { |
||||
return new PropertiesAmqpConnectionDetails(properties); |
||||
} |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean |
||||
AmqpConnectionFactory amqpConnectionFactory(AmqpConnectionDetails connectionDetails, Client protonClient, |
||||
ObjectProvider<ConnectionOptionsCustomizer> connectionOptionsCustomizers) { |
||||
ConnectionOptions connectionOptions = createConnectionOptions(connectionDetails, |
||||
connectionOptionsCustomizers.orderedStream()); |
||||
return createAmqpConnectionFactory(connectionDetails, protonClient).setConnectionOptions(connectionOptions); |
||||
} |
||||
|
||||
private SingleAmqpConnectionFactory createAmqpConnectionFactory(AmqpConnectionDetails connectionDetails, |
||||
Client protonClient) { |
||||
SingleAmqpConnectionFactory connectionFactory = new SingleAmqpConnectionFactory(protonClient); |
||||
PropertyMapper map = PropertyMapper.get(); |
||||
Address address = connectionDetails.getAddress(); |
||||
map.from(address::host).to(connectionFactory::setHost); |
||||
map.from(address::port).to(connectionFactory::setPort); |
||||
return connectionFactory; |
||||
} |
||||
|
||||
private ConnectionOptions createConnectionOptions(AmqpConnectionDetails connectionDetails, |
||||
Stream<ConnectionOptionsCustomizer> customizers) { |
||||
ConnectionOptions connectionOptions = new ConnectionOptions(); |
||||
PropertyMapper map = PropertyMapper.get(); |
||||
map.from(connectionDetails::getUsername).to(connectionOptions::user); |
||||
map.from(connectionDetails::getPassword).to(connectionOptions::password); |
||||
customizers.forEach((customizer) -> customizer.customize(connectionOptions)); |
||||
return connectionOptions; |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ConditionalOnBean(JsonMapper.class) |
||||
@ConditionalOnSingleCandidate(JsonMapper.class) |
||||
static class JacksonMessageConverterConfiguration { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean(MessageConverter.class) |
||||
JacksonJsonMessageConverter jacksonJsonMessageConverter(JsonMapper jsonMapper) { |
||||
return new JacksonJsonMessageConverter(jsonMapper); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
static class AmqpClientConfiguration { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean |
||||
AmqpClient amqpClient(AmqpConnectionFactory amqpConnectionFactory, AmqpProperties properties, |
||||
ObjectProvider<MessageConverter> messageConverter, ObjectProvider<AmqpClientCustomizer> customizers) { |
||||
AmqpClient.Builder builder = AmqpClient.builder(amqpConnectionFactory); |
||||
AmqpProperties.Client client = properties.getClient(); |
||||
PropertyMapper map = PropertyMapper.get(); |
||||
map.from(client.getDefaultToAddress()).to(builder::defaultToAddress); |
||||
map.from(client.getCompletionTimeout()).to(builder::completionTimeout); |
||||
messageConverter.ifAvailable(builder::messageConverter); |
||||
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder)); |
||||
return builder.build(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,37 @@
@@ -0,0 +1,37 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
|
||||
/** |
||||
* Callback interface that can be used to customize the auto-configured |
||||
* {@link AmqpClient.Builder}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
* @since 4.1.0 |
||||
*/ |
||||
@FunctionalInterface |
||||
public interface AmqpClientCustomizer { |
||||
|
||||
/** |
||||
* Callback to customize a {@link AmqpClient.Builder} instance. |
||||
* @param amqpClientBuilder the client builder to customize |
||||
*/ |
||||
void customize(AmqpClient.Builder amqpClientBuilder); |
||||
|
||||
} |
||||
@ -0,0 +1,62 @@
@@ -0,0 +1,62 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails; |
||||
|
||||
/** |
||||
* Details required to establish a connection to an AMQP broker. |
||||
* |
||||
* @author Stephane Nicoll |
||||
* @since 4.1.0 |
||||
*/ |
||||
public interface AmqpConnectionDetails extends ConnectionDetails { |
||||
|
||||
/** |
||||
* Return the {@link Address} of the broker. |
||||
* @return the address |
||||
*/ |
||||
Address getAddress(); |
||||
|
||||
/** |
||||
* Login user to authenticate to the broker. |
||||
* @return the login user to authenticate to the broker or {@code null} |
||||
*/ |
||||
default @Nullable String getUsername() { |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* Password used to authenticate to the broker. |
||||
* @return the password to authenticate to the broker or {@code null} |
||||
*/ |
||||
default @Nullable String getPassword() { |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* An AMQP broker address. |
||||
* |
||||
* @param host the host |
||||
* @param port the port |
||||
*/ |
||||
record Address(String host, int port) { |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,125 @@
@@ -0,0 +1,125 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import java.time.Duration; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
|
||||
/** |
||||
* Configuration properties for AMQP 1.0 connectivity. |
||||
* |
||||
* @author Stephane Nicoll |
||||
* @since 4.1.0 |
||||
*/ |
||||
@ConfigurationProperties("spring.amqp") |
||||
public class AmqpProperties { |
||||
|
||||
/** |
||||
* AMQP broker host. |
||||
*/ |
||||
private String host = "localhost"; |
||||
|
||||
/** |
||||
* AMQP broker port. |
||||
*/ |
||||
private int port = 5672; |
||||
|
||||
/** |
||||
* Login user to authenticate to the broker. |
||||
*/ |
||||
private @Nullable String username; |
||||
|
||||
/** |
||||
* Password used to authenticate to the broker. |
||||
*/ |
||||
private @Nullable String password; |
||||
|
||||
private final Client client = new Client(); |
||||
|
||||
public String getHost() { |
||||
return this.host; |
||||
} |
||||
|
||||
public void setHost(String host) { |
||||
this.host = host; |
||||
} |
||||
|
||||
public int getPort() { |
||||
return this.port; |
||||
} |
||||
|
||||
public void setPort(int port) { |
||||
this.port = port; |
||||
} |
||||
|
||||
public @Nullable String getUsername() { |
||||
return this.username; |
||||
} |
||||
|
||||
public void setUsername(@Nullable String username) { |
||||
this.username = username; |
||||
} |
||||
|
||||
public @Nullable String getPassword() { |
||||
return this.password; |
||||
} |
||||
|
||||
public void setPassword(@Nullable String password) { |
||||
this.password = password; |
||||
} |
||||
|
||||
public Client getClient() { |
||||
return this.client; |
||||
} |
||||
|
||||
/** |
||||
* Client-level settings. |
||||
*/ |
||||
public static class Client { |
||||
|
||||
/** |
||||
* Default destination address for send operations when none is specified. |
||||
*/ |
||||
private @Nullable String defaultToAddress; |
||||
|
||||
/** |
||||
* Maximum time to wait for request operations to complete. |
||||
*/ |
||||
private Duration completionTimeout = Duration.ofSeconds(60); |
||||
|
||||
public @Nullable String getDefaultToAddress() { |
||||
return this.defaultToAddress; |
||||
} |
||||
|
||||
public void setDefaultToAddress(@Nullable String defaultToAddress) { |
||||
this.defaultToAddress = defaultToAddress; |
||||
} |
||||
|
||||
public Duration getCompletionTimeout() { |
||||
return this.completionTimeout; |
||||
} |
||||
|
||||
public void setCompletionTimeout(Duration completionTimeout) { |
||||
this.completionTimeout = completionTimeout; |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,39 @@
@@ -0,0 +1,39 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import org.apache.qpid.protonj2.client.ConnectionOptions; |
||||
|
||||
import org.springframework.amqp.client.AmqpConnectionFactory; |
||||
|
||||
/** |
||||
* Callback interface for customizing {@link ConnectionOptions} on the auto-configured |
||||
* {@link AmqpConnectionFactory}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
* @since 4.1.0 |
||||
*/ |
||||
@FunctionalInterface |
||||
public interface ConnectionOptionsCustomizer { |
||||
|
||||
/** |
||||
* Customize the {@link ConnectionOptions}. |
||||
* @param connectionOptions the connection options to customize |
||||
*/ |
||||
void customize(ConnectionOptions connectionOptions); |
||||
|
||||
} |
||||
@ -0,0 +1,49 @@
@@ -0,0 +1,49 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
/** |
||||
* Adapts {@link AmqpProperties} to {@link AmqpConnectionDetails}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class PropertiesAmqpConnectionDetails implements AmqpConnectionDetails { |
||||
|
||||
private final AmqpProperties properties; |
||||
|
||||
PropertiesAmqpConnectionDetails(AmqpProperties properties) { |
||||
this.properties = properties; |
||||
} |
||||
|
||||
@Override |
||||
public Address getAddress() { |
||||
return new Address(this.properties.getHost(), this.properties.getPort()); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable String getUsername() { |
||||
return this.properties.getUsername(); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable String getPassword() { |
||||
return this.properties.getPassword(); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,23 @@
@@ -0,0 +1,23 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
/** |
||||
* Auto-configuration for generic AMQP 1.0 support. |
||||
*/ |
||||
@NullMarked |
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import org.jspecify.annotations.NullMarked; |
||||
@ -0,0 +1,84 @@
@@ -0,0 +1,84 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.docker.compose; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; |
||||
import org.springframework.boot.docker.compose.core.RunningService; |
||||
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory; |
||||
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource; |
||||
|
||||
/** |
||||
* {@link DockerComposeConnectionDetailsFactory} to create {@link AmqpConnectionDetails} |
||||
* for a {@code rabbitmq} service. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class RabbitMqDockerComposeConnectionDetailsFactory |
||||
extends DockerComposeConnectionDetailsFactory<AmqpConnectionDetails> { |
||||
|
||||
private static final int RABBITMQ_PORT = 5672; |
||||
|
||||
protected RabbitMqDockerComposeConnectionDetailsFactory() { |
||||
super("rabbitmq"); |
||||
} |
||||
|
||||
@Override |
||||
protected @Nullable AmqpConnectionDetails getDockerComposeConnectionDetails(DockerComposeConnectionSource source) { |
||||
try { |
||||
return new RabbitMqDockerComposeConnectionDetails(source.getRunningService()); |
||||
} |
||||
catch (IllegalStateException ex) { |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* {@link AmqpConnectionDetails} backed by a {@code rabbitmq} {@link RunningService}. |
||||
*/ |
||||
static class RabbitMqDockerComposeConnectionDetails extends DockerComposeConnectionDetails |
||||
implements AmqpConnectionDetails { |
||||
|
||||
private final RabbitMqEnvironment environment; |
||||
|
||||
private final Address address; |
||||
|
||||
protected RabbitMqDockerComposeConnectionDetails(RunningService service) { |
||||
super(service); |
||||
this.environment = new RabbitMqEnvironment(service.env()); |
||||
this.address = new Address(service.host(), service.ports().get(RABBITMQ_PORT)); |
||||
} |
||||
|
||||
@Override |
||||
public Address getAddress() { |
||||
return this.address; |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable String getUsername() { |
||||
return this.environment.getUsername(); |
||||
} |
||||
|
||||
@Override |
||||
public @Nullable String getPassword() { |
||||
return this.environment.getPassword(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,50 @@
@@ -0,0 +1,50 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.docker.compose; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import org.jspecify.annotations.Nullable; |
||||
|
||||
/** |
||||
* RabbitMQ environment details. |
||||
* |
||||
* @author Moritz Halbritter |
||||
* @author Andy Wilkinson |
||||
* @author Phillip Webb |
||||
* @author Scott Frederick |
||||
*/ |
||||
class RabbitMqEnvironment { |
||||
|
||||
private final @Nullable String username; |
||||
|
||||
private final @Nullable String password; |
||||
|
||||
RabbitMqEnvironment(Map<String, @Nullable String> env) { |
||||
this.username = env.getOrDefault("RABBITMQ_DEFAULT_USER", env.getOrDefault("RABBITMQ_USERNAME", "guest")); |
||||
this.password = env.getOrDefault("RABBITMQ_DEFAULT_PASS", env.getOrDefault("RABBITMQ_PASSWORD", "guest")); |
||||
} |
||||
|
||||
@Nullable String getUsername() { |
||||
return this.username; |
||||
} |
||||
|
||||
@Nullable String getPassword() { |
||||
return this.password; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,23 @@
@@ -0,0 +1,23 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
/** |
||||
* Support for Docker Compose generic AMQP service connections. |
||||
*/ |
||||
@NullMarked |
||||
package org.springframework.boot.amqp.docker.compose; |
||||
|
||||
import org.jspecify.annotations.NullMarked; |
||||
@ -0,0 +1,71 @@
@@ -0,0 +1,71 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.testcontainers; |
||||
|
||||
import java.net.URI; |
||||
|
||||
import org.testcontainers.rabbitmq.RabbitMQContainer; |
||||
|
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails; |
||||
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory; |
||||
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource; |
||||
import org.springframework.boot.testcontainers.service.connection.ServiceConnection; |
||||
|
||||
/** |
||||
* {@link ContainerConnectionDetailsFactory} to create {@link AmqpConnectionDetails} from |
||||
* a {@link ServiceConnection @ServiceConnection}-annotated {@link RabbitMQContainer}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class RabbitMqContainerConnectionDetailsFactory |
||||
extends ContainerConnectionDetailsFactory<RabbitMQContainer, AmqpConnectionDetails> { |
||||
|
||||
@Override |
||||
protected AmqpConnectionDetails getContainerConnectionDetails(ContainerConnectionSource<RabbitMQContainer> source) { |
||||
return new RabbitMqContainerConnectionDetails(source); |
||||
} |
||||
|
||||
/** |
||||
* {@link AmqpConnectionDetails} for RabbitMQ backed by a |
||||
* {@link ContainerConnectionSource}. |
||||
*/ |
||||
static final class RabbitMqContainerConnectionDetails extends ContainerConnectionDetails<RabbitMQContainer> |
||||
implements AmqpConnectionDetails { |
||||
|
||||
private RabbitMqContainerConnectionDetails(ContainerConnectionSource<RabbitMQContainer> source) { |
||||
super(source); |
||||
} |
||||
|
||||
@Override |
||||
public Address getAddress() { |
||||
URI uri = URI.create(getContainer().getAmqpUrl()); |
||||
return new Address(uri.getHost(), uri.getPort()); |
||||
} |
||||
|
||||
@Override |
||||
public String getUsername() { |
||||
return getContainer().getAdminUsername(); |
||||
} |
||||
|
||||
@Override |
||||
public String getPassword() { |
||||
return getContainer().getAdminPassword(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,23 @@
@@ -0,0 +1,23 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
/** |
||||
* Support for testcontainers generic AMQP service connections. |
||||
*/ |
||||
@NullMarked |
||||
package org.springframework.boot.amqp.testcontainers; |
||||
|
||||
import org.jspecify.annotations.NullMarked; |
||||
@ -0,0 +1,5 @@
@@ -0,0 +1,5 @@
|
||||
# Connection Details Factories |
||||
org.springframework.boot.autoconfigure.service.connection.ConnectionDetailsFactory=\ |
||||
org.springframework.boot.amqp.docker.compose.RabbitMqDockerComposeConnectionDetailsFactory,\ |
||||
org.springframework.boot.amqp.testcontainers.RabbitMqContainerConnectionDetailsFactory |
||||
|
||||
@ -0,0 +1 @@
@@ -0,0 +1 @@
|
||||
org.springframework.boot.amqp.autoconfigure.AmqpAutoConfiguration |
||||
@ -0,0 +1,276 @@
@@ -0,0 +1,276 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.autoconfigure; |
||||
|
||||
import java.time.Duration; |
||||
|
||||
import org.apache.qpid.protonj2.client.Client; |
||||
import org.apache.qpid.protonj2.client.ConnectionOptions; |
||||
import org.assertj.core.api.InstanceOfAssertFactories; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.mockito.InOrder; |
||||
import tools.jackson.databind.json.JsonMapper; |
||||
|
||||
import org.springframework.amqp.client.AmqpClient; |
||||
import org.springframework.amqp.client.AmqpConnectionFactory; |
||||
import org.springframework.amqp.support.converter.JacksonJsonMessageConverter; |
||||
import org.springframework.amqp.support.converter.MessageConverter; |
||||
import org.springframework.boot.amqp.autoconfigure.AmqpConnectionDetails.Address; |
||||
import org.springframework.boot.autoconfigure.AutoConfigurations; |
||||
import org.springframework.boot.jackson.autoconfigure.JacksonAutoConfiguration; |
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.core.annotation.Order; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.BDDMockito.given; |
||||
import static org.mockito.Mockito.inOrder; |
||||
import static org.mockito.Mockito.mock; |
||||
|
||||
/** |
||||
* Tests for {@link AmqpAutoConfiguration}. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class AmqpAutoConfigurationTests { |
||||
|
||||
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() |
||||
.withConfiguration(AutoConfigurations.of(AmqpAutoConfiguration.class)); |
||||
|
||||
@Test |
||||
void autoConfigurationConfiguresConnectionFactoryWithDefaultSettings() { |
||||
this.contextRunner.run((context) -> { |
||||
AmqpConnectionFactory connectionFactory = context.getBean(AmqpConnectionFactory.class); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("host", "localhost"); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("port", 5672); |
||||
assertThat(connectionFactory) |
||||
.extracting("connectionOptions", InstanceOfAssertFactories.type(ConnectionOptions.class)) |
||||
.satisfies((connectOptions) -> { |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("user", null); |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("password", null); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationConfiguresConnectionFactoryWithOverrides() { |
||||
this.contextRunner |
||||
.withPropertyValues("spring.amqp.host=amqp.example.com", "spring.amqp.port=1234", |
||||
"spring.amqp.username=user", "spring.amqp.password=secret") |
||||
.run((context) -> { |
||||
AmqpConnectionFactory connectionFactory = context.getBean(AmqpConnectionFactory.class); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("host", "amqp.example.com"); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("port", 1234); |
||||
assertThat(connectionFactory) |
||||
.extracting("connectionOptions", InstanceOfAssertFactories.type(ConnectionOptions.class)) |
||||
.satisfies((connectOptions) -> { |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("user", "user"); |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("password", "secret"); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationUsesAmqpConnectionDetailsForConnectionFactory() { |
||||
AmqpConnectionDetails connectionDetails = mock(); |
||||
given(connectionDetails.getAddress()).willReturn(new Address("details.example.com", 9999)); |
||||
given(connectionDetails.getUsername()).willReturn("from-details"); |
||||
given(connectionDetails.getPassword()).willReturn("details-secret"); |
||||
this.contextRunner.withBean(AmqpConnectionDetails.class, () -> connectionDetails) |
||||
.withPropertyValues("spring.amqp.host=ignored.example.com", "spring.amqp.port=1111", |
||||
"spring.amqp.username=ignored", "spring.amqp.password=ignored") |
||||
.run((context) -> { |
||||
AmqpConnectionFactory connectionFactory = context.getBean(AmqpConnectionFactory.class); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("host", "details.example.com"); |
||||
assertThat(connectionFactory).hasFieldOrPropertyWithValue("port", 9999); |
||||
assertThat(connectionFactory) |
||||
.extracting("connectionOptions", InstanceOfAssertFactories.type(ConnectionOptions.class)) |
||||
.satisfies((connectOptions) -> { |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("user", "from-details"); |
||||
assertThat(connectOptions).hasFieldOrPropertyWithValue("password", "details-secret"); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationAppliesConnectionOptionsCustomizer() { |
||||
this.contextRunner |
||||
.withBean(ConnectionOptionsCustomizer.class, () -> (connectOptions) -> connectOptions.user("custom-user")) |
||||
.withPropertyValues("spring.amqp.username=user") |
||||
.run((context) -> { |
||||
AmqpConnectionFactory connectionFactory = context.getBean(AmqpConnectionFactory.class); |
||||
assertThat(connectionFactory) |
||||
.extracting("connectionOptions", InstanceOfAssertFactories.type(ConnectionOptions.class)) |
||||
.satisfies((connectOptions) -> assertThat(connectOptions).hasFieldOrPropertyWithValue("user", |
||||
"custom-user")); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationAppliesConnectionOptionsCustomizersInOrder() { |
||||
this.contextRunner.withUserConfiguration(ConnectionOptionsCustomizersConfiguration.class).run((context) -> { |
||||
ConnectionOptionsCustomizer first = context.getBean("first", ConnectionOptionsCustomizer.class); |
||||
ConnectionOptionsCustomizer second = context.getBean("second", ConnectionOptionsCustomizer.class); |
||||
InOrder ordered = inOrder(first, second); |
||||
ordered.verify(first).customize(any()); |
||||
ordered.verify(second).customize(any()); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationBacksOffWhenUserProvidesProtonClient() { |
||||
Client protonClient = mock(); |
||||
this.contextRunner.withBean(Client.class, () -> protonClient).run((context) -> { |
||||
assertThat(context).hasSingleBean(Client.class); |
||||
assertThat(context.getBean(Client.class)).isSameAs(protonClient); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationBacksOffWhenUserProvidesAmqpConnectionDetails() { |
||||
AmqpConnectionDetails connectionDetails = mock(); |
||||
given(connectionDetails.getAddress()).willReturn(new Address("details.example.com", 9999)); |
||||
this.contextRunner.withBean(AmqpConnectionDetails.class, () -> connectionDetails).run((context) -> { |
||||
assertThat(context).hasSingleBean(AmqpConnectionDetails.class); |
||||
assertThat(context.getBean(AmqpConnectionDetails.class)).isSameAs(connectionDetails); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationBacksOffWhenUserProvidesAmqpConnectionFactory() { |
||||
AmqpConnectionFactory amqpConnectionFactory = mock(); |
||||
this.contextRunner.withBean(AmqpConnectionFactory.class, () -> amqpConnectionFactory).run((context) -> { |
||||
assertThat(context).hasSingleBean(AmqpConnectionFactory.class); |
||||
assertThat(context.getBean(AmqpConnectionFactory.class)).isSameAs(amqpConnectionFactory); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationConfiguresAmqpClientWithOverrides() { |
||||
this.contextRunner |
||||
.withPropertyValues("spring.amqp.client.default-to-address=/queues/default_queue", |
||||
"spring.amqp.client.completion-timeout=20s") |
||||
.run((context) -> { |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient).hasFieldOrPropertyWithValue("defaultToAddress", "/queues/default_queue"); |
||||
assertThat(amqpClient).hasFieldOrPropertyWithValue("completionTimeout", Duration.ofSeconds(20)); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationConfiguresAmqpClientWithJacksonMessageConverter() { |
||||
this.contextRunner.withConfiguration(AutoConfigurations.of(JacksonAutoConfiguration.class)).run((context) -> { |
||||
assertThat(context).hasSingleBean(JsonMapper.class); |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient) |
||||
.extracting("messageConverter", InstanceOfAssertFactories.type(MessageConverter.class)) |
||||
.satisfies((messageConverter) -> assertThat(messageConverter) |
||||
.isInstanceOf(JacksonJsonMessageConverter.class) |
||||
.hasFieldOrPropertyWithValue("objectMapper", context.getBean(JsonMapper.class))); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationConfiguresAmqpClientWithMessageConverter() { |
||||
MessageConverter messageConverter = mock(MessageConverter.class); |
||||
this.contextRunner.withBean(MessageConverter.class, () -> messageConverter).run((context) -> { |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient).hasFieldOrPropertyWithValue("messageConverter", messageConverter); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationBacksOffWhenUserProvidesAmqpClient() { |
||||
AmqpClient amqpClient = mock(); |
||||
this.contextRunner.withBean(AmqpClient.class, () -> amqpClient).run((context) -> { |
||||
assertThat(context).hasSingleBean(AmqpClient.class); |
||||
assertThat(context.getBean(AmqpClient.class)).isSameAs(amqpClient); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationUsesUserMessageConverterWhenDefinedAlongsideJackson() { |
||||
MessageConverter messageConverter = mock(MessageConverter.class); |
||||
this.contextRunner.withConfiguration(AutoConfigurations.of(JacksonAutoConfiguration.class)) |
||||
.withBean(MessageConverter.class, () -> messageConverter) |
||||
.run((context) -> { |
||||
assertThat(context).doesNotHaveBean(JacksonJsonMessageConverter.class); |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient).hasFieldOrPropertyWithValue("messageConverter", messageConverter); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationAppliesAmqpClientCustomizer() { |
||||
this.contextRunner |
||||
.withBean(AmqpClientCustomizer.class, |
||||
() -> (client) -> client.defaultToAddress("/queues/custom_default_queue")) |
||||
.withPropertyValues("spring.amqp.client.default-to-address=/queues/default_queue") |
||||
.run((context) -> { |
||||
AmqpClient amqpClient = context.getBean(AmqpClient.class); |
||||
assertThat(amqpClient).hasFieldOrPropertyWithValue("defaultToAddress", "/queues/custom_default_queue"); |
||||
}); |
||||
} |
||||
|
||||
@Test |
||||
void autoConfigurationAppliesAmqpClientCustomizersInOrder() { |
||||
this.contextRunner.withUserConfiguration(AmqpClientCustomizersConfiguration.class).run((context) -> { |
||||
AmqpClientCustomizer first = context.getBean("first", AmqpClientCustomizer.class); |
||||
AmqpClientCustomizer second = context.getBean("second", AmqpClientCustomizer.class); |
||||
InOrder ordered = inOrder(first, second); |
||||
ordered.verify(first).customize(any()); |
||||
ordered.verify(second).customize(any()); |
||||
}); |
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
static class ConnectionOptionsCustomizersConfiguration { |
||||
|
||||
@Bean |
||||
@Order(2) |
||||
ConnectionOptionsCustomizer second() { |
||||
return mock(); |
||||
} |
||||
|
||||
@Bean |
||||
@Order(1) |
||||
ConnectionOptionsCustomizer first() { |
||||
return mock(); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
static class AmqpClientCustomizersConfiguration { |
||||
|
||||
@Bean |
||||
@Order(2) |
||||
AmqpClientCustomizer second() { |
||||
return mock(); |
||||
} |
||||
|
||||
@Bean |
||||
@Order(1) |
||||
AmqpClientCustomizer first() { |
||||
return mock(); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,72 @@
@@ -0,0 +1,72 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.amqp.docker.compose; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* Tests for {@link RabbitMqEnvironment}. |
||||
* |
||||
* @author Moritz Halbritter |
||||
* @author Andy Wilkinson |
||||
* @author Phillip Webb |
||||
* @author Scott Frederick |
||||
*/ |
||||
class RabbitMqEnvironmentTests { |
||||
|
||||
@Test |
||||
void getUsernameWhenNoRabbitMqDefaultUser() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Collections.emptyMap()); |
||||
assertThat(environment.getUsername()).isEqualTo("guest"); |
||||
} |
||||
|
||||
@Test |
||||
void getUsernameWhenHasRabbitMqDefaultUser() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Map.of("RABBITMQ_DEFAULT_USER", "me")); |
||||
assertThat(environment.getUsername()).isEqualTo("me"); |
||||
} |
||||
|
||||
@Test |
||||
void getUsernameWhenHasRabbitMqUsername() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Map.of("RABBITMQ_USERNAME", "me")); |
||||
assertThat(environment.getUsername()).isEqualTo("me"); |
||||
} |
||||
|
||||
@Test |
||||
void getPasswordWhenNoRabbitMqDefaultPass() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Collections.emptyMap()); |
||||
assertThat(environment.getPassword()).isEqualTo("guest"); |
||||
} |
||||
|
||||
@Test |
||||
void getPasswordWhenHasRabbitMqDefaultPass() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Map.of("RABBITMQ_DEFAULT_PASS", "secret")); |
||||
assertThat(environment.getPassword()).isEqualTo("secret"); |
||||
} |
||||
|
||||
@Test |
||||
void getPasswordWhenHasRabbitMqPassword() { |
||||
RabbitMqEnvironment environment = new RabbitMqEnvironment(Map.of("RABBITMQ_PASSWORD", "secret")); |
||||
assertThat(environment.getPassword()).isEqualTo("secret"); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,26 @@
@@ -0,0 +1,26 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the License); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
plugins { |
||||
id "org.springframework.boot.starter" |
||||
} |
||||
|
||||
description = "Starter for testing generic AMQP 1.0 support" |
||||
|
||||
dependencies { |
||||
api(project(":starter:spring-boot-starter-amqp")) |
||||
api(project(":starter:spring-boot-starter-test")) |
||||
} |
||||
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the License); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
plugins { |
||||
id "org.springframework.boot.starter" |
||||
} |
||||
|
||||
description = "Starter for generic AMQP 1.0 support" |
||||
|
||||
dependencies { |
||||
api(project(":starter:spring-boot-starter")) |
||||
|
||||
api(project(":module:spring-boot-amqp")) |
||||
} |
||||
@ -0,0 +1,57 @@
@@ -0,0 +1,57 @@
|
||||
/* |
||||
* Copyright 2012-present the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.testsupport.container; |
||||
|
||||
import java.util.UUID; |
||||
|
||||
import org.testcontainers.rabbitmq.RabbitMQContainer; |
||||
|
||||
/** |
||||
* A {@link RabbitMQContainer} with management plugin enabled. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
public class RabbitMqManagementContainer extends RabbitMQContainer { |
||||
|
||||
public RabbitMqManagementContainer() { |
||||
super(TestImage.RABBITMQ.toString()); |
||||
} |
||||
|
||||
/** |
||||
* Create a queue with the given name. |
||||
* @param name the name of the queue |
||||
* @return the AMQP target of the queue |
||||
*/ |
||||
public String createQueue(String name) { |
||||
try { |
||||
execInContainer("rabbitmqadmin", "queues", "declare", "--name", name); |
||||
return "/queues/" + name; |
||||
} |
||||
catch (Exception ex) { |
||||
throw new RuntimeException("Could not create queue", ex); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Create a queue with a random, unique, name. |
||||
* @return the AMQP target of the queue |
||||
*/ |
||||
public String createRandomQueue() { |
||||
return createQueue(UUID.randomUUID().toString()); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue