diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/build.gradle b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/build.gradle new file mode 100644 index 00000000000..83a22c02c3b --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/build.gradle @@ -0,0 +1,27 @@ +plugins { + id "java" + id "org.springframework.boot.docker-test" +} + +description = "Spring Boot Pulsar 4 smoke test" + +configurations.all { + resolutionStrategy.eachDependency { + if (it.requested.group == 'org.apache.pulsar' && + !(it.requested.name.startsWith('pulsar-client-reactive'))) { + it.useVersion '4.0.1' + } + } +} + +dependencies { + dockerTestImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test")) + dockerTestImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support-docker")) + dockerTestImplementation(project(":spring-boot-project:spring-boot-testcontainers")) + dockerTestImplementation("org.awaitility:awaitility") + dockerTestImplementation("org.testcontainers:junit-jupiter") + dockerTestImplementation("org.testcontainers:pulsar") + + implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar")) + implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive")) +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java new file mode 100644 index 00000000000..5de0fad211c --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/dockerTest/java/smoketest/pulsar/SamplePulsarApplicationTests.java @@ -0,0 +1,92 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.boot.testsupport.container.TestImage; +import org.springframework.test.context.ActiveProfiles; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers(disabledWithoutDocker = true) +@ExtendWith(OutputCaptureExtension.class) +class SamplePulsarApplicationTests { + + @Container + @ServiceConnection + static final PulsarContainer pulsar = TestImage.container(PulsarContainer.class); + + abstract class PulsarApplication { + + private final String type; + + PulsarApplication(String type) { + this.type = type; + } + + @Test + void appProducesAndConsumesMessages(CapturedOutput output) { + List expectedOutput = new ArrayList<>(); + IntStream.range(0, 10).forEachOrdered((i) -> { + expectedOutput.add("++++++PRODUCE %s:(%s)------".formatted(this.type, i)); + expectedOutput.add("++++++CONSUME %s:(%s)------".formatted(this.type, i)); + }); + Awaitility.waitAtMost(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(output).contains(expectedOutput)); + } + + } + + @Nested + @SpringBootTest + @ActiveProfiles("smoketest.pulsar.imperative") + class ImperativePulsarApplication extends PulsarApplication { + + ImperativePulsarApplication() { + super("IMPERATIVE"); + } + + } + + @Nested + @SpringBootTest + @ActiveProfiles("smoketest.pulsar.reactive") + class ReactivePulsarApplication extends PulsarApplication { + + ReactivePulsarApplication() { + super("REACTIVE"); + } + + } + +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ImperativeAppConfig.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ImperativeAppConfig.java new file mode 100644 index 00000000000..7964a3ad44c --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ImperativeAppConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.core.PulsarTopicBuilder; + +@Configuration(proxyBeanMethods = false) +@Profile("smoketest.pulsar.imperative") +class ImperativeAppConfig { + + private static final Log logger = LogFactory.getLog(ImperativeAppConfig.class); + + private static final String TOPIC = "pulsar-smoke-test-topic"; + + @Bean + PulsarTopic pulsarTestTopic() { + return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build(); + } + + @Bean + ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate template) { + return (args) -> { + for (int i = 0; i < 10; i++) { + template.send(TOPIC, new SampleMessage(i, "message:" + i)); + logger.info("++++++PRODUCE IMPERATIVE:(" + i + ")------"); + } + }; + } + + @PulsarListener(topics = TOPIC) + void consumeMessagesFromPulsarTopic(SampleMessage msg) { + logger.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------"); + } + +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ReactiveAppConfig.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ReactiveAppConfig.java new file mode 100644 index 00000000000..506aa97bec5 --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/ReactiveAppConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pulsar.reactive.client.api.MessageSpec; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.core.PulsarTopicBuilder; +import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; +import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; + +@Configuration(proxyBeanMethods = false) +@Profile("smoketest.pulsar.reactive") +class ReactiveAppConfig { + + private static final Log logger = LogFactory.getLog(ReactiveAppConfig.class); + + private static final String TOPIC = "pulsar-reactive-smoke-test-topic"; + + @Bean + PulsarTopic pulsarTestTopic() { + return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build(); + } + + @Bean + ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate template) { + return (args) -> Flux.range(0, 10) + .map((i) -> new SampleMessage(i, "message:" + i)) + .map(MessageSpec::of) + .as((msgs) -> template.send(TOPIC, msgs)) + .doOnNext((sendResult) -> logger + .info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------")) + .subscribe(); + } + + @ReactivePulsarListener(topics = TOPIC) + Mono consumeMessagesFromPulsarTopic(SampleMessage msg) { + logger.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------"); + return Mono.empty(); + } + +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SampleMessage.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SampleMessage.java new file mode 100644 index 00000000000..3887ce61f13 --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SampleMessage.java @@ -0,0 +1,20 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +record SampleMessage(Integer id, String content) { +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SamplePulsarApplication.java b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SamplePulsarApplication.java new file mode 100644 index 00000000000..560967bb2d0 --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/java/smoketest/pulsar/SamplePulsarApplication.java @@ -0,0 +1,29 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package smoketest.pulsar; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SamplePulsarApplication { + + public static void main(String[] args) { + SpringApplication.run(SamplePulsarApplication.class, args); + } + +} diff --git a/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/resources/application.properties b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/resources/application.properties new file mode 100644 index 00000000000..b1ae3ec6f4e --- /dev/null +++ b/spring-boot-tests/spring-boot-smoke-tests/spring-boot-smoke-test-pulsar4/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.pulsar.consumer.subscription.initial-position=earliest