@ -18,6 +18,7 @@ package org.springframework.boot.actuate.metrics.opentsdb;
@@ -18,6 +18,7 @@ package org.springframework.boot.actuate.metrics.opentsdb;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
@ -43,6 +44,7 @@ import org.springframework.web.client.RestTemplate;
@@ -43,6 +44,7 @@ import org.springframework.web.client.RestTemplate;
* task to flush periodically .
*
* @author Dave Syer
* @author Thomas Badie
* @since 1 . 3 . 0
* /
public class OpenTsdbMetricWriter implements MetricWriter {
@ -67,7 +69,7 @@ public class OpenTsdbMetricWriter implements MetricWriter {
@@ -67,7 +69,7 @@ public class OpenTsdbMetricWriter implements MetricWriter {
* /
private MediaType mediaType = MediaType . APPLICATION_JSON ;
private List < OpenTsdbData > buffer = new ArrayList < OpenTsdbData > ( this . bufferSize ) ;
private final List < OpenTsdbData > buffer = new ArrayList < OpenTsdbData > ( this . bufferSize ) ;
private OpenTsdbNamingStrategy namingStrategy = new DefaultOpenTsdbNamingStrategy ( ) ;
@ -105,35 +107,42 @@ public class OpenTsdbMetricWriter implements MetricWriter {
@@ -105,35 +107,42 @@ public class OpenTsdbMetricWriter implements MetricWriter {
OpenTsdbData data = new OpenTsdbData (
this . namingStrategy . getName ( value . getName ( ) ) , value . getValue ( ) , value
. getTimestamp ( ) . getTime ( ) ) ;
this . buffer . add ( data ) ;
if ( this . buffer . size ( ) > = this . bufferSize ) {
flush ( ) ;
synchronized ( this . buffer ) {
this . buffer . add ( data ) ;
if ( this . buffer . size ( ) > = this . bufferSize ) {
flush ( ) ;
}
}
}
/ * *
* Flush the buffer without waiting for it to fill any further .
* /
@SuppressWarnings ( "rawtypes" )
public void flush ( ) {
if ( this . buffer . isEmpty ( ) ) {
List < OpenTsdbData > snapshot = getBufferSnapshot ( ) ;
if ( snapshot . isEmpty ( ) ) {
return ;
}
List < OpenTsdbData > temp = new ArrayList < OpenTsdbData > ( ) ;
synchronized ( this . buffer ) {
temp . addAll ( this . buffer ) ;
this . buffer . clear ( ) ;
}
HttpHeaders headers = new HttpHeaders ( ) ;
headers . setAccept ( Arrays . asList ( this . mediaType ) ) ;
headers . setContentType ( this . mediaType ) ;
HttpEntity < List < OpenTsdbData > > request = new HttpEntity < List < OpenTsdbData > > ( temp ,
headers ) ;
@SuppressWarnings ( "rawtypes" )
ResponseEntity < Map > response = this . restTemplate . postForEntity ( this . url , request ,
Map . class ) ;
ResponseEntity < Map > response = this . restTemplate . postForEntity ( this . url ,
new HttpEntity < List < OpenTsdbData > > ( snapshot , headers ) , Map . class ) ;
if ( ! response . getStatusCode ( ) . is2xxSuccessful ( ) ) {
logger . warn ( "Cannot write metrics (discarded " + temp . size ( ) + " values): "
+ response . getBody ( ) ) ;
logger . warn ( "Cannot write metrics (discarded " + snapshot . size ( )
+ " values): " + response . getBody ( ) ) ;
}
}
private List < OpenTsdbData > getBufferSnapshot ( ) {
synchronized ( this . buffer ) {
if ( this . buffer . isEmpty ( ) ) {
return Collections . emptyList ( ) ;
}
List < OpenTsdbData > snapshot = new ArrayList < OpenTsdbData > ( this . buffer ) ;
this . buffer . clear ( ) ;
return snapshot ;
}
}