diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f130fe60..0cb8d3349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## v1.0 +## v1.0 [unreleased] ### Release Notes @@ -42,6 +42,7 @@ should now look like: - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. +- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix. ## v1.0 beta 2 [2016-06-21] diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go index e8a7c0892..3c9ddc503 100644 --- a/plugins/inputs/prometheus/parser.go +++ b/plugins/inputs/prometheus/parser.go @@ -10,6 +10,7 @@ import ( "io" "math" "mime" + "net/http" "time" "github.com/influxdata/telegraf" @@ -19,17 +20,9 @@ import ( "github.com/prometheus/common/expfmt" ) -// PrometheusParser is an object for Parsing incoming metrics. -type PrometheusParser struct { - // PromFormat - PromFormat map[string]string - // DefaultTags will be added to every parsed metric - // DefaultTags map[string]string -} - // Parse returns a slice of Metrics from a text representation of a // metrics -func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { var metrics []telegraf.Metric var parser expfmt.TextParser // parse even if the buffer begins with a newline @@ -38,97 +31,71 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { buffer := bytes.NewBuffer(buf) reader := bufio.NewReader(buffer) - // Get format - mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"]) + mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) + if err == nil && mediatype == "application/vnd.google.protobuf" && params["encoding"] == "delimited" && params["proto"] == "io.prometheus.client.MetricFamily" { for { - metricFamily := &dto.MetricFamily{} - if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { - if err == io.EOF { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if ierr == io.EOF { break } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) } - metricFamilies[metricFamily.GetName()] = metricFamily + metricFamilies[mf.GetName()] = mf } } else { metricFamilies, err = parser.TextToMetricFamilies(reader) if err != nil { return nil, fmt.Errorf("reading text format failed: %s", err) } - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := makeLabels(m) - /* - for key, value := range p.DefaultTags { - tags[key] = value - } - */ - // reading fields - fields := make(map[string]interface{}) - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - fields = makeQuantiles(m) - fields["count"] = float64(m.GetSummary().GetSampleCount()) - fields["sum"] = float64(m.GetSummary().GetSampleSum()) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // historgram metric - fields = makeBuckets(m) - fields["count"] = float64(m.GetHistogram().GetSampleCount()) - fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + } + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + // reading fields + fields := make(map[string]interface{}) + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetSummary().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // historgram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetHistogram().GetSampleSum()) + + } else { + // standard metric + fields = getNameAndValue(m) + } + // converting to telegraf metric + if len(fields) > 0 { + var t time.Time + if m.TimestampMs != nil && *m.TimestampMs > 0 { + t = time.Unix(0, *m.TimestampMs*1000000) } else { - // standard metric - fields = getNameAndValue(m) + t = time.Now() } - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = time.Now() - } - metric, err := telegraf.NewMetric(metricName, tags, fields, t) - if err == nil { - metrics = append(metrics, metric) - } + metric, err := telegraf.NewMetric(metricName, tags, fields, t) + if err == nil { + metrics = append(metrics, metric) } } } } + return metrics, err } -// Parse one line -func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line + "\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf( - "Can not parse the line: %s, for data format: prometheus", line) - } - - return metrics[0], nil -} - -/* -// Set default tags -func (p *PrometheusParser) SetDefaultTags(tags map[string]string) { - p.DefaultTags = tags -} -*/ - // Get Quantiles from summary metric func makeQuantiles(m *dto.Metric) map[string]interface{} { fields := make(map[string]interface{}) diff --git a/plugins/inputs/prometheus/parser_test.go b/plugins/inputs/prometheus/parser_test.go index 6259a4ef6..4f2a8516f 100644 --- a/plugins/inputs/prometheus/parser_test.go +++ b/plugins/inputs/prometheus/parser_test.go @@ -1,6 +1,7 @@ package prometheus import ( + "net/http" "testing" "time" @@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 ` func TestParseValidPrometheus(t *testing.T) { - parser := PrometheusParser{} - // Gauge value - metrics, err := parser.Parse([]byte(validUniqueGauge)) + metrics, err := Parse([]byte(validUniqueGauge), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) @@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) { }, metrics[0].Tags()) // Counter value - //parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) - metrics, err = parser.Parse([]byte(validUniqueCounter)) + metrics, err = Parse([]byte(validUniqueCounter), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "get_token_fail_count", metrics[0].Name()) @@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) { assert.Equal(t, map[string]string{}, metrics[0].Tags()) // Summary data - //parser.SetDefaultTags(map[string]string{}) - metrics, err = parser.Parse([]byte(validUniqueSummary)) + //SetDefaultTags(map[string]string{}) + metrics, err = Parse([]byte(validUniqueSummary), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) @@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) { assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) // histogram data - metrics, err = parser.Parse([]byte(validUniqueHistogram)) + metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}) assert.NoError(t, err) assert.Len(t, metrics, 1) assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) @@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) { metrics[0].Tags()) } - -func TestParseLineInvalidPrometheus(t *testing.T) { - parser := PrometheusParser{} - metric, err := parser.ParseLine(validUniqueLine) - assert.NotNil(t, err) - assert.Nil(t, metric) - -} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2eabcf92c..12f7fd38e 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -13,6 +13,8 @@ import ( "time" ) +const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` + type Prometheus struct { Urls []string @@ -86,7 +88,7 @@ var client = &http.Client{ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { collectDate := time.Now() var req, err = http.NewRequest("GET", url, nil) - req.Header = make(http.Header) + req.Header.Add("Accept", acceptHeader) var token []byte var resp *http.Response @@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { return fmt.Errorf("error reading body: %s", err) } - // Headers - headers := make(map[string]string) - for key, value := range headers { - headers[key] = value - } - - // Prepare Prometheus parser config - promparser := PrometheusParser{ - PromFormat: headers, - } - - metrics, err := promparser.Parse(body) + metrics, err := Parse(body, resp.Header) if err != nil { - return fmt.Errorf("error getting processing samples for %s: %s", + return fmt.Errorf("error reading metrics for %s: %s", url, err) } // Add (or not) collected metrics