Browse Source
Different physical sources for the same logical metric just need to publish them with a period-separated prefix, and this reader will aggregate (by truncating the metric names, dropping the prefix). Very useful (for instance) if multiple application instances are feeding to a central (e.g. redis) repository and you want to display the results. Useful in conjunction with a MetricReaderPublicMetrics for hooking up to the /metrics endpoint.pull/2938/merge
3 changed files with 275 additions and 10 deletions
@ -0,0 +1,142 @@
@@ -0,0 +1,142 @@
|
||||
/* |
||||
* Copyright 2014-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.boot.actuate.metrics.aggregate; |
||||
|
||||
import java.util.HashSet; |
||||
import java.util.Set; |
||||
|
||||
import org.springframework.boot.actuate.metrics.Metric; |
||||
import org.springframework.boot.actuate.metrics.reader.MetricReader; |
||||
import org.springframework.boot.actuate.metrics.repository.InMemoryMetricRepository; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
/** |
||||
* A metric reader that aggregates values from a source reader, normally one that has been |
||||
* collecting data from many sources in the same form (like a scaled-out application). The |
||||
* source has metrics with names in the form <code>*.*.counter.**</code> and |
||||
* <code>*.*.[anything].**</code> (the length of the prefix is controlled by the |
||||
* {@link #setTruncateKeyLength(int) truncateKeyLength} property, and defaults to 2, |
||||
* meaning 2 period separated fields), and the result has metric names in the form |
||||
* <code>aggregate.count.**</code> and <code>aggregate.[anything].**</code>. Counters are |
||||
* summed and anything else (i.e. gauges) are aggregated by choosing the most recent |
||||
* value. |
||||
* |
||||
* @author Dave Syer |
||||
* |
||||
*/ |
||||
public class AggregateMetricReader implements MetricReader { |
||||
|
||||
private MetricReader source; |
||||
|
||||
private int truncate = 2; |
||||
|
||||
private String prefix = "aggregate."; |
||||
|
||||
public AggregateMetricReader(MetricReader source) { |
||||
this.source = source; |
||||
} |
||||
|
||||
/** |
||||
* The number of period-separated keys to remove from the start of the input metric |
||||
* names before aggregating. |
||||
* |
||||
* @param truncate length of source metric prefixes |
||||
*/ |
||||
public void setTruncateKeyLength(int truncate) { |
||||
this.truncate = truncate; |
||||
} |
||||
|
||||
/** |
||||
* Prefix to apply to all output metrics. A period will be appended if no present in |
||||
* the provided value. |
||||
* |
||||
* @param prefix the prefix to use default "aggregator.") |
||||
*/ |
||||
public void setPrefix(String prefix) { |
||||
this.prefix = prefix.endsWith(".") ? prefix : prefix + "."; |
||||
} |
||||
|
||||
@Override |
||||
public Metric<?> findOne(String metricName) { |
||||
if (!metricName.startsWith(this.prefix)) { |
||||
return null; |
||||
} |
||||
InMemoryMetricRepository result = new InMemoryMetricRepository(); |
||||
String baseName = metricName.substring(this.prefix.length()); |
||||
for (Metric<?> metric : this.source.findAll()) { |
||||
String name = getSourceKey(metric.getName()); |
||||
if (baseName.equals(name)) { |
||||
update(result, name, metric); |
||||
} |
||||
} |
||||
return result.findOne(metricName); |
||||
} |
||||
|
||||
@Override |
||||
public Iterable<Metric<?>> findAll() { |
||||
InMemoryMetricRepository result = new InMemoryMetricRepository(); |
||||
for (Metric<?> metric : this.source.findAll()) { |
||||
String key = getSourceKey(metric.getName()); |
||||
if (key != null) { |
||||
update(result, key, metric); |
||||
} |
||||
} |
||||
return result.findAll(); |
||||
} |
||||
|
||||
@Override |
||||
public long count() { |
||||
Set<String> names = new HashSet<String>(); |
||||
for (Metric<?> metric : this.source.findAll()) { |
||||
String name = getSourceKey(metric.getName()); |
||||
if (name != null) { |
||||
names.add(name); |
||||
} |
||||
} |
||||
return names.size(); |
||||
} |
||||
|
||||
private void update(InMemoryMetricRepository result, String key, Metric<?> metric) { |
||||
String name = this.prefix + key; |
||||
Metric<?> aggregate = result.findOne(name); |
||||
if (aggregate == null) { |
||||
aggregate = new Metric<Number>(name, metric.getValue(), metric.getTimestamp()); |
||||
} |
||||
else if (key.startsWith("counter")) { |
||||
// accumulate all values
|
||||
aggregate = new Metric<Number>(name, metric.increment( |
||||
aggregate.getValue().intValue()).getValue(), metric.getTimestamp()); |
||||
} |
||||
else if (aggregate.getTimestamp().before(metric.getTimestamp())) { |
||||
// sort by timestamp and only take the latest
|
||||
aggregate = new Metric<Number>(name, metric.getValue(), metric.getTimestamp()); |
||||
} |
||||
result.set(aggregate); |
||||
} |
||||
|
||||
private String getSourceKey(String name) { |
||||
String[] keys = StringUtils.delimitedListToStringArray(name, "."); |
||||
if (keys.length <= this.truncate) { |
||||
return null; |
||||
} |
||||
StringBuilder builder = new StringBuilder(keys[this.truncate]); |
||||
for (int i = this.truncate + 1; i < keys.length; i++) { |
||||
builder.append(".").append(keys[i]); |
||||
} |
||||
return builder.toString(); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,87 @@
@@ -0,0 +1,87 @@
|
||||
/* |
||||
* Copyright 2012-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.boot.actuate.metrics.aggregate; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.junit.Test; |
||||
import org.springframework.boot.actuate.metrics.Metric; |
||||
import org.springframework.boot.actuate.metrics.repository.InMemoryMetricRepository; |
||||
import org.springframework.boot.actuate.metrics.writer.Delta; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNull; |
||||
|
||||
/** |
||||
* @author Dave Syer |
||||
*/ |
||||
public class AggregateMetricReaderTests { |
||||
|
||||
private InMemoryMetricRepository source = new InMemoryMetricRepository(); |
||||
|
||||
private AggregateMetricReader reader = new AggregateMetricReader(this.source); |
||||
|
||||
@Test |
||||
public void writeAndReadDefaults() { |
||||
this.source.set(new Metric<Double>("foo.bar.spam", 2.3)); |
||||
assertEquals(2.3, this.reader.findOne("aggregate.spam").getValue()); |
||||
} |
||||
|
||||
@Test |
||||
public void writeAndReadLatestValue() { |
||||
this.source.set(new Metric<Double>("foo.bar.spam", 2.3, new Date(100L))); |
||||
this.source.set(new Metric<Double>("oof.rab.spam", 2.4, new Date(0L))); |
||||
assertEquals(2.3, this.reader.findOne("aggregate.spam").getValue()); |
||||
} |
||||
|
||||
@Test |
||||
public void writeAndReadExtraLong() { |
||||
this.source.set(new Metric<Double>("blee.foo.bar.spam", 2.3)); |
||||
this.reader.setTruncateKeyLength(3); |
||||
assertEquals(2.3, this.reader.findOne("aggregate.spam").getValue()); |
||||
} |
||||
|
||||
@Test |
||||
public void onlyPrefixed() { |
||||
this.source.set(new Metric<Double>("foo.bar.spam", 2.3)); |
||||
assertNull(this.reader.findOne("spam")); |
||||
} |
||||
|
||||
@Test |
||||
public void incrementCounter() { |
||||
this.source.increment(new Delta<Long>("foo.bar.counter.spam", 2L)); |
||||
this.source.increment(new Delta<Long>("oof.rab.counter.spam", 3L)); |
||||
assertEquals(5L, this.reader.findOne("aggregate.counter.spam").getValue()); |
||||
} |
||||
|
||||
@Test |
||||
public void countGauges() { |
||||
this.source.set(new Metric<Double>("foo.bar.spam", 2.3)); |
||||
this.source.set(new Metric<Double>("oof.rab.spam", 2.4)); |
||||
assertEquals(1, this.reader.count()); |
||||
} |
||||
|
||||
@Test |
||||
public void countGaugesAndCounters() { |
||||
this.source.set(new Metric<Double>("foo.bar.spam", 2.3)); |
||||
this.source.set(new Metric<Double>("oof.rab.spam", 2.4)); |
||||
this.source.increment(new Delta<Long>("foo.bar.counter.spam", 2L)); |
||||
this.source.increment(new Delta<Long>("oof.rab.counter.spam", 3L)); |
||||
assertEquals(2, this.reader.count()); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue