diff --git a/accumulator.go b/accumulator.go index 13fd6e571..370f0c70c 100644 --- a/accumulator.go +++ b/accumulator.go @@ -28,6 +28,18 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + // AddSummary is the same as AddFields, but will add the metric as a "Summary" type + AddSummary(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) + + // AddHistogram is the same as AddFields, but will add the metric as a "Histogram" type + AddHistogram(measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time) + SetPrecision(precision, interval time.Duration) AddError(err error) diff --git a/agent/accumulator.go b/agent/accumulator.go index 1f9e2270d..1fa9b13ee 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -76,6 +76,28 @@ func (ac *accumulator) AddCounter( } } +func (ac *accumulator) AddSummary( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil { + ac.metrics <- m + } +} + +func (ac *accumulator) AddHistogram( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil { + ac.metrics <- m + } +} + // AddError passes a runtime error to the accumulator. // The error will be tagged with the plugin name and written to the log. func (ac *accumulator) AddError(err error) { diff --git a/metric.go b/metric.go index fc479b51d..3fb531358 100644 --- a/metric.go +++ b/metric.go @@ -13,6 +13,8 @@ const ( Counter Gauge Untyped + Summary + Histogram ) type Metric interface { diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index 0807d7e7a..6584fbc05 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -103,6 +103,10 @@ func valueType(mt dto.MetricType) telegraf.ValueType { return telegraf.Counter case dto.MetricType_GAUGE: return telegraf.Gauge + case dto.MetricType_SUMMARY: + return telegraf.Summary + case dto.MetricType_HISTOGRAM: + return telegraf.Histogram default: return telegraf.Untyped } @@ -145,11 +149,11 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} { fields["gauge"] = float64(m.GetGauge().GetValue()) } } else if m.Counter != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { + if !math.IsNaN(m.GetCounter().GetValue()) { fields["counter"] = float64(m.GetCounter().GetValue()) } } else if m.Untyped != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { + if !math.IsNaN(m.GetUntyped().GetValue()) { fields["value"] = float64(m.GetUntyped().GetValue()) } } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 5445a12a3..c929a5b26 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -224,6 +224,10 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time()) case telegraf.Gauge: acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Summary: + acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Histogram: + acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time()) default: acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) } diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index d7702a062..f0b0a7673 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -8,6 +8,7 @@ import ( "os" "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -28,8 +29,13 @@ type SampleID string type Sample struct { // Labels are the Prometheus labels. Labels map[string]string - // Value is the value in the Prometheus output. - Value float64 + // Value is the value in the Prometheus output. Only one of these will populated. + Value float64 + HistogramValue map[float64]uint64 + SummaryValue map[float64]float64 + // Histograms and Summaries need a count and a sum + Count uint64 + Sum float64 // Expiration is the deadline that this Sample is valid until. Expiration time.Time } @@ -38,8 +44,9 @@ type Sample struct { type MetricFamily struct { // Samples are the Sample belonging to this MetricFamily. Samples map[SampleID]*Sample - // Type of the Value. - ValueType prometheus.ValueType + // Need the telegraf ValueType because there isn't a Prometheus ValueType + // representing Histogram or Summary + TelegrafValueType telegraf.ValueType // LabelSet is the label counts for all Samples. LabelSet map[string]int } @@ -189,7 +196,16 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { labels = append(labels, v) } - metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...) + var metric prometheus.Metric + var err error + switch family.TelegrafValueType { + case telegraf.Summary: + metric, err = prometheus.NewConstSummary(desc, sample.Count, sample.Sum, sample.SummaryValue, labels...) + case telegraf.Histogram: + metric, err = prometheus.NewConstHistogram(desc, sample.Count, sample.Sum, sample.HistogramValue, labels...) + default: + metric, err = prometheus.NewConstMetric(desc, getPromValueType(family.TelegrafValueType), sample.Value, labels...) + } if err != nil { log.Printf("E! Error creating prometheus metric, "+ "key: %s, labels: %v,\nerr: %s\n", @@ -205,7 +221,7 @@ func sanitize(value string) string { return invalidNameCharRE.ReplaceAllString(value, "_") } -func valueType(tt telegraf.ValueType) prometheus.ValueType { +func getPromValueType(tt telegraf.ValueType) prometheus.ValueType { switch tt { case telegraf.Counter: return prometheus.CounterValue @@ -226,6 +242,30 @@ func CreateSampleID(tags map[string]string) SampleID { return SampleID(strings.Join(pairs, ",")) } +func addSample(fam *MetricFamily, sample *Sample, sampleID SampleID) { + + for k, _ := range sample.Labels { + fam.LabelSet[k]++ + } + + fam.Samples[sampleID] = sample +} + +func (p *PrometheusClient) addMetricFamily(point telegraf.Metric, sample *Sample, mname string, sampleID SampleID) { + var fam *MetricFamily + var ok bool + if fam, ok = p.fam[mname]; !ok { + fam = &MetricFamily{ + Samples: make(map[SampleID]*Sample), + TelegrafValueType: point.Type(), + LabelSet: make(map[string]int), + } + p.fam[mname] = fam + } + + addSample(fam, sample, sampleID) +} + func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { p.Lock() defer p.Unlock() @@ -234,7 +274,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { for _, point := range metrics { tags := point.Tags() - vt := valueType(point.Type()) sampleID := CreateSampleID(tags) labels := make(map[string]string) @@ -251,77 +290,128 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { } } - for fn, fv := range point.Fields() { - // Ignore string and bool fields. - var value float64 - switch fv := fv.(type) { - case int64: - value = float64(fv) - case float64: - value = fv - default: - continue - } - - sample := &Sample{ - Labels: labels, - Value: value, - Expiration: now.Add(p.ExpirationInterval.Duration), - } - - // Special handling of value field; supports passthrough from - // the prometheus input. + switch point.Type() { + case telegraf.Summary: var mname string - switch point.Type() { - case telegraf.Counter: - if fn == "counter" { - mname = sanitize(point.Name()) - } - case telegraf.Gauge: - if fn == "gauge" { - mname = sanitize(point.Name()) - } - } - if mname == "" { - if fn == "value" { - mname = sanitize(point.Name()) - } else { - mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn)) - } - } - - var fam *MetricFamily - var ok bool - if fam, ok = p.fam[mname]; !ok { - fam = &MetricFamily{ - Samples: make(map[SampleID]*Sample), - ValueType: vt, - LabelSet: make(map[string]int), - } - p.fam[mname] = fam - } else { - // Metrics can be untyped even though the corresponding plugin - // creates them with a type. This happens when the metric was - // transferred over the network in a format that does not - // preserve value type and received using an input such as a - // queue consumer. To avoid issues we automatically upgrade - // value type from untyped to a typed metric. - if fam.ValueType == prometheus.UntypedValue { - fam.ValueType = vt + var sum float64 + var count uint64 + summaryvalue := make(map[float64]float64) + for fn, fv := range point.Fields() { + var value float64 + switch fv := fv.(type) { + case int64: + value = float64(fv) + case float64: + value = fv + default: + continue } - if vt != prometheus.UntypedValue && fam.ValueType != vt { - // Don't return an error since this would be a permanent error - log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name()) - break + switch fn { + case "sum": + sum = value + case "count": + count = uint64(value) + default: + limit, err := strconv.ParseFloat(fn, 64) + if err == nil { + summaryvalue[limit] = value + } } } - - for k, _ := range sample.Labels { - fam.LabelSet[k]++ + sample := &Sample{ + Labels: labels, + SummaryValue: summaryvalue, + Count: count, + Sum: sum, + Expiration: now.Add(p.ExpirationInterval.Duration), } + mname = sanitize(point.Name()) - fam.Samples[sampleID] = sample + p.addMetricFamily(point, sample, mname, sampleID) + + case telegraf.Histogram: + var mname string + var sum float64 + var count uint64 + histogramvalue := make(map[float64]uint64) + for fn, fv := range point.Fields() { + var value float64 + switch fv := fv.(type) { + case int64: + value = float64(fv) + case float64: + value = fv + default: + continue + } + + switch fn { + case "sum": + sum = value + case "count": + count = uint64(value) + default: + limit, err := strconv.ParseFloat(fn, 64) + if err == nil { + histogramvalue[limit] = uint64(value) + } + } + } + sample := &Sample{ + Labels: labels, + HistogramValue: histogramvalue, + Count: count, + Sum: sum, + Expiration: now.Add(p.ExpirationInterval.Duration), + } + mname = sanitize(point.Name()) + + p.addMetricFamily(point, sample, mname, sampleID) + + default: + for fn, fv := range point.Fields() { + // Ignore string and bool fields. + var value float64 + switch fv := fv.(type) { + case int64: + value = float64(fv) + case float64: + value = fv + default: + continue + } + + sample := &Sample{ + Labels: labels, + Value: value, + Expiration: now.Add(p.ExpirationInterval.Duration), + } + + // Special handling of value field; supports passthrough from + // the prometheus input. + var mname string + switch point.Type() { + case telegraf.Counter: + if fn == "counter" { + mname = sanitize(point.Name()) + } + case telegraf.Gauge: + if fn == "gauge" { + mname = sanitize(point.Name()) + } + } + if mname == "" { + if fn == "value" { + mname = sanitize(point.Name()) + } else { + mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn)) + } + } + + p.addMetricFamily(point, sample, mname, sampleID) + + } } } return nil diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 1bb1cc83a..69509ae1c 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -9,7 +9,6 @@ import ( "github.com/influxdata/telegraf/metric" prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -45,7 +44,7 @@ func TestWrite_Basic(t *testing.T) { fam, ok := client.fam["foo"] require.True(t, ok) - require.Equal(t, prometheus.UntypedValue, fam.ValueType) + require.Equal(t, telegraf.Untyped, fam.TelegrafValueType) require.Equal(t, map[string]int{}, fam.LabelSet) sample, ok := fam.Samples[CreateSampleID(pt1.Tags())] @@ -119,7 +118,7 @@ func TestWrite_Counters(t *testing.T) { args args err error metricName string - promType prometheus.ValueType + valueType telegraf.ValueType }{ { name: "field named value is not added to metric name", @@ -129,7 +128,7 @@ func TestWrite_Counters(t *testing.T) { valueType: telegraf.Counter, }, metricName: "foo", - promType: prometheus.CounterValue, + valueType: telegraf.Counter, }, { name: "field named counter is not added to metric name", @@ -139,7 +138,7 @@ func TestWrite_Counters(t *testing.T) { valueType: telegraf.Counter, }, metricName: "foo", - promType: prometheus.CounterValue, + valueType: telegraf.Counter, }, { name: "field with any other name is added to metric name", @@ -149,7 +148,7 @@ func TestWrite_Counters(t *testing.T) { valueType: telegraf.Counter, }, metricName: "foo_other", - promType: prometheus.CounterValue, + valueType: telegraf.Counter, }, } for _, tt := range tests { @@ -167,7 +166,7 @@ func TestWrite_Counters(t *testing.T) { fam, ok := client.fam[tt.metricName] require.True(t, ok) - require.Equal(t, tt.promType, fam.ValueType) + require.Equal(t, tt.valueType, fam.TelegrafValueType) }) } } @@ -196,20 +195,119 @@ func TestWrite_Sanitize(t *testing.T) { } func TestWrite_Gauge(t *testing.T) { + type args struct { + measurement string + tags map[string]string + fields map[string]interface{} + valueType telegraf.ValueType + } + var tests = []struct { + name string + args args + err error + metricName string + valueType telegraf.ValueType + }{ + { + name: "field named value is not added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"value": 42}, + valueType: telegraf.Gauge, + }, + metricName: "foo", + valueType: telegraf.Gauge, + }, + { + name: "field named gauge is not added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"gauge": 42}, + valueType: telegraf.Gauge, + }, + metricName: "foo", + valueType: telegraf.Gauge, + }, + { + name: "field with any other name is added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"other": 42}, + valueType: telegraf.Gauge, + }, + metricName: "foo_other", + valueType: telegraf.Gauge, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := metric.New( + tt.args.measurement, + tt.args.tags, + tt.args.fields, + time.Now(), + tt.args.valueType, + ) + client := NewClient() + err = client.Write([]telegraf.Metric{m}) + require.Equal(t, tt.err, err) + + fam, ok := client.fam[tt.metricName] + require.True(t, ok) + require.Equal(t, tt.valueType, fam.TelegrafValueType) + + }) + } +} + +func TestWrite_Summary(t *testing.T) { client := NewClient() p1, err := metric.New( "foo", make(map[string]string), - map[string]interface{}{"value": 42}, + map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4}, time.Now(), - telegraf.Gauge) + telegraf.Summary) + err = client.Write([]telegraf.Metric{p1}) require.NoError(t, err) fam, ok := client.fam["foo"] require.True(t, ok) - require.Equal(t, prometheus.GaugeValue, fam.ValueType) + require.Equal(t, 1, len(fam.Samples)) + + sample1, ok := fam.Samples[CreateSampleID(p1.Tags())] + require.True(t, ok) + + require.Equal(t, 84.0, sample1.Sum) + require.Equal(t, uint64(42), sample1.Count) + require.Equal(t, 3, len(sample1.SummaryValue)) +} + +func TestWrite_Histogram(t *testing.T) { + client := NewClient() + + p1, err := metric.New( + "foo", + make(map[string]string), + map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4}, + time.Now(), + telegraf.Histogram) + + err = client.Write([]telegraf.Metric{p1}) + require.NoError(t, err) + + fam, ok := client.fam["foo"] + require.True(t, ok) + require.Equal(t, 1, len(fam.Samples)) + + sample1, ok := fam.Samples[CreateSampleID(p1.Tags())] + require.True(t, ok) + + require.Equal(t, 84.0, sample1.Sum) + require.Equal(t, uint64(42), sample1.Count) + require.Equal(t, 3, len(sample1.HistogramValue)) } func TestWrite_MixedValueType(t *testing.T) { @@ -307,7 +405,7 @@ func TestWrite_Tags(t *testing.T) { fam, ok := client.fam["foo"] require.True(t, ok) - require.Equal(t, prometheus.UntypedValue, fam.ValueType) + require.Equal(t, telegraf.Untyped, fam.TelegrafValueType) require.Equal(t, map[string]int{"host": 1}, fam.LabelSet) diff --git a/testutil/accumulator.go b/testutil/accumulator.go index c478400eb..29c362c87 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -122,6 +122,24 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { } } +func (a *Accumulator) AddSummary( + measurement string, + fields map[string]interface{}, + tags map[string]string, + timestamp ...time.Time, +) { + a.AddFields(measurement, fields, tags, timestamp...) +} + +func (a *Accumulator) AddHistogram( + measurement string, + fields map[string]interface{}, + tags map[string]string, + timestamp ...time.Time, +) { + a.AddFields(measurement, fields, tags, timestamp...) +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {