|
|
|
|
@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.pulsar;
@@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.pulsar;
|
|
|
|
|
import java.time.Duration; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.TreeMap; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
import java.util.function.BiConsumer; |
|
|
|
|
import java.util.function.Consumer; |
|
|
|
|
@ -29,6 +30,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -29,6 +30,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
|
|
|
|
|
import org.apache.pulsar.client.api.ProducerBuilder; |
|
|
|
|
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; |
|
|
|
|
import org.apache.pulsar.client.api.ReaderBuilder; |
|
|
|
|
import org.apache.pulsar.common.util.ObjectMapperFactory; |
|
|
|
|
|
|
|
|
|
import org.springframework.boot.context.properties.PropertyMapper; |
|
|
|
|
import org.springframework.pulsar.listener.PulsarContainerProperties; |
|
|
|
|
@ -73,7 +75,8 @@ final class PulsarPropertiesMapper {
@@ -73,7 +75,8 @@ final class PulsarPropertiesMapper {
|
|
|
|
|
PulsarProperties.Authentication properties) { |
|
|
|
|
if (StringUtils.hasText(properties.getPluginClassName())) { |
|
|
|
|
try { |
|
|
|
|
authentication.accept(properties.getPluginClassName(), properties.getParam()); |
|
|
|
|
authentication.accept(properties.getPluginClassName(), |
|
|
|
|
getAuthenticationParamsJson(properties.getParam())); |
|
|
|
|
} |
|
|
|
|
catch (UnsupportedAuthenticationException ex) { |
|
|
|
|
throw new IllegalStateException("Unable to configure Pulsar authentication", ex); |
|
|
|
|
@ -81,6 +84,16 @@ final class PulsarPropertiesMapper {
@@ -81,6 +84,16 @@ final class PulsarPropertiesMapper {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private String getAuthenticationParamsJson(Map<String, String> params) { |
|
|
|
|
Map<String, String> sortedParams = new TreeMap<>(params); |
|
|
|
|
try { |
|
|
|
|
return ObjectMapperFactory.create().writeValueAsString(sortedParams); |
|
|
|
|
} |
|
|
|
|
catch (Exception ex) { |
|
|
|
|
throw new IllegalStateException("Could not convert auth parameters to encoded string", ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
<T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) { |
|
|
|
|
PulsarProperties.Producer properties = this.properties.getProducer(); |
|
|
|
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); |
|
|
|
|
@ -158,8 +171,7 @@ final class PulsarPropertiesMapper {
@@ -158,8 +171,7 @@ final class PulsarPropertiesMapper {
|
|
|
|
|
|
|
|
|
|
private interface AuthenticationConsumer { |
|
|
|
|
|
|
|
|
|
void accept(String authPluginClassName, Map<String, String> authParams) |
|
|
|
|
throws UnsupportedAuthenticationException; |
|
|
|
|
void accept(String authPluginClassName, String authParamString) throws UnsupportedAuthenticationException; |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|