From 9c969f91beb0d88ff9f90a9faaf87f4050248111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Nicoll?= Date: Mon, 20 Oct 2025 09:53:33 +0200 Subject: [PATCH] Polish "Remove Spring Pulsar Reactive support" See gh-47707 --- .../reference/pages/messaging/pulsar.adoc | 7 +- ...lsarAutoConfigurationIntegrationTests.java | 2 +- .../PulsarAutoConfiguration.java | 3 +- .../PulsarAutoConfigurationTests.java | 368 ++++++++++++++- .../PulsarConfigurationTests.java | 421 ------------------ .../pulsar/SamplePulsarApplicationTests.java | 42 +- ...ava => SamplePulsarApplicationConfig.java} | 10 +- 7 files changed, 388 insertions(+), 465 deletions(-) delete mode 100644 module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java rename smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/{ImperativeAppConfig.java => SamplePulsarApplicationConfig.java} (83%) diff --git a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc index ad5c918fe11..e518cb2e743 100644 --- a/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc +++ b/documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc @@ -8,6 +8,7 @@ Spring Boot will auto-configure and register the Spring for Apache Pulsar compon There is the `spring-boot-starter-pulsar` starter for conveniently collecting the dependencies for use. + [[messaging.pulsar.connecting]] == Connecting to Pulsar @@ -67,6 +68,7 @@ You can follow {url-spring-pulsar-docs}/reference/pulsar/pulsar-client.html#tls- For complete details on the client and authentication see the Spring for Apache Pulsar {url-spring-pulsar-docs}/reference/pulsar/pulsar-client.html[reference documentation]. + [[messaging.pulsar.admin]] == Connecting to Pulsar Administration @@ -107,6 +109,8 @@ You can also pass in a javadoc:org.springframework.pulsar.core.ProducerBuilderCu If you need more control over the message being sent, you can pass in a javadoc:org.springframework.pulsar.core.TypedMessageBuilderCustomizer[] when sending a message. + + [[messaging.pulsar.receiving]] == Receiving a Message @@ -124,6 +128,8 @@ You can also customize a single listener by setting the `consumerCustomizer` att If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. + + [[messaging.pulsar.reading]] == Reading a Message @@ -144,7 +150,6 @@ You can also customize a single listener by setting the `readerCustomizer` attri If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. - TIP: For more details on any of the above components and to discover other available features, see the Spring for Apache Pulsar {url-spring-pulsar-docs}[reference documentation]. diff --git a/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java b/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java index 04fddcb59e8..482c28d96a5 100644 --- a/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java +++ b/module/spring-boot-pulsar/src/dockerTest/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationIntegrationTests.java @@ -75,7 +75,7 @@ class PulsarAutoConfigurationIntegrationTests { } @Configuration(proxyBeanMethods = false) - @ImportAutoConfiguration({ PulsarAutoConfiguration.class }) + @ImportAutoConfiguration(PulsarAutoConfiguration.class) @Import(TestService.class) static class TestConfiguration { diff --git a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java index 4e68ea71d63..176dafe4432 100644 --- a/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/module/spring-boot-pulsar/src/main/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -38,13 +38,13 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBooleanProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.SchemaInfo; import org.springframework.boot.pulsar.autoconfigure.PulsarProperties.Defaults.TypeMapping; import org.springframework.boot.thread.Threading; import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Scope; import org.springframework.core.env.Environment; import org.springframework.core.task.VirtualThreadTaskExecutor; @@ -96,6 +96,7 @@ import org.springframework.util.Assert; */ @AutoConfiguration @ConditionalOnClass({ PulsarClient.class, PulsarTemplate.class }) +@EnableConfigurationProperties(PulsarProperties.class) public final class PulsarAutoConfiguration { private final PulsarProperties properties; diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index 50a5b8d1e17..19b6d75903f 100644 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -16,24 +16,36 @@ package org.springframework.boot.pulsar.autoconfigure; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.client.impl.AutoClusterFailover; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.assertj.core.api.ThrowingConsumer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentMatchers; +import org.mockito.InOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -62,7 +74,10 @@ import org.springframework.pulsar.core.DefaultPulsarReaderFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.ProducerBuilderCustomizer; +import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; +import org.springframework.pulsar.core.PulsarClientFactory; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; @@ -70,12 +85,16 @@ import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; +import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.function.PulsarFunctionAdministration; import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; /** @@ -123,8 +142,7 @@ class PulsarAutoConfigurationTests { @Test void autoConfiguresBeans() { - this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConfiguration.class) - .hasSingleBean(PulsarConnectionDetails.class) + this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarConnectionDetails.class) .hasSingleBean(DefaultPulsarClientFactory.class) .hasSingleBean(PulsarClient.class) .hasSingleBean(PulsarTopicBuilder.class) @@ -149,6 +167,346 @@ class PulsarAutoConfigurationTests { .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); } + @Test + void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { + PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class); + this.contextRunner + .withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails) + .run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class) + .isSameAs(customConnectionDetails)); + } + + @Test + void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { + PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); + this.contextRunner + .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) + .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) + .isSameAs(customizers)); + } + + @Nested + class ClientTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedClientFactoryBeanDoesNotAutoConfigureBean() { + PulsarClientFactory customFactory = mock(PulsarClientFactory.class); + given(customFactory.createClient()).willReturn(mock(PulsarClient.class)); + new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarAutoConfiguration.class)) + .withBean("customPulsarClientFactory", PulsarClientFactory.class, () -> customFactory) + .run((context) -> assertThat(context).getBean(PulsarClientFactory.class).isSameAs(customFactory)); + } + + @Test + void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() { + PulsarClient customClient = mock(PulsarClient.class); + new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarAutoConfiguration.class)) + .withBean("customPulsarClient", PulsarClient.class, () -> customClient) + .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); + } + + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); + this.contextRunner.withUserConfiguration(ClientTests.PulsarClientBuilderCustomizersConfig.class) + .withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.client.service-url=properties") + .run((context) -> { + DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); + Customizers customizers = Customizers + .of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize); + assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder( + ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2"); + }); + } + + @Test + void whenHasUserDefinedFailoverPropertiesAddsToClient() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); + this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.client.service-url=properties", + "spring.pulsar.client.failover.backup-clusters[0].service-url=backup-cluster-1", + "spring.pulsar.client.failover.delay=15s", + "spring.pulsar.client.failover.switch-back-delay=30s", + "spring.pulsar.client.failover.check-interval=5s", + "spring.pulsar.client.failover.backup-clusters[1].service-url=backup-cluster-2", + "spring.pulsar.client.failover.backup-clusters[1].authentication.plugin-class-name=" + + MockAuthentication.class.getName(), + "spring.pulsar.client.failover.backup-clusters[1].authentication.param.token=1234") + .run((context) -> { + DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); + PulsarProperties pulsarProperties = context.getBean(PulsarProperties.class); + ClientBuilder target = mock(ClientBuilder.class); + BiConsumer customizeAction = PulsarClientBuilderCustomizer::customize; + PulsarClientBuilderCustomizer pulsarClientBuilderCustomizer = (PulsarClientBuilderCustomizer) ReflectionTestUtils + .getField(clientFactory, "customizer"); + customizeAction.accept(pulsarClientBuilderCustomizer, target); + InOrder ordered = inOrder(target); + ordered.verify(target).serviceUrlProvider(ArgumentMatchers.any(AutoClusterFailover.class)); + assertThat(pulsarProperties.getClient().getFailover().getDelay()).isEqualTo(Duration.ofSeconds(15)); + assertThat(pulsarProperties.getClient().getFailover().getSwitchBackDelay()) + .isEqualTo(Duration.ofSeconds(30)); + assertThat(pulsarProperties.getClient().getFailover().getCheckInterval()) + .isEqualTo(Duration.ofSeconds(5)); + assertThat(pulsarProperties.getClient().getFailover().getBackupClusters().size()).isEqualTo(2); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class PulsarClientBuilderCustomizersConfig { + + @Bean + @Order(200) + PulsarClientBuilderCustomizer customizerFoo() { + return (builder) -> builder.serviceUrl("fromCustomizer2"); + } + + @Bean + @Order(100) + PulsarClientBuilderCustomizer customizerBar() { + return (builder) -> builder.serviceUrl("fromCustomizer1"); + } + + } + + } + + @Nested + class AdministrationTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class); + this.contextRunner + .withBean("customPulsarAdministration", PulsarAdministration.class, () -> pulsarAdministration) + .run((context) -> assertThat(context).getBean(PulsarAdministration.class) + .isSameAs(pulsarAdministration)); + } + + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); + given(connectionDetails.getAdminUrl()).willReturn("connectiondetails"); + this.contextRunner.withUserConfiguration(AdministrationTests.PulsarAdminBuilderCustomizersConfig.class) + .withBean(PulsarConnectionDetails.class, () -> connectionDetails) + .withPropertyValues("spring.pulsar.admin.service-url=property") + .run((context) -> { + PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class); + Customizers customizers = Customizers + .of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize); + assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder( + PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1", + "fromCustomizer2"); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class PulsarAdminBuilderCustomizersConfig { + + @Bean + @Order(200) + PulsarAdminBuilderCustomizer customizerFoo() { + return (builder) -> builder.serviceHttpUrl("fromCustomizer2"); + } + + @Bean + @Order(100) + PulsarAdminBuilderCustomizer customizerBar() { + return (builder) -> builder.serviceHttpUrl("fromCustomizer1"); + } + + } + + } + + @Nested + class SchemaResolverTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + SchemaResolver schemaResolver = mock(SchemaResolver.class); + this.contextRunner.withBean("customSchemaResolver", SchemaResolver.class, () -> schemaResolver) + .run((context) -> assertThat(context).getBean(SchemaResolver.class).isSameAs(schemaResolver)); + } + + @Test + void whenHasUserDefinedSchemaResolverCustomizer() { + SchemaResolverCustomizer customizer = (schemaResolver) -> schemaResolver + .addCustomSchemaMapping(TestRecord.class, Schema.STRING); + this.contextRunner.withBean("schemaResolverCustomizer", SchemaResolverCustomizer.class, () -> customizer) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); + } + + @Test + void whenHasDefaultsTypeMappingForPrimitiveAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=STRING"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); + } + + @Test + void whenHasDefaultsTypeMappingForStructAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON"); + Schema expectedSchema = Schema.JSON(TestRecord.class); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); + } + + @Test + void whenHasDefaultsTypeMappingForKeyValueAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=key-value"); + properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=java.lang.String"); + Schema expectedSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(TestRecord.class), + KeyValueEncodingType.INLINE); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) + .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); + } + + private ThrowingConsumer customSchemaMappingOf(Class messageType, + Schema expectedSchema) { + return (resolver) -> assertThat(resolver.getCustomSchemaMapping(messageType)) + .hasValueSatisfying(schemaEqualTo(expectedSchema)); + } + + private Consumer> schemaEqualTo(Schema expected) { + return (actual) -> assertThat(actual.getSchemaInfo()).isEqualTo(expected.getSchemaInfo()); + } + + } + + @Nested + class TopicResolverTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + TopicResolver topicResolver = mock(TopicResolver.class); + this.contextRunner.withBean("customTopicResolver", TopicResolver.class, () -> topicResolver) + .run((context) -> assertThat(context).getBean(TopicResolver.class).isSameAs(topicResolver)); + } + + @Test + void whenHasDefaultsTypeMappingAddsToSchemaResolver() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); + properties.add("spring.pulsar.defaults.type-mappings[0].topic-name=foo-topic"); + properties.add("spring.pulsar.defaults.type-mappings[1].message-type=java.lang.String"); + properties.add("spring.pulsar.defaults.type-mappings[1].topic-name=string-topic"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(TopicResolver.class) + .asInstanceOf(InstanceOfAssertFactories.type(DefaultTopicResolver.class)) + .satisfies((resolver) -> { + assertThat(resolver.getCustomTopicMapping(TestRecord.class)).hasValue("foo-topic"); + assertThat(resolver.getCustomTopicMapping(String.class)).hasValue("string-topic"); + })); + } + + } + + @Nested + class TopicBuilderTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); + this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); + } + + @Test + void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + + @Test + void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); + properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) + .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .satisfies((topicBuilder) -> { + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); + })); + } + + @Test + void beanHasScopePrototype() { + this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) + .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); + } + + } + + @Nested + class FunctionAdministrationTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenNoPropertiesAddsFunctionAdministrationBean() { + this.contextRunner.run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .hasFieldOrPropertyWithValue("failFast", Boolean.TRUE) + .hasFieldOrPropertyWithValue("propagateFailures", Boolean.TRUE) + .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.FALSE) + .hasNoNullFieldsOrProperties() // ensures object providers set + .extracting("pulsarAdministration") + .isSameAs(context.getBean(PulsarAdministration.class))); + } + + @Test + void whenHasFunctionPropertiesAppliesPropertiesToBean() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.function.fail-fast=false"); + properties.add("spring.pulsar.function.propagate-failures=false"); + properties.add("spring.pulsar.function.propagate-stop-failures=true"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .hasFieldOrPropertyWithValue("failFast", Boolean.FALSE) + .hasFieldOrPropertyWithValue("propagateFailures", Boolean.FALSE) + .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.TRUE)); + } + + @Test + void whenHasFunctionDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.function.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarFunctionAdministration.class)); + } + + @Test + void whenHasCustomFunctionAdministrationBean() { + PulsarFunctionAdministration functionAdministration = mock(PulsarFunctionAdministration.class); + this.contextRunner.withBean(PulsarFunctionAdministration.class, () -> functionAdministration) + .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) + .isSameAs(functionAdministration)); + } + + } + @Nested class ProducerFactoryTests { @@ -770,4 +1128,10 @@ class PulsarAutoConfigurationTests { } + record TestRecord() { + + private static final String CLASS_NAME = TestRecord.class.getName(); + + } + } diff --git a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java b/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java deleted file mode 100644 index 1f37f474b14..00000000000 --- a/module/spring-boot-pulsar/src/test/java/org/springframework/boot/pulsar/autoconfigure/PulsarConfigurationTests.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Copyright 2012-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.pulsar.autoconfigure; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.AutoClusterFailover; -import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.assertj.core.api.ThrowingConsumer; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.InOrder; - -import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.core.annotation.Order; -import org.springframework.pulsar.core.DefaultPulsarClientFactory; -import org.springframework.pulsar.core.DefaultSchemaResolver; -import org.springframework.pulsar.core.DefaultTopicResolver; -import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; -import org.springframework.pulsar.core.PulsarAdministration; -import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; -import org.springframework.pulsar.core.PulsarClientFactory; -import org.springframework.pulsar.core.PulsarTopicBuilder; -import org.springframework.pulsar.core.SchemaResolver; -import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; -import org.springframework.pulsar.core.TopicResolver; -import org.springframework.pulsar.function.PulsarFunctionAdministration; -import org.springframework.test.util.ReflectionTestUtils; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; - -/** - * Tests for {@link PulsarConfiguration}. - * - * @author Chris Bono - * @author Alexander Preuß - * @author Soby Chacko - * @author Phillip Webb - * @author Swamy Mavuri - */ -class PulsarConfigurationTests { - - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean(PulsarClient.class, () -> mock(PulsarClient.class)); - - @Test - void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { - PulsarConnectionDetails customConnectionDetails = mock(PulsarConnectionDetails.class); - this.contextRunner - .withBean("customPulsarConnectionDetails", PulsarConnectionDetails.class, () -> customConnectionDetails) - .run((context) -> assertThat(context).getBean(PulsarConnectionDetails.class) - .isSameAs(customConnectionDetails)); - } - - @Test - void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { - PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); - this.contextRunner - .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) - .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) - .isSameAs(customizers)); - } - - @Nested - class ClientTests { - - @Test - void whenHasUserDefinedClientFactoryBeanDoesNotAutoConfigureBean() { - PulsarClientFactory customFactory = mock(PulsarClientFactory.class); - new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean("customPulsarClientFactory", PulsarClientFactory.class, () -> customFactory) - .run((context) -> assertThat(context).getBean(PulsarClientFactory.class).isSameAs(customFactory)); - } - - @Test - void whenHasUserDefinedClientBeanDoesNotAutoConfigureBean() { - PulsarClient customClient = mock(PulsarClient.class); - new ApplicationContextRunner().withConfiguration(AutoConfigurations.of(PulsarConfiguration.class)) - .withBean("customPulsarClient", PulsarClient.class, () -> customClient) - .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); - PulsarConfigurationTests.this.contextRunner - .withUserConfiguration(PulsarClientBuilderCustomizersConfig.class) - .withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.client.service-url=properties") - .run((context) -> { - DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); - Customizers customizers = Customizers - .of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize); - assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder( - ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2"); - }); - } - - @Test - void whenHasUserDefinedFailoverPropertiesAddsToClient() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails"); - PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.client.service-url=properties", - "spring.pulsar.client.failover.backup-clusters[0].service-url=backup-cluster-1", - "spring.pulsar.client.failover.delay=15s", - "spring.pulsar.client.failover.switch-back-delay=30s", - "spring.pulsar.client.failover.check-interval=5s", - "spring.pulsar.client.failover.backup-clusters[1].service-url=backup-cluster-2", - "spring.pulsar.client.failover.backup-clusters[1].authentication.plugin-class-name=" - + MockAuthentication.class.getName(), - "spring.pulsar.client.failover.backup-clusters[1].authentication.param.token=1234") - .run((context) -> { - DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class); - PulsarProperties pulsarProperties = context.getBean(PulsarProperties.class); - ClientBuilder target = mock(ClientBuilder.class); - BiConsumer customizeAction = PulsarClientBuilderCustomizer::customize; - PulsarClientBuilderCustomizer pulsarClientBuilderCustomizer = (PulsarClientBuilderCustomizer) ReflectionTestUtils - .getField(clientFactory, "customizer"); - customizeAction.accept(pulsarClientBuilderCustomizer, target); - InOrder ordered = inOrder(target); - ordered.verify(target).serviceUrlProvider(ArgumentMatchers.any(AutoClusterFailover.class)); - assertThat(pulsarProperties.getClient().getFailover().getDelay()).isEqualTo(Duration.ofSeconds(15)); - assertThat(pulsarProperties.getClient().getFailover().getSwitchBackDelay()) - .isEqualTo(Duration.ofSeconds(30)); - assertThat(pulsarProperties.getClient().getFailover().getCheckInterval()) - .isEqualTo(Duration.ofSeconds(5)); - assertThat(pulsarProperties.getClient().getFailover().getBackupClusters().size()).isEqualTo(2); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class PulsarClientBuilderCustomizersConfig { - - @Bean - @Order(200) - PulsarClientBuilderCustomizer customizerFoo() { - return (builder) -> builder.serviceUrl("fromCustomizer2"); - } - - @Bean - @Order(100) - PulsarClientBuilderCustomizer customizerBar() { - return (builder) -> builder.serviceUrl("fromCustomizer1"); - } - - } - - } - - @Nested - class AdministrationTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - PulsarAdministration pulsarAdministration = mock(PulsarAdministration.class); - this.contextRunner - .withBean("customPulsarAdministration", PulsarAdministration.class, () -> pulsarAdministration) - .run((context) -> assertThat(context).getBean(PulsarAdministration.class) - .isSameAs(pulsarAdministration)); - } - - @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { - PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class); - given(connectionDetails.getAdminUrl()).willReturn("connectiondetails"); - this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class) - .withBean(PulsarConnectionDetails.class, () -> connectionDetails) - .withPropertyValues("spring.pulsar.admin.service-url=property") - .run((context) -> { - PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class); - Customizers customizers = Customizers - .of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize); - assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder( - PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1", - "fromCustomizer2"); - }); - } - - @TestConfiguration(proxyBeanMethods = false) - static class PulsarAdminBuilderCustomizersConfig { - - @Bean - @Order(200) - PulsarAdminBuilderCustomizer customizerFoo() { - return (builder) -> builder.serviceHttpUrl("fromCustomizer2"); - } - - @Bean - @Order(100) - PulsarAdminBuilderCustomizer customizerBar() { - return (builder) -> builder.serviceHttpUrl("fromCustomizer1"); - } - - } - - } - - @Nested - class SchemaResolverTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - SchemaResolver schemaResolver = mock(SchemaResolver.class); - this.contextRunner.withBean("customSchemaResolver", SchemaResolver.class, () -> schemaResolver) - .run((context) -> assertThat(context).getBean(SchemaResolver.class).isSameAs(schemaResolver)); - } - - @Test - void whenHasUserDefinedSchemaResolverCustomizer() { - SchemaResolverCustomizer customizer = (schemaResolver) -> schemaResolver - .addCustomSchemaMapping(TestRecord.class, Schema.STRING); - this.contextRunner.withBean("schemaResolverCustomizer", SchemaResolverCustomizer.class, () -> customizer) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); - } - - @Test - void whenHasDefaultsTypeMappingForPrimitiveAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=STRING"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); - } - - @Test - void whenHasDefaultsTypeMappingForStructAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON"); - Schema expectedSchema = Schema.JSON(TestRecord.class); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); - } - - @Test - void whenHasDefaultsTypeMappingForKeyValueAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=key-value"); - properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=java.lang.String"); - Schema expectedSchema = Schema.KeyValue(Schema.STRING, Schema.JSON(TestRecord.class), - KeyValueEncodingType.INLINE); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) - .satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); - } - - private ThrowingConsumer customSchemaMappingOf(Class messageType, - Schema expectedSchema) { - return (resolver) -> assertThat(resolver.getCustomSchemaMapping(messageType)) - .hasValueSatisfying(schemaEqualTo(expectedSchema)); - } - - private Consumer> schemaEqualTo(Schema expected) { - return (actual) -> assertThat(actual.getSchemaInfo()).isEqualTo(expected.getSchemaInfo()); - } - - } - - @Nested - class TopicResolverTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - TopicResolver topicResolver = mock(TopicResolver.class); - this.contextRunner.withBean("customTopicResolver", TopicResolver.class, () -> topicResolver) - .run((context) -> assertThat(context).getBean(TopicResolver.class).isSameAs(topicResolver)); - } - - @Test - void whenHasDefaultsTypeMappingAddsToSchemaResolver() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.type-mappings[0].message-type=" + TestRecord.CLASS_NAME); - properties.add("spring.pulsar.defaults.type-mappings[0].topic-name=foo-topic"); - properties.add("spring.pulsar.defaults.type-mappings[1].message-type=java.lang.String"); - properties.add("spring.pulsar.defaults.type-mappings[1].topic-name=string-topic"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(TopicResolver.class) - .asInstanceOf(InstanceOfAssertFactories.type(DefaultTopicResolver.class)) - .satisfies((resolver) -> { - assertThat(resolver.getCustomTopicMapping(TestRecord.class)).hasValue("foo-topic"); - assertThat(resolver.getCustomTopicMapping(String.class)).hasValue("string-topic"); - })); - } - - } - - @Nested - class TopicBuilderTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { - PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); - this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); - } - - @Test - void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { - this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); - } - - @Test - void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); - properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) - .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .satisfies((topicBuilder) -> { - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); - })); - } - - @Test - void beanHasScopePrototype() { - this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) - .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); - } - - } - - @Nested - class FunctionAdministrationTests { - - private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; - - @Test - void whenNoPropertiesAddsFunctionAdministrationBean() { - this.contextRunner.run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .hasFieldOrPropertyWithValue("failFast", Boolean.TRUE) - .hasFieldOrPropertyWithValue("propagateFailures", Boolean.TRUE) - .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.FALSE) - .hasNoNullFieldsOrProperties() // ensures object providers set - .extracting("pulsarAdministration") - .isSameAs(context.getBean(PulsarAdministration.class))); - } - - @Test - void whenHasFunctionPropertiesAppliesPropertiesToBean() { - List properties = new ArrayList<>(); - properties.add("spring.pulsar.function.fail-fast=false"); - properties.add("spring.pulsar.function.propagate-failures=false"); - properties.add("spring.pulsar.function.propagate-stop-failures=true"); - this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .hasFieldOrPropertyWithValue("failFast", Boolean.FALSE) - .hasFieldOrPropertyWithValue("propagateFailures", Boolean.FALSE) - .hasFieldOrPropertyWithValue("propagateStopFailures", Boolean.TRUE)); - } - - @Test - void whenHasFunctionDisabledPropertyDoesNotCreateBean() { - this.contextRunner.withPropertyValues("spring.pulsar.function.enabled=false") - .run((context) -> assertThat(context).doesNotHaveBean(PulsarFunctionAdministration.class)); - } - - @Test - void whenHasCustomFunctionAdministrationBean() { - PulsarFunctionAdministration functionAdministration = mock(PulsarFunctionAdministration.class); - this.contextRunner.withBean(PulsarFunctionAdministration.class, () -> functionAdministration) - .run((context) -> assertThat(context).getBean(PulsarFunctionAdministration.class) - .isSameAs(functionAdministration)); - } - - } - - record TestRecord() { - - private static final String CLASS_NAME = TestRecord.class.getName(); - - } - -} diff --git a/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java b/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java index adafc8b7f09..3857d78624c 100644 --- a/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java +++ b/smoke-test/spring-boot-smoke-test-pulsar/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java @@ -19,10 +19,7 @@ package smoketest.pulsar; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.stream.IntStream; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.testcontainers.junit.jupiter.Container; @@ -34,48 +31,27 @@ import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.boot.testsupport.container.TestImage; -import org.springframework.test.context.ActiveProfiles; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.waitAtMost; @Testcontainers(disabledWithoutDocker = true) @ExtendWith(OutputCaptureExtension.class) +@SpringBootTest class SamplePulsarApplicationTests { @Container @ServiceConnection static final PulsarContainer pulsar = TestImage.container(PulsarContainer.class); - abstract class PulsarApplication { - - private final String type; - - PulsarApplication(String type) { - this.type = type; - } - - @Test - void appProducesAndConsumesMessages(CapturedOutput output) { - List expectedOutput = new ArrayList<>(); - IntStream.range(0, 10).forEachOrdered((i) -> { - expectedOutput.add("++++++PRODUCE %s:(%s)------".formatted(this.type, i)); - expectedOutput.add("++++++CONSUME %s:(%s)------".formatted(this.type, i)); - }); - Awaitility.waitAtMost(Duration.ofSeconds(30)) - .untilAsserted(() -> assertThat(output).contains(expectedOutput)); - } - - } - - @Nested - @SpringBootTest - @ActiveProfiles("smoketest-pulsar-imperative") - class ImperativePulsarApplication extends PulsarApplication { - - ImperativePulsarApplication() { - super("IMPERATIVE"); + @Test + void appProducesAndConsumesMessages(CapturedOutput output) { + List expectedOutput = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + expectedOutput.add("++++++PRODUCE:(%s)------".formatted(i)); + expectedOutput.add("++++++CONSUME:(%s)------".formatted(i)); } - + waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> assertThat(output).contains(expectedOutput)); } } diff --git a/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java b/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplicationConfig.java similarity index 83% rename from smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java rename to smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplicationConfig.java index 9a5a71412ca..3c85e09a6c1 100644 --- a/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/ImperativeAppConfig.java +++ b/smoke-test/spring-boot-smoke-test-pulsar/src/main/java/smoketest/pulsar/SamplePulsarApplicationConfig.java @@ -22,17 +22,15 @@ import org.apache.commons.logging.LogFactory; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.PulsarTopic; import org.springframework.pulsar.core.PulsarTopicBuilder; @Configuration(proxyBeanMethods = false) -@Profile("smoketest-pulsar-imperative") -class ImperativeAppConfig { +class SamplePulsarApplicationConfig { - private static final Log logger = LogFactory.getLog(ImperativeAppConfig.class); + private static final Log logger = LogFactory.getLog(SamplePulsarApplicationConfig.class); private static final String TOPIC = "pulsar-smoke-test-topic"; @@ -46,14 +44,14 @@ class ImperativeAppConfig { return (args) -> { for (int i = 0; i < 10; i++) { template.send(TOPIC, new SampleMessage(i, "message:" + i)); - logger.info("++++++PRODUCE IMPERATIVE:(" + i + ")------"); + logger.info("++++++PRODUCE:(" + i + ")------"); } }; } @PulsarListener(topics = TOPIC) void consumeMessagesFromPulsarTopic(SampleMessage msg) { - logger.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------"); + logger.info("++++++CONSUME:(" + msg.id() + ")------"); } }