Add Prometheus parser and Kubernetes input plugin

This commit is contained in:
Thibault Cohen 2016-02-14 02:48:43 -05:00
parent 6a601ceb97
commit a5288fc280
7 changed files with 1084 additions and 1 deletions

View File

@ -16,6 +16,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"

View File

@ -0,0 +1,83 @@
# Kubernetes Input Plugin
The Kubernetes plugin can gather metrics from Kubernetes services
- APIServer
- Scheduler
- Controller Manager
- Kubelet
### Configuration:
```toml
# Get metrics from Kubernetes services
[[inputs.kubernetes.apiserver]]
url = "http://mtl-nvcbladea-15.nuance.com:8080"
endpoint = "/metrics"
timeout = 5.0
# includes only metrics which match one of the
# following regexp
includes = ["apiserver_.*"]
[[inputs.kubernetes.scheduler]]
url = "http://mtl-nvcbladea-15.nuance.com:10251"
endpoint = "/metrics"
timeout = 1.0
# DO NOT include metrics which match one of the
# following regexp
excludes = ["scheduler_.*"]
[[inputs.kubernetes.controllermanager]]
url = "http://mtl-nvcbladea-15.nuance.com:10252"
[[inputs.kubernetes.kubelet]]
url = "http://mtl-nvcbladea-15.nuance.com:4194"
# You should increase metric_buffer_limit
# Because of number of kubelet metrics
# otherwise you can limit metrics with
# the following 'excludes' argument
excludes = ["container_.*"]
```
### Measurements & Fields:
This input plugin get measurements and fields from Kubernetes services.
If new metrics appear on K8S services, this plugin will grab it without
modification.
- http_request_duration_microseconds"
- 0.5
- 0.9
- 0.99
- count
- sum
- .......
- .......
- .......
- .......
- .......
- .......
### Tags:
This input plugin get metrics tags from Kubernetes services.
If new tags appear on K8S services, this plugin will grab it without
modification.
- kubeservice
- serverURL
- handler
- .......
- .......
- .......
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter example -test
process_cpu_seconds_total,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=452366.52 1455436043283475033
ssh_tunnel_open_fail_count,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=0 1455436043284057879
get_token_count,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=0 1455436043285030409
http_requests_total,code=200,handler=prometheus,kubeservice=kubelet,method=get,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=116 1455436043285364118
kubelet_generate_pod_status_latency_microseconds,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics 0.5=94441,0.9=186928,0.99=236698,count=0,sum=128409528225 1455436043285715318
process_max_fds,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics gauge=1000000 1455436043286061204
```

View File

@ -0,0 +1,318 @@
package kubernetes
// Plugin inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Kubernetes struct {
Apiserver []Apiserver
Scheduler []Scheduler
Controllermanager []Controllermanager
Kubelet []Kubelet
client HTTPClient
}
type KubeService struct {
Url string
Endpoint string
Timeout float64
Excludes []string
Includes []string
}
type Apiserver struct {
KubeService
}
type Scheduler struct {
KubeService
}
type Controllermanager struct {
KubeService
}
type Kubelet struct {
KubeService
}
type HTTPClient interface {
// Returns the result of an http request
//
// Parameters:
// req: HTTP request object
//
// Returns:
// http.Response: HTTP respons object
// error : Any error that may have occurred
MakeRequest(req *http.Request, timeout float64) (*http.Response, error)
}
type RealHTTPClient struct {
client *http.Client
}
func (c RealHTTPClient) MakeRequest(req *http.Request, timeout float64) (*http.Response, error) {
c.client.Timeout = time.Duration(timeout) * time.Second
return c.client.Do(req)
}
var sampleConfig = `
# Get metrics from Kubernetes services
[[inputs.kubernetes.apiserver]]
url = "http://mtl-nvcbladea-15.nuance.com:8080"
endpoint = "/metrics"
timeout = 5.0
# includes only metrics which match one of the
# following regexp
includes = ["apiserver_.*"]
[[inputs.kubernetes.scheduler]]
url = "http://mtl-nvcbladea-15.nuance.com:10251"
endpoint = "/metrics"
timeout = 1.0
# DO NOT include metrics which match one of the
# following regexp
excludes = ["scheduler_.*"]
[[inputs.kubernetes.controllermanager]]
url = "http://mtl-nvcbladea-15.nuance.com:10252"
[[inputs.kubernetes.kubelet]]
url = "http://mtl-nvcbladea-15.nuance.com:4194"
# You should increase metric_buffer_limit
# Because of number of kubelet metrics
# otherwise you can limit metrics with
# the following 'excludes' argument
excludes = ["container_.*"]
`
func (k *Kubernetes) SampleConfig() string {
return sampleConfig
}
func (k *Kubernetes) Description() string {
return "Read metrics from Kubernetes services"
}
// Gathers data for all servers.
func (k *Kubernetes) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
errorChannel := make(chan error,
len(k.Apiserver)+len(k.Scheduler)+
len(k.Controllermanager)+len(k.Kubelet))
// Routine to gather metrics and add it to acc
GatherMetric := func(url string, endpoint string, serviceType string,
excludes []string, includes []string, timeout float64) {
defer wg.Done()
if timeout == 0. {
timeout = 1.0
}
if endpoint == "" {
endpoint = "/metrics"
} else if string(endpoint[0]) != "/" {
endpoint = "/" + endpoint
}
url = url + endpoint
if err := k.gatherServer(acc, url, serviceType, excludes, includes, timeout); err != nil {
errorChannel <- err
}
}
// Apiservers
for _, service := range k.Apiserver {
wg.Add(1)
serviceType := "apiserver"
go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout)
}
// Schedulers
for _, service := range k.Scheduler {
wg.Add(1)
serviceType := "scheduler"
go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout)
}
// Controllermanager
for _, service := range k.Controllermanager {
wg.Add(1)
serviceType := "controllermanager"
go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout)
}
// Kubelet
for _, service := range k.Kubelet {
wg.Add(1)
serviceType := "kubelet"
go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout)
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
// Gathers data from a particular server
// Parameters:
// acc : The telegraf Accumulator to use
// serverURL : endpoint to send request to
// serviceType : service type (apiserver, kubelet, ...)
// timeout : http timeout
//
// Returns:
// error: Any error that may have occurred
func (k *Kubernetes) gatherServer(
acc telegraf.Accumulator,
serviceURL string,
serviceType string,
excludes []string,
includes []string,
timeout float64,
) error {
// Get raw data from Kube service
collectDate := time.Now()
raw_data, err := k.sendRequest(serviceURL, timeout)
if err != nil {
return err
}
// Prepare Prometheus parser config
config := parsers.Config{
DataFormat: "prometheus",
}
// Create Prometheus parser
promparser, err := parsers.NewParser(&config)
if err != nil {
return err
}
// Set default tags
tags := map[string]string{
"kubeservice": serviceType,
"serverURL": serviceURL,
}
promparser.SetDefaultTags(tags)
// Parseing
metrics, err := promparser.Parse(raw_data)
if err != nil {
return err
}
// Add (or not) collected metrics
for _, metric := range metrics {
if len(includes) > 0 {
// includes regexp
IncludeMetric:
for _, include := range includes {
r, err := regexp.Compile(include)
if err == nil {
if r.MatchString(metric.Name()) {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate)
break IncludeMetric
}
}
}
} else if len(excludes) > 0 {
// excludes regexp
includeMetric := true
ExcludeMetric:
for _, exclude := range excludes {
r, err := regexp.Compile(exclude)
if err == nil {
if r.MatchString(metric.Name()) {
includeMetric = false
break ExcludeMetric
}
}
}
if includeMetric {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate)
}
} else {
// no includes/excludes regexp
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
}
return nil
}
// Sends an HTTP request to the server using the Kubernetes object's HTTPClient
// Parameters:
// serverURL: endpoint to send request to
// timeout: request timeout
//
// Returns:
// []byte: body of the response
// error : Any error that may have occurred
func (k *Kubernetes) sendRequest(serverURL string, timeout float64) ([]byte, error) {
// Prepare URL
requestURL, err := url.Parse(serverURL)
if err != nil {
return nil, fmt.Errorf("Invalid server URL \"%s\"", serverURL)
}
params := url.Values{}
requestURL.RawQuery = params.Encode()
// Create request
req, err := http.NewRequest("GET", requestURL.String(), nil)
if err != nil {
return nil, err
}
// Make request
resp, err := k.client.MakeRequest(req, timeout)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return body, err
}
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestURL.String(),
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
return body, err
}
func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{client: RealHTTPClient{client: &http.Client{}}}
})
}

View File

@ -0,0 +1,219 @@
package kubernetes
import (
"io/ioutil"
"net/http"
"strings"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013534896000000001
go_gc_duration_seconds{quantile="0.25"} 0.02469263
go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005
go_gc_duration_seconds{quantile="0.75"} 0.03840335
go_gc_duration_seconds{quantile="1"} 0.049956604
go_gc_duration_seconds_sum 1970.341293002
go_gc_duration_seconds_count 65952
# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
`
const invalidData = "I don't think this is valid data"
const empty = ""
type mockHTTPClient struct {
responseBody string
statusCode int
}
// Mock implementation of MakeRequest. Usually returns an http.Response with
// hard-coded responseBody and statusCode. However, if the request uses a
// nonstandard method, it uses status code 405 (method not allowed)
func (c mockHTTPClient) MakeRequest(req *http.Request, timeout float64) (*http.Response, error) {
resp := http.Response{}
resp.StatusCode = c.statusCode
// basic error checking on request method
allowedMethods := []string{"GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"}
methodValid := false
for _, method := range allowedMethods {
if req.Method == method {
methodValid = true
break
}
}
if !methodValid {
resp.StatusCode = 405 // Method not allowed
}
resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody))
return &resp, nil
}
// Generates a pointer to an Kubernetes object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *Kubernetes: Pointer to an Kubernetes object that uses the generated mock HTTP client
func genMockKubernetes(response string, statusCode int) Kubernetes {
return Kubernetes{
client: mockHTTPClient{responseBody: response, statusCode: statusCode},
Apiserver: []Apiserver{
Apiserver{
KubeService{
Url: "http://127.0.0.1:8080",
Endpoint: "/metrics",
Timeout: 1.0,
},
},
},
Scheduler: []Scheduler{
Scheduler{
KubeService{
Url: "http://127.0.0.1:10251",
Excludes: []string{"http_request_duration_.*"},
},
},
},
Controllermanager: []Controllermanager{
Controllermanager{
KubeService{
Url: "http://127.0.0.1:10252",
Endpoint: "metrics",
Includes: []string{"http_request_duration_microseconds"},
},
},
},
Kubelet: []Kubelet{
Kubelet{
KubeService{
Url: "http://127.0.0.1:4194",
Endpoint: "metrics",
Includes: []string{"http_request_duration_microseconds"},
},
},
},
}
}
// Generates a pointer to an Kubernetes object that uses a mock HTTP client.
// Parameters:
// response : Body of the response that the mock HTTP client should return
// statusCode: HTTP status code the mock HTTP client should return
//
// Returns:
// *Kubernetes: Pointer to an Kubernetes object that uses the generated mock HTTP client
func genMockKubernetes2(response string, statusCode int) Kubernetes {
return Kubernetes{
client: mockHTTPClient{responseBody: response, statusCode: statusCode},
Apiserver: []Apiserver{
Apiserver{
KubeService{
Url: "http://127.0.0.1:8080",
Endpoint: "/metrics",
Timeout: 1.0,
Includes: []string{"http_request_duration_microseconds"},
},
},
},
}
}
// Test that the proper values are ignored or collected
func TestOK(t *testing.T) {
kubernetes := genMockKubernetes(validData, 200)
var acc testutil.Accumulator
err := kubernetes.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, 33, acc.NFields())
}
// Test that the proper values are ignored or collected
func TestOK2(t *testing.T) {
kubernetes := genMockKubernetes2(validData, 200)
var acc testutil.Accumulator
err := kubernetes.Gather(&acc)
require.NoError(t, err)
assert.Equal(t, 5, acc.NFields())
tags := map[string]string{
"kubeservice": "apiserver",
"serverURL": "http://127.0.0.1:8080/metrics",
"handler": "prometheus",
}
mname := "http_request_duration_microseconds"
expectedFields := map[string]interface{}{
"0.5": 552048.506,
"0.9": 5.876804288e+06,
"0.99": 5.876804288e+06,
"count": 0.0,
"sum": 1.8909097205e+07}
acc.AssertContainsTaggedFields(t, mname, expectedFields, tags)
}
// Test response to HTTP 500
func TestKubernetes500(t *testing.T) {
kubernetes := genMockKubernetes(validData, 500)
var acc testutil.Accumulator
err := kubernetes.Gather(&acc)
assert.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}
// Test response to malformed Data
func TestKubernetesBadData(t *testing.T) {
kubernetes := genMockKubernetes(invalidData, 200)
var acc testutil.Accumulator
err := kubernetes.Gather(&acc)
assert.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}
// Test response to empty string as response objectgT
func TestKubernetesEmptyResponse(t *testing.T) {
kubernetes := Kubernetes{client: RealHTTPClient{client: &http.Client{}}}
kubernetes.Apiserver = []Apiserver{
Apiserver{
KubeService{
Url: "http://127.0.0.1:59999",
Endpoint: "/metrics",
Timeout: 1.0,
},
},
}
var acc testutil.Accumulator
err := kubernetes.Gather(&acc)
require.NotNil(t, err)
assert.Equal(t, 0, acc.NFields())
}

View File

@ -0,0 +1,140 @@
package prometheus
// Parser inspired from
// https://github.com/prometheus/prom2json/blob/master/main.go
import (
"bufio"
"bytes"
"fmt"
"math"
"github.com/influxdata/telegraf"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// PrometheusParser is an object for Parsing incoming metrics.
type PrometheusParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string
}
// Parse returns a slice of Metrics from a text representation of a
// metrics
func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
var parser expfmt.TextParser
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
// Read raw data
buffer := bytes.NewBuffer(buf)
reader := bufio.NewReader(buffer)
metricFamilies, err := parser.TextToMetricFamilies(reader)
// read metrics
for metricName, mf := range metricFamilies {
for _, m := range mf.Metric {
// reading tags
tags := makeLabels(m)
for key, value := range p.DefaultTags {
tags[key] = value
}
// reading fields
fields := make(map[string]interface{})
if mf.GetType() == dto.MetricType_SUMMARY {
// summary metric
fields = makeQuantiles(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
// historgram metric
fields = makeBuckets(m)
fields["count"] = float64(m.GetHistogram().GetSampleCount())
fields["sum"] = float64(m.GetSummary().GetSampleSum())
} else {
// standard metric
fields = getNameAndValue(m)
}
// converting to telegraf metric
if len(fields) > 0 {
metric, err := telegraf.NewMetric(metricName, tags, fields)
if err == nil {
metrics = append(metrics, metric)
}
}
}
}
return metrics, err
}
// Parse one line
func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf(
"Can not parse the line: %s, for data format: prometheus", line)
}
return metrics[0], nil
}
// Set default tags
func (p *PrometheusParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
// Get Quantiles from summary metric
func makeQuantiles(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, q := range m.GetSummary().Quantile {
if !math.IsNaN(q.GetValue()) {
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
}
}
return fields
}
// Get Buckets from histogram metric
func makeBuckets(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
for _, b := range m.GetHistogram().Bucket {
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
}
return fields
}
// Get labels from metric
func makeLabels(m *dto.Metric) map[string]string {
result := map[string]string{}
for _, lp := range m.Label {
result[lp.GetName()] = lp.GetValue()
}
return result
}
// Get name and value from metric
func getNameAndValue(m *dto.Metric) map[string]interface{} {
fields := make(map[string]interface{})
if m.Gauge != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["gauge"] = float64(m.GetGauge().GetValue())
}
} else if m.Counter != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["counter"] = float64(m.GetCounter().GetValue())
}
} else if m.Untyped != nil {
if !math.IsNaN(m.GetGauge().GetValue()) {
fields["value"] = float64(m.GetUntyped().GetValue())
}
}
return fields
}

View File

@ -0,0 +1,315 @@
package prometheus
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
const (
validPrometheus = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
validPrometheusNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
invalidPrometheus = "I don't think this is line protocol"
invalidPrometheus2 = "{\"a\": 5, \"b\": {\"c\": 6}}"
)
const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
`
const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
`
const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
`
const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
`
const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision.
# TYPE cadvisor_version_info gauge
cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.013534896000000001
go_gc_duration_seconds{quantile="0.25"} 0.02469263
go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005
go_gc_duration_seconds{quantile="0.75"} 0.03840335
go_gc_duration_seconds{quantile="1"} 0.049956604
go_gc_duration_seconds_sum 1970.341293002
go_gc_duration_seconds_count 65952
# HELP http_request_duration_microseconds The HTTP request latencies in microseconds.
# TYPE http_request_duration_microseconds summary
http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506
http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06
http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06
http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07
http_request_duration_microseconds_count{handler="prometheus"} 9
# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source
# TYPE get_token_fail_count counter
get_token_fail_count 0
# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client.
# TYPE apiserver_request_latencies histogram
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024
apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025
apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08
apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025
`
const prometheusMulti = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
const prometheusMultiSomeInvalid = `
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4 , usage_idle=99,usage_busy=1
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`
func TestParseValidPrometheus(t *testing.T) {
parser := PrometheusParser{}
// Gauge value
metrics, err := parser.Parse([]byte(validUniqueGauge))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cadvisor_version_info", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"gauge": float64(1),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"osVersion": "CentOS Linux 7 (Core)",
"dockerVersion": "1.8.2",
"kernelVersion": "3.10.0-229.20.1.el7.x86_64",
}, metrics[0].Tags())
// Counter value
parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"})
metrics, err = parser.Parse([]byte(validUniqueCounter))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "get_token_fail_count", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"counter": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"mytag": "mytagvalue"}, metrics[0].Tags())
// Summary data
parser.SetDefaultTags(map[string]string{})
metrics, err = parser.Parse([]byte(validUniqueSummary))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"0.5": 552048.506,
"0.9": 5.876804288e+06,
"0.99": 5.876804288e+06,
"count": 0.0,
"sum": 1.8909097205e+07,
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags())
// histogram data
metrics, err = parser.Parse([]byte(validUniqueHistogram))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "apiserver_request_latencies", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"500000": 2000.0,
"count": 2025.0,
"sum": 0.0,
"250000": 1997.0,
"2e+06": 2012.0,
"4e+06": 2017.0,
"8e+06": 2024.0,
"+Inf": 2025.0,
"125000": 1994.0,
"1e+06": 2005.0,
}, metrics[0].Fields())
assert.Equal(t,
map[string]string{"verb": "POST", "resource": "bindings"},
metrics[0].Tags())
}
func TestParseLineInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validUniqueLine)
assert.NotNil(t, err)
assert.Nil(t, metric)
}
/*
func TestParseLineValidPrometheus(t *testing.T) {
parser := PrometheusParser{}
metric, err := parser.ParseLine(validPrometheus)
assert.NoError(t, err)
assert.Equal(t, "cpu_load_short", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": float64(10),
}, metric.Fields())
assert.Equal(t, map[string]string{
"cpu": "cpu0",
}, metric.Tags())
assert.Equal(t, exptime, metric.Time())
metric, err = parser.ParseLine(validPrometheusNewline)
assert.NoError(t, err)
assert.Equal(t, "cpu_load_short", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": float64(10),
}, metric.Fields())
assert.Equal(t, map[string]string{
"cpu": "cpu0",
}, metric.Tags())
assert.Equal(t, exptime, metric.Time())
}
func TestParseMultipleValid(t *testing.T) {
parser := PrometheusParser{}
metrics, err := parser.Parse([]byte(prometheusMulti))
assert.NoError(t, err)
assert.Len(t, metrics, 7)
for _, metric := range metrics {
assert.Equal(t, "cpu", metric.Name())
assert.Equal(t, map[string]string{
"datacenter": "us-east",
"host": "foo",
}, metrics[0].Tags())
assert.Equal(t, map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}, metrics[0].Fields())
}
}
func TestParseSomeValid(t *testing.T) {
parser := PrometheusParser{}
metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid))
assert.Error(t, err)
assert.Len(t, metrics, 4)
for _, metric := range metrics {
assert.Equal(t, "cpu", metric.Name())
assert.Equal(t, map[string]string{
"datacenter": "us-east",
"host": "foo",
}, metrics[0].Tags())
assert.Equal(t, map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}, metrics[0].Fields())
}
}
// Test that default tags are applied.
func TestParseDefaultTags(t *testing.T) {
parser := PrometheusParser{
DefaultTags: map[string]string{
"tag": "default",
},
}
metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid))
assert.Error(t, err)
assert.Len(t, metrics, 4)
for _, metric := range metrics {
assert.Equal(t, "cpu", metric.Name())
assert.Equal(t, map[string]string{
"datacenter": "us-east",
"host": "foo",
"tag": "default",
}, metrics[0].Tags())
assert.Equal(t, map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}, metrics[0].Fields())
}
}
// Verify that metric tags will override default tags
func TestParseDefaultTagsOverride(t *testing.T) {
parser := PrometheusParser{
DefaultTags: map[string]string{
"host": "default",
},
}
metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid))
assert.Error(t, err)
assert.Len(t, metrics, 4)
for _, metric := range metrics {
assert.Equal(t, "cpu", metric.Name())
assert.Equal(t, map[string]string{
"datacenter": "us-east",
"host": "foo",
}, metrics[0].Tags())
assert.Equal(t, map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}, metrics[0].Fields())
}
}
func TestParseInvalidPrometheus(t *testing.T) {
parser := PrometheusParser{}
_, err := parser.Parse([]byte(invalidPrometheus))
assert.Error(t, err)
_, err = parser.Parse([]byte(invalidPrometheus2))
assert.Error(t, err)
_, err = parser.ParseLine(invalidPrometheus)
assert.Error(t, err)
_, err = parser.ParseLine(invalidPrometheus2)
assert.Error(t, err)
}
*/

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
)
// ParserInput is an interface for input plugins that are able to parse
@ -38,7 +39,7 @@ type Parser interface {
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// Dataformat can be one of: json, influx, graphite
// Dataformat can be one of: json, influx, graphite, prometheus
DataFormat string
// Separator only applied to Graphite data.
@ -65,6 +66,8 @@ func NewParser(config *Config) (Parser, error) {
config.TagKeys, config.DefaultTags)
case "influx":
parser, err = NewInfluxParser()
case "prometheus":
parser, err = NewPrometheusParser()
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
@ -87,6 +90,10 @@ func NewJSONParser(
return parser, nil
}
func NewPrometheusParser() (Parser, error) {
return &prometheus.PrometheusParser{}, nil
}
func NewInfluxParser() (Parser, error) {
return &influx.InfluxParser{}, nil
}