Browse Source
* pr/20887: Polish "Add health indicator for Cassandra that uses the CqlSession" Add health indicator for Cassandra that uses the CqlSession Closes gh-20887pull/21936/head
12 changed files with 494 additions and 73 deletions
@ -0,0 +1,101 @@
@@ -0,0 +1,101 @@
|
||||
/* |
||||
* Copyright 2012-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.autoconfigure.cassandra; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession; |
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthContributorConfiguration; |
||||
import org.springframework.boot.actuate.autoconfigure.health.CompositeReactiveHealthContributorConfiguration; |
||||
import org.springframework.boot.actuate.cassandra.CassandraDriverHealthIndicator; |
||||
import org.springframework.boot.actuate.cassandra.CassandraDriverReactiveHealthIndicator; |
||||
import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator; |
||||
import org.springframework.boot.actuate.cassandra.CassandraReactiveHealthIndicator; |
||||
import org.springframework.boot.actuate.health.HealthContributor; |
||||
import org.springframework.boot.actuate.health.ReactiveHealthContributor; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; |
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.data.cassandra.core.CassandraOperations; |
||||
import org.springframework.data.cassandra.core.ReactiveCassandraOperations; |
||||
|
||||
/** |
||||
* Health contributor options for Cassandra. |
||||
* |
||||
* @author Stephane Nicoll |
||||
*/ |
||||
class CassandraHealthContributorConfigurations { |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ConditionalOnBean(CqlSession.class) |
||||
static class CassandraDriverConfiguration |
||||
extends CompositeHealthContributorConfiguration<CassandraDriverHealthIndicator, CqlSession> { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" }) |
||||
HealthContributor cassandraHealthContributor(Map<String, CqlSession> sessions) { |
||||
return createContributor(sessions); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ConditionalOnClass(CassandraOperations.class) |
||||
@ConditionalOnBean(CassandraOperations.class) |
||||
static class CassandraOperationsConfiguration |
||||
extends CompositeHealthContributorConfiguration<CassandraHealthIndicator, CassandraOperations> { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" }) |
||||
HealthContributor cassandraHealthContributor(Map<String, CassandraOperations> cassandraOperations) { |
||||
return createContributor(cassandraOperations); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ConditionalOnBean(CqlSession.class) |
||||
static class CassandraReactiveDriverConfiguration extends |
||||
CompositeReactiveHealthContributorConfiguration<CassandraDriverReactiveHealthIndicator, CqlSession> { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" }) |
||||
ReactiveHealthContributor cassandraHealthContributor(Map<String, CqlSession> sessions) { |
||||
return createContributor(sessions); |
||||
} |
||||
|
||||
} |
||||
|
||||
@Configuration(proxyBeanMethods = false) |
||||
@ConditionalOnClass(ReactiveCassandraOperations.class) |
||||
@ConditionalOnBean(ReactiveCassandraOperations.class) |
||||
static class CassandraReactiveOperationsConfiguration extends |
||||
CompositeReactiveHealthContributorConfiguration<CassandraReactiveHealthIndicator, ReactiveCassandraOperations> { |
||||
|
||||
@Bean |
||||
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" }) |
||||
ReactiveHealthContributor cassandraHealthContributor( |
||||
Map<String, ReactiveCassandraOperations> reactiveCassandraOperations) { |
||||
return createContributor(reactiveCassandraOperations); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,62 @@
@@ -0,0 +1,62 @@
|
||||
/* |
||||
* Copyright 2012-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.cassandra; |
||||
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel; |
||||
import com.datastax.oss.driver.api.core.CqlSession; |
||||
import com.datastax.oss.driver.api.core.cql.Row; |
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
||||
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator; |
||||
import org.springframework.boot.actuate.health.Health; |
||||
import org.springframework.boot.actuate.health.HealthIndicator; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Simple implementation of a {@link HealthIndicator} returning status information for |
||||
* Cassandra data stores. |
||||
* |
||||
* @author Alexandre Dutra |
||||
* @since 2.4.0 |
||||
*/ |
||||
public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { |
||||
|
||||
private static final SimpleStatement SELECT = SimpleStatement |
||||
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); |
||||
|
||||
private final CqlSession session; |
||||
|
||||
/** |
||||
* Create a new {@link CassandraDriverHealthIndicator} instance. |
||||
* @param session the {@link CqlSession}. |
||||
*/ |
||||
public CassandraDriverHealthIndicator(CqlSession session) { |
||||
super("Cassandra health check failed"); |
||||
Assert.notNull(session, "session must not be null"); |
||||
this.session = session; |
||||
} |
||||
|
||||
@Override |
||||
protected void doHealthCheck(Health.Builder builder) throws Exception { |
||||
Row row = this.session.execute(SELECT).one(); |
||||
builder.up(); |
||||
if (row != null && !row.isNull(0)) { |
||||
builder.withDetail("version", row.getString(0)); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/* |
||||
* Copyright 2012-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.boot.actuate.cassandra; |
||||
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel; |
||||
import com.datastax.oss.driver.api.core.CqlSession; |
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; |
||||
import org.springframework.boot.actuate.health.Health; |
||||
import org.springframework.boot.actuate.health.ReactiveHealthIndicator; |
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* Simple implementation of a {@link ReactiveHealthIndicator} returning status information |
||||
* for Cassandra data stores. |
||||
* |
||||
* @author Alexandre Dutra |
||||
* @since 2.4.0 |
||||
*/ |
||||
public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator { |
||||
|
||||
private static final SimpleStatement SELECT = SimpleStatement |
||||
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); |
||||
|
||||
private final CqlSession session; |
||||
|
||||
/** |
||||
* Create a new {@link CassandraHealthIndicator} instance. |
||||
* @param session the {@link CqlSession}. |
||||
*/ |
||||
public CassandraDriverReactiveHealthIndicator(CqlSession session) { |
||||
super("Cassandra health check failed"); |
||||
Assert.notNull(session, "session must not be null"); |
||||
this.session = session; |
||||
} |
||||
|
||||
@Override |
||||
protected Mono<Health> doHealthCheck(Health.Builder builder) { |
||||
return Mono.from(this.session.executeReactive(SELECT)) |
||||
.map((row) -> builder.up().withDetail("version", row.getString(0)).build()); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,74 @@
@@ -0,0 +1,74 @@
|
||||
/* |
||||
* Copyright 2012-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.cassandra; |
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession; |
||||
import com.datastax.oss.driver.api.core.DriverTimeoutException; |
||||
import com.datastax.oss.driver.api.core.cql.ResultSet; |
||||
import com.datastax.oss.driver.api.core.cql.Row; |
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import org.springframework.boot.actuate.health.Health; |
||||
import org.springframework.boot.actuate.health.Status; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.BDDMockito.given; |
||||
import static org.mockito.Mockito.mock; |
||||
|
||||
/** |
||||
* Tests for {@link CassandraDriverHealthIndicator}. |
||||
* |
||||
* @author Alexandre Dutra |
||||
* @since 2.4.0 |
||||
*/ |
||||
class CassandraDriverHealthIndicatorTests { |
||||
|
||||
@Test |
||||
void createWhenCqlSessionIsNullShouldThrowException() { |
||||
assertThatIllegalArgumentException().isThrownBy(() -> new CassandraDriverHealthIndicator(null)); |
||||
} |
||||
|
||||
@Test |
||||
void healthWithCassandraUp() { |
||||
CqlSession session = mock(CqlSession.class); |
||||
ResultSet resultSet = mock(ResultSet.class); |
||||
Row row = mock(Row.class); |
||||
given(session.execute(any(SimpleStatement.class))).willReturn(resultSet); |
||||
given(resultSet.one()).willReturn(row); |
||||
given(row.isNull(0)).willReturn(false); |
||||
given(row.getString(0)).willReturn("1.0.0"); |
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); |
||||
Health health = healthIndicator.health(); |
||||
assertThat(health.getStatus()).isEqualTo(Status.UP); |
||||
assertThat(health.getDetails().get("version")).isEqualTo("1.0.0"); |
||||
} |
||||
|
||||
@Test |
||||
void healthWithCassandraDown() { |
||||
CqlSession session = mock(CqlSession.class); |
||||
given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception")); |
||||
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); |
||||
Health health = healthIndicator.health(); |
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN); |
||||
assertThat(health.getDetails().get("error")) |
||||
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception"); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,106 @@
@@ -0,0 +1,106 @@
|
||||
/* |
||||
* Copyright 2012-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.boot.actuate.cassandra; |
||||
|
||||
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet; |
||||
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow; |
||||
import com.datastax.oss.driver.api.core.CqlSession; |
||||
import com.datastax.oss.driver.api.core.DriverTimeoutException; |
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.mockito.stubbing.Answer; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import org.springframework.boot.actuate.health.Health; |
||||
import org.springframework.boot.actuate.health.Status; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.BDDMockito.doAnswer; |
||||
import static org.mockito.BDDMockito.given; |
||||
import static org.mockito.BDDMockito.mock; |
||||
|
||||
/** |
||||
* Tests for {@link CassandraDriverReactiveHealthIndicator}. |
||||
* |
||||
* @author Alexandre Dutra |
||||
* @since 2.4.0 |
||||
*/ |
||||
class CassandraDriverReactiveHealthIndicatorTests { |
||||
|
||||
@Test |
||||
void createWhenCqlSessionIsNullShouldThrowException() { |
||||
assertThatIllegalArgumentException().isThrownBy(() -> new CassandraDriverReactiveHealthIndicator(null)); |
||||
} |
||||
|
||||
@Test |
||||
void testCassandraIsUp() { |
||||
CqlSession session = mock(CqlSession.class); |
||||
ReactiveResultSet results = mock(ReactiveResultSet.class); |
||||
ReactiveRow row = mock(ReactiveRow.class); |
||||
given(session.executeReactive(any(SimpleStatement.class))).willReturn(results); |
||||
doAnswer(mockReactiveResultSetBehavior(row)).when(results).subscribe(any()); |
||||
given(row.getString(0)).willReturn("6.0.0"); |
||||
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( |
||||
session); |
||||
Mono<Health> health = cassandraReactiveHealthIndicator.health(); |
||||
StepVerifier.create(health).consumeNextWith((h) -> { |
||||
assertThat(h.getStatus()).isEqualTo(Status.UP); |
||||
assertThat(h.getDetails()).containsOnlyKeys("version"); |
||||
assertThat(h.getDetails().get("version")).isEqualTo("6.0.0"); |
||||
}).verifyComplete(); |
||||
} |
||||
|
||||
@Test |
||||
void testCassandraIsDown() { |
||||
CqlSession session = mock(CqlSession.class); |
||||
given(session.executeReactive(any(SimpleStatement.class))) |
||||
.willThrow(new DriverTimeoutException("Test Exception")); |
||||
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator( |
||||
session); |
||||
Mono<Health> health = cassandraReactiveHealthIndicator.health(); |
||||
StepVerifier.create(health).consumeNextWith((h) -> { |
||||
assertThat(h.getStatus()).isEqualTo(Status.DOWN); |
||||
assertThat(h.getDetails()).containsOnlyKeys("error"); |
||||
assertThat(h.getDetails().get("error")) |
||||
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception"); |
||||
}).verifyComplete(); |
||||
} |
||||
|
||||
private Answer<Void> mockReactiveResultSetBehavior(ReactiveRow row) { |
||||
return (invocation) -> { |
||||
Subscriber<ReactiveRow> subscriber = invocation.getArgument(0); |
||||
Subscription s = new Subscription() { |
||||
@Override |
||||
public void request(long n) { |
||||
subscriber.onNext(row); |
||||
subscriber.onComplete(); |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
} |
||||
}; |
||||
subscriber.onSubscribe(s); |
||||
return null; |
||||
}; |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue