From a5288fc28086f62ffd170c141ca56254d64d49a1 Mon Sep 17 00:00:00 2001 From: Thibault Cohen Date: Sun, 14 Feb 2016 02:48:43 -0500 Subject: [PATCH] Add Prometheus parser and Kubernetes input plugin --- plugins/inputs/all/all.go | 1 + plugins/inputs/kubernetes/README.md | 83 +++++ plugins/inputs/kubernetes/kubernetes.go | 318 +++++++++++++++++++ plugins/inputs/kubernetes/kubernetes_test.go | 219 +++++++++++++ plugins/parsers/prometheus/parser.go | 140 ++++++++ plugins/parsers/prometheus/parser_test.go | 315 ++++++++++++++++++ plugins/parsers/registry.go | 9 +- 7 files changed, 1084 insertions(+), 1 deletion(-) create mode 100644 plugins/inputs/kubernetes/README.md create mode 100644 plugins/inputs/kubernetes/kubernetes.go create mode 100644 plugins/inputs/kubernetes/kubernetes_test.go create mode 100644 plugins/parsers/prometheus/parser.go create mode 100644 plugins/parsers/prometheus/parser_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 335d41a32..de32df572 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -16,6 +16,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" + _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" diff --git a/plugins/inputs/kubernetes/README.md b/plugins/inputs/kubernetes/README.md new file mode 100644 index 000000000..406ac0303 --- /dev/null +++ b/plugins/inputs/kubernetes/README.md @@ -0,0 +1,83 @@ +# Kubernetes Input Plugin + +The Kubernetes plugin can gather metrics from Kubernetes services +- APIServer +- Scheduler +- Controller Manager +- Kubelet + +### Configuration: + +```toml +# Get metrics from Kubernetes services + [[inputs.kubernetes.apiserver]] + url = "http://mtl-nvcbladea-15.nuance.com:8080" + endpoint = "/metrics" + timeout = 5.0 + # includes only metrics which match one of the + # following regexp + includes = ["apiserver_.*"] + + [[inputs.kubernetes.scheduler]] + url = "http://mtl-nvcbladea-15.nuance.com:10251" + endpoint = "/metrics" + timeout = 1.0 + # DO NOT include metrics which match one of the + # following regexp + excludes = ["scheduler_.*"] + + [[inputs.kubernetes.controllermanager]] + url = "http://mtl-nvcbladea-15.nuance.com:10252" + + [[inputs.kubernetes.kubelet]] + url = "http://mtl-nvcbladea-15.nuance.com:4194" + # You should increase metric_buffer_limit + # Because of number of kubelet metrics + # otherwise you can limit metrics with + # the following 'excludes' argument + excludes = ["container_.*"] +``` + +### Measurements & Fields: + +This input plugin get measurements and fields from Kubernetes services. +If new metrics appear on K8S services, this plugin will grab it without +modification. + +- http_request_duration_microseconds" + - 0.5 + - 0.9 + - 0.99 + - count + - sum +- ....... + - ....... + - ....... + - ....... + - ....... + - ....... + +### Tags: + +This input plugin get metrics tags from Kubernetes services. +If new tags appear on K8S services, this plugin will grab it without +modification. + +- kubeservice +- serverURL +- handler +- ....... +- ....... +- ....... + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter example -test +process_cpu_seconds_total,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=452366.52 1455436043283475033 +ssh_tunnel_open_fail_count,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=0 1455436043284057879 +get_token_count,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=0 1455436043285030409 +http_requests_total,code=200,handler=prometheus,kubeservice=kubelet,method=get,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics counter=116 1455436043285364118 +kubelet_generate_pod_status_latency_microseconds,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics 0.5=94441,0.9=186928,0.99=236698,count=0,sum=128409528225 1455436043285715318 +process_max_fds,kubeservice=kubelet,serverURL=http://mtl-blade19-02.nuance.com:4194/metrics gauge=1000000 1455436043286061204 +``` diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go new file mode 100644 index 000000000..6c6953a3b --- /dev/null +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -0,0 +1,318 @@ +package kubernetes + +// Plugin inspired from +// https://github.com/prometheus/prom2json/blob/master/main.go + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "regexp" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Kubernetes struct { + Apiserver []Apiserver + Scheduler []Scheduler + Controllermanager []Controllermanager + Kubelet []Kubelet + client HTTPClient +} + +type KubeService struct { + Url string + Endpoint string + Timeout float64 + Excludes []string + Includes []string +} + +type Apiserver struct { + KubeService +} + +type Scheduler struct { + KubeService +} + +type Controllermanager struct { + KubeService +} + +type Kubelet struct { + KubeService +} + +type HTTPClient interface { + // Returns the result of an http request + // + // Parameters: + // req: HTTP request object + // + // Returns: + // http.Response: HTTP respons object + // error : Any error that may have occurred + MakeRequest(req *http.Request, timeout float64) (*http.Response, error) +} + +type RealHTTPClient struct { + client *http.Client +} + +func (c RealHTTPClient) MakeRequest(req *http.Request, timeout float64) (*http.Response, error) { + c.client.Timeout = time.Duration(timeout) * time.Second + return c.client.Do(req) +} + +var sampleConfig = ` +# Get metrics from Kubernetes services + [[inputs.kubernetes.apiserver]] + url = "http://mtl-nvcbladea-15.nuance.com:8080" + endpoint = "/metrics" + timeout = 5.0 + # includes only metrics which match one of the + # following regexp + includes = ["apiserver_.*"] + + [[inputs.kubernetes.scheduler]] + url = "http://mtl-nvcbladea-15.nuance.com:10251" + endpoint = "/metrics" + timeout = 1.0 + # DO NOT include metrics which match one of the + # following regexp + excludes = ["scheduler_.*"] + + [[inputs.kubernetes.controllermanager]] + url = "http://mtl-nvcbladea-15.nuance.com:10252" + + [[inputs.kubernetes.kubelet]] + url = "http://mtl-nvcbladea-15.nuance.com:4194" + # You should increase metric_buffer_limit + # Because of number of kubelet metrics + # otherwise you can limit metrics with + # the following 'excludes' argument + excludes = ["container_.*"] + +` + +func (k *Kubernetes) SampleConfig() string { + return sampleConfig +} + +func (k *Kubernetes) Description() string { + return "Read metrics from Kubernetes services" +} + +// Gathers data for all servers. +func (k *Kubernetes) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + errorChannel := make(chan error, + len(k.Apiserver)+len(k.Scheduler)+ + len(k.Controllermanager)+len(k.Kubelet)) + + // Routine to gather metrics and add it to acc + GatherMetric := func(url string, endpoint string, serviceType string, + excludes []string, includes []string, timeout float64) { + defer wg.Done() + if timeout == 0. { + timeout = 1.0 + } + + if endpoint == "" { + endpoint = "/metrics" + } else if string(endpoint[0]) != "/" { + endpoint = "/" + endpoint + } + url = url + endpoint + if err := k.gatherServer(acc, url, serviceType, excludes, includes, timeout); err != nil { + errorChannel <- err + } + } + + // Apiservers + for _, service := range k.Apiserver { + wg.Add(1) + serviceType := "apiserver" + go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout) + } + // Schedulers + for _, service := range k.Scheduler { + wg.Add(1) + serviceType := "scheduler" + go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout) + } + // Controllermanager + for _, service := range k.Controllermanager { + wg.Add(1) + serviceType := "controllermanager" + go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout) + } + // Kubelet + for _, service := range k.Kubelet { + wg.Add(1) + serviceType := "kubelet" + go GatherMetric(service.Url, service.Endpoint, serviceType, service.Excludes, service.Includes, service.Timeout) + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +// Gathers data from a particular server +// Parameters: +// acc : The telegraf Accumulator to use +// serverURL : endpoint to send request to +// serviceType : service type (apiserver, kubelet, ...) +// timeout : http timeout +// +// Returns: +// error: Any error that may have occurred +func (k *Kubernetes) gatherServer( + acc telegraf.Accumulator, + serviceURL string, + serviceType string, + excludes []string, + includes []string, + timeout float64, +) error { + // Get raw data from Kube service + collectDate := time.Now() + raw_data, err := k.sendRequest(serviceURL, timeout) + if err != nil { + return err + } + // Prepare Prometheus parser config + config := parsers.Config{ + DataFormat: "prometheus", + } + // Create Prometheus parser + promparser, err := parsers.NewParser(&config) + if err != nil { + return err + } + // Set default tags + tags := map[string]string{ + "kubeservice": serviceType, + "serverURL": serviceURL, + } + promparser.SetDefaultTags(tags) + + // Parseing + metrics, err := promparser.Parse(raw_data) + if err != nil { + return err + } + // Add (or not) collected metrics + for _, metric := range metrics { + if len(includes) > 0 { + // includes regexp + IncludeMetric: + for _, include := range includes { + r, err := regexp.Compile(include) + if err == nil { + if r.MatchString(metric.Name()) { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate) + break IncludeMetric + } + } + } + } else if len(excludes) > 0 { + // excludes regexp + includeMetric := true + ExcludeMetric: + for _, exclude := range excludes { + r, err := regexp.Compile(exclude) + if err == nil { + if r.MatchString(metric.Name()) { + includeMetric = false + break ExcludeMetric + } + } + } + if includeMetric { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), collectDate) + } + } else { + // no includes/excludes regexp + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + } + + return nil +} + +// Sends an HTTP request to the server using the Kubernetes object's HTTPClient +// Parameters: +// serverURL: endpoint to send request to +// timeout: request timeout +// +// Returns: +// []byte: body of the response +// error : Any error that may have occurred +func (k *Kubernetes) sendRequest(serverURL string, timeout float64) ([]byte, error) { + // Prepare URL + requestURL, err := url.Parse(serverURL) + if err != nil { + return nil, fmt.Errorf("Invalid server URL \"%s\"", serverURL) + } + params := url.Values{} + requestURL.RawQuery = params.Encode() + + // Create request + req, err := http.NewRequest("GET", requestURL.String(), nil) + if err != nil { + return nil, err + } + + // Make request + resp, err := k.client.MakeRequest(req, timeout) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return body, err + } + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestURL.String(), + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + return body, err +} + +func init() { + inputs.Add("kubernetes", func() telegraf.Input { + return &Kubernetes{client: RealHTTPClient{client: &http.Client{}}} + }) +} diff --git a/plugins/inputs/kubernetes/kubernetes_test.go b/plugins/inputs/kubernetes/kubernetes_test.go new file mode 100644 index 000000000..4cc134dfa --- /dev/null +++ b/plugins/inputs/kubernetes/kubernetes_test.go @@ -0,0 +1,219 @@ +package kubernetes + +import ( + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.013534896000000001 +go_gc_duration_seconds{quantile="0.25"} 0.02469263 +go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 +go_gc_duration_seconds{quantile="0.75"} 0.03840335 +go_gc_duration_seconds{quantile="1"} 0.049956604 +go_gc_duration_seconds_sum 1970.341293002 +go_gc_duration_seconds_count 65952 +# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +` + +const invalidData = "I don't think this is valid data" + +const empty = "" + +type mockHTTPClient struct { + responseBody string + statusCode int +} + +// Mock implementation of MakeRequest. Usually returns an http.Response with +// hard-coded responseBody and statusCode. However, if the request uses a +// nonstandard method, it uses status code 405 (method not allowed) +func (c mockHTTPClient) MakeRequest(req *http.Request, timeout float64) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + + // basic error checking on request method + allowedMethods := []string{"GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"} + methodValid := false + for _, method := range allowedMethods { + if req.Method == method { + methodValid = true + break + } + } + + if !methodValid { + resp.StatusCode = 405 // Method not allowed + } + + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +// Generates a pointer to an Kubernetes object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *Kubernetes: Pointer to an Kubernetes object that uses the generated mock HTTP client +func genMockKubernetes(response string, statusCode int) Kubernetes { + return Kubernetes{ + client: mockHTTPClient{responseBody: response, statusCode: statusCode}, + Apiserver: []Apiserver{ + Apiserver{ + KubeService{ + Url: "http://127.0.0.1:8080", + Endpoint: "/metrics", + Timeout: 1.0, + }, + }, + }, + Scheduler: []Scheduler{ + Scheduler{ + KubeService{ + Url: "http://127.0.0.1:10251", + Excludes: []string{"http_request_duration_.*"}, + }, + }, + }, + Controllermanager: []Controllermanager{ + Controllermanager{ + KubeService{ + Url: "http://127.0.0.1:10252", + Endpoint: "metrics", + Includes: []string{"http_request_duration_microseconds"}, + }, + }, + }, + Kubelet: []Kubelet{ + Kubelet{ + KubeService{ + Url: "http://127.0.0.1:4194", + Endpoint: "metrics", + Includes: []string{"http_request_duration_microseconds"}, + }, + }, + }, + } +} + +// Generates a pointer to an Kubernetes object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *Kubernetes: Pointer to an Kubernetes object that uses the generated mock HTTP client +func genMockKubernetes2(response string, statusCode int) Kubernetes { + return Kubernetes{ + client: mockHTTPClient{responseBody: response, statusCode: statusCode}, + Apiserver: []Apiserver{ + Apiserver{ + KubeService{ + Url: "http://127.0.0.1:8080", + Endpoint: "/metrics", + Timeout: 1.0, + Includes: []string{"http_request_duration_microseconds"}, + }, + }, + }, + } +} + +// Test that the proper values are ignored or collected +func TestOK(t *testing.T) { + kubernetes := genMockKubernetes(validData, 200) + + var acc testutil.Accumulator + err := kubernetes.Gather(&acc) + require.NoError(t, err) + assert.Equal(t, 33, acc.NFields()) + +} + +// Test that the proper values are ignored or collected +func TestOK2(t *testing.T) { + kubernetes := genMockKubernetes2(validData, 200) + + var acc testutil.Accumulator + err := kubernetes.Gather(&acc) + require.NoError(t, err) + assert.Equal(t, 5, acc.NFields()) + + tags := map[string]string{ + "kubeservice": "apiserver", + "serverURL": "http://127.0.0.1:8080/metrics", + "handler": "prometheus", + } + mname := "http_request_duration_microseconds" + expectedFields := map[string]interface{}{ + "0.5": 552048.506, + "0.9": 5.876804288e+06, + "0.99": 5.876804288e+06, + "count": 0.0, + "sum": 1.8909097205e+07} + acc.AssertContainsTaggedFields(t, mname, expectedFields, tags) + +} + +// Test response to HTTP 500 +func TestKubernetes500(t *testing.T) { + kubernetes := genMockKubernetes(validData, 500) + + var acc testutil.Accumulator + err := kubernetes.Gather(&acc) + + assert.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +} + +// Test response to malformed Data +func TestKubernetesBadData(t *testing.T) { + kubernetes := genMockKubernetes(invalidData, 200) + + var acc testutil.Accumulator + err := kubernetes.Gather(&acc) + + assert.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +} + +// Test response to empty string as response objectgT +func TestKubernetesEmptyResponse(t *testing.T) { + kubernetes := Kubernetes{client: RealHTTPClient{client: &http.Client{}}} + kubernetes.Apiserver = []Apiserver{ + Apiserver{ + KubeService{ + Url: "http://127.0.0.1:59999", + Endpoint: "/metrics", + Timeout: 1.0, + }, + }, + } + + var acc testutil.Accumulator + err := kubernetes.Gather(&acc) + + require.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +} diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go new file mode 100644 index 000000000..841fd8178 --- /dev/null +++ b/plugins/parsers/prometheus/parser.go @@ -0,0 +1,140 @@ +package prometheus + +// Parser inspired from +// https://github.com/prometheus/prom2json/blob/master/main.go + +import ( + "bufio" + "bytes" + "fmt" + "math" + + "github.com/influxdata/telegraf" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +// PrometheusParser is an object for Parsing incoming metrics. +type PrometheusParser struct { + // DefaultTags will be added to every parsed metric + DefaultTags map[string]string +} + +// Parse returns a slice of Metrics from a text representation of a +// metrics +func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) { + var metrics []telegraf.Metric + var parser expfmt.TextParser + // parse even if the buffer begins with a newline + buf = bytes.TrimPrefix(buf, []byte("\n")) + // Read raw data + buffer := bytes.NewBuffer(buf) + reader := bufio.NewReader(buffer) + metricFamilies, err := parser.TextToMetricFamilies(reader) + // read metrics + for metricName, mf := range metricFamilies { + for _, m := range mf.Metric { + // reading tags + tags := makeLabels(m) + for key, value := range p.DefaultTags { + tags[key] = value + } + // reading fields + fields := make(map[string]interface{}) + if mf.GetType() == dto.MetricType_SUMMARY { + // summary metric + fields = makeQuantiles(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + } else if mf.GetType() == dto.MetricType_HISTOGRAM { + // historgram metric + fields = makeBuckets(m) + fields["count"] = float64(m.GetHistogram().GetSampleCount()) + fields["sum"] = float64(m.GetSummary().GetSampleSum()) + + } else { + // standard metric + fields = getNameAndValue(m) + } + // converting to telegraf metric + if len(fields) > 0 { + metric, err := telegraf.NewMetric(metricName, tags, fields) + if err == nil { + metrics = append(metrics, metric) + } + } + } + } + + return metrics, err +} + +// Parse one line +func (p *PrometheusParser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line + "\n")) + + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf( + "Can not parse the line: %s, for data format: prometheus", line) + } + + return metrics[0], nil +} + +// Set default tags +func (p *PrometheusParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +// Get Quantiles from summary metric +func makeQuantiles(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, q := range m.GetSummary().Quantile { + if !math.IsNaN(q.GetValue()) { + fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) + } + } + return fields +} + +// Get Buckets from histogram metric +func makeBuckets(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, b := range m.GetHistogram().Bucket { + fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) + } + return fields +} + +// Get labels from metric +func makeLabels(m *dto.Metric) map[string]string { + result := map[string]string{} + for _, lp := range m.Label { + result[lp.GetName()] = lp.GetValue() + } + return result +} + +// Get name and value from metric +func getNameAndValue(m *dto.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + if m.Gauge != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["gauge"] = float64(m.GetGauge().GetValue()) + } + } else if m.Counter != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["counter"] = float64(m.GetCounter().GetValue()) + } + } else if m.Untyped != nil { + if !math.IsNaN(m.GetGauge().GetValue()) { + fields["value"] = float64(m.GetUntyped().GetValue()) + } + } + return fields +} diff --git a/plugins/parsers/prometheus/parser_test.go b/plugins/parsers/prometheus/parser_test.go new file mode 100644 index 000000000..53d727c89 --- /dev/null +++ b/plugins/parsers/prometheus/parser_test.go @@ -0,0 +1,315 @@ +package prometheus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + +const ( + validPrometheus = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000" + validPrometheusNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n" + invalidPrometheus = "I don't think this is line protocol" + invalidPrometheus2 = "{\"a\": 5, \"b\": {\"c\": 6}}" +) + +const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +` + +const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +` + +const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +` + +const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +` + +const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. +# TYPE cadvisor_version_info gauge +cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 +# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.013534896000000001 +go_gc_duration_seconds{quantile="0.25"} 0.02469263 +go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 +go_gc_duration_seconds{quantile="0.75"} 0.03840335 +go_gc_duration_seconds{quantile="1"} 0.049956604 +go_gc_duration_seconds_sum 1970.341293002 +go_gc_duration_seconds_count 65952 +# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. +# TYPE http_request_duration_microseconds summary +http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 +http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 +http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 +http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 +http_request_duration_microseconds_count{handler="prometheus"} 9 +# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source +# TYPE get_token_fail_count counter +get_token_fail_count 0 +# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. +# TYPE apiserver_request_latencies histogram +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 +apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 +apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 +apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 +` + +const prometheusMulti = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +const prometheusMultiSomeInvalid = ` +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu4 , usage_idle=99,usage_busy=1 +cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + +func TestParseValidPrometheus(t *testing.T) { + parser := PrometheusParser{} + + // Gauge value + metrics, err := parser.Parse([]byte(validUniqueGauge)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "gauge": float64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "osVersion": "CentOS Linux 7 (Core)", + "dockerVersion": "1.8.2", + "kernelVersion": "3.10.0-229.20.1.el7.x86_64", + }, metrics[0].Tags()) + + // Counter value + parser.SetDefaultTags(map[string]string{"mytag": "mytagvalue"}) + metrics, err = parser.Parse([]byte(validUniqueCounter)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "get_token_fail_count", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "counter": float64(0), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{"mytag": "mytagvalue"}, metrics[0].Tags()) + + // Summary data + parser.SetDefaultTags(map[string]string{}) + metrics, err = parser.Parse([]byte(validUniqueSummary)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "0.5": 552048.506, + "0.9": 5.876804288e+06, + "0.99": 5.876804288e+06, + "count": 0.0, + "sum": 1.8909097205e+07, + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) + + // histogram data + metrics, err = parser.Parse([]byte(validUniqueHistogram)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "500000": 2000.0, + "count": 2025.0, + "sum": 0.0, + "250000": 1997.0, + "2e+06": 2012.0, + "4e+06": 2017.0, + "8e+06": 2024.0, + "+Inf": 2025.0, + "125000": 1994.0, + "1e+06": 2005.0, + }, metrics[0].Fields()) + assert.Equal(t, + map[string]string{"verb": "POST", "resource": "bindings"}, + metrics[0].Tags()) + +} + +func TestParseLineInvalidPrometheus(t *testing.T) { + parser := PrometheusParser{} + metric, err := parser.ParseLine(validUniqueLine) + assert.NotNil(t, err) + assert.Nil(t, metric) + +} + +/* + +func TestParseLineValidPrometheus(t *testing.T) { + parser := PrometheusParser{} + + metric, err := parser.ParseLine(validPrometheus) + assert.NoError(t, err) + assert.Equal(t, "cpu_load_short", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "value": float64(10), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "cpu": "cpu0", + }, metric.Tags()) + assert.Equal(t, exptime, metric.Time()) + + metric, err = parser.ParseLine(validPrometheusNewline) + assert.NoError(t, err) + assert.Equal(t, "cpu_load_short", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "value": float64(10), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "cpu": "cpu0", + }, metric.Tags()) + assert.Equal(t, exptime, metric.Time()) +} + +func TestParseMultipleValid(t *testing.T) { + parser := PrometheusParser{} + + metrics, err := parser.Parse([]byte(prometheusMulti)) + assert.NoError(t, err) + assert.Len(t, metrics, 7) + + for _, metric := range metrics { + assert.Equal(t, "cpu", metric.Name()) + assert.Equal(t, map[string]string{ + "datacenter": "us-east", + "host": "foo", + }, metrics[0].Tags()) + assert.Equal(t, map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + }, metrics[0].Fields()) + } +} + +func TestParseSomeValid(t *testing.T) { + parser := PrometheusParser{} + + metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid)) + assert.Error(t, err) + assert.Len(t, metrics, 4) + + for _, metric := range metrics { + assert.Equal(t, "cpu", metric.Name()) + assert.Equal(t, map[string]string{ + "datacenter": "us-east", + "host": "foo", + }, metrics[0].Tags()) + assert.Equal(t, map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + }, metrics[0].Fields()) + } +} + +// Test that default tags are applied. +func TestParseDefaultTags(t *testing.T) { + parser := PrometheusParser{ + DefaultTags: map[string]string{ + "tag": "default", + }, + } + + metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid)) + assert.Error(t, err) + assert.Len(t, metrics, 4) + + for _, metric := range metrics { + assert.Equal(t, "cpu", metric.Name()) + assert.Equal(t, map[string]string{ + "datacenter": "us-east", + "host": "foo", + "tag": "default", + }, metrics[0].Tags()) + assert.Equal(t, map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + }, metrics[0].Fields()) + } +} + +// Verify that metric tags will override default tags +func TestParseDefaultTagsOverride(t *testing.T) { + parser := PrometheusParser{ + DefaultTags: map[string]string{ + "host": "default", + }, + } + + metrics, err := parser.Parse([]byte(prometheusMultiSomeInvalid)) + assert.Error(t, err) + assert.Len(t, metrics, 4) + + for _, metric := range metrics { + assert.Equal(t, "cpu", metric.Name()) + assert.Equal(t, map[string]string{ + "datacenter": "us-east", + "host": "foo", + }, metrics[0].Tags()) + assert.Equal(t, map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + }, metrics[0].Fields()) + } +} + +func TestParseInvalidPrometheus(t *testing.T) { + parser := PrometheusParser{} + + _, err := parser.Parse([]byte(invalidPrometheus)) + assert.Error(t, err) + _, err = parser.Parse([]byte(invalidPrometheus2)) + assert.Error(t, err) + _, err = parser.ParseLine(invalidPrometheus) + assert.Error(t, err) + _, err = parser.ParseLine(invalidPrometheus2) + assert.Error(t, err) +} +*/ diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 982b6bb80..c1480ab31 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/prometheus" ) // ParserInput is an interface for input plugins that are able to parse @@ -38,7 +39,7 @@ type Parser interface { // Config is a struct that covers the data types needed for all parser types, // and can be used to instantiate _any_ of the parsers. type Config struct { - // Dataformat can be one of: json, influx, graphite + // Dataformat can be one of: json, influx, graphite, prometheus DataFormat string // Separator only applied to Graphite data. @@ -65,6 +66,8 @@ func NewParser(config *Config) (Parser, error) { config.TagKeys, config.DefaultTags) case "influx": parser, err = NewInfluxParser() + case "prometheus": + parser, err = NewPrometheusParser() case "graphite": parser, err = NewGraphiteParser(config.Separator, config.Templates, config.DefaultTags) @@ -87,6 +90,10 @@ func NewJSONParser( return parser, nil } +func NewPrometheusParser() (Parser, error) { + return &prometheus.PrometheusParser{}, nil +} + func NewInfluxParser() (Parser, error) { return &influx.InfluxParser{}, nil }