Fix prometheus histogram and summary merging (#6756)
This commit is contained in:
parent
48f9f22f33
commit
eeb4690686
|
@ -52,12 +52,32 @@ type Histogram struct {
|
|||
Sum float64
|
||||
}
|
||||
|
||||
func (h *Histogram) merge(b Bucket) {
|
||||
for i := range h.Buckets {
|
||||
if h.Buckets[i].Bound == b.Bound {
|
||||
h.Buckets[i].Count = b.Count
|
||||
return
|
||||
}
|
||||
}
|
||||
h.Buckets = append(h.Buckets, b)
|
||||
}
|
||||
|
||||
type Summary struct {
|
||||
Quantiles []Quantile
|
||||
Count uint64
|
||||
Sum float64
|
||||
}
|
||||
|
||||
func (s *Summary) merge(q Quantile) {
|
||||
for i := range s.Quantiles {
|
||||
if s.Quantiles[i].Quantile == q.Quantile {
|
||||
s.Quantiles[i].Value = q.Value
|
||||
return
|
||||
}
|
||||
}
|
||||
s.Quantiles = append(s.Quantiles, q)
|
||||
}
|
||||
|
||||
type MetricKey uint64
|
||||
|
||||
func MakeMetricKey(labels []LabelPair) MetricKey {
|
||||
|
@ -210,7 +230,6 @@ func (c *Collection) Add(metric telegraf.Metric) {
|
|||
Scaler: &Scaler{Value: value},
|
||||
}
|
||||
|
||||
// what if already here
|
||||
entry.Metrics[metricKey] = m
|
||||
case telegraf.Histogram:
|
||||
if m == nil {
|
||||
|
@ -236,7 +255,7 @@ func (c *Collection) Add(metric telegraf.Metric) {
|
|||
continue
|
||||
}
|
||||
|
||||
m.Histogram.Buckets = append(m.Histogram.Buckets, Bucket{
|
||||
m.Histogram.merge(Bucket{
|
||||
Bound: bound,
|
||||
Count: count,
|
||||
})
|
||||
|
@ -297,7 +316,7 @@ func (c *Collection) Add(metric telegraf.Metric) {
|
|||
continue
|
||||
}
|
||||
|
||||
m.Summary.Quantiles = append(m.Summary.Quantiles, Quantile{
|
||||
m.Summary.merge(Quantile{
|
||||
Quantile: quantile,
|
||||
Value: value,
|
||||
})
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -47,6 +48,78 @@ func TestCollectionExpire(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update metric expiration",
|
||||
now: time.Unix(20, 0),
|
||||
age: 10 * time.Second,
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"time_idle": 42.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"time_idle": 43.0,
|
||||
},
|
||||
time.Unix(12, 0),
|
||||
),
|
||||
},
|
||||
expected: []*dto.MetricFamily{
|
||||
{
|
||||
Name: proto.String("cpu_time_idle"),
|
||||
Help: proto.String(helpString),
|
||||
Type: dto.MetricType_UNTYPED.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
{
|
||||
Label: []*dto.LabelPair{},
|
||||
Untyped: &dto.Untyped{Value: proto.Float64(43.0)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "update metric expiration descending order",
|
||||
now: time.Unix(20, 0),
|
||||
age: 10 * time.Second,
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"time_idle": 42.0,
|
||||
},
|
||||
time.Unix(12, 0),
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"time_idle": 43.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
expected: []*dto.MetricFamily{
|
||||
{
|
||||
Name: proto.String("cpu_time_idle"),
|
||||
Help: proto.String(helpString),
|
||||
Type: dto.MetricType_UNTYPED.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
{
|
||||
Label: []*dto.LabelPair{},
|
||||
Untyped: &dto.Untyped{Value: proto.Float64(42.0)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "expired single metric in metric family",
|
||||
now: time.Unix(20, 0),
|
||||
|
@ -99,6 +172,164 @@ func TestCollectionExpire(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "histogram bucket updates",
|
||||
now: time.Unix(0, 0),
|
||||
age: 10 * time.Second,
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_sum": 10.0,
|
||||
"http_request_duration_seconds_count": 2,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"le": "0.05"},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_bucket": 1.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"le": "+Inf"},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_bucket": 1.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
// Next interval
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_sum": 20.0,
|
||||
"http_request_duration_seconds_count": 4,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"le": "0.05"},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_bucket": 2.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"le": "+Inf"},
|
||||
map[string]interface{}{
|
||||
"http_request_duration_seconds_bucket": 2.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Histogram,
|
||||
),
|
||||
},
|
||||
expected: []*dto.MetricFamily{
|
||||
{
|
||||
Name: proto.String("http_request_duration_seconds"),
|
||||
Help: proto.String(helpString),
|
||||
Type: dto.MetricType_HISTOGRAM.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
{
|
||||
Label: []*dto.LabelPair{},
|
||||
Histogram: &dto.Histogram{
|
||||
SampleCount: proto.Uint64(4),
|
||||
SampleSum: proto.Float64(20.0),
|
||||
Bucket: []*dto.Bucket{
|
||||
{
|
||||
UpperBound: proto.Float64(0.05),
|
||||
CumulativeCount: proto.Uint64(2),
|
||||
},
|
||||
{
|
||||
UpperBound: proto.Float64(math.Inf(1)),
|
||||
CumulativeCount: proto.Uint64(2),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "summary quantile updates",
|
||||
now: time.Unix(0, 0),
|
||||
age: 10 * time.Second,
|
||||
metrics: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"rpc_duration_seconds_sum": 1.0,
|
||||
"rpc_duration_seconds_count": 1,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Summary,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"quantile": "0.01"},
|
||||
map[string]interface{}{
|
||||
"rpc_duration_seconds": 1.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Summary,
|
||||
),
|
||||
// Updated Summary
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"rpc_duration_seconds_sum": 2.0,
|
||||
"rpc_duration_seconds_count": 2,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Summary,
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"prometheus",
|
||||
map[string]string{"quantile": "0.01"},
|
||||
map[string]interface{}{
|
||||
"rpc_duration_seconds": 2.0,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
telegraf.Summary,
|
||||
),
|
||||
},
|
||||
expected: []*dto.MetricFamily{
|
||||
{
|
||||
Name: proto.String("rpc_duration_seconds"),
|
||||
Help: proto.String(helpString),
|
||||
Type: dto.MetricType_SUMMARY.Enum(),
|
||||
Metric: []*dto.Metric{
|
||||
{
|
||||
Label: []*dto.LabelPair{},
|
||||
Summary: &dto.Summary{
|
||||
SampleCount: proto.Uint64(2),
|
||||
SampleSum: proto.Float64(2.0),
|
||||
Quantile: []*dto.Quantile{
|
||||
{
|
||||
Quantile: proto.Float64(0.01),
|
||||
Value: proto.Float64(2),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue