|
|
|
|
@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
|
|
|
|
|
/* |
|
|
|
|
* Copyright 2012-2024 the original author or authors. |
|
|
|
|
* Copyright 2012-2025 the original author or authors. |
|
|
|
|
* |
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
|
@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.pulsar;
@@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.pulsar;
|
|
|
|
|
import java.time.Duration; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.function.BiConsumer; |
|
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
|
|
@ -30,8 +29,7 @@ import org.apache.pulsar.client.api.Schema;
@@ -30,8 +29,7 @@ import org.apache.pulsar.client.api.Schema;
|
|
|
|
|
import org.apache.pulsar.client.impl.AutoClusterFailover; |
|
|
|
|
import org.apache.pulsar.common.schema.KeyValueEncodingType; |
|
|
|
|
import org.assertj.core.api.InstanceOfAssertFactories; |
|
|
|
|
import org.assertj.core.api.InstanceOfAssertFactory; |
|
|
|
|
import org.assertj.core.api.MapAssert; |
|
|
|
|
import org.assertj.core.api.ThrowingConsumer; |
|
|
|
|
import org.junit.jupiter.api.Nested; |
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
import org.mockito.ArgumentMatchers; |
|
|
|
|
@ -57,7 +55,6 @@ import org.springframework.pulsar.function.PulsarFunctionAdministration;
@@ -57,7 +55,6 @@ import org.springframework.pulsar.function.PulsarFunctionAdministration;
|
|
|
|
|
import org.springframework.test.util.ReflectionTestUtils; |
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat; |
|
|
|
|
import static org.assertj.core.api.Assertions.entry; |
|
|
|
|
import static org.mockito.BDDMockito.given; |
|
|
|
|
import static org.mockito.Mockito.inOrder; |
|
|
|
|
import static org.mockito.Mockito.mock; |
|
|
|
|
@ -235,10 +232,6 @@ class PulsarConfigurationTests {
@@ -235,10 +232,6 @@ class PulsarConfigurationTests {
|
|
|
|
|
@Nested |
|
|
|
|
class SchemaResolverTests { |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("rawtypes") |
|
|
|
|
private static final InstanceOfAssertFactory<Map, MapAssert<Class, Schema>> CLASS_SCHEMA_MAP = InstanceOfAssertFactories |
|
|
|
|
.map(Class.class, Schema.class); |
|
|
|
|
|
|
|
|
|
private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
@ -254,8 +247,7 @@ class PulsarConfigurationTests {
@@ -254,8 +247,7 @@ class PulsarConfigurationTests {
|
|
|
|
|
.addCustomSchemaMapping(TestRecord.class, Schema.STRING); |
|
|
|
|
this.contextRunner.withBean("schemaResolverCustomizer", SchemaResolverCustomizer.class, () -> customizer) |
|
|
|
|
.run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) |
|
|
|
|
.extracting(DefaultSchemaResolver::getCustomSchemaMappings, InstanceOfAssertFactories.MAP) |
|
|
|
|
.containsEntry(TestRecord.class, Schema.STRING)); |
|
|
|
|
.satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
@ -265,8 +257,7 @@ class PulsarConfigurationTests {
@@ -265,8 +257,7 @@ class PulsarConfigurationTests {
|
|
|
|
|
properties.add("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=STRING"); |
|
|
|
|
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) |
|
|
|
|
.run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) |
|
|
|
|
.extracting(DefaultSchemaResolver::getCustomSchemaMappings, InstanceOfAssertFactories.MAP) |
|
|
|
|
.containsOnly(entry(TestRecord.class, Schema.STRING))); |
|
|
|
|
.satisfies(customSchemaMappingOf(TestRecord.class, Schema.STRING))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
@ -277,8 +268,7 @@ class PulsarConfigurationTests {
@@ -277,8 +268,7 @@ class PulsarConfigurationTests {
|
|
|
|
|
Schema<?> expectedSchema = Schema.JSON(TestRecord.class); |
|
|
|
|
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) |
|
|
|
|
.run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) |
|
|
|
|
.extracting(DefaultSchemaResolver::getCustomSchemaMappings, CLASS_SCHEMA_MAP) |
|
|
|
|
.hasEntrySatisfying(TestRecord.class, schemaEqualTo(expectedSchema))); |
|
|
|
|
.satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
@ -291,12 +281,16 @@ class PulsarConfigurationTests {
@@ -291,12 +281,16 @@ class PulsarConfigurationTests {
|
|
|
|
|
KeyValueEncodingType.INLINE); |
|
|
|
|
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) |
|
|
|
|
.run((context) -> assertThat(context).getBean(DefaultSchemaResolver.class) |
|
|
|
|
.extracting(DefaultSchemaResolver::getCustomSchemaMappings, CLASS_SCHEMA_MAP) |
|
|
|
|
.hasEntrySatisfying(TestRecord.class, schemaEqualTo(expectedSchema))); |
|
|
|
|
.satisfies(customSchemaMappingOf(TestRecord.class, expectedSchema))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private ThrowingConsumer<DefaultSchemaResolver> customSchemaMappingOf(Class<?> messageType, |
|
|
|
|
Schema<?> expectedSchema) { |
|
|
|
|
return (resolver) -> assertThat(resolver.getCustomSchemaMapping(messageType)) |
|
|
|
|
.hasValueSatisfying(schemaEqualTo(expectedSchema)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("rawtypes") |
|
|
|
|
private Consumer<Schema> schemaEqualTo(Schema<?> expected) { |
|
|
|
|
private Consumer<Schema<?>> schemaEqualTo(Schema<?> expected) { |
|
|
|
|
return (actual) -> assertThat(actual.getSchemaInfo()).isEqualTo(expected.getSchemaInfo()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -324,8 +318,10 @@ class PulsarConfigurationTests {
@@ -324,8 +318,10 @@ class PulsarConfigurationTests {
|
|
|
|
|
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) |
|
|
|
|
.run((context) -> assertThat(context).getBean(TopicResolver.class) |
|
|
|
|
.asInstanceOf(InstanceOfAssertFactories.type(DefaultTopicResolver.class)) |
|
|
|
|
.extracting(DefaultTopicResolver::getCustomTopicMappings, InstanceOfAssertFactories.MAP) |
|
|
|
|
.containsOnly(entry(TestRecord.class, "foo-topic"), entry(String.class, "string-topic"))); |
|
|
|
|
.satisfies((resolver) -> { |
|
|
|
|
assertThat(resolver.getCustomTopicMapping(TestRecord.class)).hasValue("foo-topic"); |
|
|
|
|
assertThat(resolver.getCustomTopicMapping(String.class)).hasValue("string-topic"); |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|