Use add time for prometheus expiration calculation (#7056)

This commit is contained in:
Daniel Nelson 2020-02-24 15:53:16 -08:00 committed by GitHub
parent e9e4f2c354
commit b5e0577d6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 266 additions and 179 deletions

View File

@ -83,7 +83,7 @@ func (c *Collector) Add(metrics []telegraf.Metric) error {
defer c.Unlock() defer c.Unlock()
for _, metric := range metrics { for _, metric := range metrics {
c.coll.Add(metric) c.coll.Add(metric, time.Now())
} }
// Expire metrics, doing this on Add ensure metrics are removed even if no // Expire metrics, doing this on Add ensure metrics are removed even if no

View File

@ -14,6 +14,8 @@ import (
const helpString = "Telegraf collected metric" const helpString = "Telegraf collected metric"
type TimeFunc func() time.Time
type MetricFamily struct { type MetricFamily struct {
Name string Name string
Type telegraf.ValueType Type telegraf.ValueType
@ -22,6 +24,7 @@ type MetricFamily struct {
type Metric struct { type Metric struct {
Labels []LabelPair Labels []LabelPair
Time time.Time Time time.Time
AddTime time.Time
Scaler *Scaler Scaler *Scaler
Histogram *Histogram Histogram *Histogram
Summary *Summary Summary *Summary
@ -97,14 +100,14 @@ type Entry struct {
} }
type Collection struct { type Collection struct {
config FormatConfig
Entries map[MetricFamily]Entry Entries map[MetricFamily]Entry
config FormatConfig
} }
func NewCollection(config FormatConfig) *Collection { func NewCollection(config FormatConfig) *Collection {
cache := &Collection{ cache := &Collection{
config: config,
Entries: make(map[MetricFamily]Entry), Entries: make(map[MetricFamily]Entry),
config: config,
} }
return cache return cache
} }
@ -177,7 +180,7 @@ func (c *Collection) createLabels(metric telegraf.Metric) []LabelPair {
return labels return labels
} }
func (c *Collection) Add(metric telegraf.Metric) { func (c *Collection) Add(metric telegraf.Metric, now time.Time) {
labels := c.createLabels(metric) labels := c.createLabels(metric)
for _, field := range metric.FieldList() { for _, field := range metric.FieldList() {
metricName := MetricName(metric.Name(), field.Key, metric.Type()) metricName := MetricName(metric.Name(), field.Key, metric.Type())
@ -225,9 +228,10 @@ func (c *Collection) Add(metric telegraf.Metric) {
} }
m = &Metric{ m = &Metric{
Labels: labels, Labels: labels,
Time: metric.Time(), Time: metric.Time(),
Scaler: &Scaler{Value: value}, AddTime: now,
Scaler: &Scaler{Value: value},
} }
entry.Metrics[metricKey] = m entry.Metrics[metricKey] = m
@ -236,6 +240,7 @@ func (c *Collection) Add(metric telegraf.Metric) {
m = &Metric{ m = &Metric{
Labels: labels, Labels: labels,
Time: metric.Time(), Time: metric.Time(),
AddTime: now,
Histogram: &Histogram{}, Histogram: &Histogram{},
} }
} }
@ -283,6 +288,7 @@ func (c *Collection) Add(metric telegraf.Metric) {
m = &Metric{ m = &Metric{
Labels: labels, Labels: labels,
Time: metric.Time(), Time: metric.Time(),
AddTime: now,
Summary: &Summary{}, Summary: &Summary{},
} }
} }
@ -331,7 +337,7 @@ func (c *Collection) Expire(now time.Time, age time.Duration) {
expireTime := now.Add(-age) expireTime := now.Add(-age)
for _, entry := range c.Entries { for _, entry := range c.Entries {
for key, metric := range entry.Metrics { for key, metric := range entry.Metrics {
if metric.Time.Before(expireTime) { if metric.AddTime.Before(expireTime) {
delete(entry.Metrics, key) delete(entry.Metrics, key)
if len(entry.Metrics) == 0 { if len(entry.Metrics) == 0 {
delete(c.Entries, entry.Family) delete(c.Entries, entry.Family)

View File

@ -12,27 +12,35 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type Input struct {
metric telegraf.Metric
addtime time.Time
}
func TestCollectionExpire(t *testing.T) { func TestCollectionExpire(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
now time.Time now time.Time
age time.Duration age time.Duration
metrics []telegraf.Metric input []Input
expected []*dto.MetricFamily expected []*dto.MetricFamily
}{ }{
{ {
name: "not expired", name: "not expired",
now: time.Unix(1, 0), now: time.Unix(1, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"cpu", metric: testutil.MustMetric(
map[string]string{}, "cpu",
map[string]interface{}{ map[string]string{},
"time_idle": 42.0, map[string]interface{}{
}, "time_idle": 42.0,
time.Unix(0, 0), },
), time.Unix(0, 0),
),
addtime: time.Unix(0, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -52,23 +60,29 @@ func TestCollectionExpire(t *testing.T) {
name: "update metric expiration", name: "update metric expiration",
now: time.Unix(20, 0), now: time.Unix(20, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"cpu", metric: testutil.MustMetric(
map[string]string{}, "cpu",
map[string]interface{}{ map[string]string{},
"time_idle": 42.0, map[string]interface{}{
}, "time_idle": 42.0,
time.Unix(0, 0), },
), time.Unix(0, 0),
testutil.MustMetric( ),
"cpu", addtime: time.Unix(0, 0),
map[string]string{}, },
map[string]interface{}{ {
"time_idle": 43.0, metric: testutil.MustMetric(
}, "cpu",
time.Unix(12, 0), map[string]string{},
), map[string]interface{}{
"time_idle": 43.0,
},
time.Unix(12, 0),
),
addtime: time.Unix(12, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -88,23 +102,28 @@ func TestCollectionExpire(t *testing.T) {
name: "update metric expiration descending order", name: "update metric expiration descending order",
now: time.Unix(20, 0), now: time.Unix(20, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"cpu", metric: testutil.MustMetric(
map[string]string{}, "cpu",
map[string]interface{}{ map[string]string{},
"time_idle": 42.0, map[string]interface{}{
}, "time_idle": 42.0,
time.Unix(12, 0), },
), time.Unix(12, 0),
testutil.MustMetric( ),
"cpu", addtime: time.Unix(12, 0),
map[string]string{}, }, {
map[string]interface{}{ metric: testutil.MustMetric(
"time_idle": 43.0, "cpu",
}, map[string]string{},
time.Unix(0, 0), map[string]interface{}{
), "time_idle": 43.0,
},
time.Unix(0, 0),
),
addtime: time.Unix(0, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -124,15 +143,18 @@ func TestCollectionExpire(t *testing.T) {
name: "expired single metric in metric family", name: "expired single metric in metric family",
now: time.Unix(20, 0), now: time.Unix(20, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"cpu", metric: testutil.MustMetric(
map[string]string{}, "cpu",
map[string]interface{}{ map[string]string{},
"time_idle": 42.0, map[string]interface{}{
}, "time_idle": 42.0,
time.Unix(0, 0), },
), time.Unix(0, 0),
),
addtime: time.Unix(0, 0),
},
}, },
expected: []*dto.MetricFamily{}, expected: []*dto.MetricFamily{},
}, },
@ -140,23 +162,28 @@ func TestCollectionExpire(t *testing.T) {
name: "expired one metric in metric family", name: "expired one metric in metric family",
now: time.Unix(20, 0), now: time.Unix(20, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"cpu", metric: testutil.MustMetric(
map[string]string{}, "cpu",
map[string]interface{}{ map[string]string{},
"time_idle": 42.0, map[string]interface{}{
}, "time_idle": 42.0,
time.Unix(0, 0), },
), time.Unix(0, 0),
testutil.MustMetric( ),
"cpu", addtime: time.Unix(0, 0),
map[string]string{}, }, {
map[string]interface{}{ metric: testutil.MustMetric(
"time_guest": 42.0, "cpu",
}, map[string]string{},
time.Unix(15, 0), map[string]interface{}{
), "time_guest": 42.0,
},
time.Unix(15, 0),
),
addtime: time.Unix(15, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -176,64 +203,77 @@ func TestCollectionExpire(t *testing.T) {
name: "histogram bucket updates", name: "histogram bucket updates",
now: time.Unix(0, 0), now: time.Unix(0, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"prometheus", metric: testutil.MustMetric(
map[string]string{}, "prometheus",
map[string]interface{}{ map[string]string{},
"http_request_duration_seconds_sum": 10.0, map[string]interface{}{
"http_request_duration_seconds_count": 2, "http_request_duration_seconds_sum": 10.0,
}, "http_request_duration_seconds_count": 2,
time.Unix(0, 0), },
telegraf.Histogram, time.Unix(0, 0),
), telegraf.Histogram,
testutil.MustMetric( ),
"prometheus", addtime: time.Unix(0, 0),
map[string]string{"le": "0.05"}, }, {
map[string]interface{}{ metric: testutil.MustMetric(
"http_request_duration_seconds_bucket": 1.0, "prometheus",
}, map[string]string{"le": "0.05"},
time.Unix(0, 0), map[string]interface{}{
telegraf.Histogram, "http_request_duration_seconds_bucket": 1.0,
), },
testutil.MustMetric( time.Unix(0, 0),
"prometheus", telegraf.Histogram,
map[string]string{"le": "+Inf"}, ),
map[string]interface{}{ addtime: time.Unix(0, 0),
"http_request_duration_seconds_bucket": 1.0, }, {
}, metric: testutil.MustMetric(
time.Unix(0, 0), "prometheus",
telegraf.Histogram, map[string]string{"le": "+Inf"},
), map[string]interface{}{
// Next interval "http_request_duration_seconds_bucket": 1.0,
testutil.MustMetric( },
"prometheus", time.Unix(0, 0),
map[string]string{}, telegraf.Histogram,
map[string]interface{}{ ),
"http_request_duration_seconds_sum": 20.0, addtime: time.Unix(0, 0),
"http_request_duration_seconds_count": 4, }, {
}, // Next interval
time.Unix(0, 0), metric: testutil.MustMetric(
telegraf.Histogram, "prometheus",
), map[string]string{},
testutil.MustMetric( map[string]interface{}{
"prometheus", "http_request_duration_seconds_sum": 20.0,
map[string]string{"le": "0.05"}, "http_request_duration_seconds_count": 4,
map[string]interface{}{ },
"http_request_duration_seconds_bucket": 2.0, time.Unix(0, 0),
}, telegraf.Histogram,
time.Unix(0, 0), ),
telegraf.Histogram, addtime: time.Unix(0, 0),
), }, {
testutil.MustMetric( metric: testutil.MustMetric(
"prometheus", "prometheus",
map[string]string{"le": "+Inf"}, map[string]string{"le": "0.05"},
map[string]interface{}{ map[string]interface{}{
"http_request_duration_seconds_bucket": 2.0, "http_request_duration_seconds_bucket": 2.0,
}, },
time.Unix(0, 0), time.Unix(0, 0),
telegraf.Histogram, telegraf.Histogram,
), ),
addtime: time.Unix(0, 0),
}, {
metric: testutil.MustMetric(
"prometheus",
map[string]string{"le": "+Inf"},
map[string]interface{}{
"http_request_duration_seconds_bucket": 2.0,
},
time.Unix(0, 0),
telegraf.Histogram,
),
addtime: time.Unix(0, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -266,46 +306,55 @@ func TestCollectionExpire(t *testing.T) {
name: "summary quantile updates", name: "summary quantile updates",
now: time.Unix(0, 0), now: time.Unix(0, 0),
age: 10 * time.Second, age: 10 * time.Second,
metrics: []telegraf.Metric{ input: []Input{
testutil.MustMetric( {
"prometheus", metric: testutil.MustMetric(
map[string]string{}, "prometheus",
map[string]interface{}{ map[string]string{},
"rpc_duration_seconds_sum": 1.0, map[string]interface{}{
"rpc_duration_seconds_count": 1, "rpc_duration_seconds_sum": 1.0,
}, "rpc_duration_seconds_count": 1,
time.Unix(0, 0), },
telegraf.Summary, time.Unix(0, 0),
), telegraf.Summary,
testutil.MustMetric( ),
"prometheus", addtime: time.Unix(0, 0),
map[string]string{"quantile": "0.01"}, }, {
map[string]interface{}{ metric: testutil.MustMetric(
"rpc_duration_seconds": 1.0, "prometheus",
}, map[string]string{"quantile": "0.01"},
time.Unix(0, 0), map[string]interface{}{
telegraf.Summary, "rpc_duration_seconds": 1.0,
), },
// Updated Summary time.Unix(0, 0),
testutil.MustMetric( telegraf.Summary,
"prometheus", ),
map[string]string{}, addtime: time.Unix(0, 0),
map[string]interface{}{ }, {
"rpc_duration_seconds_sum": 2.0, // Updated Summary
"rpc_duration_seconds_count": 2, metric: testutil.MustMetric(
}, "prometheus",
time.Unix(0, 0), map[string]string{},
telegraf.Summary, map[string]interface{}{
), "rpc_duration_seconds_sum": 2.0,
testutil.MustMetric( "rpc_duration_seconds_count": 2,
"prometheus", },
map[string]string{"quantile": "0.01"}, time.Unix(0, 0),
map[string]interface{}{ telegraf.Summary,
"rpc_duration_seconds": 2.0, ),
}, addtime: time.Unix(0, 0),
time.Unix(0, 0), }, {
telegraf.Summary, metric: testutil.MustMetric(
), "prometheus",
map[string]string{"quantile": "0.01"},
map[string]interface{}{
"rpc_duration_seconds": 2.0,
},
time.Unix(0, 0),
telegraf.Summary,
),
addtime: time.Unix(0, 0),
},
}, },
expected: []*dto.MetricFamily{ expected: []*dto.MetricFamily{
{ {
@ -330,12 +379,43 @@ func TestCollectionExpire(t *testing.T) {
}, },
}, },
}, },
{
name: "expire based on add time",
now: time.Unix(20, 0),
age: 10 * time.Second,
input: []Input{
{
metric: testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
addtime: time.Unix(15, 0),
},
},
expected: []*dto.MetricFamily{
{
Name: proto.String("cpu_time_idle"),
Help: proto.String(helpString),
Type: dto.MetricType_UNTYPED.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{},
Untyped: &dto.Untyped{Value: proto.Float64(42.0)},
},
},
},
},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
c := NewCollection(FormatConfig{}) c := NewCollection(FormatConfig{})
for _, metric := range tt.metrics { for _, item := range tt.input {
c.Add(metric) c.Add(item.metric, item.addtime)
} }
c.Expire(tt.now, tt.age) c.Expire(tt.now, tt.age)

View File

@ -2,6 +2,7 @@ package prometheus
import ( import (
"bytes" "bytes"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
@ -53,7 +54,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
coll := NewCollection(s.config) coll := NewCollection(s.config)
for _, metric := range metrics { for _, metric := range metrics {
coll.Add(metric) coll.Add(metric, time.Now())
} }
var buf bytes.Buffer var buf bytes.Buffer