From 01cfe1d5057023fecb896a4bc1619a4dd5c018e9 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 18 Oct 2017 14:51:08 -0700 Subject: [PATCH] Fix prometheus passthrough for existing value types (#3351) --- plugins/inputs/prometheus/parser.go | 13 +++- plugins/inputs/prometheus/prometheus.go | 10 ++- .../prometheus_client/prometheus_client.go | 42 ++++++++-- .../prometheus_client_test.go | 76 +++++++++++++++---- 4 files changed, 120 insertions(+), 21 deletions(-) diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index ac5c608f6..0807d7e7a 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -86,7 +86,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { } else { t = time.Now() } - metric, err := metric.New(metricName, tags, fields, t) + metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType())) if err == nil { metrics = append(metrics, metric) } @@ -97,6 +97,17 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { return metrics, err } +func valueType(mt dto.MetricType) telegraf.ValueType { + switch mt { + case dto.MetricType_COUNTER: + return telegraf.Counter + case dto.MetricType_GAUGE: + return telegraf.Gauge + default: + return telegraf.Untyped + } +} + // Get Quantiles from summary metric func makeQuantiles(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 7c3943bd0..5445a12a3 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -218,7 +218,15 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro if url.Address != "" { tags["address"] = url.Address } - acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) + + switch metric.Type() { + case telegraf.Counter: + acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time()) + case telegraf.Gauge: + acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time()) + default: + acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) + } } return nil diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 92addf9c0..48b625513 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net/http" + "os" "regexp" "sort" "strings" @@ -15,6 +16,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) @@ -46,6 +48,7 @@ type PrometheusClient struct { Listen string ExpirationInterval internal.Duration `toml:"expiration_interval"` Path string `toml:"path"` + CollectorsExclude []string `toml:"collectors_exclude"` server *http.Server @@ -62,11 +65,26 @@ var sampleConfig = ` ## Interval to expire metrics and not deliver to prometheus, 0 == no expiration # expiration_interval = "60s" + + ## Collectors to enable, valid entries are "gocollector" and "process". + ## If unset, both are enabled. + collectors_exclude = ["gocollector", "process"] ` func (p *PrometheusClient) Start() error { prometheus.Register(p) + for _, collector := range p.CollectorsExclude { + switch collector { + case "gocollector": + prometheus.Unregister(prometheus.NewGoCollector()) + case "process": + prometheus.Unregister(prometheus.NewProcessCollector(os.Getpid(), "")) + default: + return fmt.Errorf("unrecognized collector %s", collector) + } + } + if p.Listen == "" { p.Listen = "localhost:9273" } @@ -76,7 +94,9 @@ func (p *PrometheusClient) Start() error { } mux := http.NewServeMux() - mux.Handle(p.Path, prometheus.Handler()) + mux.Handle(p.Path, promhttp.HandlerFor( + prometheus.DefaultGatherer, + promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})) p.server = &http.Server{ Addr: p.Listen, @@ -243,10 +263,22 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { // Special handling of value field; supports passthrough from // the prometheus input. var mname string - if fn == "value" { - mname = sanitize(point.Name()) - } else { - mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn)) + switch point.Type() { + case telegraf.Counter: + if fn == "counter" { + mname = sanitize(point.Name()) + } + case telegraf.Gauge: + if fn == "gauge" { + mname = sanitize(point.Name()) + } + } + if mname == "" { + if fn == "value" { + mname = sanitize(point.Name()) + } else { + mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn)) + } } var fam *MetricFamily diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go index 767f9a878..a997c0401 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_test.go @@ -107,21 +107,69 @@ func TestWrite_SkipNonNumberField(t *testing.T) { require.False(t, ok) } -func TestWrite_Counter(t *testing.T) { - client := NewClient() +func TestWrite_Counters(t *testing.T) { + type args struct { + measurement string + tags map[string]string + fields map[string]interface{} + valueType telegraf.ValueType + } + var tests = []struct { + name string + args args + err error + metricName string + promType prometheus.ValueType + }{ + { + name: "field named value is not added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"value": 42}, + valueType: telegraf.Counter, + }, + metricName: "foo", + promType: prometheus.CounterValue, + }, + { + name: "field named counter is not added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"counter": 42}, + valueType: telegraf.Counter, + }, + metricName: "foo", + promType: prometheus.CounterValue, + }, + { + name: "field with any other name is added to metric name", + args: args{ + measurement: "foo", + fields: map[string]interface{}{"other": 42}, + valueType: telegraf.Counter, + }, + metricName: "foo_other", + promType: prometheus.CounterValue, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := metric.New( + tt.args.measurement, + tt.args.tags, + tt.args.fields, + time.Now(), + tt.args.valueType, + ) + client := NewClient() + err = client.Write([]telegraf.Metric{m}) + require.Equal(t, tt.err, err) - p1, err := metric.New( - "foo", - make(map[string]string), - map[string]interface{}{"value": 42}, - time.Now(), - telegraf.Counter) - err = client.Write([]telegraf.Metric{p1}) - require.NoError(t, err) - - fam, ok := client.fam["foo"] - require.True(t, ok) - require.Equal(t, prometheus.CounterValue, fam.ValueType) + fam, ok := client.fam[tt.metricName] + require.True(t, ok) + require.Equal(t, tt.promType, fam.ValueType) + }) + } } func TestWrite_Sanitize(t *testing.T) {