Compare commits
8 Commits
kube-state
...
feature/js
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
690c0b6673 | ||
|
|
407675c741 | ||
|
|
b09fcc70c8 | ||
|
|
9836b1eb02 | ||
|
|
a79f1b7e0d | ||
|
|
d4a4ac25bb | ||
|
|
92e156c784 | ||
|
|
342d3d633a |
59
Gopkg.lock
generated
59
Gopkg.lock
generated
@@ -317,19 +317,10 @@
|
|||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/gogo/protobuf"
|
name = "github.com/gogo/protobuf"
|
||||||
packages = [
|
packages = ["proto"]
|
||||||
"proto",
|
|
||||||
"sortkeys"
|
|
||||||
]
|
|
||||||
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
|
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
|
||||||
version = "v1.0.0"
|
version = "v1.0.0"
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/golang/glog"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/golang/protobuf"
|
name = "github.com/golang/protobuf"
|
||||||
packages = [
|
packages = [
|
||||||
@@ -359,12 +350,6 @@
|
|||||||
revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
|
revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
|
||||||
version = "v0.2.0"
|
version = "v0.2.0"
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "master"
|
|
||||||
name = "github.com/google/gofuzz"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1"
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/gorilla/context"
|
name = "github.com/gorilla/context"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
@@ -940,12 +925,6 @@
|
|||||||
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
|
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
|
||||||
version = "v3.0.5"
|
version = "v3.0.5"
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "gopkg.in/inf.v0"
|
|
||||||
packages = ["."]
|
|
||||||
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
|
|
||||||
version = "v0.9.1"
|
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "gopkg.in/ldap.v2"
|
name = "gopkg.in/ldap.v2"
|
||||||
packages = ["."]
|
packages = ["."]
|
||||||
@@ -986,43 +965,9 @@
|
|||||||
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
|
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
|
||||||
version = "v2.2.1"
|
version = "v2.2.1"
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
name = "k8s.io/api"
|
|
||||||
packages = ["core/v1"]
|
|
||||||
revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a"
|
|
||||||
version = "kubernetes-1.9.0"
|
|
||||||
|
|
||||||
[[projects]]
|
|
||||||
branch = "release-1.11"
|
|
||||||
name = "k8s.io/apimachinery"
|
|
||||||
packages = [
|
|
||||||
"pkg/api/resource",
|
|
||||||
"pkg/apis/meta/v1",
|
|
||||||
"pkg/conversion",
|
|
||||||
"pkg/conversion/queryparams",
|
|
||||||
"pkg/fields",
|
|
||||||
"pkg/labels",
|
|
||||||
"pkg/runtime",
|
|
||||||
"pkg/runtime/schema",
|
|
||||||
"pkg/selection",
|
|
||||||
"pkg/types",
|
|
||||||
"pkg/util/errors",
|
|
||||||
"pkg/util/intstr",
|
|
||||||
"pkg/util/json",
|
|
||||||
"pkg/util/net",
|
|
||||||
"pkg/util/runtime",
|
|
||||||
"pkg/util/sets",
|
|
||||||
"pkg/util/validation",
|
|
||||||
"pkg/util/validation/field",
|
|
||||||
"pkg/util/wait",
|
|
||||||
"pkg/watch",
|
|
||||||
"third_party/forked/golang/reflect"
|
|
||||||
]
|
|
||||||
revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae"
|
|
||||||
|
|
||||||
[solve-meta]
|
[solve-meta]
|
||||||
analyzer-name = "dep"
|
analyzer-name = "dep"
|
||||||
analyzer-version = 1
|
analyzer-version = 1
|
||||||
inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6"
|
inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370"
|
||||||
solver-name = "gps-cdcl"
|
solver-name = "gps-cdcl"
|
||||||
solver-version = 1
|
solver-version = 1
|
||||||
|
|||||||
12
Gopkg.toml
12
Gopkg.toml
@@ -241,15 +241,3 @@
|
|||||||
[[override]]
|
[[override]]
|
||||||
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
|
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
|
||||||
name = "gopkg.in/fsnotify.v1"
|
name = "gopkg.in/fsnotify.v1"
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "k8s.io/api"
|
|
||||||
version = "kubernetes-1.11.0"
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "k8s.io/apimachinery"
|
|
||||||
version = "kubernetes-1.11.0"
|
|
||||||
|
|
||||||
[[constraint]]
|
|
||||||
name = "k8s.io/kubernetes"
|
|
||||||
version = "v1.11.0"
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
|||||||
|
|
||||||
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
|
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
|
||||||
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
|
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
|
||||||
|
1. [GJSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#gjson)
|
||||||
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
|
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
|
||||||
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
||||||
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
||||||
@@ -205,6 +206,69 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
|
|||||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# GJSON:
|
||||||
|
GJSON also parses JSON data, but uses paths to name and identify fields of your choosing.
|
||||||
|
|
||||||
|
The GJSON parser supports 5 different configuration fields for json objects:
|
||||||
|
|
||||||
|
1.'gjson_tag_paths'
|
||||||
|
2.'gjson_string_paths'
|
||||||
|
3.'gjson_int_paths'
|
||||||
|
4.'gjson_float_paths'
|
||||||
|
5.'gjson_bool_paths'
|
||||||
|
|
||||||
|
Each field is a map type that will map a field_name to a field_path. Path syntax is described below.
|
||||||
|
Path maps should be configured as:
|
||||||
|
`toml gjson_tag_paths = {"field_name" = "field.path", "field_name2" = "field.path2"}`
|
||||||
|
|
||||||
|
Any paths specified in gjson_tag_paths will be converted to strings and stored as tags.
|
||||||
|
Any paths otherwise specified will be their marked type and stored as fields.
|
||||||
|
|
||||||
|
#### GJSON Configuration:
|
||||||
|
Paths are a series of keys seperated by a dot, ie "obj.sub_obj".
|
||||||
|
Paths should not lead to an JSON array, but a single object.
|
||||||
|
An error message will be thrown if a path describes an array.
|
||||||
|
Further reading for path syntax can be found here: https://github.com/tidwall/gjson
|
||||||
|
|
||||||
|
As an example, if you had the json:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": {"first": "Tom", "last": "Anderson"},
|
||||||
|
"age":37,
|
||||||
|
"children": ["Sara","Alex","Jack"],
|
||||||
|
"fav.movie": "Deer Hunter",
|
||||||
|
"friends": [
|
||||||
|
{"first": "Dale", "last": "Murphy", "age": 44},
|
||||||
|
{"first": "Roger", "last": "Craig", "age": 68},
|
||||||
|
{"first": "Jane", "last": "Murphy", "age": 47}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
with the config:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.exec]]
|
||||||
|
## Commands array
|
||||||
|
commands = ["/usr/bin/mycollector --foo=bar"]
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
data_format = "gjson"
|
||||||
|
|
||||||
|
name_override = "gjson_sample"
|
||||||
|
|
||||||
|
gjson_tag_paths = {"first_name_tag" = "name.first"}
|
||||||
|
gjson_string_paths = {"last_name" = "name.last"}
|
||||||
|
gjson_int_paths = {"age" = "age", "Janes_age" = "friends.2.age"}
|
||||||
|
```
|
||||||
|
|
||||||
|
would output the metric:
|
||||||
|
`gjson_sample, first_name_tag=Tom last_name=Anderson,age=37,Janes_age=47`
|
||||||
|
|
||||||
|
|
||||||
# Value:
|
# Value:
|
||||||
|
|
||||||
The "value" data format translates single values into Telegraf metrics. This
|
The "value" data format translates single values into Telegraf metrics. This
|
||||||
|
|||||||
@@ -1338,6 +1338,71 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.GJSONTagPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_tag_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONTagPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONBoolPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_bool_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONBoolPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONFloatPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_float_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONFloatPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONStringPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_string_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONStringPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONIntPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_int_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONIntPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c.MetricName = name
|
c.MetricName = name
|
||||||
|
|
||||||
delete(tbl.Fields, "data_format")
|
delete(tbl.Fields, "data_format")
|
||||||
@@ -1353,6 +1418,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
delete(tbl.Fields, "dropwizard_time_format")
|
delete(tbl.Fields, "dropwizard_time_format")
|
||||||
delete(tbl.Fields, "dropwizard_tags_path")
|
delete(tbl.Fields, "dropwizard_tags_path")
|
||||||
delete(tbl.Fields, "dropwizard_tag_paths")
|
delete(tbl.Fields, "dropwizard_tag_paths")
|
||||||
|
delete(tbl.Fields, "gjson_tag_paths")
|
||||||
|
delete(tbl.Fields, "gjson_bool_paths")
|
||||||
|
delete(tbl.Fields, "gjson_float_paths")
|
||||||
|
delete(tbl.Fields, "gjson_string_paths")
|
||||||
|
delete(tbl.Fields, "gjson_int_paths")
|
||||||
|
|
||||||
return parsers.NewParser(c)
|
return parsers.NewParser(c)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ import (
|
|||||||
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
|
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
|
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
|
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/kube_state"
|
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
|
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
|
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
|
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
### Line Protocol
|
|
||||||
|
|
||||||
### PODs
|
|
||||||
|
|
||||||
#### kube_pod
|
|
||||||
namespace =
|
|
||||||
name =
|
|
||||||
host_ip =
|
|
||||||
pod_ip =
|
|
||||||
node =
|
|
||||||
created_by_kind =
|
|
||||||
created_by_name =
|
|
||||||
owner_kind =
|
|
||||||
owner_name =
|
|
||||||
owner_is_controller = "true"
|
|
||||||
label_1 = ""
|
|
||||||
label_2 = ""
|
|
||||||
created = ""
|
|
||||||
|
|
||||||
|
|
||||||
start_time =
|
|
||||||
completion_time =
|
|
||||||
owner =
|
|
||||||
label_* =
|
|
||||||
created =
|
|
||||||
|
|
||||||
status_scheduled_time
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#### kube_pod_status_scheduled_time
|
|
||||||
|
|
||||||
#### kube_pod_status_phase
|
|
||||||
|
|
||||||
#### kube_pod_status_ready
|
|
||||||
|
|
||||||
#### kube_pod_status_scheduled
|
|
||||||
|
|
||||||
#### kube_pod_container_info
|
|
||||||
namespace=
|
|
||||||
pod_name=
|
|
||||||
container_name=
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#### kube_pod_container_status_waiting
|
|
||||||
|
|
||||||
#### kube_pod_container_status_waiting_reason
|
|
||||||
|
|
||||||
#### kube_pod_container_status_running
|
|
||||||
|
|
||||||
#### kube_pod_container_status_terminated
|
|
||||||
|
|
||||||
#### kube_pod_container_status_terminated_reason
|
|
||||||
|
|
||||||
#### kube_pod_container_status_ready
|
|
||||||
|
|
||||||
#### kube_pod_container_status_restarts_total
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_requests
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_limits
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_requests_cpu_cores
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_requests_memory_bytes
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_limits_cpu_cores
|
|
||||||
|
|
||||||
#### kube_pod_container_resource_limits_memory_bytes
|
|
||||||
|
|
||||||
|
|
||||||
#### kube_pod_spec_volumes_persistentvolumeclaims_info
|
|
||||||
|
|
||||||
#### kube_pod_spec_volumes_persistentvolumeclaims_readonly
|
|
||||||
@@ -1,144 +0,0 @@
|
|||||||
package kube_state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
type client struct {
|
|
||||||
baseURL string
|
|
||||||
httpClient *http.Client
|
|
||||||
bearerToken string
|
|
||||||
semaphore chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client {
|
|
||||||
return &client{
|
|
||||||
baseURL: baseURL,
|
|
||||||
httpClient: &http.Client{
|
|
||||||
Transport: &http.Transport{
|
|
||||||
MaxIdleConns: maxConns,
|
|
||||||
TLSClientConfig: tlsConfig,
|
|
||||||
},
|
|
||||||
Timeout: timeout,
|
|
||||||
},
|
|
||||||
bearerToken: bearerToken,
|
|
||||||
semaphore: make(chan struct{}, maxConns),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) {
|
|
||||||
rList = new(metav1.APIResourceList)
|
|
||||||
if err = c.doGet(ctx, "", rList); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if rList.GroupVersion == "" {
|
|
||||||
return nil, &APIError{
|
|
||||||
URL: c.baseURL,
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Title: "empty group version",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) {
|
|
||||||
list = new(v1.NodeList)
|
|
||||||
if err = c.doGet(ctx, "/nodes/", list); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) {
|
|
||||||
list = new(v1.PodList)
|
|
||||||
if err = c.doGet(ctx, "/pods/", list); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) {
|
|
||||||
list = new(v1.ConfigMapList)
|
|
||||||
if err = c.doGet(ctx, "/configmaps/", list); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
|
|
||||||
req, err := createGetRequest(c.baseURL+url, c.bearerToken)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case c.semaphore <- struct{}{}:
|
|
||||||
break
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.httpClient.Do(req.WithContext(ctx))
|
|
||||||
if err != nil {
|
|
||||||
<-c.semaphore
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
resp.Body.Close()
|
|
||||||
<-c.semaphore
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Clear invalid token if unauthorized
|
|
||||||
if resp.StatusCode == http.StatusUnauthorized {
|
|
||||||
c.bearerToken = ""
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
||||||
return &APIError{
|
|
||||||
URL: url,
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Title: resp.Status,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNoContent {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return json.NewDecoder(resp.Body).Decode(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func createGetRequest(url string, token string) (*http.Request, error) {
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if token != "" {
|
|
||||||
req.Header.Set("Authorization", "Bearer "+token)
|
|
||||||
}
|
|
||||||
req.Header.Add("Accept", "application/json")
|
|
||||||
|
|
||||||
return req, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type APIError struct {
|
|
||||||
URL string
|
|
||||||
StatusCode int
|
|
||||||
Title string
|
|
||||||
Description string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e APIError) Error() string {
|
|
||||||
if e.Description != "" {
|
|
||||||
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
|
|
||||||
}
|
|
||||||
@@ -1,42 +0,0 @@
|
|||||||
package kube_state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
var configMapMeasurement = "kube_configmap"
|
|
||||||
|
|
||||||
func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
|
|
||||||
list, err := ks.client.getConfigMaps(ctx)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, s := range list.Items {
|
|
||||||
if err = ks.gatherConfigMap(s, acc); err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error {
|
|
||||||
var creationTime time.Time
|
|
||||||
if !s.CreationTimestamp.IsZero() {
|
|
||||||
creationTime = s.CreationTimestamp.Time
|
|
||||||
}
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"gauge": 1,
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"namespace": s.Namespace,
|
|
||||||
"configmap": s.Name,
|
|
||||||
"resource_version": s.ResourceVersion,
|
|
||||||
}
|
|
||||||
acc.AddFields(configMapMeasurement, fields, tags, creationTime)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,194 +0,0 @@
|
|||||||
package kube_state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"crypto/md5"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/filter"
|
|
||||||
"github.com/influxdata/telegraf/internal"
|
|
||||||
"github.com/influxdata/telegraf/internal/tls"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// KubenetesState represents the config object for the plugin.
|
|
||||||
type KubenetesState struct {
|
|
||||||
URL string
|
|
||||||
|
|
||||||
// Bearer Token authorization file path
|
|
||||||
BearerToken string `toml:"bearer_token"`
|
|
||||||
|
|
||||||
// MaxConnections for worker pool tcp connections
|
|
||||||
MaxConnections int `toml:"max_connections"`
|
|
||||||
|
|
||||||
// HTTP Timeout specified as a string - 3s, 1m, 1h
|
|
||||||
ResponseTimeout internal.Duration `toml:"response_timeout"`
|
|
||||||
|
|
||||||
tls.ClientConfig
|
|
||||||
|
|
||||||
client *client
|
|
||||||
rListHash string
|
|
||||||
filter filter.Filter
|
|
||||||
lastFilterBuilt int64
|
|
||||||
ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"`
|
|
||||||
ResourceExclude []string `toml:"resource_exclude"`
|
|
||||||
|
|
||||||
DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"`
|
|
||||||
DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"`
|
|
||||||
}
|
|
||||||
|
|
||||||
var sampleConfig = `
|
|
||||||
## URL for the kubelet
|
|
||||||
url = "http://1.1.1.1:10255"
|
|
||||||
|
|
||||||
## Use bearer token for authorization
|
|
||||||
# bearer_token = /path/to/bearer/token
|
|
||||||
|
|
||||||
## Set response_timeout (default 5 seconds)
|
|
||||||
# response_timeout = "5s"
|
|
||||||
|
|
||||||
## Optional TLS Config
|
|
||||||
# tls_ca = /path/to/cafile
|
|
||||||
# tls_cert = /path/to/certfile
|
|
||||||
# tls_key = /path/to/keyfile
|
|
||||||
## Use TLS but skip chain & host verification
|
|
||||||
# insecure_skip_verify = false
|
|
||||||
|
|
||||||
## Woker pool for kube_state_metric plugin only
|
|
||||||
# empty this field will use default value 30
|
|
||||||
# max_connections = 30
|
|
||||||
`
|
|
||||||
|
|
||||||
//SampleConfig returns a sample config
|
|
||||||
func (k *KubenetesState) SampleConfig() string {
|
|
||||||
return sampleConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
//Description returns the description of this plugin
|
|
||||||
func (k *KubenetesState) Description() string {
|
|
||||||
return "Read metrics from the kubernetes kubelet api"
|
|
||||||
}
|
|
||||||
|
|
||||||
//Gather collects kubernetes metrics from a given URL
|
|
||||||
func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) {
|
|
||||||
var rList *metav1.APIResourceList
|
|
||||||
if k.client == nil {
|
|
||||||
if k.client, rList, err = k.initClient(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
goto buildFilter
|
|
||||||
}
|
|
||||||
|
|
||||||
if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) {
|
|
||||||
println("! skip to gather")
|
|
||||||
goto doGather
|
|
||||||
}
|
|
||||||
|
|
||||||
rList, err = k.client.getAPIResourceList(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
buildFilter:
|
|
||||||
k.lastFilterBuilt = time.Now().Unix()
|
|
||||||
if err = k.buildFilter(rList); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
doGather:
|
|
||||||
for n, f := range availableCollectors {
|
|
||||||
ctx := context.Background()
|
|
||||||
if k.filter.Match(n) {
|
|
||||||
println("!", n)
|
|
||||||
go f(ctx, acc, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error {
|
|
||||||
hash, err := genHash(rList)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if k.rListHash == hash {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
k.rListHash = hash
|
|
||||||
include := make([]string, len(rList.APIResources))
|
|
||||||
for k, v := range rList.APIResources {
|
|
||||||
include[k] = v.Name
|
|
||||||
}
|
|
||||||
k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func genHash(rList *metav1.APIResourceList) (string, error) {
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
for _, v := range rList.APIResources {
|
|
||||||
if _, err := buf.WriteString(v.Name + "|"); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sum := md5.Sum(buf.Bytes())
|
|
||||||
return string(sum[:]), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){
|
|
||||||
// "cronjobs": RegisterCronJobCollector,
|
|
||||||
// "daemonsets": RegisterDaemonSetCollector,
|
|
||||||
// "deployments": RegisterDeploymentCollector,
|
|
||||||
// "jobs": RegisterJobCollector,
|
|
||||||
// "limitranges": RegisterLimitRangeCollector,
|
|
||||||
"nodes": registerNodeCollector,
|
|
||||||
"pods": registerPodCollector,
|
|
||||||
// "replicasets": RegisterReplicaSetCollector,
|
|
||||||
// "replicationcontrollers": RegisterReplicationControllerCollector,
|
|
||||||
// "resourcequotas": RegisterResourceQuotaCollector,
|
|
||||||
// "services": RegisterServiceCollector,
|
|
||||||
// "statefulsets": RegisterStatefulSetCollector,
|
|
||||||
// "persistentvolumes": RegisterPersistentVolumeCollector,
|
|
||||||
// "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector,
|
|
||||||
// "namespaces": RegisterNamespaceCollector,
|
|
||||||
// "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector,
|
|
||||||
// "endpoints": RegisterEndpointCollector,
|
|
||||||
// "secrets": RegisterSecretCollector,
|
|
||||||
"configmaps": registerConfigMapCollector,
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) {
|
|
||||||
tlsCfg, err := k.ClientConfig.TLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err)
|
|
||||||
}
|
|
||||||
// default 30 concurrent TCP connections
|
|
||||||
if k.MaxConnections == 0 {
|
|
||||||
k.MaxConnections = 30
|
|
||||||
}
|
|
||||||
|
|
||||||
// default check resourceList every hour
|
|
||||||
if k.ResourceListCheckInterval == nil {
|
|
||||||
k.ResourceListCheckInterval = &internal.Duration{
|
|
||||||
Duration: time.Hour,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg)
|
|
||||||
rList, err := c.getAPIResourceList(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err)
|
|
||||||
}
|
|
||||||
log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion)
|
|
||||||
return c, rList, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
inputs.Add("kubernetes_state", func() telegraf.Input {
|
|
||||||
return &KubenetesState{}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -1,73 +0,0 @@
|
|||||||
package kube_state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
nodeMeasurement = "kube_node"
|
|
||||||
nodeTaintMeasurement = "kube_node_spec_taint"
|
|
||||||
)
|
|
||||||
|
|
||||||
func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
|
|
||||||
list, err := ks.client.getNodes(ctx)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, n := range list.Items {
|
|
||||||
if err = ks.gatherNode(n, acc); err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
|
|
||||||
fields := map[string]interface{}{}
|
|
||||||
tags := map[string]string{
|
|
||||||
"node": n.Name,
|
|
||||||
"kernel_version": n.Status.NodeInfo.KernelVersion,
|
|
||||||
"os_image": n.Status.NodeInfo.OSImage,
|
|
||||||
"container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion,
|
|
||||||
"kubelet_version": n.Status.NodeInfo.KubeletVersion,
|
|
||||||
"kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion,
|
|
||||||
"provider_id": n.Spec.ProviderID,
|
|
||||||
"spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !n.CreationTimestamp.IsZero() {
|
|
||||||
fields["created"] = n.CreationTimestamp.Unix()
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range n.Labels {
|
|
||||||
tags["label_"+sanitizeLabelName(k)] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect node taints
|
|
||||||
for _, taint := range n.Spec.Taints {
|
|
||||||
go gatherNodeTaint(n, taint, acc)
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields(nodeMeasurement, fields, tags)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"gauge":1,
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"node": n.Name,
|
|
||||||
"key": taint.Key,
|
|
||||||
"value": taint.Value,
|
|
||||||
"effect":string(taint.Effect),
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields(nodeTaintMeasurement, fields, tags)
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,202 +0,0 @@
|
|||||||
package kube_state
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/util/node"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
podMeasurement = "kube_pod"
|
|
||||||
podContainerMeasurement = "kube_pod_container"
|
|
||||||
podVolumeMeasurement = "kube_pod_spec_volumes"
|
|
||||||
)
|
|
||||||
|
|
||||||
func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
|
|
||||||
list, err := ks.client.getPods(ctx)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, p := range list.Items {
|
|
||||||
if err = ks.gatherPod(p, acc); err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
|
|
||||||
nodeName := p.Spec.NodeName
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
tags := make(map[string]string)
|
|
||||||
|
|
||||||
createdBy := metav1.GetControllerOf(&p)
|
|
||||||
createdByKind := ""
|
|
||||||
createdByName := ""
|
|
||||||
if createdBy != nil {
|
|
||||||
if createdBy.Kind != "" {
|
|
||||||
createdByKind = createdBy.Kind
|
|
||||||
}
|
|
||||||
if createdBy.Name != "" {
|
|
||||||
createdByName = createdBy.Name
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.Status.StartTime != nil {
|
|
||||||
fields["start_time"] = p.Status.StartTime.UnixNano()
|
|
||||||
}
|
|
||||||
|
|
||||||
tags["namesapce"] = p.Namespace
|
|
||||||
tags["name"] = p.Name
|
|
||||||
tags["host_ip"] = p.Status.HostIP
|
|
||||||
tags["pod_ip"] = p.Status.PodIP
|
|
||||||
tags["node"] = nodeName
|
|
||||||
tags["created_by_kind"] = createdByKind
|
|
||||||
tags["created_by_name"] = createdByName
|
|
||||||
tags["status_scheduled"] = "false"
|
|
||||||
tags["status_ready"] = "false"
|
|
||||||
|
|
||||||
owners := p.GetOwnerReferences()
|
|
||||||
if len(owners) == 0 {
|
|
||||||
tags["owner_kind"] = ""
|
|
||||||
tags["owner_name"] = ""
|
|
||||||
tags["owner_is_controller"] = ""
|
|
||||||
} else {
|
|
||||||
tags["owner_kind"] = owners[0].Kind
|
|
||||||
tags["owner_name"] = owners[0].Name
|
|
||||||
if owners[0].Controller != nil {
|
|
||||||
tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller)
|
|
||||||
} else {
|
|
||||||
tags["owner_is_controller"] = "false"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range p.Labels {
|
|
||||||
tags["label_"+sanitizeLabelName(k)] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
if phase := p.Status.Phase; phase != "" {
|
|
||||||
tags["status_phase"] = string(phase)
|
|
||||||
// This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601
|
|
||||||
// For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410
|
|
||||||
if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason {
|
|
||||||
tags["status_phase"] = string(v1.PodUnknown)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !p.CreationTimestamp.IsZero() {
|
|
||||||
fields["created"] = p.CreationTimestamp.Unix()
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, c := range p.Status.Conditions {
|
|
||||||
switch c.Type {
|
|
||||||
case v1.PodReady:
|
|
||||||
tags["status_ready"] = "true"
|
|
||||||
case v1.PodScheduled:
|
|
||||||
tags["status_scheduled"] = "true"
|
|
||||||
fields["status_scheduled_time"] = c.LastTransitionTime.Unix()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastFinishTime int64
|
|
||||||
|
|
||||||
for i, cs := range p.Status.ContainerStatuses {
|
|
||||||
c := p.Spec.Containers[i]
|
|
||||||
gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc)
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastFinishTime > 0 {
|
|
||||||
fields["completion_time"] = lastFinishTime
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, v := range p.Spec.Volumes {
|
|
||||||
if v.PersistentVolumeClaim != nil {
|
|
||||||
gatherPodVolume(v, p, acc)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields(podMeasurement, fields, tags)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) {
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"read_only": 0.0,
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"namespace": p.Namespace,
|
|
||||||
"pod": p.Name,
|
|
||||||
"volume": v.Name,
|
|
||||||
"persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName,
|
|
||||||
}
|
|
||||||
if v.PersistentVolumeClaim.ReadOnly {
|
|
||||||
fields["read_only"] = 1.0
|
|
||||||
}
|
|
||||||
acc.AddFields(podVolumeMeasurement, fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) {
|
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"status_restarts_total": cs.RestartCount,
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"namespace": p.Namespace,
|
|
||||||
"pod_name": p.Name,
|
|
||||||
"node_name": nodeName,
|
|
||||||
"container": c.Name,
|
|
||||||
"image": cs.Image,
|
|
||||||
"image_id": cs.ImageID,
|
|
||||||
"container_id": cs.ContainerID,
|
|
||||||
"status_waiting": strconv.FormatBool(cs.State.Waiting != nil),
|
|
||||||
"status_waiting_reason": "",
|
|
||||||
"status_running": strconv.FormatBool(cs.State.Terminated != nil),
|
|
||||||
"status_terminated": strconv.FormatBool(cs.State.Running != nil),
|
|
||||||
"status_terminated_reason": "",
|
|
||||||
"container_status_ready": strconv.FormatBool(cs.Ready),
|
|
||||||
}
|
|
||||||
|
|
||||||
if cs.State.Waiting != nil {
|
|
||||||
tags["status_waiting_reason"] = cs.State.Waiting.Reason
|
|
||||||
}
|
|
||||||
|
|
||||||
if cs.State.Terminated != nil {
|
|
||||||
tags["status_terminated_reason"] = cs.State.Terminated.Reason
|
|
||||||
if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() {
|
|
||||||
*lastFinishTime = cs.State.Terminated.FinishedAt.Unix()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
req := c.Resources.Requests
|
|
||||||
lim := c.Resources.Limits
|
|
||||||
|
|
||||||
for resourceName, val := range req {
|
|
||||||
switch resourceName {
|
|
||||||
case v1.ResourceCPU:
|
|
||||||
fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000
|
|
||||||
default:
|
|
||||||
fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for resourceName, val := range lim {
|
|
||||||
switch resourceName {
|
|
||||||
case v1.ResourceCPU:
|
|
||||||
fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000
|
|
||||||
default:
|
|
||||||
fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AddFields(podContainerMeasurement, fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
|
||||||
|
|
||||||
func sanitizeLabelName(s string) string {
|
|
||||||
return invalidLabelCharRE.ReplaceAllString(s, "_")
|
|
||||||
}
|
|
||||||
96
plugins/parsers/gjson/parser.go
Normal file
96
plugins/parsers/gjson/parser.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package gjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JSONPath struct {
|
||||||
|
MetricName string
|
||||||
|
TagPath map[string]string
|
||||||
|
FloatPath map[string]string
|
||||||
|
IntPath map[string]string
|
||||||
|
StrPath map[string]string
|
||||||
|
BoolPath map[string]string
|
||||||
|
DefaultTags map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
for k, v := range j.DefaultTags {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
metrics := make([]telegraf.Metric, 0)
|
||||||
|
|
||||||
|
for k, v := range j.TagPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags[k] = c.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.FloatPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.Float()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.IntPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.Int()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.BoolPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if c.String() == "true" {
|
||||||
|
fields[k] = true
|
||||||
|
} else if c.String() == "false" {
|
||||||
|
fields[k] = false
|
||||||
|
} else {
|
||||||
|
log.Printf("E! Cannot decode: %v as bool", c.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.StrPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := metric.New(j.MetricName, tags, fields, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) {
|
||||||
|
m, err := j.Parse([]byte(str))
|
||||||
|
return m[0], err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) SetDefaultTags(tags map[string]string) {
|
||||||
|
j.DefaultTags = tags
|
||||||
|
}
|
||||||
72
plugins/parsers/gjson/parser_test.go
Normal file
72
plugins/parsers/gjson/parser_test.go
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
package gjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseJsonPath(t *testing.T) {
|
||||||
|
testString := `{
|
||||||
|
"total_devices": 5,
|
||||||
|
"total_threads": 10,
|
||||||
|
"shares": {
|
||||||
|
"total": 5,
|
||||||
|
"accepted": 5,
|
||||||
|
"rejected": 0,
|
||||||
|
"avg_find_time": 4,
|
||||||
|
"tester": "work",
|
||||||
|
"tester2": true,
|
||||||
|
"tester3": {
|
||||||
|
"hello":"sup",
|
||||||
|
"fun":"money",
|
||||||
|
"break":9
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
jsonParser := JSONPath{
|
||||||
|
MetricName: "jsonpather",
|
||||||
|
TagPath: map[string]string{"hello": "shares.tester3.hello"},
|
||||||
|
BoolPath: map[string]string{"bool": "shares.tester2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := jsonParser.Parse([]byte(testString))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTagTypes(t *testing.T) {
|
||||||
|
testString := `{
|
||||||
|
"total_devices": 5,
|
||||||
|
"total_threads": 10,
|
||||||
|
"shares": {
|
||||||
|
"total": 5,
|
||||||
|
"accepted": 5,
|
||||||
|
"rejected": 0,
|
||||||
|
"my_bool": true,
|
||||||
|
"tester": "work",
|
||||||
|
"tester2": {
|
||||||
|
"hello":"sup",
|
||||||
|
"fun":true,
|
||||||
|
"break":9.97
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
r := JSONPath{
|
||||||
|
TagPath: map[string]string{"int1": "total_devices", "my_bool": "shares.my_bool"},
|
||||||
|
FloatPath: map[string]string{"total": "shares.total"},
|
||||||
|
BoolPath: map[string]string{"fun": "shares.tester2.fun"},
|
||||||
|
StrPath: map[string]string{"hello": "shares.tester2.hello"},
|
||||||
|
IntPath: map[string]string{"accepted": "shares.accepted"},
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := r.Parse([]byte(testString))
|
||||||
|
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, true, reflect.DeepEqual(map[string]interface{}{"total": 5.0, "fun": true, "hello": "sup", "accepted": int64(5)}, metrics[0].Fields()))
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/gjson"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
@@ -87,6 +88,13 @@ type Config struct {
|
|||||||
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||||
// used if TagsPath is empty or doesn't return any tags
|
// used if TagsPath is empty or doesn't return any tags
|
||||||
DropwizardTagPathsMap map[string]string
|
DropwizardTagPathsMap map[string]string
|
||||||
|
|
||||||
|
//for gjson format
|
||||||
|
GJSONTagPaths map[string]string
|
||||||
|
GJSONBoolPaths map[string]string
|
||||||
|
GJSONFloatPaths map[string]string
|
||||||
|
GJSONStringPaths map[string]string
|
||||||
|
GJSONIntPaths map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParser returns a Parser interface based on the given config.
|
// NewParser returns a Parser interface based on the given config.
|
||||||
@@ -120,12 +128,37 @@ func NewParser(config *Config) (Parser, error) {
|
|||||||
config.DefaultTags,
|
config.DefaultTags,
|
||||||
config.Separator,
|
config.Separator,
|
||||||
config.Templates)
|
config.Templates)
|
||||||
|
|
||||||
|
case "gjson":
|
||||||
|
parser, err = newGJSONParser(config.MetricName,
|
||||||
|
config.GJSONTagPaths,
|
||||||
|
config.GJSONStringPaths,
|
||||||
|
config.GJSONBoolPaths,
|
||||||
|
config.GJSONFloatPaths,
|
||||||
|
config.GJSONIntPaths)
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||||
}
|
}
|
||||||
return parser, err
|
return parser, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newGJSONParser(metricName string,
|
||||||
|
tagPaths map[string]string,
|
||||||
|
strPaths map[string]string,
|
||||||
|
boolPaths map[string]string,
|
||||||
|
floatPaths map[string]string,
|
||||||
|
intPaths map[string]string) (Parser, error) {
|
||||||
|
parser := &gjson.JSONPath{
|
||||||
|
MetricName: metricName,
|
||||||
|
TagPath: tagPaths,
|
||||||
|
StrPath: strPaths,
|
||||||
|
BoolPath: boolPaths,
|
||||||
|
FloatPath: floatPaths,
|
||||||
|
IntPath: intPaths,
|
||||||
|
}
|
||||||
|
return parser, nil
|
||||||
|
}
|
||||||
|
|
||||||
func NewJSONParser(
|
func NewJSONParser(
|
||||||
metricName string,
|
metricName string,
|
||||||
tagKeys []string,
|
tagKeys []string,
|
||||||
|
|||||||
Reference in New Issue
Block a user