Add history and summary types to telegraf and prometheus plugins (#3337)

This commit is contained in:
Jeremy Doupe 2017-10-24 18:28:52 -05:00 committed by Daniel Nelson
parent 13c1f1524a
commit a6797a44d5
8 changed files with 334 additions and 84 deletions

View File

@ -28,6 +28,18 @@ type Accumulator interface {
tags map[string]string, tags map[string]string,
t ...time.Time) t ...time.Time)
// AddSummary is the same as AddFields, but will add the metric as a "Summary" type
AddSummary(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)
// AddHistogram is the same as AddFields, but will add the metric as a "Histogram" type
AddHistogram(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)
SetPrecision(precision, interval time.Duration) SetPrecision(precision, interval time.Duration)
AddError(err error) AddError(err error)

View File

@ -76,6 +76,28 @@ func (ac *accumulator) AddCounter(
} }
} }
func (ac *accumulator) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
func (ac *accumulator) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil {
ac.metrics <- m
}
}
// AddError passes a runtime error to the accumulator. // AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log. // The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) { func (ac *accumulator) AddError(err error) {

View File

@ -13,6 +13,8 @@ const (
Counter Counter
Gauge Gauge
Untyped Untyped
Summary
Histogram
) )
type Metric interface { type Metric interface {

View File

@ -103,6 +103,10 @@ func valueType(mt dto.MetricType) telegraf.ValueType {
return telegraf.Counter return telegraf.Counter
case dto.MetricType_GAUGE: case dto.MetricType_GAUGE:
return telegraf.Gauge return telegraf.Gauge
case dto.MetricType_SUMMARY:
return telegraf.Summary
case dto.MetricType_HISTOGRAM:
return telegraf.Histogram
default: default:
return telegraf.Untyped return telegraf.Untyped
} }
@ -145,11 +149,11 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields["gauge"] = float64(m.GetGauge().GetValue()) fields["gauge"] = float64(m.GetGauge().GetValue())
} }
} else if m.Counter != nil { } else if m.Counter != nil {
if !math.IsNaN(m.GetGauge().GetValue()) { if !math.IsNaN(m.GetCounter().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue()) fields["counter"] = float64(m.GetCounter().GetValue())
} }
} else if m.Untyped != nil { } else if m.Untyped != nil {
if !math.IsNaN(m.GetGauge().GetValue()) { if !math.IsNaN(m.GetUntyped().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue()) fields["value"] = float64(m.GetUntyped().GetValue())
} }
} }

View File

@ -224,6 +224,10 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time()) acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Gauge: case telegraf.Gauge:
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time()) acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Summary:
acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
case telegraf.Histogram:
acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
default: default:
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
} }

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"regexp" "regexp"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -28,8 +29,13 @@ type SampleID string
type Sample struct { type Sample struct {
// Labels are the Prometheus labels. // Labels are the Prometheus labels.
Labels map[string]string Labels map[string]string
// Value is the value in the Prometheus output. // Value is the value in the Prometheus output. Only one of these will populated.
Value float64 Value float64
HistogramValue map[float64]uint64
SummaryValue map[float64]float64
// Histograms and Summaries need a count and a sum
Count uint64
Sum float64
// Expiration is the deadline that this Sample is valid until. // Expiration is the deadline that this Sample is valid until.
Expiration time.Time Expiration time.Time
} }
@ -38,8 +44,9 @@ type Sample struct {
type MetricFamily struct { type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily. // Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample Samples map[SampleID]*Sample
// Type of the Value. // Need the telegraf ValueType because there isn't a Prometheus ValueType
ValueType prometheus.ValueType // representing Histogram or Summary
TelegrafValueType telegraf.ValueType
// LabelSet is the label counts for all Samples. // LabelSet is the label counts for all Samples.
LabelSet map[string]int LabelSet map[string]int
} }
@ -189,7 +196,16 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
labels = append(labels, v) labels = append(labels, v)
} }
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...) var metric prometheus.Metric
var err error
switch family.TelegrafValueType {
case telegraf.Summary:
metric, err = prometheus.NewConstSummary(desc, sample.Count, sample.Sum, sample.SummaryValue, labels...)
case telegraf.Histogram:
metric, err = prometheus.NewConstHistogram(desc, sample.Count, sample.Sum, sample.HistogramValue, labels...)
default:
metric, err = prometheus.NewConstMetric(desc, getPromValueType(family.TelegrafValueType), sample.Value, labels...)
}
if err != nil { if err != nil {
log.Printf("E! Error creating prometheus metric, "+ log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
@ -205,7 +221,7 @@ func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_") return invalidNameCharRE.ReplaceAllString(value, "_")
} }
func valueType(tt telegraf.ValueType) prometheus.ValueType { func getPromValueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt { switch tt {
case telegraf.Counter: case telegraf.Counter:
return prometheus.CounterValue return prometheus.CounterValue
@ -226,6 +242,30 @@ func CreateSampleID(tags map[string]string) SampleID {
return SampleID(strings.Join(pairs, ",")) return SampleID(strings.Join(pairs, ","))
} }
func addSample(fam *MetricFamily, sample *Sample, sampleID SampleID) {
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
fam.Samples[sampleID] = sample
}
func (p *PrometheusClient) addMetricFamily(point telegraf.Metric, sample *Sample, mname string, sampleID SampleID) {
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
TelegrafValueType: point.Type(),
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
}
addSample(fam, sample, sampleID)
}
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@ -234,7 +274,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
for _, point := range metrics { for _, point := range metrics {
tags := point.Tags() tags := point.Tags()
vt := valueType(point.Type())
sampleID := CreateSampleID(tags) sampleID := CreateSampleID(tags)
labels := make(map[string]string) labels := make(map[string]string)
@ -251,77 +290,128 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
} }
for fn, fv := range point.Fields() { switch point.Type() {
// Ignore string and bool fields. case telegraf.Summary:
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string var mname string
switch point.Type() { var sum float64
case telegraf.Counter: var count uint64
if fn == "counter" { summaryvalue := make(map[float64]float64)
mname = sanitize(point.Name()) for fn, fv := range point.Fields() {
} var value float64
case telegraf.Gauge: switch fv := fv.(type) {
if fn == "gauge" { case int64:
mname = sanitize(point.Name()) value = float64(fv)
} case float64:
} value = fv
if mname == "" { default:
if fn == "value" { continue
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
}
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
// Metrics can be untyped even though the corresponding plugin
// creates them with a type. This happens when the metric was
// transferred over the network in a format that does not
// preserve value type and received using an input such as a
// queue consumer. To avoid issues we automatically upgrade
// value type from untyped to a typed metric.
if fam.ValueType == prometheus.UntypedValue {
fam.ValueType = vt
} }
if vt != prometheus.UntypedValue && fam.ValueType != vt { switch fn {
// Don't return an error since this would be a permanent error case "sum":
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name()) sum = value
break case "count":
count = uint64(value)
default:
limit, err := strconv.ParseFloat(fn, 64)
if err == nil {
summaryvalue[limit] = value
}
} }
} }
sample := &Sample{
for k, _ := range sample.Labels { Labels: labels,
fam.LabelSet[k]++ SummaryValue: summaryvalue,
Count: count,
Sum: sum,
Expiration: now.Add(p.ExpirationInterval.Duration),
} }
mname = sanitize(point.Name())
fam.Samples[sampleID] = sample p.addMetricFamily(point, sample, mname, sampleID)
case telegraf.Histogram:
var mname string
var sum float64
var count uint64
histogramvalue := make(map[float64]uint64)
for fn, fv := range point.Fields() {
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
switch fn {
case "sum":
sum = value
case "count":
count = uint64(value)
default:
limit, err := strconv.ParseFloat(fn, 64)
if err == nil {
histogramvalue[limit] = uint64(value)
}
}
}
sample := &Sample{
Labels: labels,
HistogramValue: histogramvalue,
Count: count,
Sum: sum,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
mname = sanitize(point.Name())
p.addMetricFamily(point, sample, mname, sampleID)
default:
for fn, fv := range point.Fields() {
// Ignore string and bool fields.
var value float64
switch fv := fv.(type) {
case int64:
value = float64(fv)
case float64:
value = fv
default:
continue
}
sample := &Sample{
Labels: labels,
Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
}
// Special handling of value field; supports passthrough from
// the prometheus input.
var mname string
switch point.Type() {
case telegraf.Counter:
if fn == "counter" {
mname = sanitize(point.Name())
}
case telegraf.Gauge:
if fn == "gauge" {
mname = sanitize(point.Name())
}
}
if mname == "" {
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
}
}
p.addMetricFamily(point, sample, mname, sampleID)
}
} }
} }
return nil return nil

View File

@ -9,7 +9,6 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus" prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -45,7 +44,7 @@ func TestWrite_Basic(t *testing.T) {
fam, ok := client.fam["foo"] fam, ok := client.fam["foo"]
require.True(t, ok) require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType) require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
require.Equal(t, map[string]int{}, fam.LabelSet) require.Equal(t, map[string]int{}, fam.LabelSet)
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())] sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
@ -119,7 +118,7 @@ func TestWrite_Counters(t *testing.T) {
args args args args
err error err error
metricName string metricName string
promType prometheus.ValueType valueType telegraf.ValueType
}{ }{
{ {
name: "field named value is not added to metric name", name: "field named value is not added to metric name",
@ -129,7 +128,7 @@ func TestWrite_Counters(t *testing.T) {
valueType: telegraf.Counter, valueType: telegraf.Counter,
}, },
metricName: "foo", metricName: "foo",
promType: prometheus.CounterValue, valueType: telegraf.Counter,
}, },
{ {
name: "field named counter is not added to metric name", name: "field named counter is not added to metric name",
@ -139,7 +138,7 @@ func TestWrite_Counters(t *testing.T) {
valueType: telegraf.Counter, valueType: telegraf.Counter,
}, },
metricName: "foo", metricName: "foo",
promType: prometheus.CounterValue, valueType: telegraf.Counter,
}, },
{ {
name: "field with any other name is added to metric name", name: "field with any other name is added to metric name",
@ -149,7 +148,7 @@ func TestWrite_Counters(t *testing.T) {
valueType: telegraf.Counter, valueType: telegraf.Counter,
}, },
metricName: "foo_other", metricName: "foo_other",
promType: prometheus.CounterValue, valueType: telegraf.Counter,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
@ -167,7 +166,7 @@ func TestWrite_Counters(t *testing.T) {
fam, ok := client.fam[tt.metricName] fam, ok := client.fam[tt.metricName]
require.True(t, ok) require.True(t, ok)
require.Equal(t, tt.promType, fam.ValueType) require.Equal(t, tt.valueType, fam.TelegrafValueType)
}) })
} }
} }
@ -196,20 +195,119 @@ func TestWrite_Sanitize(t *testing.T) {
} }
func TestWrite_Gauge(t *testing.T) { func TestWrite_Gauge(t *testing.T) {
type args struct {
measurement string
tags map[string]string
fields map[string]interface{}
valueType telegraf.ValueType
}
var tests = []struct {
name string
args args
err error
metricName string
valueType telegraf.ValueType
}{
{
name: "field named value is not added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"value": 42},
valueType: telegraf.Gauge,
},
metricName: "foo",
valueType: telegraf.Gauge,
},
{
name: "field named gauge is not added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"gauge": 42},
valueType: telegraf.Gauge,
},
metricName: "foo",
valueType: telegraf.Gauge,
},
{
name: "field with any other name is added to metric name",
args: args{
measurement: "foo",
fields: map[string]interface{}{"other": 42},
valueType: telegraf.Gauge,
},
metricName: "foo_other",
valueType: telegraf.Gauge,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(
tt.args.measurement,
tt.args.tags,
tt.args.fields,
time.Now(),
tt.args.valueType,
)
client := NewClient()
err = client.Write([]telegraf.Metric{m})
require.Equal(t, tt.err, err)
fam, ok := client.fam[tt.metricName]
require.True(t, ok)
require.Equal(t, tt.valueType, fam.TelegrafValueType)
})
}
}
func TestWrite_Summary(t *testing.T) {
client := NewClient() client := NewClient()
p1, err := metric.New( p1, err := metric.New(
"foo", "foo",
make(map[string]string), make(map[string]string),
map[string]interface{}{"value": 42}, map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
time.Now(), time.Now(),
telegraf.Gauge) telegraf.Summary)
err = client.Write([]telegraf.Metric{p1}) err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err) require.NoError(t, err)
fam, ok := client.fam["foo"] fam, ok := client.fam["foo"]
require.True(t, ok) require.True(t, ok)
require.Equal(t, prometheus.GaugeValue, fam.ValueType) require.Equal(t, 1, len(fam.Samples))
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 84.0, sample1.Sum)
require.Equal(t, uint64(42), sample1.Count)
require.Equal(t, 3, len(sample1.SummaryValue))
}
func TestWrite_Histogram(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
time.Now(),
telegraf.Histogram)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 1, len(fam.Samples))
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 84.0, sample1.Sum)
require.Equal(t, uint64(42), sample1.Count)
require.Equal(t, 3, len(sample1.HistogramValue))
} }
func TestWrite_MixedValueType(t *testing.T) { func TestWrite_MixedValueType(t *testing.T) {
@ -307,7 +405,7 @@ func TestWrite_Tags(t *testing.T) {
fam, ok := client.fam["foo"] fam, ok := client.fam["foo"]
require.True(t, ok) require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType) require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet) require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)

View File

@ -122,6 +122,24 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
} }
} }
func (a *Accumulator) AddSummary(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.AddFields(measurement, fields, tags, timestamp...)
}
func (a *Accumulator) AddHistogram(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time,
) {
a.AddFields(measurement, fields, tags, timestamp...)
}
// AddError appends the given error to Accumulator.Errors. // AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) { func (a *Accumulator) AddError(err error) {
if err == nil { if err == nil {