diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml
index 6632d5a355b..d0151ebc913 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml
@@ -193,11 +193,6 @@
elasticsearch
true
-
- org.springframework.kafka
- spring-kafka
- true
-
org.flywaydb
flyway-core
@@ -311,6 +306,11 @@
spring-integration-core
true
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
org.springframework.security
spring-security-config
diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java
index 8b5ecbf1471..04c468f277e 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java
@@ -28,6 +28,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -41,40 +42,35 @@ import org.springframework.kafka.core.KafkaAdmin;
* @author Juan Rada
*/
@Configuration
+@ConditionalOnClass(KafkaAdmin.class)
+@ConditionalOnBean(KafkaAdmin.class)
@ConditionalOnEnabledHealthIndicator("kafka")
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
@AutoConfigureAfter(KafkaAutoConfiguration.class)
-public class KafkaHealthIndicatorAutoConfiguration {
+@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
+public class KafkaHealthIndicatorAutoConfiguration extends
+ CompositeHealthIndicatorConfiguration {
- @Configuration
- @ConditionalOnBean(KafkaAdmin.class)
- @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
- static class KafkaClientHealthIndicatorConfiguration extends
- CompositeHealthIndicatorConfiguration {
+ private final Map admins;
- private final Map admins;
+ private final KafkaHealthIndicatorProperties properties;
- private final KafkaHealthIndicatorProperties properties;
-
- KafkaClientHealthIndicatorConfiguration(Map admins,
- KafkaHealthIndicatorProperties properties) {
- this.admins = admins;
- this.properties = properties;
- }
-
- @Bean
- @ConditionalOnMissingBean(name = "kafkaHealthIndicator")
- public HealthIndicator kafkaHealthIndicator() {
- return createHealthIndicator(this.admins);
- }
+ KafkaHealthIndicatorAutoConfiguration(Map admins,
+ KafkaHealthIndicatorProperties properties) {
+ this.admins = admins;
+ this.properties = properties;
+ }
- @Override
- protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
- Duration responseTimeout = this.properties.getResponseTimeout();
+ @Bean
+ @ConditionalOnMissingBean(name = "kafkaHealthIndicator")
+ public HealthIndicator kafkaHealthIndicator() {
+ return createHealthIndicator(this.admins);
+ }
- return new KafkaHealthIndicator(source,
- responseTimeout == null ? 100L : responseTimeout.toMillis());
- }
+ @Override
+ protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
+ Duration responseTimeout = this.properties.getResponseTimeout();
+ return new KafkaHealthIndicator(source, responseTimeout.toMillis());
}
}
diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java
index 0f619d00adf..fb156af68e1 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java
@@ -25,6 +25,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* Configuration properties for {@link KafkaHealthIndicator}.
*
* @author Juan Rada
+ * @since 2.0.0
*/
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
public class KafkaHealthIndicatorProperties {
@@ -32,7 +33,7 @@ public class KafkaHealthIndicatorProperties {
/**
* Time to wait for a response from the cluster description operation.
*/
- private Duration responseTimeout = Duration.ofMillis(100);
+ private Duration responseTimeout = Duration.ofMillis(1000);
public Duration getResponseTimeout() {
return this.responseTimeout;
diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java
index 57e9377fc62..35f3f26c685 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Auto-configuration for actuator kafka support.
+ * Auto-configuration for actuator Apache Kafka support.
*/
package org.springframework.boot.actuate.autoconfigure.kafka;
diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
index 554030d66d9..d60b5eb326c 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json
@@ -103,6 +103,12 @@
"description": "Whether to enable JMS health check.",
"defaultValue": true
},
+ {
+ "name": "management.health.kafka.enabled",
+ "type": "java.lang.Boolean",
+ "description": "Whether to enable Kafka health check.",
+ "defaultValue": true
+ },
{
"name": "management.health.ldap.enabled",
"type": "java.lang.Boolean",
@@ -145,12 +151,6 @@
"description": "Whether to enable Neo4j health check.",
"defaultValue": true
},
- {
- "name": "management.health.kafka.enabled",
- "type": "java.lang.Boolean",
- "description": "Whether to enable kafka health check.",
- "defaultValue": true
- },
{
"name": "management.info.build.enabled",
"type": "java.lang.Boolean",
diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories
index 9cf46eaf50b..40141644030 100644
--- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories
+++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories
@@ -11,7 +11,6 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\
-org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\
@@ -25,6 +24,7 @@ org.springframework.boot.actuate.autoconfigure.info.InfoEndpointAutoConfiguratio
org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.jms.JmsHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.jolokia.JolokiaEndpointAutoConfiguration,\
+org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.ldap.LdapHealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.liquibase.LiquibaseEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.logging.LogFileWebEndpointAutoConfiguration,\
diff --git a/spring-boot-project/spring-boot-actuator/pom.xml b/spring-boot-project/spring-boot-actuator/pom.xml
index 634c62ce16f..1bf30d2522c 100644
--- a/spring-boot-project/spring-boot-actuator/pom.xml
+++ b/spring-boot-project/spring-boot-actuator/pom.xml
@@ -172,11 +172,6 @@
spring-rabbit
true
-
- org.springframework.kafka
- spring-kafka
- true
-
org.springframework.data
spring-data-cassandra
@@ -230,6 +225,11 @@
spring-integration-core
true
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
org.springframework.security
spring-security-core
@@ -263,13 +263,13 @@
test
- org.springframework.kafka
- spring-kafka-test
+ org.springframework.boot
+ spring-boot-autoconfigure
test
- org.springframework.boot
- spring-boot-autoconfigure
+ org.springframework.kafka
+ spring-kafka-test
test
diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
index 8b75ff3cfed..8a2c505ffe7 100644
--- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
+++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.config.ConfigResource.Type;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.Assert;
@@ -43,37 +44,35 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
private final KafkaAdmin kafkaAdmin;
+
private final DescribeClusterOptions describeOptions;
/**
* Create a new {@link KafkaHealthIndicator} instance.
*
* @param kafkaAdmin the kafka admin
- * @param responseTimeout the describe cluster request timeout in milliseconds
+ * @param requestTimeout the request timeout in milliseconds
*/
- public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
+ public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
this.kafkaAdmin = kafkaAdmin;
this.describeOptions = new DescribeClusterOptions()
- .timeoutMs((int) responseTimeout);
+ .timeoutMs((int) requestTimeout);
}
@Override
protected void doHealthCheck(Builder builder) throws Exception {
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
- DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
+ DescribeClusterResult result = adminClient.describeCluster(
+ this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
- if (nodes >= replicationFactor) {
- builder.up();
- }
- else {
- builder.down();
- }
- builder.withDetail("clusterId", result.clusterId().get());
- builder.withDetail("brokerId", brokerId);
- builder.withDetail("nodes", nodes);
+ Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
+ builder.status(status)
+ .withDetail("clusterId", result.clusterId().get())
+ .withDetail("brokerId", brokerId)
+ .withDetail("nodes", nodes);
}
}
@@ -85,5 +84,6 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
Config brokerConfig = kafkaConfig.get(configResource);
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
}
+
}
diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java
index 1ed6f13e253..51978d8ac2c 100644
--- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java
+++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Actuator support for Kafka.
+ * Actuator support for Apache Kafka.
*/
package org.springframework.boot.actuate.kafka;
diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java
index 15bc0403c0a..8f646a76f6e 100644
--- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java
+++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java
@@ -20,78 +20,82 @@ import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.After;
import org.junit.Test;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.test.rule.KafkaEmbedded;
+import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Test for {@link KafkaHealthIndicator}
+ * Tests for {@link KafkaHealthIndicator}.
*
* @author Juan Rada
+ * @author Stephane Nicoll
*/
public class KafkaHealthIndicatorTests {
- private static final Long RESPONSE_TIME = 1000L;
-
private KafkaEmbedded kafkaEmbedded;
- private KafkaAdmin kafkaAdmin;
- private void startKafka(int replicationFactor) throws Exception {
- this.kafkaEmbedded = new KafkaEmbedded(1, true);
- this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
- KafkaHealthIndicator.REPLICATION_PROPERTY,
- String.valueOf(replicationFactor)));
- this.kafkaEmbedded.before();
- this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- this.kafkaEmbedded.getBrokersAsString()));
- }
+ private KafkaAdmin kafkaAdmin;
- private void shutdownKafka() throws Exception {
- this.kafkaEmbedded.destroy();
+ @After
+ public void shutdownKafka() throws Exception {
+ if (this.kafkaEmbedded != null) {
+ this.kafkaEmbedded.destroy();
+ }
}
-
@Test
public void kafkaIsUp() throws Exception {
startKafka(1);
KafkaHealthIndicator healthIndicator =
- new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
+ new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertDetails(health.getDetails());
- shutdownKafka();
}
- private void assertDetails(Map details) {
- assertThat(details).containsEntry("brokerId", "0");
- assertThat(details).containsKey("clusterId");
- assertThat(details).containsEntry("nodes", 1);
+ @Test
+ public void kafkaIsDown() {
+ int freePort = SocketUtils.findAvailableTcpPort();
+ this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
+ KafkaHealthIndicator healthIndicator =
+ new KafkaHealthIndicator(this.kafkaAdmin, 1L);
+ Health health = healthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ assertThat((String) health.getDetails().get("error")).isNotEmpty();
}
@Test
public void notEnoughNodesForReplicationFactor() throws Exception {
startKafka(2);
KafkaHealthIndicator healthIndicator =
- new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
+ new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertDetails(health.getDetails());
- shutdownKafka();
}
- @Test
- public void kafkaIsDown() throws Exception {
+ private void assertDetails(Map details) {
+ assertThat(details).containsEntry("brokerId", "0");
+ assertThat(details).containsKey("clusterId");
+ assertThat(details).containsEntry("nodes", 1);
+ }
+
+ private void startKafka(int replicationFactor) throws Exception {
+ this.kafkaEmbedded = new KafkaEmbedded(1, true);
+ this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
+ KafkaHealthIndicator.REPLICATION_PROPERTY,
+ String.valueOf(replicationFactor)));
+ this.kafkaEmbedded.before();
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987"));
- KafkaHealthIndicator healthIndicator =
- new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
- Health health = healthIndicator.health();
- assertThat(health.getStatus()).isEqualTo(Status.DOWN);
- assertThat((String) health.getDetails().get("error")).isNotEmpty();
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ this.kafkaEmbedded.getBrokersAsString()));
}
+
}
diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
index 92d70a1a556..ab580d1fc9d 100644
--- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
+++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc
@@ -1271,6 +1271,8 @@ content into your application. Rather, pick only the properties that you need.
management.health.elasticsearch.response-timeout=100ms # The time to wait for a response from the cluster.
management.health.influxdb.enabled=true # Whether to enable InfluxDB health check.
management.health.jms.enabled=true # Whether to enable JMS health check.
+ management.health.kafka.enabled=true # Whether to enable Kafka health check.
+ management.health.kafka.response-timeout=1000ms # Time to wait for a response from the cluster description operation.
management.health.ldap.enabled=true # Whether to enable LDAP health check.
management.health.mail.enabled=true # Whether to enable Mail health check.
management.health.mongo.enabled=true # Whether to enable MongoDB health check.
diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc
index 79324061681..23e7aae77d6 100644
--- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc
+++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc
@@ -572,6 +572,9 @@ The following `HealthIndicators` are auto-configured by Spring Boot when appropr
|{sc-spring-boot-actuator}/jms/JmsHealthIndicator.{sc-ext}[`JmsHealthIndicator`]
|Checks that a JMS broker is up.
+|{sc-spring-boot-actuator}/kafka/KafkaHealthIndicator.{sc-ext}[`KafkaHealthIndicator`]
+|Checks that a Kafka server is up.
+
|{sc-spring-boot-actuator}/mail/MailHealthIndicator.{sc-ext}[`MailHealthIndicator`]
|Checks that a mail server is up.