diff --git a/CHANGELOG.md b/CHANGELOG.md index dcd3b43d3..06a712029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ continue sending logs to /var/log/telegraf/telegraf.log. - [#1771](https://github.com/influxdata/telegraf/issues/1771): Delete nil fields in the metric maker. - [#870](https://github.com/influxdata/telegraf/issues/870): Fix MySQL special characters in DSN parsing. - [#1742](https://github.com/influxdata/telegraf/issues/1742): Ping input odd timeout behavior. +- [#1775](https://github.com/influxdata/telegraf/issues/1775): Cache metrics for delivery to prometheus ## v1.0.1 [2016-09-26] diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index fc8926602..e86a0a526 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -6,19 +6,26 @@ import ( "net/http" "regexp" "sync" + "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/prometheus/client_golang/prometheus" ) var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) -type PrometheusClient struct { - Listen string +type MetricWithExpiration struct { + Metric prometheus.Metric + Expiration time.Time +} - metrics map[string]prometheus.Metric - lastMetrics map[string]prometheus.Metric +type PrometheusClient struct { + Listen string + ExpirationInterval internal.Duration `toml:"expiration_interval"` + + metrics map[string]*MetricWithExpiration sync.Mutex } @@ -26,11 +33,13 @@ type PrometheusClient struct { var sampleConfig = ` ## Address to listen on # listen = ":9126" + + ## Interval to expire metrics and not deliver to prometheus, 0 == no expiration + # expiration_interval = "60s" ` func (p *PrometheusClient) Start() error { - p.metrics = make(map[string]prometheus.Metric) - p.lastMetrics = make(map[string]prometheus.Metric) + p.metrics = make(map[string]*MetricWithExpiration) prometheus.Register(p) defer func() { if r := recover(); r != nil { @@ -86,16 +95,11 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { p.Lock() defer p.Unlock() - if len(p.metrics) > 0 { - p.lastMetrics = make(map[string]prometheus.Metric) - for k, m := range p.metrics { - ch <- m - p.lastMetrics[k] = m - } - p.metrics = make(map[string]prometheus.Metric) - } else { - for _, m := range p.lastMetrics { - ch <- m + for key, m := range p.metrics { + if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) { + delete(p.metrics, key) + } else { + ch <- m.Metric } } } @@ -171,7 +175,11 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { "key: %s, labels: %v,\nerr: %s\n", mname, l, err.Error()) } - p.metrics[desc.String()] = metric + + p.metrics[desc.String()] = &MetricWithExpiration{ + Metric: metric, + Expiration: time.Now().Add(p.ExpirationInterval.Duration), + } } } return nil @@ -179,6 +187,8 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func init() { outputs.Add("prometheus_client", func() telegraf.Output { - return &PrometheusClient{} + return &PrometheusClient{ + ExpirationInterval: internal.Duration{Duration: time.Second * 60}, + } }) } diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 14aee13d9..0b6a89ad0 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -4,9 +4,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -17,16 +19,12 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - now := time.Now() - pTesting = &PrometheusClient{Listen: "localhost:9127"} - err := pTesting.Start() - time.Sleep(time.Millisecond * 200) - require.NoError(t, err) - defer pTesting.Stop() - p := &prometheus.Prometheus{ - Urls: []string{"http://localhost:9127/metrics"}, - } + pClient, p, err := setupPrometheus() + require.NoError(t, err) + defer pClient.Stop() + + now := time.Now() tags := make(map[string]string) pt1, _ := telegraf.NewMetric( "test_point_1", @@ -42,7 +40,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt1, pt2, } - require.NoError(t, pTesting.Write(metrics)) + require.NoError(t, pClient.Write(metrics)) expected := []struct { name string @@ -77,7 +75,7 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { pt3, pt4, } - require.NoError(t, pTesting.Write(metrics)) + require.NoError(t, pClient.Write(metrics)) expected2 := []struct { name string @@ -93,3 +91,77 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) { map[string]interface{}{"value": e.value}) } } + +func TestPrometheusExpireOldMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + pClient, p, err := setupPrometheus() + pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10} + require.NoError(t, err) + defer pClient.Stop() + + now := time.Now() + tags := make(map[string]string) + pt1, _ := telegraf.NewMetric( + "test_point_1", + tags, + map[string]interface{}{"value": 0.0}, + now) + var metrics = []telegraf.Metric{pt1} + require.NoError(t, pClient.Write(metrics)) + + for _, m := range pClient.metrics { + m.Expiration = now.Add(time.Duration(-15) * time.Second) + } + + pt2, _ := telegraf.NewMetric( + "test_point_2", + tags, + map[string]interface{}{"value": 1.0}, + now) + var metrics2 = []telegraf.Metric{pt2} + require.NoError(t, pClient.Write(metrics2)) + + expected := []struct { + name string + value float64 + tags map[string]string + }{ + {"test_point_2", 1.0, tags}, + } + + var acc testutil.Accumulator + + require.NoError(t, p.Gather(&acc)) + for _, e := range expected { + acc.AssertContainsFields(t, e.name, + map[string]interface{}{"value": e.value}) + } + + acc.AssertDoesNotContainMeasurement(t, "test_point_1") + + // Confirm that it's not in the PrometheusClient map anymore + assert.Equal(t, 1, len(pClient.metrics)) +} + +func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) { + if pTesting == nil { + pTesting = &PrometheusClient{Listen: "localhost:9127"} + err := pTesting.Start() + if err != nil { + return nil, nil, err + } + } else { + pTesting.metrics = make(map[string]*MetricWithExpiration) + } + + time.Sleep(time.Millisecond * 200) + + p := &prometheus.Prometheus{ + Urls: []string{"http://localhost:9127/metrics"}, + } + + return pTesting, p, nil +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 99f9e3006..11cea2434 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -202,6 +202,17 @@ func (a *Accumulator) AssertContainsFields( assert.Fail(t, msg) } +func (a *Accumulator) AssertDoesNotContainMeasurement(t *testing.T, measurement string) { + a.Lock() + defer a.Unlock() + for _, p := range a.Metrics { + if p.Measurement == measurement { + msg := fmt.Sprintf("found unexpected measurement %s", measurement) + assert.Fail(t, msg) + } + } +} + // HasIntValue returns true if the measurement has an Int value func (a *Accumulator) HasIntField(measurement string, field string) bool { a.Lock()