From 7cd19822c6de99e835bcaff1307f104e863da265 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 8 Feb 2018 11:54:27 +0100 Subject: [PATCH] Polish "Add Kafka health indicator" Closes gh-11515 --- .../pom.xml | 10 +-- ...KafkaHealthIndicatorAutoConfiguration.java | 48 ++++++------- .../kafka/KafkaHealthIndicatorProperties.java | 3 +- .../autoconfigure/kafka/package-info.java | 2 +- ...itional-spring-configuration-metadata.json | 12 ++-- .../main/resources/META-INF/spring.factories | 2 +- .../spring-boot-actuator/pom.xml | 18 ++--- .../actuate/kafka/KafkaHealthIndicator.java | 26 +++---- .../boot/actuate/kafka/package-info.java | 2 +- .../kafka/KafkaHealthIndicatorTests.java | 70 ++++++++++--------- .../appendix-application-properties.adoc | 2 + .../asciidoc/production-ready-features.adoc | 3 + 12 files changed, 102 insertions(+), 96 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml index 6632d5a355b..d0151ebc913 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml @@ -193,11 +193,6 @@ elasticsearch true - - org.springframework.kafka - spring-kafka - true - org.flywaydb flyway-core @@ -311,6 +306,11 @@ spring-integration-core true + + org.springframework.kafka + spring-kafka + true + org.springframework.security spring-security-config diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java index 8b5ecbf1471..04c468f277e 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java @@ -28,6 +28,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -41,40 +42,35 @@ import org.springframework.kafka.core.KafkaAdmin; * @author Juan Rada */ @Configuration +@ConditionalOnClass(KafkaAdmin.class) +@ConditionalOnBean(KafkaAdmin.class) @ConditionalOnEnabledHealthIndicator("kafka") @AutoConfigureBefore(HealthIndicatorAutoConfiguration.class) @AutoConfigureAfter(KafkaAutoConfiguration.class) -public class KafkaHealthIndicatorAutoConfiguration { +@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) +public class KafkaHealthIndicatorAutoConfiguration extends + CompositeHealthIndicatorConfiguration { - @Configuration - @ConditionalOnBean(KafkaAdmin.class) - @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) - static class KafkaClientHealthIndicatorConfiguration extends - CompositeHealthIndicatorConfiguration { + private final Map admins; - private final Map admins; + private final KafkaHealthIndicatorProperties properties; - private final KafkaHealthIndicatorProperties properties; - - KafkaClientHealthIndicatorConfiguration(Map admins, - KafkaHealthIndicatorProperties properties) { - this.admins = admins; - this.properties = properties; - } - - @Bean - @ConditionalOnMissingBean(name = "kafkaHealthIndicator") - public HealthIndicator kafkaHealthIndicator() { - return createHealthIndicator(this.admins); - } + KafkaHealthIndicatorAutoConfiguration(Map admins, + KafkaHealthIndicatorProperties properties) { + this.admins = admins; + this.properties = properties; + } - @Override - protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) { - Duration responseTimeout = this.properties.getResponseTimeout(); + @Bean + @ConditionalOnMissingBean(name = "kafkaHealthIndicator") + public HealthIndicator kafkaHealthIndicator() { + return createHealthIndicator(this.admins); + } - return new KafkaHealthIndicator(source, - responseTimeout == null ? 100L : responseTimeout.toMillis()); - } + @Override + protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) { + Duration responseTimeout = this.properties.getResponseTimeout(); + return new KafkaHealthIndicator(source, responseTimeout.toMillis()); } } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java index 0f619d00adf..fb156af68e1 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java @@ -25,6 +25,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; * Configuration properties for {@link KafkaHealthIndicator}. * * @author Juan Rada + * @since 2.0.0 */ @ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false) public class KafkaHealthIndicatorProperties { @@ -32,7 +33,7 @@ public class KafkaHealthIndicatorProperties { /** * Time to wait for a response from the cluster description operation. */ - private Duration responseTimeout = Duration.ofMillis(100); + private Duration responseTimeout = Duration.ofMillis(1000); public Duration getResponseTimeout() { return this.responseTimeout; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java index 57e9377fc62..35f3f26c685 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java @@ -15,6 +15,6 @@ */ /** - * Auto-configuration for actuator kafka support. + * Auto-configuration for actuator Apache Kafka support. */ package org.springframework.boot.actuate.autoconfigure.kafka; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 554030d66d9..d60b5eb326c 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -103,6 +103,12 @@ "description": "Whether to enable JMS health check.", "defaultValue": true }, + { + "name": "management.health.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Whether to enable Kafka health check.", + "defaultValue": true + }, { "name": "management.health.ldap.enabled", "type": "java.lang.Boolean", @@ -145,12 +151,6 @@ "description": "Whether to enable Neo4j health check.", "defaultValue": true }, - { - "name": "management.health.kafka.enabled", - "type": "java.lang.Boolean", - "description": "Whether to enable kafka health check.", - "defaultValue": true - }, { "name": "management.info.build.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories index 9cf46eaf50b..40141644030 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories @@ -11,7 +11,6 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\ -org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\ @@ -25,6 +24,7 @@ org.springframework.boot.actuate.autoconfigure.info.InfoEndpointAutoConfiguratio org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.jms.JmsHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.jolokia.JolokiaEndpointAutoConfiguration,\ +org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.ldap.LdapHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.liquibase.LiquibaseEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.logging.LogFileWebEndpointAutoConfiguration,\ diff --git a/spring-boot-project/spring-boot-actuator/pom.xml b/spring-boot-project/spring-boot-actuator/pom.xml index 634c62ce16f..1bf30d2522c 100644 --- a/spring-boot-project/spring-boot-actuator/pom.xml +++ b/spring-boot-project/spring-boot-actuator/pom.xml @@ -172,11 +172,6 @@ spring-rabbit true - - org.springframework.kafka - spring-kafka - true - org.springframework.data spring-data-cassandra @@ -230,6 +225,11 @@ spring-integration-core true + + org.springframework.kafka + spring-kafka + true + org.springframework.security spring-security-core @@ -263,13 +263,13 @@ test - org.springframework.kafka - spring-kafka-test + org.springframework.boot + spring-boot-autoconfigure test - org.springframework.boot - spring-boot-autoconfigure + org.springframework.kafka + spring-kafka-test test diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java index 8b75ff3cfed..8a2c505ffe7 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.config.ConfigResource.Type; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health.Builder; import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.actuate.health.Status; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.util.Assert; @@ -43,37 +44,35 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator { static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor"; private final KafkaAdmin kafkaAdmin; + private final DescribeClusterOptions describeOptions; /** * Create a new {@link KafkaHealthIndicator} instance. * * @param kafkaAdmin the kafka admin - * @param responseTimeout the describe cluster request timeout in milliseconds + * @param requestTimeout the request timeout in milliseconds */ - public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) { + public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) { Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null"); this.kafkaAdmin = kafkaAdmin; this.describeOptions = new DescribeClusterOptions() - .timeoutMs((int) responseTimeout); + .timeoutMs((int) requestTimeout); } @Override protected void doHealthCheck(Builder builder) throws Exception { try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) { - DescribeClusterResult result = adminClient.describeCluster(this.describeOptions); + DescribeClusterResult result = adminClient.describeCluster( + this.describeOptions); String brokerId = result.controller().get().idString(); int replicationFactor = getReplicationFactor(brokerId, adminClient); int nodes = result.nodes().get().size(); - if (nodes >= replicationFactor) { - builder.up(); - } - else { - builder.down(); - } - builder.withDetail("clusterId", result.clusterId().get()); - builder.withDetail("brokerId", brokerId); - builder.withDetail("nodes", nodes); + Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN; + builder.status(status) + .withDetail("clusterId", result.clusterId().get()) + .withDetail("brokerId", brokerId) + .withDetail("nodes", nodes); } } @@ -85,5 +84,6 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator { Config brokerConfig = kafkaConfig.get(configResource); return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value()); } + } diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java index 1ed6f13e253..51978d8ac2c 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java @@ -15,6 +15,6 @@ */ /** - * Actuator support for Kafka. + * Actuator support for Apache Kafka. */ package org.springframework.boot.actuate.kafka; diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index 15bc0403c0a..8f646a76f6e 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -20,78 +20,82 @@ import java.util.Collections; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.After; import org.junit.Test; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.util.SocketUtils; import static org.assertj.core.api.Assertions.assertThat; /** - * Test for {@link KafkaHealthIndicator} + * Tests for {@link KafkaHealthIndicator}. * * @author Juan Rada + * @author Stephane Nicoll */ public class KafkaHealthIndicatorTests { - private static final Long RESPONSE_TIME = 1000L; - private KafkaEmbedded kafkaEmbedded; - private KafkaAdmin kafkaAdmin; - private void startKafka(int replicationFactor) throws Exception { - this.kafkaEmbedded = new KafkaEmbedded(1, true); - this.kafkaEmbedded.brokerProperties(Collections.singletonMap( - KafkaHealthIndicator.REPLICATION_PROPERTY, - String.valueOf(replicationFactor))); - this.kafkaEmbedded.before(); - this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.kafkaEmbedded.getBrokersAsString())); - } + private KafkaAdmin kafkaAdmin; - private void shutdownKafka() throws Exception { - this.kafkaEmbedded.destroy(); + @After + public void shutdownKafka() throws Exception { + if (this.kafkaEmbedded != null) { + this.kafkaEmbedded.destroy(); + } } - @Test public void kafkaIsUp() throws Exception { startKafka(1); KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + new KafkaHealthIndicator(this.kafkaAdmin, 1000L); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); assertDetails(health.getDetails()); - shutdownKafka(); } - private void assertDetails(Map details) { - assertThat(details).containsEntry("brokerId", "0"); - assertThat(details).containsKey("clusterId"); - assertThat(details).containsEntry("nodes", 1); + @Test + public void kafkaIsDown() { + int freePort = SocketUtils.findAvailableTcpPort(); + this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort)); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, 1L); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + assertThat((String) health.getDetails().get("error")).isNotEmpty(); } @Test public void notEnoughNodesForReplicationFactor() throws Exception { startKafka(2); KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + new KafkaHealthIndicator(this.kafkaAdmin, 1000L); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); assertDetails(health.getDetails()); - shutdownKafka(); } - @Test - public void kafkaIsDown() throws Exception { + private void assertDetails(Map details) { + assertThat(details).containsEntry("brokerId", "0"); + assertThat(details).containsKey("clusterId"); + assertThat(details).containsEntry("nodes", 1); + } + + private void startKafka(int replicationFactor) throws Exception { + this.kafkaEmbedded = new KafkaEmbedded(1, true); + this.kafkaEmbedded.brokerProperties(Collections.singletonMap( + KafkaHealthIndicator.REPLICATION_PROPERTY, + String.valueOf(replicationFactor))); + this.kafkaEmbedded.before(); this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987")); - KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); - Health health = healthIndicator.health(); - assertThat(health.getStatus()).isEqualTo(Status.DOWN); - assertThat((String) health.getDetails().get("error")).isNotEmpty(); + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.kafkaEmbedded.getBrokersAsString())); } + } diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 92d70a1a556..ab580d1fc9d 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -1271,6 +1271,8 @@ content into your application. Rather, pick only the properties that you need. management.health.elasticsearch.response-timeout=100ms # The time to wait for a response from the cluster. management.health.influxdb.enabled=true # Whether to enable InfluxDB health check. management.health.jms.enabled=true # Whether to enable JMS health check. + management.health.kafka.enabled=true # Whether to enable Kafka health check. + management.health.kafka.response-timeout=1000ms # Time to wait for a response from the cluster description operation. management.health.ldap.enabled=true # Whether to enable LDAP health check. management.health.mail.enabled=true # Whether to enable Mail health check. management.health.mongo.enabled=true # Whether to enable MongoDB health check. diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc index 79324061681..23e7aae77d6 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/production-ready-features.adoc @@ -572,6 +572,9 @@ The following `HealthIndicators` are auto-configured by Spring Boot when appropr |{sc-spring-boot-actuator}/jms/JmsHealthIndicator.{sc-ext}[`JmsHealthIndicator`] |Checks that a JMS broker is up. +|{sc-spring-boot-actuator}/kafka/KafkaHealthIndicator.{sc-ext}[`KafkaHealthIndicator`] +|Checks that a Kafka server is up. + |{sc-spring-boot-actuator}/mail/MailHealthIndicator.{sc-ext}[`MailHealthIndicator`] |Checks that a mail server is up.