You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
7.4 KiB
165 lines
7.4 KiB
[[messaging.kafka]] |
|
== Apache Kafka Support |
|
https://kafka.apache.org/[Apache Kafka] is supported by providing auto-configuration of the `spring-kafka` project. |
|
|
|
Kafka configuration is controlled by external configuration properties in `spring.kafka.*`. |
|
For example, you might declare the following section in `application.properties`: |
|
|
|
[source,yaml,indent=0,subs="verbatim",configprops,configblocks] |
|
---- |
|
spring: |
|
kafka: |
|
bootstrap-servers: "localhost:9092" |
|
consumer: |
|
group-id: "myGroup" |
|
---- |
|
|
|
TIP: To create a topic on startup, add a bean of type `NewTopic`. |
|
If the topic already exists, the bean is ignored. |
|
|
|
See {spring-boot-autoconfigure-module-code}/kafka/KafkaProperties.java[`KafkaProperties`] for more supported options. |
|
|
|
|
|
|
|
[[messaging.kafka.sending]] |
|
=== Sending a Message |
|
Spring's `KafkaTemplate` is auto-configured, and you can autowire it directly in your own beans, as shown in the following example: |
|
|
|
include::code:MyBean[] |
|
|
|
NOTE: If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a `KafkaTransactionManager` is automatically configured. |
|
Also, if a `RecordMessageConverter` bean is defined, it is automatically associated to the auto-configured `KafkaTemplate`. |
|
|
|
|
|
|
|
[[messaging.kafka.receiving]] |
|
=== Receiving a Message |
|
When the Apache Kafka infrastructure is present, any bean can be annotated with `@KafkaListener` to create a listener endpoint. |
|
If no `KafkaListenerContainerFactory` has been defined, a default one is automatically configured with keys defined in `spring.kafka.listener.*`. |
|
|
|
The following component creates a listener endpoint on the `someTopic` topic: |
|
|
|
include::code:MyBean[] |
|
|
|
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory. |
|
Similarly, if a `RecordFilterStrategy`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory. |
|
|
|
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory. |
|
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`. |
|
|
|
TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually references the auto-configured `KafkaTransactionManager` bean. |
|
|
|
|
|
|
|
[[messaging.kafka.streams]] |
|
=== Kafka Streams |
|
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and manage the lifecycle of its streams. |
|
Spring Boot auto-configures the required `KafkaStreamsConfiguration` bean as long as `kafka-streams` is on the classpath and Kafka Streams is enabled by the `@EnableKafkaStreams` annotation. |
|
|
|
Enabling Kafka Streams means that the application id and bootstrap servers must be set. |
|
The former can be configured using `spring.kafka.streams.application-id`, defaulting to `spring.application.name` if not set. |
|
The latter can be set globally or specifically overridden only for streams. |
|
|
|
Several additional properties are available using dedicated properties; other arbitrary Kafka properties can be set using the `spring.kafka.streams.properties` namespace. |
|
See also <<messaging#messaging.kafka.additional-properties>> for more information. |
|
|
|
To use the factory bean, wire `StreamsBuilder` into your `@Bean` as shown in the following example: |
|
|
|
include::code:MyKafkaStreamsConfiguration[] |
|
|
|
By default, the streams managed by the `StreamBuilder` object are started automatically. |
|
You can customize this behavior using the configprop:spring.kafka.streams.auto-startup[] property. |
|
|
|
|
|
|
|
[[messaging.kafka.additional-properties]] |
|
=== Additional Kafka Properties |
|
The properties supported by auto configuration are shown in the <<application-properties#appendix.application-properties.integration, "`Integration Properties`">> section of the Appendix. |
|
Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. |
|
See the Apache Kafka documentation for details. |
|
|
|
Common properties (that don't include a client type in the name: `producer`, `consumer`, `admin`, or `streams`) apply to all clients. |
|
Most of these common values can be overridden for one or more of the component types, if needed. |
|
|
|
Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. |
|
Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value. |
|
|
|
Only a subset of the properties supported by Kafka are available directly through the `KafkaProperties` class. |
|
If you wish to configure the individual client types with additional properties that are not directly supported, use the following properties: |
|
|
|
[source,yaml,indent=0,subs="verbatim",configprops,configblocks] |
|
---- |
|
spring: |
|
kafka: |
|
properties: |
|
"[prop.one]": "first" |
|
admin: |
|
properties: |
|
"[prop.two]": "second" |
|
consumer: |
|
properties: |
|
"[prop.three]": "third" |
|
producer: |
|
properties: |
|
"[prop.four]": "fourth" |
|
streams: |
|
properties: |
|
"[prop.five]": "fifth" |
|
---- |
|
|
|
This sets the common `prop.one` Kafka property to `first` (applies to producers, consumers, admins and streams), the `prop.two` admin property to `second`, the `prop.three` consumer property to `third`, the `prop.four` producer property to `fourth` and the `prop.five` streams property to `fifth`. |
|
|
|
You can also configure the Spring Kafka `JsonDeserializer` as follows: |
|
|
|
[source,yaml,indent=0,subs="verbatim",configprops,configblocks] |
|
---- |
|
spring: |
|
kafka: |
|
consumer: |
|
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer" |
|
properties: |
|
"[spring.json.value.default.type]": "com.example.Invoice" |
|
"[spring.json.trusted.packages]": "com.example.main,com.example.another" |
|
---- |
|
|
|
Similarly, you can disable the `JsonSerializer` default behavior of sending type information in headers: |
|
|
|
[source,yaml,indent=0,subs="verbatim",configprops,configblocks] |
|
---- |
|
spring: |
|
kafka: |
|
producer: |
|
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer" |
|
properties: |
|
"[spring.json.add.type.headers]": false |
|
---- |
|
|
|
IMPORTANT: Properties set in this way override any configuration item that Spring Boot explicitly supports. |
|
|
|
|
|
|
|
[[messaging.kafka.embedded]] |
|
=== Testing with Embedded Kafka |
|
Spring for Apache Kafka provides a convenient way to test projects with an embedded Apache Kafka broker. |
|
To use this feature, annotate a test class with `@EmbeddedKafka` from the `spring-kafka-test` module. |
|
For more information, please see the Spring for Apache Kafka {spring-kafka-docs}#embedded-kafka-annotation[reference manual]. |
|
|
|
To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka broker, you need to remap a system property for embedded broker addresses (populated by the `EmbeddedKafkaBroker`) into the Spring Boot configuration property for Apache Kafka. |
|
There are several ways to do that: |
|
|
|
* Provide a system property to map embedded broker addresses into configprop:spring.kafka.bootstrap-servers[] in the test class: |
|
|
|
include::code:property/MyTest[tag=*] |
|
|
|
* Configure a property name on the `@EmbeddedKafka` annotation: |
|
|
|
include::code:annotation/MyTest[] |
|
|
|
* Use a placeholder in configuration properties: |
|
|
|
[source,yaml,indent=0,subs="verbatim",configprops,configblocks] |
|
---- |
|
spring: |
|
kafka: |
|
bootstrap-servers: "${spring.embedded.kafka.brokers}" |
|
----
|
|
|