This commit is contained in:
ncohensm 2016-07-12 09:37:21 -07:00
commit 4a812b0c64
7 changed files with 61 additions and 111 deletions

View File

@ -1,4 +1,4 @@
## v1.0 ## v1.0 [unreleased]
### Release Notes ### Release Notes
@ -37,12 +37,14 @@ should now look like:
### Bugfixes ### Bugfixes
- [#1472](https://github.com/influxdata/telegraf/pull/1472): diskio input plugin: set 'skip_serial_number = true' by default to avoid high cardinality.
- [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix. - [#1426](https://github.com/influxdata/telegraf/pull/1426): nil metrics panic fix.
- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin. - [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin. - [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.
- [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin. - [#1405](https://github.com/influxdata/telegraf/issues/1405): Fix memory/connection leak in prometheus input plugin.
- [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support. - [#1378](https://github.com/influxdata/telegraf/issues/1378): Trim BOM from config file for Windows support.
- [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload. - [#1339](https://github.com/influxdata/telegraf/issues/1339): Prometheus client output panic on service reload.
- [#1461](https://github.com/influxdata/telegraf/pull/1461): Prometheus parser, protobuf format header fix.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

View File

@ -222,8 +222,6 @@ Telegraf can also collect metrics via the following service plugins:
* [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
* [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer) * [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.

View File

@ -32,6 +32,8 @@ regex patterns.
''' '''
``` ```
> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher.
## Grok Parser ## Grok Parser
The grok parser uses a slightly modified version of logstash "grok" patterns, The grok parser uses a slightly modified version of logstash "grok" patterns,

View File

@ -10,6 +10,7 @@ import (
"io" "io"
"math" "math"
"mime" "mime"
"net/http"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -19,17 +20,9 @@ import (
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
) )
// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// PromFormat
PromFormat map[string]string
// DefaultTags will be added to every parsed metric
// DefaultTags map[string]string
}
// Parse returns a slice of Metrics from a text representation of a // Parse returns a slice of Metrics from a text representation of a
// metrics // metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric var metrics []telegraf.Metric
var parser expfmt.TextParser var parser expfmt.TextParser
// parse even if the buffer begins with a newline // parse even if the buffer begins with a newline
@ -38,97 +31,71 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
buffer := bytes.NewBuffer(buf) buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer) reader := bufio.NewReader(buffer)
// Get format mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
mediatype, params, err := mime.ParseMediaType(p.PromFormat["Content-Type"])
// Prepare output // Prepare output
metricFamilies := make(map[string]*dto.MetricFamily) metricFamilies := make(map[string]*dto.MetricFamily)
if err == nil && mediatype == "application/vnd.google.protobuf" && if err == nil && mediatype == "application/vnd.google.protobuf" &&
params["encoding"] == "delimited" && params["encoding"] == "delimited" &&
params["proto"] == "io.prometheus.client.MetricFamily" { params["proto"] == "io.prometheus.client.MetricFamily" {
for { for {
metricFamily := &dto.MetricFamily{} mf := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(reader, metricFamily); err != nil { if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
if err == io.EOF { if ierr == io.EOF {
break break
} }
return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", err) return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr)
} }
metricFamilies[metricFamily.GetName()] = metricFamily metricFamilies[mf.GetName()] = mf
} }
} else { } else {
metricFamilies, err = parser.TextToMetricFamilies(reader) metricFamilies, err = parser.TextToMetricFamilies(reader)
if err != nil { if err != nil {
return nil, fmt.Errorf("reading text format failed: %s", err) return nil, fmt.Errorf("reading text format failed: %s", err)
} }
// read metrics }
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
/*
for key, value := range p.DefaultTags {
tags[key] = value
}
*/
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetSummary().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else { } else {
// standard metric t = time.Now()
fields = getNameAndValue(m)
} }
// converting to telegraf metric metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if len(fields) > 0 { if err == nil {
var t time.Time metrics = append(metrics, metric)
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}
} }
} }
} }
} }
return metrics, err return metrics, err
} }
// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}
return metrics[0], nil
}
/*
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
*/
// Get Quantiles from summary metric // Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} { func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{}) fields := make(map[string]interface{})

View File

@ -1,6 +1,7 @@
package prometheus package prometheus
import ( import (
"net/http"
"testing" "testing"
"time" "time"
@ -101,10 +102,8 @@ cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
` `
func TestParseValidPrometheus(t *testing.T) { func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}
// Gauge value // Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge)) metrics, err := Parse([]byte(validUniqueGauge), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
@ -118,8 +117,7 @@ func TestParseValidPrometheus(t *testing.T) {
}, metrics[0].Tags()) }, metrics[0].Tags())
// Counter value // Counter value
//parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) metrics, err = Parse([]byte(validUniqueCounter), http.Header{})
metrics, err = parser.Parse([]byte(validUniqueCounter))
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name()) assert.Equal(t, "get_token_fail_count", metrics[0].Name())
@ -129,8 +127,8 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{}, metrics[0].Tags()) assert.Equal(t, map[string]string{}, metrics[0].Tags())
// Summary data // Summary data
//parser.SetDefaultTags(map[string]string{}) //SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary)) metrics, err = Parse([]byte(validUniqueSummary), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
@ -144,7 +142,7 @@ func TestParseValidPrometheus(t *testing.T) {
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data // histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram)) metrics, err = Parse([]byte(validUniqueHistogram), http.Header{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, metrics, 1) assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
@ -165,11 +163,3 @@ func TestParseValidPrometheus(t *testing.T) {
metrics[0].Tags()) metrics[0].Tags())
} }
func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)
}

View File

@ -13,6 +13,8 @@ import (
"time" "time"
) )
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
type Prometheus struct { type Prometheus struct {
Urls []string Urls []string
@ -86,7 +88,7 @@ var client = &http.Client{
func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now() collectDate := time.Now()
var req, err = http.NewRequest("GET", url, nil) var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header) req.Header.Add("Accept", acceptHeader)
var token []byte var token []byte
var resp *http.Response var resp *http.Response
@ -129,20 +131,9 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
return fmt.Errorf("error reading body: %s", err) return fmt.Errorf("error reading body: %s", err)
} }
// Headers metrics, err := Parse(body, resp.Header)
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}
// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}
metrics, err := promparser.Parse(body)
if err != nil { if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s", return fmt.Errorf("error reading metrics for %s: %s",
url, err) url, err)
} }
// Add (or not) collected metrics // Add (or not) collected metrics

View File

@ -92,8 +92,8 @@ var diskIoSampleConfig = `
## disk partitions. ## disk partitions.
## Setting devices will restrict the stats to the specified devices. ## Setting devices will restrict the stats to the specified devices.
# devices = ["sda", "sdb"] # devices = ["sda", "sdb"]
## Uncomment the following line if you do not need disk serial numbers. ## Uncomment the following line if you need disk serial numbers.
# skip_serial_number = true # skip_serial_number = false
` `
func (_ *DiskIOStats) SampleConfig() string { func (_ *DiskIOStats) SampleConfig() string {
@ -151,6 +151,6 @@ func init() {
}) })
inputs.Add("diskio", func() telegraf.Input { inputs.Add("diskio", func() telegraf.Input {
return &DiskIOStats{ps: &systemPS{}} return &DiskIOStats{ps: &systemPS{}, SkipSerialNumber: true}
}) })
} }