Prometheus parser fix, parse headers properly

closes #1458
This commit is contained in:
Cameron Sparr 2016-07-07 12:15:47 +02:00
parent c046232425
commit e1c3800cd9
4 changed files with 55 additions and 106 deletions

View File

@ -1,4 +1,4 @@
## v1.0 ## v1.0 [unreleased]
### Release Notes ### Release Notes
@ -42,6 +42,7 @@ should now look like:
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

View File

@ -10,6 +10,7 @@ import (
"io" "io"
"math" "math"
"mime" "mime"
"net/http"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -19,17 +20,9 @@ import (
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
) )
// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}
// Parse returns a slice of Metrics from a text representation of a // Parse returns a slice of Metrics from a text representation of a
// metrics // metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric var metrics []telegraf.Metric
var parser expfmt.TextParser var parser expfmt.TextParser
// parse even if the buffer begins with a newline // parse even if the buffer begins with a newline
@ -38,97 +31,71 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buffer := bytes.NewBuffer(buf) buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer) reader := bufio.NewReader(buffer)
// Get format mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
// Prepare output // Prepare output
metricFamilies := make(map[string]*dto.MetricFamily) metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" && if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" && params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" { params["proto"] == "io.prometheus.client.MetricFamily" {
for { for {
metricFamily := &dto.MetricFamily{} mf := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if err == io.EOF { if ierr == io.EOF {
break break
} }
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
} }
metricFamilies[metricFamily.GetName()] = metricFamily metricFamilies[mf.GetName()] = mf
} }
} else { } else {
metricFamilies, err = parser.TextToMetricFamilies(reader) metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil { if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err) return nil, fmt.Errorf("reading text format failed: %s", err)
} }
// read metrics }
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else { } else {
// standard metric t = time.Now()
fields = getNameAndValue(m)
} }
// converting to telegraf metric metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if len(fields) > 0 { if err == nil {
var t time.Time metrics = append(metrics, metric)
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}
} }
} }
} }
} }
return metrics, err return metrics, err
} }
// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}
return metrics[0], nil
}
/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/
// Get Quantiles from summary metric // Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} { func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{}) fields := make(map[string]interface{})

View File

@ -1,6 +1,7 @@
package prometheus package prometheus
import ( import (
"net/http"
"testing" "testing"
"time" "time"
@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
` `
func TestParseValidPrometheus(t *testing.T) { func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}
// Gauge value // Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge)) metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) {
}, metrics[0].Tags()) }, metrics[0].Tags())
// Counter value // Counter value
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
metrics, err = parser.Parse([]byte(validUniqueCounter))
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name()) assert.Equal(t, "get_token_fail_count", metrics[0].Name())
@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{}, metrics[0].Tags()) assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Summary data // Summary data
//parser.SetDefaultTags(map[string]string{}) //SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary)) metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data // histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram)) metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) {
metrics[0].Tags()) metrics[0].Tags())
} }
func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)
}

View File

@ -13,6 +13,8 @@ import (
"time" "time"
) )
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
type Prometheus struct { type Prometheus struct {
Urls []string Urls []string
@ -86,7 +88,7 @@ var client = &http.Client{
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now() collectDate := time.Now()
var req, err = http.NewRequest("GET", url, nil) var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header) req.Header.Add("Accept", acceptHeader)
var token []byte var token []byte
var resp *http.Response var resp *http.Response
@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
return fmt.Errorf("error reading body: %s", err) return fmt.Errorf("error reading body: %s", err)
} }
// Headers metrics, err := Parse(body, resp.Header)
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}
// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}
metrics, err := promparser.Parse(body)
if err != nil { if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s", return fmt.Errorf("error reading metrics for %s: %s",
url, err) url, err)
} }
// Add (or not) collected metrics // Add (or not) collected metrics