From eeb46906866f8a10b89040ae62a88e7e98dff366 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 4 Dec 2019 13:41:33 -0800 Subject: [PATCH] Fix prometheus histogram and summary merging (#6756) --- plugins/serializers/prometheus/collection.go | 25 +- .../serializers/prometheus/collection_test.go | 231 ++++++++++++++++++ 2 files changed, 253 insertions(+), 3 deletions(-) diff --git a/plugins/serializers/prometheus/collection.go b/plugins/serializers/prometheus/collection.go index 8ca06520b..5c385caad 100644 --- a/plugins/serializers/prometheus/collection.go +++ b/plugins/serializers/prometheus/collection.go @@ -52,12 +52,32 @@ type Histogram struct { Sum float64 } +func (h *Histogram) merge(b Bucket) { + for i := range h.Buckets { + if h.Buckets[i].Bound == b.Bound { + h.Buckets[i].Count = b.Count + return + } + } + h.Buckets = append(h.Buckets, b) +} + type Summary struct { Quantiles []Quantile Count uint64 Sum float64 } +func (s *Summary) merge(q Quantile) { + for i := range s.Quantiles { + if s.Quantiles[i].Quantile == q.Quantile { + s.Quantiles[i].Value = q.Value + return + } + } + s.Quantiles = append(s.Quantiles, q) +} + type MetricKey uint64 func MakeMetricKey(labels []LabelPair) MetricKey { @@ -210,7 +230,6 @@ func (c *Collection) Add(metric telegraf.Metric) { Scaler: &Scaler{Value: value}, } - // what if already here entry.Metrics[metricKey] = m case telegraf.Histogram: if m == nil { @@ -236,7 +255,7 @@ func (c *Collection) Add(metric telegraf.Metric) { continue } - m.Histogram.Buckets = append(m.Histogram.Buckets, Bucket{ + m.Histogram.merge(Bucket{ Bound: bound, Count: count, }) @@ -297,7 +316,7 @@ func (c *Collection) Add(metric telegraf.Metric) { continue } - m.Summary.Quantiles = append(m.Summary.Quantiles, Quantile{ + m.Summary.merge(Quantile{ Quantile: quantile, Value: value, }) diff --git a/plugins/serializers/prometheus/collection_test.go b/plugins/serializers/prometheus/collection_test.go index 589c306b5..70f26dac7 100644 --- a/plugins/serializers/prometheus/collection_test.go +++ b/plugins/serializers/prometheus/collection_test.go @@ -1,6 +1,7 @@ package prometheus import ( + "math" "testing" "time" @@ -47,6 +48,78 @@ func TestCollectionExpire(t *testing.T) { }, }, }, + { + name: "update metric expiration", + now: time.Unix(20, 0), + age: 10 * time.Second, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 43.0, + }, + time.Unix(12, 0), + ), + }, + expected: []*dto.MetricFamily{ + { + Name: proto.String("cpu_time_idle"), + Help: proto.String(helpString), + Type: dto.MetricType_UNTYPED.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Untyped: &dto.Untyped{Value: proto.Float64(43.0)}, + }, + }, + }, + }, + }, + { + name: "update metric expiration descending order", + now: time.Unix(20, 0), + age: 10 * time.Second, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(12, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 43.0, + }, + time.Unix(0, 0), + ), + }, + expected: []*dto.MetricFamily{ + { + Name: proto.String("cpu_time_idle"), + Help: proto.String(helpString), + Type: dto.MetricType_UNTYPED.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Untyped: &dto.Untyped{Value: proto.Float64(42.0)}, + }, + }, + }, + }, + }, { name: "expired single metric in metric family", now: time.Unix(20, 0), @@ -99,6 +172,164 @@ func TestCollectionExpire(t *testing.T) { }, }, }, + { + name: "histogram bucket updates", + now: time.Unix(0, 0), + age: 10 * time.Second, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "http_request_duration_seconds_sum": 10.0, + "http_request_duration_seconds_count": 2, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"le": "0.05"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 1.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"le": "+Inf"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 1.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + // Next interval + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "http_request_duration_seconds_sum": 20.0, + "http_request_duration_seconds_count": 4, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"le": "0.05"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 2.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"le": "+Inf"}, + map[string]interface{}{ + "http_request_duration_seconds_bucket": 2.0, + }, + time.Unix(0, 0), + telegraf.Histogram, + ), + }, + expected: []*dto.MetricFamily{ + { + Name: proto.String("http_request_duration_seconds"), + Help: proto.String(helpString), + Type: dto.MetricType_HISTOGRAM.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(4), + SampleSum: proto.Float64(20.0), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.05), + CumulativeCount: proto.Uint64(2), + }, + { + UpperBound: proto.Float64(math.Inf(1)), + CumulativeCount: proto.Uint64(2), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "summary quantile updates", + now: time.Unix(0, 0), + age: 10 * time.Second, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "rpc_duration_seconds_sum": 1.0, + "rpc_duration_seconds_count": 1, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"quantile": "0.01"}, + map[string]interface{}{ + "rpc_duration_seconds": 1.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + // Updated Summary + testutil.MustMetric( + "prometheus", + map[string]string{}, + map[string]interface{}{ + "rpc_duration_seconds_sum": 2.0, + "rpc_duration_seconds_count": 2, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + testutil.MustMetric( + "prometheus", + map[string]string{"quantile": "0.01"}, + map[string]interface{}{ + "rpc_duration_seconds": 2.0, + }, + time.Unix(0, 0), + telegraf.Summary, + ), + }, + expected: []*dto.MetricFamily{ + { + Name: proto.String("rpc_duration_seconds"), + Help: proto.String(helpString), + Type: dto.MetricType_SUMMARY.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{}, + Summary: &dto.Summary{ + SampleCount: proto.Uint64(2), + SampleSum: proto.Float64(2.0), + Quantile: []*dto.Quantile{ + { + Quantile: proto.Float64(0.01), + Value: proto.Float64(2), + }, + }, + }, + }, + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {