@ -36,8 +36,8 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
@@ -36,8 +36,8 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
/ * *
* Configuration properties for Spring for Apache Kafka .
* < p / >
* Users should refer to k afka documentation for complete descriptions of these
* < p >
* Users should refer to K afka documentation for complete descriptions of these
* properties .
*
* @author Gary Russell
@ -63,8 +63,8 @@ public class KafkaProperties {
@@ -63,8 +63,8 @@ public class KafkaProperties {
* Comma - delimited list of host : port pairs to use for establishing the initial
* connection to the Kafka cluster .
* /
private List < String > bootstrapServers = new ArrayList < String > ( Collections . singletonList (
"localhost:9092" ) ) ;
private List < String > bootstrapServers = new ArrayList < String > (
Collections . singletonList ( "localhost:9092" ) ) ;
/ * *
* Id to pass to the server when making requests ; used for server - side logging .
@ -110,7 +110,8 @@ public class KafkaProperties {
@@ -110,7 +110,8 @@ public class KafkaProperties {
private Map < String , Object > buildCommonProperties ( ) {
Map < String , Object > properties = new HashMap < String , Object > ( ) ;
if ( this . bootstrapServers ! = null ) {
properties . put ( CommonClientConfigs . BOOTSTRAP_SERVERS_CONFIG , this . bootstrapServers ) ;
properties . put ( CommonClientConfigs . BOOTSTRAP_SERVERS_CONFIG ,
this . bootstrapServers ) ;
}
if ( this . clientId ! = null ) {
properties . put ( CommonClientConfigs . CLIENT_ID_CONFIG , this . clientId ) ;
@ -139,10 +140,11 @@ public class KafkaProperties {
@@ -139,10 +140,11 @@ public class KafkaProperties {
/ * *
* Create an initial map of consumer properties from the state of this instance .
* < p > This allows you to add additional properties , if necessary , and override the
* < p >
* This allows you to add additional properties , if necessary , and override the
* default kafkaConsumerFactory bean .
* @return the consumer properties initialized with the customizations defined on
* this instance
* @return the consumer properties initialized with the customizations defined on this
* instance
* /
public Map < String , Object > buildConsumerProperties ( ) {
Map < String , Object > props = buildCommonProperties ( ) ;
@ -152,10 +154,11 @@ public class KafkaProperties {
@@ -152,10 +154,11 @@ public class KafkaProperties {
/ * *
* Create an initial map of producer properties from the state of this instance .
* < p > This allows you to add additional properties , if necessary , and override the
* < p >
* This allows you to add additional properties , if necessary , and override the
* default kafkaProducerFactory bean .
* @return the producer properties initialized with the customizations defined on
* this instance
* @return the producer properties initialized with the customizations defined on this
* instance
* /
public Map < String , Object > buildProducerProperties ( ) {
Map < String , Object > props = buildCommonProperties ( ) ;
@ -168,8 +171,9 @@ public class KafkaProperties {
@@ -168,8 +171,9 @@ public class KafkaProperties {
return resource . getFile ( ) . getAbsolutePath ( ) ;
}
catch ( IOException ex ) {
throw new IllegalStateException ( String . format (
"Resource '%s' must be on a file system" , resource ) , ex ) ;
throw new IllegalStateException (
String . format ( "Resource '%s' must be on a file system" , resource ) ,
ex ) ;
}
}
@ -178,8 +182,8 @@ public class KafkaProperties {
@@ -178,8 +182,8 @@ public class KafkaProperties {
private final Ssl ssl = new Ssl ( ) ;
/ * *
* Frequency in milliseconds that the consumer offsets are auto - committed to
* Kafka if ' enable . auto . commit ' true .
* Frequency in milliseconds that the consumer offsets are auto - committed to Kafka
* if ' enable . auto . commit ' true .
* /
private Long autoCommitInterval ;
@ -332,22 +336,27 @@ public class KafkaProperties {
@@ -332,22 +336,27 @@ public class KafkaProperties {
public Map < String , Object > buildProperties ( ) {
Map < String , Object > properties = new HashMap < String , Object > ( ) ;
if ( this . autoCommitInterval ! = null ) {
properties . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , this . autoCommitInterval ) ;
properties . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG ,
this . autoCommitInterval ) ;
}
if ( this . autoOffsetReset ! = null ) {
properties . put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , this . autoOffsetReset ) ;
properties . put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG ,
this . autoOffsetReset ) ;
}
if ( this . bootstrapServers ! = null ) {
properties . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , this . bootstrapServers ) ;
properties . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,
this . bootstrapServers ) ;
}
if ( this . clientId ! = null ) {
properties . put ( ConsumerConfig . CLIENT_ID_CONFIG , this . clientId ) ;
}
if ( this . enableAutoCommit ! = null ) {
properties . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , this . enableAutoCommit ) ;
properties . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG ,
this . enableAutoCommit ) ;
}
if ( this . fetchMaxWait ! = null ) {
properties . put ( ConsumerConfig . FETCH_MAX_WAIT_MS_CONFIG , this . fetchMaxWait ) ;
properties . put ( ConsumerConfig . FETCH_MAX_WAIT_MS_CONFIG ,
this . fetchMaxWait ) ;
}
if ( this . fetchMinSize ! = null ) {
properties . put ( ConsumerConfig . FETCH_MIN_BYTES_CONFIG , this . fetchMinSize ) ;
@ -356,29 +365,36 @@ public class KafkaProperties {
@@ -356,29 +365,36 @@ public class KafkaProperties {
properties . put ( ConsumerConfig . GROUP_ID_CONFIG , this . groupId ) ;
}
if ( this . heartbeatInterval ! = null ) {
properties . put ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG , this . heartbeatInterval ) ;
properties . put ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG ,
this . heartbeatInterval ) ;
}
if ( this . keyDeserializer ! = null ) {
properties . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , this . keyDeserializer ) ;
properties . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,
this . keyDeserializer ) ;
}
if ( this . ssl . getKeyPassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEY_PASSWORD_CONFIG , this . ssl . getKeyPassword ( ) ) ;
properties . put ( SslConfigs . SSL_KEY_PASSWORD_CONFIG ,
this . ssl . getKeyPassword ( ) ) ;
}
if ( this . ssl . getKeystoreLocation ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath ( this . ssl . getKeystoreLocation ( ) ) ) ;
properties . put ( SslConfigs . SSL_KEYSTORE_LOCATION_CONFIG ,
resourceToPath ( this . ssl . getKeystoreLocation ( ) ) ) ;
}
if ( this . ssl . getKeystorePassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEYSTORE_PASSWORD_CONFIG , this . ssl . getKeystorePassword ( ) ) ;
properties . put ( SslConfigs . SSL_KEYSTORE_PASSWORD_CONFIG ,
this . ssl . getKeystorePassword ( ) ) ;
}
if ( this . ssl . getTruststoreLocation ( ) ! = null ) {
properties . put ( SslConfigs . SSL_TRUSTSTORE_LOCATION_CONFIG ,
resourceToPath ( this . ssl . getTruststoreLocation ( ) ) ) ;
}
if ( this . ssl . getTruststorePassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG , this . ssl . getTruststorePassword ( ) ) ;
properties . put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG ,
this . ssl . getTruststorePassword ( ) ) ;
}
if ( this . valueDeserializer ! = null ) {
properties . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , this . valueDeserializer ) ;
properties . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,
this . valueDeserializer ) ;
}
return properties ;
}
@ -407,8 +423,8 @@ public class KafkaProperties {
@@ -407,8 +423,8 @@ public class KafkaProperties {
private List < String > bootstrapServers ;
/ * *
* Total bytes of memory the producer can use to buffer records waiting to be
* sent to the server .
* Total bytes of memory the producer can use to buffer records waiting to be sent
* to the server .
* /
private Long bufferMemory ;
@ -522,7 +538,8 @@ public class KafkaProperties {
@@ -522,7 +538,8 @@ public class KafkaProperties {
properties . put ( ProducerConfig . BATCH_SIZE_CONFIG , this . batchSize ) ;
}
if ( this . bootstrapServers ! = null ) {
properties . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , this . bootstrapServers ) ;
properties . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,
this . bootstrapServers ) ;
}
if ( this . bufferMemory ! = null ) {
properties . put ( ProducerConfig . BUFFER_MEMORY_CONFIG , this . bufferMemory ) ;
@ -531,32 +548,39 @@ public class KafkaProperties {
@@ -531,32 +548,39 @@ public class KafkaProperties {
properties . put ( ProducerConfig . CLIENT_ID_CONFIG , this . clientId ) ;
}
if ( this . compressionType ! = null ) {
properties . put ( ProducerConfig . COMPRESSION_TYPE_CONFIG , this . compressionType ) ;
properties . put ( ProducerConfig . COMPRESSION_TYPE_CONFIG ,
this . compressionType ) ;
}
if ( this . keySerializer ! = null ) {
properties . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , this . keySerializer ) ;
properties . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,
this . keySerializer ) ;
}
if ( this . retries ! = null ) {
properties . put ( ProducerConfig . RETRIES_CONFIG , this . retries ) ;
}
if ( this . ssl . getKeyPassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEY_PASSWORD_CONFIG , this . ssl . getKeyPassword ( ) ) ;
properties . put ( SslConfigs . SSL_KEY_PASSWORD_CONFIG ,
this . ssl . getKeyPassword ( ) ) ;
}
if ( this . ssl . getKeystoreLocation ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEYSTORE_LOCATION_CONFIG , resourceToPath ( this . ssl . getKeystoreLocation ( ) ) ) ;
properties . put ( SslConfigs . SSL_KEYSTORE_LOCATION_CONFIG ,
resourceToPath ( this . ssl . getKeystoreLocation ( ) ) ) ;
}
if ( this . ssl . getKeystorePassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_KEYSTORE_PASSWORD_CONFIG , this . ssl . getKeystorePassword ( ) ) ;
properties . put ( SslConfigs . SSL_KEYSTORE_PASSWORD_CONFIG ,
this . ssl . getKeystorePassword ( ) ) ;
}
if ( this . ssl . getTruststoreLocation ( ) ! = null ) {
properties . put ( SslConfigs . SSL_TRUSTSTORE_LOCATION_CONFIG ,
resourceToPath ( this . ssl . getTruststoreLocation ( ) ) ) ;
}
if ( this . ssl . getTruststorePassword ( ) ! = null ) {
properties . put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG , this . ssl . getTruststorePassword ( ) ) ;
properties . put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG ,
this . ssl . getTruststorePassword ( ) ) ;
}
if ( this . valueSerializer ! = null ) {
properties . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , this . valueSerializer ) ;
properties . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,
this . valueSerializer ) ;
}
return properties ;
}