Cache and expire metrics for prometheus output (#2016)

* Cache and expire metrics for prometheus output

* Fix test

* Use interval.Duration

* Default prometheus expiration interval to 60s

* Update changelog
This commit is contained in:
Mike Ragalie 2016-11-15 06:33:39 -05:00 committed by Cameron Sparr
parent f816b952cf
commit ff67a4b96c
4 changed files with 123 additions and 29 deletions

View File

@ -91,6 +91,7 @@ continue sending logs to /var/log/telegraf/telegraf.log.
- [#1771](https://github.com/influxdata/telegraf/issues/1771): Delete nil fields in the metric maker.
- [#870](https://github.com/influxdata/telegraf/issues/870): Fix MySQL special characters in DSN parsing.
- [#1742](https://github.com/influxdata/telegraf/issues/1742): Ping input odd timeout behavior.
- [#1775](https://github.com/influxdata/telegraf/issues/1775): Cache metrics for delivery to prometheus
## v1.0.1 [2016-09-26]

View File

@ -6,19 +6,26 @@ import (
"net/http"
"regexp"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus"
)
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type MetricWithExpiration struct {
Metric prometheus.Metric
Expiration time.Time
}
type PrometheusClient struct {
Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"`
metrics map[string]prometheus.Metric
lastMetrics map[string]prometheus.Metric
metrics map[string]*MetricWithExpiration
sync.Mutex
}
@ -26,11 +33,13 @@ type PrometheusClient struct {
var sampleConfig = `
## Address to listen on
# listen = ":9126"
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
# expiration_interval = "60s"
`
func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]prometheus.Metric)
p.lastMetrics = make(map[string]prometheus.Metric)
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p)
defer func() {
if r := recover(); r != nil {
@ -86,16 +95,11 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock()
defer p.Unlock()
if len(p.metrics) > 0 {
p.lastMetrics = make(map[string]prometheus.Metric)
for k, m := range p.metrics {
ch <- m
p.lastMetrics[k] = m
}
p.metrics = make(map[string]prometheus.Metric)
for key, m := range p.metrics {
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key)
} else {
for _, m := range p.lastMetrics {
ch <- m
ch <- m.Metric
}
}
}
@ -171,7 +175,11 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}
p.metrics[desc.String()] = metric
p.metrics[desc.String()] = &MetricWithExpiration{
Metric: metric,
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
}
}
}
return nil
@ -179,6 +187,8 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
func init() {
outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{}
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
}
})
}

View File

@ -4,9 +4,11 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil"
)
@ -17,16 +19,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
now := time.Now()
pTesting = &PrometheusClient{Listen: "localhost:9127"}
err := pTesting.Start()
time.Sleep(time.Millisecond * 200)
require.NoError(t, err)
defer pTesting.Stop()
p := &prometheus.Prometheus{
Urls: []string{"http://localhost:9127/metrics"},
}
pClient, p, err := setupPrometheus()
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := telegraf.NewMetric(
"test_point_1",
@ -42,7 +40,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt1,
pt2,
}
require.NoError(t, pTesting.Write(metrics))
require.NoError(t, pClient.Write(metrics))
expected := []struct {
name string
@ -77,7 +75,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
pt3,
pt4,
}
require.NoError(t, pTesting.Write(metrics))
require.NoError(t, pClient.Write(metrics))
expected2 := []struct {
name string
@ -93,3 +91,77 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
map[string]interface{}{"value": e.value})
}
}
func TestPrometheusExpireOldMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pClient, p, err := setupPrometheus()
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := telegraf.NewMetric(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{pt1}
require.NoError(t, pClient.Write(metrics))
for _, m := range pClient.metrics {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := telegraf.NewMetric(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
now)
var metrics2 = []telegraf.Metric{pt2}
require.NoError(t, pClient.Write(metrics2))
expected := []struct {
name string
value float64
tags map[string]string
}{
{"test_point_2", 1.0, tags},
}
var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))
for _, e := range expected {
acc.AssertContainsFields(t, e.name,
map[string]interface{}{"value": e.value})
}
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
// Confirm that it's not in the PrometheusClient map anymore
assert.Equal(t, 1, len(pClient.metrics))
}
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
if pTesting == nil {
pTesting = &PrometheusClient{Listen: "localhost:9127"}
err := pTesting.Start()
if err != nil {
return nil, nil, err
}
} else {
pTesting.metrics = make(map[string]*MetricWithExpiration)
}
time.Sleep(time.Millisecond * 200)
p := &prometheus.Prometheus{
Urls: []string{"http://localhost:9127/metrics"},
}
return pTesting, p, nil
}

View File

@ -202,6 +202,17 @@ func (a *Accumulator) AssertContainsFields(
assert.Fail(t, msg)
}
func (a *Accumulator) AssertDoesNotContainMeasurement(t *testing.T, measurement string) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
msg := fmt.Sprintf("found unexpected measurement %s", measurement)
assert.Fail(t, msg)
}
}
}
// HasIntValue returns true if the measurement has an Int value
func (a *Accumulator) HasIntField(measurement string, field string) bool {
a.Lock()