Improve prometheus plugin

closes #707
This commit is contained in:
Thibault Cohen
2016-03-01 11:12:23 -05:00
committed by Michele Fadda
parent 91166b4e01
commit 1fbee019fc
7 changed files with 452 additions and 46 deletions

View File

@@ -5,9 +5,7 @@ import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"io"
"io/ioutil"
"net/http"
"sync"
"time"
@@ -62,6 +60,7 @@ var client = &http.Client{
}
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now()
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
@@ -70,38 +69,33 @@ func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}
format := expfmt.ResponseFormat(resp.Header)
decoder := expfmt.NewDecoder(resp.Body, format)
options := &expfmt.DecodeOptions{
Timestamp: model.Now(),
}
sampleDecoder := &expfmt.SampleDecoder{
Dec: decoder,
Opts: options,
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading body: %s", err)
}
for {
var samples model.Vector
err := sampleDecoder.Decode(&samples)
if err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s",
url, err)
}
for _, sample := range samples {
tags := make(map[string]string)
for key, value := range sample.Metric {
if key == model.MetricNameLabel {
continue
}
tags[string(key)] = string(value)
}
acc.Add("prometheus_"+string(sample.Metric[model.MetricNameLabel]),
float64(sample.Value), tags)
}
// Headers
headers := make(map[string]string)
for key, value := range headers {
headers[key] = value
}
// Prepare Prometheus parser config
promparser := PrometheusParser{
PromFormat: headers,
}
metrics, err := promparser.Parse(body)
if err != nil {
return fmt.Errorf("error getting processing samples for %s: %s",
url, err)
}
// Add (or not) collected metrics
for _, metric := range metrics {
tags := metric.Tags()
tags["url"] = url
acc.AddFields(metric.Name(), metric.Fields(), tags, collectDate)
}
return nil