Browse Source
* pr/14353: Polish "Add Prometheus push gateway support" Add Prometheus push gateway supportpull/14707/head
9 changed files with 606 additions and 1 deletions
@ -0,0 +1,201 @@
@@ -0,0 +1,201 @@
|
||||
/* |
||||
* Copyright 2012-2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.metrics.export.prometheus; |
||||
|
||||
import java.net.UnknownHostException; |
||||
import java.time.Duration; |
||||
import java.util.Map; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ScheduledExecutorService; |
||||
import java.util.concurrent.ScheduledFuture; |
||||
|
||||
import io.prometheus.client.CollectorRegistry; |
||||
import io.prometheus.client.exporter.PushGateway; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import org.springframework.scheduling.TaskScheduler; |
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
/** |
||||
* Class that can be used to managed the pushing of metrics to a {@link PushGateway |
||||
* Prometheus PushGateway}. Handles the scheduling of push operations, error handling and |
||||
* shutdown operations. |
||||
* |
||||
* @author David J. M. Karlsen |
||||
* @author Phillip Webb |
||||
* @since 2.1.0 |
||||
*/ |
||||
public class PrometheusPushGatewayManager { |
||||
|
||||
private static final Logger logger = LoggerFactory |
||||
.getLogger(PrometheusPushGatewayManager.class); |
||||
|
||||
private final PushGateway pushGateway; |
||||
|
||||
private final CollectorRegistry registry; |
||||
|
||||
private final String job; |
||||
|
||||
private final Map<String, String> groupingKey; |
||||
|
||||
private final ShutdownOperation shutdownOperation; |
||||
|
||||
private final TaskScheduler scheduler; |
||||
|
||||
private ScheduledFuture<?> scheduled; |
||||
|
||||
/** |
||||
* Create a new {@link PrometheusPushGatewayManager} instance using a single threaded |
||||
* {@link TaskScheduler}. |
||||
* @param pushGateway the source push gateway |
||||
* @param registry the collector registry to push |
||||
* @param pushRate the rate at which push operations occur |
||||
* @param job the job ID for the operation |
||||
* @param groupingKeys an optional set of grouping keys for the operation |
||||
* @param shutdownOperation the shutdown operation that should be performed when |
||||
* context is closed. |
||||
*/ |
||||
public PrometheusPushGatewayManager(PushGateway pushGateway, |
||||
CollectorRegistry registry, Duration pushRate, String job, |
||||
Map<String, String> groupingKeys, ShutdownOperation shutdownOperation) { |
||||
this(pushGateway, registry, new PushGatewayTaskScheduler(), pushRate, job, |
||||
groupingKeys, shutdownOperation); |
||||
} |
||||
|
||||
/** |
||||
* Create a new {@link PrometheusPushGatewayManager} instance. |
||||
* @param pushGateway the source push gateway |
||||
* @param registry the collector registry to push |
||||
* @param scheduler the scheduler used for operations |
||||
* @param pushRate the rate at which push operations occur |
||||
* @param job the job ID for the operation |
||||
* @param groupingKey an optional set of grouping keys for the operation |
||||
* @param shutdownOperation the shutdown operation that should be performed when |
||||
* context is closed. |
||||
*/ |
||||
public PrometheusPushGatewayManager(PushGateway pushGateway, |
||||
CollectorRegistry registry, TaskScheduler scheduler, Duration pushRate, |
||||
String job, Map<String, String> groupingKey, |
||||
ShutdownOperation shutdownOperation) { |
||||
Assert.notNull(pushGateway, "PushGateway must not be null"); |
||||
Assert.notNull(registry, "Registry must not be null"); |
||||
Assert.notNull(scheduler, "Scheduler must not be null"); |
||||
Assert.notNull(pushRate, "PushRate must not be null"); |
||||
Assert.hasLength(job, "Job must not be empty"); |
||||
this.pushGateway = pushGateway; |
||||
this.registry = registry; |
||||
this.job = job; |
||||
this.groupingKey = groupingKey; |
||||
this.shutdownOperation = (shutdownOperation != null) ? shutdownOperation |
||||
: ShutdownOperation.NONE; |
||||
this.scheduler = scheduler; |
||||
this.scheduled = this.scheduler.scheduleAtFixedRate(this::push, pushRate); |
||||
} |
||||
|
||||
private void push() { |
||||
try { |
||||
this.pushGateway.pushAdd(this.registry, this.job, this.groupingKey); |
||||
} |
||||
catch (UnknownHostException ex) { |
||||
String host = ex.getMessage(); |
||||
String message = "Unable to locate prometheus push gateway host"; |
||||
message += StringUtils.hasLength(host) ? " '" + host + "'" : ""; |
||||
message += ". No longer attempting metrics publication to this host"; |
||||
logger.error(message, ex); |
||||
shutdown(ShutdownOperation.NONE); |
||||
} |
||||
catch (Throwable ex) { |
||||
logger.error("Unable to push metrics to Prometheus Pushgateway", ex); |
||||
} |
||||
} |
||||
|
||||
private void delete() { |
||||
try { |
||||
this.pushGateway.delete(this.job, this.groupingKey); |
||||
} |
||||
catch (Throwable ex) { |
||||
logger.error("Unable to delete metrics from Prometheus Pushgateway", ex); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Shutdown the manager, running any {@link ShutdownOperation}. |
||||
*/ |
||||
public void shutdown() { |
||||
shutdown(this.shutdownOperation); |
||||
} |
||||
|
||||
private void shutdown(ShutdownOperation shutdownOperation) { |
||||
if (this.scheduler instanceof PushGatewayTaskScheduler) { |
||||
((PushGatewayTaskScheduler) this.scheduler).shutdown(); |
||||
} |
||||
this.scheduled.cancel(false); |
||||
switch (shutdownOperation) { |
||||
case PUSH: |
||||
push(); |
||||
break; |
||||
case DELETE: |
||||
delete(); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The operation that should be performed on shutdown. |
||||
*/ |
||||
public enum ShutdownOperation { |
||||
|
||||
/** |
||||
* Don't perform any shutdown operation. |
||||
*/ |
||||
NONE, |
||||
|
||||
/** |
||||
* Perform a 'push' before shutdown. |
||||
*/ |
||||
PUSH, |
||||
|
||||
/** |
||||
* Perform a 'delete' before shutdown. |
||||
*/ |
||||
DELETE |
||||
|
||||
} |
||||
|
||||
/** |
||||
* {@link TaskScheduler} used when the user doesn't specify one. |
||||
*/ |
||||
static class PushGatewayTaskScheduler extends ThreadPoolTaskScheduler { |
||||
|
||||
PushGatewayTaskScheduler() { |
||||
setPoolSize(1); |
||||
setDaemon(true); |
||||
setThreadGroupName("prometheus-push-gateway"); |
||||
} |
||||
|
||||
@Override |
||||
public ScheduledExecutorService getScheduledExecutor() |
||||
throws IllegalStateException { |
||||
return Executors.newSingleThreadScheduledExecutor(this::newThread); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,215 @@
@@ -0,0 +1,215 @@
|
||||
/* |
||||
* Copyright 2012-2018 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.metrics.export.prometheus; |
||||
|
||||
import java.net.UnknownHostException; |
||||
import java.time.Duration; |
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ScheduledFuture; |
||||
|
||||
import io.prometheus.client.CollectorRegistry; |
||||
import io.prometheus.client.exporter.PushGateway; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Captor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.MockitoAnnotations; |
||||
|
||||
import org.springframework.boot.actuate.metrics.export.prometheus.PrometheusPushGatewayManager.PushGatewayTaskScheduler; |
||||
import org.springframework.boot.actuate.metrics.export.prometheus.PrometheusPushGatewayManager.ShutdownOperation; |
||||
import org.springframework.scheduling.TaskScheduler; |
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.ArgumentMatchers.isA; |
||||
import static org.mockito.BDDMockito.given; |
||||
import static org.mockito.BDDMockito.willThrow; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.never; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyZeroInteractions; |
||||
|
||||
/** |
||||
* Tests for {@link PrometheusPushGatewayManager}. |
||||
* |
||||
* @author Phillip Webb |
||||
*/ |
||||
public class PrometheusPushGatewayManagerTests { |
||||
|
||||
@Mock |
||||
private PushGateway pushGateway; |
||||
|
||||
@Mock |
||||
private CollectorRegistry registry; |
||||
|
||||
private TaskScheduler scheduler; |
||||
|
||||
private Duration pushRate = Duration.ofSeconds(1); |
||||
|
||||
private Map<String, String> groupingKey = Collections.singletonMap("foo", "bar"); |
||||
|
||||
@Captor |
||||
private ArgumentCaptor<Runnable> task; |
||||
|
||||
@Mock |
||||
private ScheduledFuture<Object> future; |
||||
|
||||
@Before |
||||
public void setup() { |
||||
MockitoAnnotations.initMocks(this); |
||||
this.scheduler = mockScheduler(TaskScheduler.class); |
||||
} |
||||
|
||||
@Test |
||||
public void createWhenPushGatewayIsNullThrowsException() { |
||||
assertThatIllegalArgumentException() |
||||
.isThrownBy(() -> new PrometheusPushGatewayManager(null, this.registry, |
||||
this.scheduler, this.pushRate, "job", this.groupingKey, null)) |
||||
.withMessage("PushGateway must not be null"); |
||||
} |
||||
|
||||
@Test |
||||
public void createWhenCollectorRegistryIsNullThrowsException() { |
||||
assertThatIllegalArgumentException() |
||||
.isThrownBy(() -> new PrometheusPushGatewayManager(this.pushGateway, null, |
||||
this.scheduler, this.pushRate, "job", this.groupingKey, null)) |
||||
.withMessage("Registry must not be null"); |
||||
} |
||||
|
||||
@Test |
||||
public void createWhenSchedulerIsNullThrowsException() { |
||||
assertThatIllegalArgumentException().isThrownBy( |
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry, |
||||
null, this.pushRate, "job", this.groupingKey, null)) |
||||
.withMessage("Scheduler must not be null"); |
||||
} |
||||
|
||||
@Test |
||||
public void createWhenPushRateIsNullThrowsException() { |
||||
assertThatIllegalArgumentException().isThrownBy( |
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry, |
||||
this.scheduler, null, "job", this.groupingKey, null)) |
||||
.withMessage("PushRate must not be null"); |
||||
} |
||||
|
||||
@Test |
||||
public void createWhenJobIsEmptyThrowsException() { |
||||
assertThatIllegalArgumentException().isThrownBy( |
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry, |
||||
this.scheduler, this.pushRate, "", this.groupingKey, null)) |
||||
.withMessage("Job must not be empty"); |
||||
} |
||||
|
||||
@Test |
||||
public void createShouldSchedulePushAsFixedRate() throws Exception { |
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler, |
||||
this.pushRate, "job", this.groupingKey, null); |
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(), |
||||
eq(this.pushRate)); |
||||
this.task.getValue().run(); |
||||
verify(this.pushGateway).pushAdd(this.registry, "job", this.groupingKey); |
||||
} |
||||
|
||||
@Test |
||||
public void shutdownWhenOwnsSchedulerDoesShutdownScheduler() { |
||||
PushGatewayTaskScheduler ownedScheduler = mockScheduler( |
||||
PushGatewayTaskScheduler.class); |
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager( |
||||
this.pushGateway, this.registry, ownedScheduler, this.pushRate, "job", |
||||
this.groupingKey, null); |
||||
manager.shutdown(); |
||||
verify(ownedScheduler).shutdown(); |
||||
} |
||||
|
||||
@Test |
||||
public void shutdownWhenDoesNotOwnSchedulerDoesNotShutdownScheduler() { |
||||
ThreadPoolTaskScheduler otherScheduler = mockScheduler( |
||||
ThreadPoolTaskScheduler.class); |
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager( |
||||
this.pushGateway, this.registry, otherScheduler, this.pushRate, "job", |
||||
this.groupingKey, null); |
||||
manager.shutdown(); |
||||
verify(otherScheduler, never()).shutdown(); |
||||
} |
||||
|
||||
@Test |
||||
public void shutdownWhenShutdownOperationIsPushPerformsPushOnShutdown() |
||||
throws Exception { |
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager( |
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job", |
||||
this.groupingKey, ShutdownOperation.PUSH); |
||||
manager.shutdown(); |
||||
verify(this.future).cancel(false); |
||||
verify(this.pushGateway).pushAdd(this.registry, "job", this.groupingKey); |
||||
} |
||||
|
||||
@Test |
||||
public void shutdownWhenShutdownOperationIsDeletePerformsDeleteOnShutdown() |
||||
throws Exception { |
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager( |
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job", |
||||
this.groupingKey, ShutdownOperation.DELETE); |
||||
manager.shutdown(); |
||||
verify(this.future).cancel(false); |
||||
verify(this.pushGateway).delete("job", this.groupingKey); |
||||
} |
||||
|
||||
@Test |
||||
public void shutdownWhenShutdownOperationIsNoneDoesNothing() { |
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager( |
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job", |
||||
this.groupingKey, ShutdownOperation.NONE); |
||||
manager.shutdown(); |
||||
verify(this.future).cancel(false); |
||||
verifyZeroInteractions(this.pushGateway); |
||||
} |
||||
|
||||
@Test |
||||
public void pushWhenUnknownHostExceptionIsThrownDoesShutdown() throws Exception { |
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler, |
||||
this.pushRate, "job", this.groupingKey, null); |
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(), |
||||
eq(this.pushRate)); |
||||
willThrow(new UnknownHostException("foo")).given(this.pushGateway) |
||||
.pushAdd(this.registry, "job", this.groupingKey); |
||||
this.task.getValue().run(); |
||||
verify(this.future).cancel(false); |
||||
} |
||||
|
||||
@Test |
||||
public void pushDoesNotThrowException() throws Exception { |
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler, |
||||
this.pushRate, "job", this.groupingKey, null); |
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(), |
||||
eq(this.pushRate)); |
||||
willThrow(RuntimeException.class).given(this.pushGateway).pushAdd(this.registry, |
||||
"job", this.groupingKey); |
||||
this.task.getValue().run(); |
||||
} |
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" }) |
||||
private <T extends TaskScheduler> T mockScheduler(Class<T> type) { |
||||
T scheduler = mock(type); |
||||
given(scheduler.scheduleAtFixedRate(isA(Runnable.class), isA(Duration.class))) |
||||
.willReturn((ScheduledFuture) this.future); |
||||
return scheduler; |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue