diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml index 37f7af10a8c..6632d5a355b 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml @@ -193,6 +193,11 @@ elasticsearch true + + org.springframework.kafka + spring-kafka + true + org.flywaydb flyway-core diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java new file mode 100644 index 00000000000..8b5ecbf1471 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import java.time.Duration; +import java.util.Map; + +import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration; +import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; +import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +/** + * {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}. + * + * @author Juan Rada + */ +@Configuration +@ConditionalOnEnabledHealthIndicator("kafka") +@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class) +@AutoConfigureAfter(KafkaAutoConfiguration.class) +public class KafkaHealthIndicatorAutoConfiguration { + + @Configuration + @ConditionalOnBean(KafkaAdmin.class) + @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) + static class KafkaClientHealthIndicatorConfiguration extends + CompositeHealthIndicatorConfiguration { + + private final Map admins; + + private final KafkaHealthIndicatorProperties properties; + + KafkaClientHealthIndicatorConfiguration(Map admins, + KafkaHealthIndicatorProperties properties) { + this.admins = admins; + this.properties = properties; + } + + @Bean + @ConditionalOnMissingBean(name = "kafkaHealthIndicator") + public HealthIndicator kafkaHealthIndicator() { + return createHealthIndicator(this.admins); + } + + @Override + protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) { + Duration responseTimeout = this.properties.getResponseTimeout(); + + return new KafkaHealthIndicator(source, + responseTimeout == null ? 100L : responseTimeout.toMillis()); + } + } + +} diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java new file mode 100644 index 00000000000..0f619d00adf --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java @@ -0,0 +1,45 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import java.time.Duration; + +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for {@link KafkaHealthIndicator}. + * + * @author Juan Rada + */ +@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false) +public class KafkaHealthIndicatorProperties { + + /** + * Time to wait for a response from the cluster description operation. + */ + private Duration responseTimeout = Duration.ofMillis(100); + + public Duration getResponseTimeout() { + return this.responseTimeout; + } + + public void setResponseTimeout(Duration responseTimeout) { + this.responseTimeout = responseTimeout; + } + +} diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java new file mode 100644 index 00000000000..57e9377fc62 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Auto-configuration for actuator kafka support. + */ +package org.springframework.boot.actuate.autoconfigure.kafka; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 192146c0d96..554030d66d9 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -145,6 +145,12 @@ "description": "Whether to enable Neo4j health check.", "defaultValue": true }, + { + "name": "management.health.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Whether to enable kafka health check.", + "defaultValue": true + }, { "name": "management.info.build.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories index 962f19e19e4..9cf46eaf50b 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories @@ -11,6 +11,7 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\ +org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\ diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java new file mode 100644 index 00000000000..41be4c994a1 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import org.junit.Test; + +import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; +import org.springframework.boot.actuate.health.ApplicationHealthIndicator; +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link KafkaHealthIndicatorAutoConfiguration}. + * + * @author Juan Rada + */ +public class KafkaHealthIndicatorAutoConfigurationTests { + + private ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class, + KafkaHealthIndicatorAutoConfiguration.class, + HealthIndicatorAutoConfiguration.class)); + + @Test + public void runShouldCreateIndicator() { + this.contextRunner.run((context) -> assertThat(context) + .hasSingleBean(KafkaHealthIndicator.class) + .doesNotHaveBean(ApplicationHealthIndicator.class)); + } + + @Test + public void runWhenDisabledShouldNotCreateIndicator() { + this.contextRunner.withPropertyValues("management.health.kafka.enabled:false") + .run((context) -> assertThat(context) + .doesNotHaveBean(KafkaHealthIndicator.class) + .hasSingleBean(ApplicationHealthIndicator.class)); + } +} diff --git a/spring-boot-project/spring-boot-actuator/pom.xml b/spring-boot-project/spring-boot-actuator/pom.xml index 3eacb314c68..634c62ce16f 100644 --- a/spring-boot-project/spring-boot-actuator/pom.xml +++ b/spring-boot-project/spring-boot-actuator/pom.xml @@ -172,6 +172,11 @@ spring-rabbit true + + org.springframework.kafka + spring-kafka + true + org.springframework.data spring-data-cassandra @@ -257,6 +262,11 @@ spring-boot-test-support test + + org.springframework.kafka + spring-kafka-test + test + org.springframework.boot spring-boot-autoconfigure @@ -267,6 +277,11 @@ log4j-slf4j-impl test + + org.slf4j + log4j-over-slf4j + test + org.apache.logging.log4j log4j-api diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java new file mode 100644 index 00000000000..8b75ff3cfed --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -0,0 +1,89 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.kafka; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigResource.Type; + +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health.Builder; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.util.Assert; + +/** + * {@link HealthIndicator} for Kafka cluster. + * + * @author Juan Rada + */ +public class KafkaHealthIndicator extends AbstractHealthIndicator { + + static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor"; + + private final KafkaAdmin kafkaAdmin; + private final DescribeClusterOptions describeOptions; + + /** + * Create a new {@link KafkaHealthIndicator} instance. + * + * @param kafkaAdmin the kafka admin + * @param responseTimeout the describe cluster request timeout in milliseconds + */ + public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) { + Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null"); + this.kafkaAdmin = kafkaAdmin; + this.describeOptions = new DescribeClusterOptions() + .timeoutMs((int) responseTimeout); + } + + @Override + protected void doHealthCheck(Builder builder) throws Exception { + try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) { + DescribeClusterResult result = adminClient.describeCluster(this.describeOptions); + String brokerId = result.controller().get().idString(); + int replicationFactor = getReplicationFactor(brokerId, adminClient); + int nodes = result.nodes().get().size(); + if (nodes >= replicationFactor) { + builder.up(); + } + else { + builder.down(); + } + builder.withDetail("clusterId", result.clusterId().get()); + builder.withDetail("brokerId", brokerId); + builder.withDetail("nodes", nodes); + } + } + + private int getReplicationFactor(String brokerId, + AdminClient adminClient) throws ExecutionException, InterruptedException { + ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId); + Map kafkaConfig = adminClient + .describeConfigs(Collections.singletonList(configResource)).all().get(); + Config brokerConfig = kafkaConfig.get(configResource); + return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value()); + } +} + diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java new file mode 100644 index 00000000000..1ed6f13e253 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Actuator support for Kafka. + */ +package org.springframework.boot.actuate.kafka; diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java new file mode 100644 index 00000000000..15bc0403c0a --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -0,0 +1,97 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.kafka; + +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.Test; + +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.test.rule.KafkaEmbedded; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link KafkaHealthIndicator} + * + * @author Juan Rada + */ +public class KafkaHealthIndicatorTests { + + private static final Long RESPONSE_TIME = 1000L; + + private KafkaEmbedded kafkaEmbedded; + private KafkaAdmin kafkaAdmin; + + private void startKafka(int replicationFactor) throws Exception { + this.kafkaEmbedded = new KafkaEmbedded(1, true); + this.kafkaEmbedded.brokerProperties(Collections.singletonMap( + KafkaHealthIndicator.REPLICATION_PROPERTY, + String.valueOf(replicationFactor))); + this.kafkaEmbedded.before(); + this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.kafkaEmbedded.getBrokersAsString())); + } + + private void shutdownKafka() throws Exception { + this.kafkaEmbedded.destroy(); + } + + @Test + public void kafkaIsUp() throws Exception { + startKafka(1); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + assertDetails(health.getDetails()); + shutdownKafka(); + } + + private void assertDetails(Map details) { + assertThat(details).containsEntry("brokerId", "0"); + assertThat(details).containsKey("clusterId"); + assertThat(details).containsEntry("nodes", 1); + } + + @Test + public void notEnoughNodesForReplicationFactor() throws Exception { + startKafka(2); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + assertDetails(health.getDetails()); + shutdownKafka(); + } + + @Test + public void kafkaIsDown() throws Exception { + this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987")); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + assertThat((String) health.getDetails().get("error")).isNotEmpty(); + } +}