Add prometheus_client service output module, update prometheus client
- Adds a client implementation using the prometheus go_client library
that exposes metrics.
- Adds a new type of output "ServiceOutput" which follows inline with
the "ServicePlugin", adding a Stop and Start method for the service
This change also requires the newer prometheus/client_golang code, so
the prometheus plugin needed to be changed.
Added the following to Godep:
- bitbucket.org/ww/goautoneg (in github.com/common/expfmt/encode.go)
- prometheus/common/expfmt (in plugins/prometheus.go)
- github.com/prometheus/common/model (in plugins/prometheus.go)
- github.com/prometheus/procfs (in github.com/client_golang/prometheus)
- github.com/beorn7/perks/quantile (in github.com/client_golang/prometheus)
X-Github-Meta: closes #306
This commit is contained in:
committed by
Cameron Sparr
parent
7cc60dfb8f
commit
4449f7f2fb
@@ -250,7 +250,8 @@ func get(key []byte, host string) (map[string]string, error) {
|
||||
func readAerospikeStats(stats map[string]string, acc plugins.Accumulator, host, namespace string) {
|
||||
for key, value := range stats {
|
||||
tags := map[string]string{
|
||||
"host": host,
|
||||
"host": host,
|
||||
"namespace": "_service",
|
||||
}
|
||||
|
||||
if namespace != "" {
|
||||
|
||||
@@ -3,13 +3,12 @@ package prometheus
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"github.com/prometheus/common/model"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/telegraf/plugins"
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
"github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
type Prometheus struct {
|
||||
@@ -60,42 +59,38 @@ func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
|
||||
}
|
||||
processor, err := extraction.ProcessorForRequestHeader(resp.Header)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting extractor for %s: %s", url, err)
|
||||
format := expfmt.ResponseFormat(resp.Header)
|
||||
|
||||
decoder := expfmt.NewDecoder(resp.Body, format)
|
||||
|
||||
options := &expfmt.DecodeOptions{
|
||||
Timestamp: model.Now(),
|
||||
}
|
||||
sampleDecoder := &expfmt.SampleDecoder{
|
||||
Dec: decoder,
|
||||
Opts: options,
|
||||
}
|
||||
|
||||
ingestor := &Ingester{
|
||||
acc: acc,
|
||||
}
|
||||
|
||||
options := &extraction.ProcessOptions{
|
||||
Timestamp: model.TimestampFromTime(time.Now()),
|
||||
}
|
||||
|
||||
err = processor.ProcessSingle(resp.Body, ingestor, options)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Ingester struct {
|
||||
acc plugins.Accumulator
|
||||
}
|
||||
|
||||
// Ingest implements an extraction.Ingester.
|
||||
func (i *Ingester) Ingest(samples model.Samples) error {
|
||||
for _, sample := range samples {
|
||||
tags := map[string]string{}
|
||||
for key, value := range sample.Metric {
|
||||
if key == model.MetricNameLabel {
|
||||
continue
|
||||
}
|
||||
tags[string(key)] = string(value)
|
||||
for {
|
||||
var samples model.Vector
|
||||
err := sampleDecoder.Decode(&samples)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
|
||||
}
|
||||
for _, sample := range samples {
|
||||
tags := map[string]string{}
|
||||
for key, value := range sample.Metric {
|
||||
if key == model.MetricNameLabel {
|
||||
continue
|
||||
}
|
||||
tags[string(key)] = string(value)
|
||||
}
|
||||
acc.Add(string(sample.Metric[model.MetricNameLabel]), float64(sample.Value), tags)
|
||||
}
|
||||
i.acc.Add(string(sample.Metric[model.MetricNameLabel]), float64(sample.Value), tags)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user