|
|
|
|
@ -53,6 +53,7 @@ import org.springframework.pulsar.function.PulsarFunction;
@@ -53,6 +53,7 @@ import org.springframework.pulsar.function.PulsarFunction;
|
|
|
|
|
import org.springframework.pulsar.function.PulsarFunctionAdministration; |
|
|
|
|
import org.springframework.pulsar.function.PulsarSink; |
|
|
|
|
import org.springframework.pulsar.function.PulsarSource; |
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Common configuration used by both {@link PulsarAutoConfiguration} and |
|
|
|
|
@ -146,10 +147,11 @@ class PulsarConfiguration {
@@ -146,10 +147,11 @@ class PulsarConfiguration {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("NullAway") // Resolved.orElseThrow() doesn't return nullable
|
|
|
|
|
private Schema<Object> getSchema(DefaultSchemaResolver schemaResolver, SchemaType schemaType, Class<?> messageType, |
|
|
|
|
@Nullable Class<?> messageKeyType) { |
|
|
|
|
return schemaResolver.resolveSchema(schemaType, messageType, messageKeyType).orElseThrow(); |
|
|
|
|
Schema<Object> schema = schemaResolver.resolveSchema(schemaType, messageType, messageKeyType).orElseThrow(); |
|
|
|
|
Assert.state(schema != null, "'schema' must not be null"); |
|
|
|
|
return schema; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
|