diff --git a/plugins/outputs/prometheus_client/v2/collector.go b/plugins/outputs/prometheus_client/v2/collector.go index 9ffc6516a..4f8efd839 100644 --- a/plugins/outputs/prometheus_client/v2/collector.go +++ b/plugins/outputs/prometheus_client/v2/collector.go @@ -83,7 +83,7 @@ func (c *Collector) Add(metrics []telegraf.Metric) error { defer c.Unlock() for _, metric := range metrics { - c.coll.Add(metric) + c.coll.Add(metric, time.Now()) } // Expire metrics, doing this on Add ensure metrics are removed even if no diff --git a/plugins/serializers/prometheus/collection.go b/plugins/serializers/prometheus/collection.go index 5c385caad..10e85de07 100644 --- a/plugins/serializers/prometheus/collection.go +++ b/plugins/serializers/prometheus/collection.go @@ -14,6 +14,8 @@ import ( const helpString = "Telegraf collected metric" +type TimeFunc func() time.Time + type MetricFamily struct { Name string Type telegraf.ValueType @@ -22,6 +24,7 @@ type MetricFamily struct { type Metric struct { Labels []LabelPair Time time.Time + AddTime time.Time Scaler *Scaler Histogram *Histogram Summary *Summary @@ -97,14 +100,14 @@ type Entry struct { } type Collection struct { - config FormatConfig Entries map[MetricFamily]Entry + config FormatConfig } func NewCollection(config FormatConfig) *Collection { cache := &Collection{ - config: config, Entries: make(map[MetricFamily]Entry), + config: config, } return cache } @@ -177,7 +180,7 @@ func (c *Collection) createLabels(metric telegraf.Metric) []LabelPair { return labels } -func (c *Collection) Add(metric telegraf.Metric) { +func (c *Collection) Add(metric telegraf.Metric, now time.Time) { labels := c.createLabels(metric) for _, field := range metric.FieldList() { metricName := MetricName(metric.Name(), field.Key, metric.Type()) @@ -225,9 +228,10 @@ func (c *Collection) Add(metric telegraf.Metric) { } m = &Metric{ - Labels: labels, - Time: metric.Time(), - Scaler: &Scaler{Value: value}, + Labels: labels, + Time: metric.Time(), + AddTime: now, + Scaler: &Scaler{Value: value}, } entry.Metrics[metricKey] = m @@ -236,6 +240,7 @@ func (c *Collection) Add(metric telegraf.Metric) { m = &Metric{ Labels: labels, Time: metric.Time(), + AddTime: now, Histogram: &Histogram{}, } } @@ -283,6 +288,7 @@ func (c *Collection) Add(metric telegraf.Metric) { m = &Metric{ Labels: labels, Time: metric.Time(), + AddTime: now, Summary: &Summary{}, } } @@ -331,7 +337,7 @@ func (c *Collection) Expire(now time.Time, age time.Duration) { expireTime := now.Add(-age) for _, entry := range c.Entries { for key, metric := range entry.Metrics { - if metric.Time.Before(expireTime) { + if metric.AddTime.Before(expireTime) { delete(entry.Metrics, key) if len(entry.Metrics) == 0 { delete(c.Entries, entry.Family) diff --git a/plugins/serializers/prometheus/collection_test.go b/plugins/serializers/prometheus/collection_test.go index 70f26dac7..d2c5f5d09 100644 --- a/plugins/serializers/prometheus/collection_test.go +++ b/plugins/serializers/prometheus/collection_test.go @@ -12,27 +12,35 @@ import ( "github.com/stretchr/testify/require" ) +type Input struct { + metric telegraf.Metric + addtime time.Time +} + func TestCollectionExpire(t *testing.T) { tests := []struct { name string now time.Time age time.Duration - metrics []telegraf.Metric + input []Input expected []*dto.MetricFamily }{ { name: "not expired", now: time.Unix(1, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0), - ), + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(0, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -52,23 +60,29 @@ func TestCollectionExpire(t *testing.T) { name: "update metric expiration", now: time.Unix(20, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0), - ), - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 43.0, - }, - time.Unix(12, 0), - ), + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(0, 0), + }, + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 43.0, + }, + time.Unix(12, 0), + ), + addtime: time.Unix(12, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -88,23 +102,28 @@ func TestCollectionExpire(t *testing.T) { name: "update metric expiration descending order", now: time.Unix(20, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(12, 0), - ), - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 43.0, - }, - time.Unix(0, 0), - ), + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(12, 0), + ), + addtime: time.Unix(12, 0), + }, { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 43.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(0, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -124,15 +143,18 @@ func TestCollectionExpire(t *testing.T) { name: "expired single metric in metric family", now: time.Unix(20, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0), - ), + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(0, 0), + }, }, expected: []*dto.MetricFamily{}, }, @@ -140,23 +162,28 @@ func TestCollectionExpire(t *testing.T) { name: "expired one metric in metric family", now: time.Unix(20, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0), - ), - testutil.MustMetric( - "cpu", - map[string]string{}, - map[string]interface{}{ - "time_guest": 42.0, - }, - time.Unix(15, 0), - ), + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 42.0, + }, + time.Unix(15, 0), + ), + addtime: time.Unix(15, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -176,64 +203,77 @@ func TestCollectionExpire(t *testing.T) { name: "histogram bucket updates", now: time.Unix(0, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "prometheus", - map[string]string{}, - map[string]interface{}{ - "http_request_duration_seconds_sum": 10.0, - "http_request_duration_seconds_count": 2, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"le": "0.05"}, - map[string]interface{}{ - "http_request_duration_seconds_bucket": 1.0, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"le": "+Inf"}, - map[string]interface{}{ - "http_request_duration_seconds_bucket": 1.0, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), - // Next interval - testutil.MustMetric( - "prometheus", - map[string]string{}, - map[string]interface{}{ - "http_request_duration_seconds_sum": 20.0, - "http_request_duration_seconds_count": 4, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"le": "0.05"}, - map[string]interface{}{ - "http_request_duration_seconds_bucket": 2.0, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"le": "+Inf"}, - map[string]interface{}{ - "http_request_duration_seconds_bucket": 2.0, - }, - time.Unix(0, 0), - telegraf.Histogram, - ), + input: []Input{ + { + metric: testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "http_request_duration_seconds_sum": 10.0, + "http_request_duration_seconds_count": 2, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"le": "0.05"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 1.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"le": "+Inf"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 1.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, { + // Next interval + metric: testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "http_request_duration_seconds_sum": 20.0, + "http_request_duration_seconds_count": 4, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"le": "0.05"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 2.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"le": "+Inf"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 2.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + addtime: time.Unix(0, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -266,46 +306,55 @@ func TestCollectionExpire(t *testing.T) { name: "summary quantile updates", now: time.Unix(0, 0), age: 10 * time.Second, - metrics: []telegraf.Metric{ - testutil.MustMetric( - "prometheus", - map[string]string{}, - map[string]interface{}{ - "rpc_duration_seconds_sum": 1.0, - "rpc_duration_seconds_count": 1, - }, - time.Unix(0, 0), - telegraf.Summary, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"quantile": "0.01"}, - map[string]interface{}{ - "rpc_duration_seconds": 1.0, - }, - time.Unix(0, 0), - telegraf.Summary, - ), - // Updated Summary - testutil.MustMetric( - "prometheus", - map[string]string{}, - map[string]interface{}{ - "rpc_duration_seconds_sum": 2.0, - "rpc_duration_seconds_count": 2, - }, - time.Unix(0, 0), - telegraf.Summary, - ), - testutil.MustMetric( - "prometheus", - map[string]string{"quantile": "0.01"}, - map[string]interface{}{ - "rpc_duration_seconds": 2.0, - }, - time.Unix(0, 0), - telegraf.Summary, - ), + input: []Input{ + { + metric: testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "rpc_duration_seconds_sum": 1.0, + "rpc_duration_seconds_count": 1, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"quantile": "0.01"}, + map[string]interface{}{ + "rpc_duration_seconds": 1.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + addtime: time.Unix(0, 0), + }, { + // Updated Summary + metric: testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "rpc_duration_seconds_sum": 2.0, + "rpc_duration_seconds_count": 2, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + addtime: time.Unix(0, 0), + }, { + metric: testutil.MustMetric( + "prometheus", + map[string]string{"quantile": "0.01"}, + map[string]interface{}{ + "rpc_duration_seconds": 2.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + addtime: time.Unix(0, 0), + }, }, expected: []*dto.MetricFamily{ { @@ -330,12 +379,43 @@ func TestCollectionExpire(t *testing.T) { }, }, }, + { + name: "expire based on add time", + now: time.Unix(20, 0), + age: 10 * time.Second, + input: []Input{ + { + metric: testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + addtime: time.Unix(15, 0), + }, + }, + expected: []*dto.MetricFamily{ + { + Name: proto.String("cpu_time_idle"), + Help: proto.String(helpString), + Type: dto.MetricType_UNTYPED.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Untyped: &dto.Untyped{Value: proto.Float64(42.0)}, + }, + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := NewCollection(FormatConfig{}) - for _, metric := range tt.metrics { - c.Add(metric) + for _, item := range tt.input { + c.Add(item.metric, item.addtime) } c.Expire(tt.now, tt.age) diff --git a/plugins/serializers/prometheus/prometheus.go b/plugins/serializers/prometheus/prometheus.go index 11c305aa4..9e5df5882 100644 --- a/plugins/serializers/prometheus/prometheus.go +++ b/plugins/serializers/prometheus/prometheus.go @@ -2,6 +2,7 @@ package prometheus import ( "bytes" + "time" "github.com/influxdata/telegraf" "github.com/prometheus/common/expfmt" @@ -53,7 +54,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { coll := NewCollection(s.config) for _, metric := range metrics { - coll.Add(metric) + coll.Add(metric, time.Now()) } var buf bytes.Buffer