Compare commits

..

10 Commits

Author SHA1 Message Date
Kelvin Wang
54ac4d70c9 add kube state metrics 2018-07-06 11:54:44 -07:00
kelwang
90a38bd125 Merge pull request #2 from kelwang/revert-1-jenkins-gollar-changes
Revert "Jenkins gollar changes"
2018-06-25 17:14:14 -07:00
kelwang
d017718033 Revert "Jenkins gollar changes" 2018-06-25 17:14:03 -07:00
kelwang
8ff50e4327 Merge pull request #1 from kelwang/jenkins-gollar-changes
Jenkins gollar changes
2018-06-25 17:12:08 -07:00
Kelvin Wang
4ec7999186 use semaphore model 2018-06-25 17:10:15 -07:00
Kelvin Wang
3457c98eb1 remove err struct 2018-06-25 15:43:10 -07:00
Kelvin Wang
e7ff7d506b fix conflicts 2018-06-22 10:48:18 -07:00
Kelvin Wang
cdc15205d8 fix go 1.8 compatibility 2018-06-22 10:35:43 -07:00
Kelvin Wang
73eaa057d1 add requested changes 2018-06-22 10:35:43 -07:00
Kelvin Wang
9c85c05fcb add jenkins lib 2018-06-22 10:35:43 -07:00
24 changed files with 835 additions and 134 deletions

59
Gopkg.lock generated
View File

@@ -317,10 +317,19 @@
[[projects]]
name = "github.com/gogo/protobuf"
packages = ["proto"]
packages = [
"proto",
"sortkeys"
]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/golang/glog"
packages = ["."]
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
[[projects]]
name = "github.com/golang/protobuf"
packages = [
@@ -350,6 +359,12 @@
revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
version = "v0.2.0"
[[projects]]
branch = "master"
name = "github.com/google/gofuzz"
packages = ["."]
revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1"
[[projects]]
name = "github.com/gorilla/context"
packages = ["."]
@@ -925,6 +940,12 @@
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
version = "v3.0.5"
[[projects]]
name = "gopkg.in/inf.v0"
packages = ["."]
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
version = "v0.9.1"
[[projects]]
name = "gopkg.in/ldap.v2"
packages = ["."]
@@ -965,9 +986,43 @@
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1"
[[projects]]
name = "k8s.io/api"
packages = ["core/v1"]
revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a"
version = "kubernetes-1.9.0"
[[projects]]
branch = "release-1.11"
name = "k8s.io/apimachinery"
packages = [
"pkg/api/resource",
"pkg/apis/meta/v1",
"pkg/conversion",
"pkg/conversion/queryparams",
"pkg/fields",
"pkg/labels",
"pkg/runtime",
"pkg/runtime/schema",
"pkg/selection",
"pkg/types",
"pkg/util/errors",
"pkg/util/intstr",
"pkg/util/json",
"pkg/util/net",
"pkg/util/runtime",
"pkg/util/sets",
"pkg/util/validation",
"pkg/util/validation/field",
"pkg/util/wait",
"pkg/watch",
"third_party/forked/golang/reflect"
]
revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370"
inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6"
solver-name = "gps-cdcl"
solver-version = 1

View File

@@ -241,3 +241,15 @@
[[override]]
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
name = "gopkg.in/fsnotify.v1"
[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/kubernetes"
version = "v1.11.0"

View File

@@ -104,10 +104,9 @@ but can be overridden using the `name_override` config option.
#### JSON Configuration:
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
all int and float values will be set as fields by default.
The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
For example, if you had this configuration:
@@ -174,7 +173,6 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]
field_keys = ["b_c"]
```
with this JSON output from a command:
@@ -200,11 +198,11 @@ with this JSON output from a command:
]
```
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
```
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```
# Value:

View File

@@ -1261,18 +1261,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
if node, ok := tbl.Fields["field_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FieldKeys = append(c.FieldKeys, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
@@ -1356,7 +1344,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "field_keys")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")

View File

@@ -143,10 +143,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")
ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
p, err := parsers.NewJSONParser("exec", nil, nil)
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"

View File

@@ -48,6 +48,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_state"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"

View File

@@ -93,10 +93,7 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}
func TestExec(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
@@ -122,10 +119,7 @@ func TestExec(t *testing.T) {
}
func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
@@ -138,10 +132,7 @@ func TestExecMalformed(t *testing.T) {
}
func TestCommandError(t *testing.T) {
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},

View File

@@ -26,11 +26,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -67,11 +63,8 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -90,10 +83,7 @@ func TestInvalidStatusCode(t *testing.T) {
}
metricName := "metricName"
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -115,10 +105,8 @@ func TestMethod(t *testing.T) {
Method: "POST",
}
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)
var acc testutil.Accumulator

View File

@@ -181,12 +181,7 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
if err != nil {
return err
}

View File

@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)

View File

@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)

View File

@@ -0,0 +1,77 @@
### Line Protocol
### PODs
#### kube_pod
namespace =
name =
host_ip =
pod_ip =
node =
created_by_kind =
created_by_name =
owner_kind =
owner_name =
owner_is_controller = "true"
label_1 = ""
label_2 = ""
created = ""
start_time =
completion_time =
owner =
label_* =
created =
status_scheduled_time
#### kube_pod_status_scheduled_time
#### kube_pod_status_phase
#### kube_pod_status_ready
#### kube_pod_status_scheduled
#### kube_pod_container_info
namespace=
pod_name=
container_name=
#### kube_pod_container_status_waiting
#### kube_pod_container_status_waiting_reason
#### kube_pod_container_status_running
#### kube_pod_container_status_terminated
#### kube_pod_container_status_terminated_reason
#### kube_pod_container_status_ready
#### kube_pod_container_status_restarts_total
#### kube_pod_container_resource_requests
#### kube_pod_container_resource_limits
#### kube_pod_container_resource_requests_cpu_cores
#### kube_pod_container_resource_requests_memory_bytes
#### kube_pod_container_resource_limits_cpu_cores
#### kube_pod_container_resource_limits_memory_bytes
#### kube_pod_spec_volumes_persistentvolumeclaims_info
#### kube_pod_spec_volumes_persistentvolumeclaims_readonly

View File

@@ -0,0 +1,144 @@
package kube_state
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type client struct {
baseURL string
httpClient *http.Client
bearerToken string
semaphore chan struct{}
}
func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client {
return &client{
baseURL: baseURL,
httpClient: &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
TLSClientConfig: tlsConfig,
},
Timeout: timeout,
},
bearerToken: bearerToken,
semaphore: make(chan struct{}, maxConns),
}
}
func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) {
rList = new(metav1.APIResourceList)
if err = c.doGet(ctx, "", rList); err != nil {
return nil, err
}
if rList.GroupVersion == "" {
return nil, &APIError{
URL: c.baseURL,
StatusCode: http.StatusOK,
Title: "empty group version",
}
}
return rList, nil
}
func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) {
list = new(v1.NodeList)
if err = c.doGet(ctx, "/nodes/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) {
list = new(v1.PodList)
if err = c.doGet(ctx, "/pods/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) {
list = new(v1.ConfigMapList)
if err = c.doGet(ctx, "/configmaps/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(c.baseURL+url, c.bearerToken)
if err != nil {
return err
}
select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
return err
}
defer func() {
resp.Body.Close()
<-c.semaphore
}()
// Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized {
c.bearerToken = ""
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{
URL: url,
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}
if resp.StatusCode == http.StatusNoContent {
return nil
}
return json.NewDecoder(resp.Body).Decode(v)
}
func createGetRequest(url string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
req.Header.Add("Accept", "application/json")
return req, nil
}
type APIError struct {
URL string
StatusCode int
Title string
Description string
}
func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
}

View File

@@ -0,0 +1,42 @@
package kube_state
import (
"context"
"time"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var configMapMeasurement = "kube_configmap"
func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getConfigMaps(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, s := range list.Items {
if err = ks.gatherConfigMap(s, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error {
var creationTime time.Time
if !s.CreationTimestamp.IsZero() {
creationTime = s.CreationTimestamp.Time
}
fields := map[string]interface{}{
"gauge": 1,
}
tags := map[string]string{
"namespace": s.Namespace,
"configmap": s.Name,
"resource_version": s.ResourceVersion,
}
acc.AddFields(configMapMeasurement, fields, tags, creationTime)
return nil
}

View File

@@ -0,0 +1,194 @@
package kube_state
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KubenetesState represents the config object for the plugin.
type KubenetesState struct {
URL string
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
// MaxConnections for worker pool tcp connections
MaxConnections int `toml:"max_connections"`
// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *client
rListHash string
filter filter.Filter
lastFilterBuilt int64
ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"`
ResourceExclude []string `toml:"resource_exclude"`
DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"`
DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"`
}
var sampleConfig = `
## URL for the kubelet
url = "http://1.1.1.1:10255"
## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Woker pool for kube_state_metric plugin only
# empty this field will use default value 30
# max_connections = 30
`
//SampleConfig returns a sample config
func (k *KubenetesState) SampleConfig() string {
return sampleConfig
}
//Description returns the description of this plugin
func (k *KubenetesState) Description() string {
return "Read metrics from the kubernetes kubelet api"
}
//Gather collects kubernetes metrics from a given URL
func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) {
var rList *metav1.APIResourceList
if k.client == nil {
if k.client, rList, err = k.initClient(); err != nil {
return err
}
goto buildFilter
}
if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) {
println("! skip to gather")
goto doGather
}
rList, err = k.client.getAPIResourceList(context.Background())
if err != nil {
return err
}
buildFilter:
k.lastFilterBuilt = time.Now().Unix()
if err = k.buildFilter(rList); err != nil {
return err
}
doGather:
for n, f := range availableCollectors {
ctx := context.Background()
if k.filter.Match(n) {
println("!", n)
go f(ctx, acc, k)
}
}
return nil
}
func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error {
hash, err := genHash(rList)
if err != nil {
return err
}
if k.rListHash == hash {
return nil
}
k.rListHash = hash
include := make([]string, len(rList.APIResources))
for k, v := range rList.APIResources {
include[k] = v.Name
}
k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude)
return err
}
func genHash(rList *metav1.APIResourceList) (string, error) {
buf := new(bytes.Buffer)
for _, v := range rList.APIResources {
if _, err := buf.WriteString(v.Name + "|"); err != nil {
return "", err
}
}
sum := md5.Sum(buf.Bytes())
return string(sum[:]), nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){
// "cronjobs": RegisterCronJobCollector,
// "daemonsets": RegisterDaemonSetCollector,
// "deployments": RegisterDeploymentCollector,
// "jobs": RegisterJobCollector,
// "limitranges": RegisterLimitRangeCollector,
"nodes": registerNodeCollector,
"pods": registerPodCollector,
// "replicasets": RegisterReplicaSetCollector,
// "replicationcontrollers": RegisterReplicationControllerCollector,
// "resourcequotas": RegisterResourceQuotaCollector,
// "services": RegisterServiceCollector,
// "statefulsets": RegisterStatefulSetCollector,
// "persistentvolumes": RegisterPersistentVolumeCollector,
// "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector,
// "namespaces": RegisterNamespaceCollector,
// "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector,
// "endpoints": RegisterEndpointCollector,
// "secrets": RegisterSecretCollector,
"configmaps": registerConfigMapCollector,
}
func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err)
}
// default 30 concurrent TCP connections
if k.MaxConnections == 0 {
k.MaxConnections = 30
}
// default check resourceList every hour
if k.ResourceListCheckInterval == nil {
k.ResourceListCheckInterval = &internal.Duration{
Duration: time.Hour,
}
}
c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg)
rList, err := c.getAPIResourceList(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err)
}
log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion)
return c, rList, nil
}
func init() {
inputs.Add("kubernetes_state", func() telegraf.Input {
return &KubenetesState{}
})
}

View File

@@ -0,0 +1,73 @@
package kube_state
import (
"context"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var (
nodeMeasurement = "kube_node"
nodeTaintMeasurement = "kube_node_spec_taint"
)
func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getNodes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, n := range list.Items {
if err = ks.gatherNode(n, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
fields := map[string]interface{}{}
tags := map[string]string{
"node": n.Name,
"kernel_version": n.Status.NodeInfo.KernelVersion,
"os_image": n.Status.NodeInfo.OSImage,
"container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion,
"kubelet_version": n.Status.NodeInfo.KubeletVersion,
"kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion,
"provider_id": n.Spec.ProviderID,
"spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable)
}
if !n.CreationTimestamp.IsZero() {
fields["created"] = n.CreationTimestamp.Unix()
}
for k, v := range n.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
// Collect node taints
for _, taint := range n.Spec.Taints {
go gatherNodeTaint(n, taint, acc)
}
acc.AddFields(nodeMeasurement, fields, tags)
return nil
}
func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){
fields := map[string]interface{}{
"gauge":1,
}
tags := map[string]string{
"node": n.Name,
"key": taint.Key,
"value": taint.Value,
"effect":string(taint.Effect),
}
acc.AddFields(nodeTaintMeasurement, fields, tags)
}

View File

@@ -0,0 +1,202 @@
package kube_state
import (
"context"
"regexp"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/util/node"
)
var (
podMeasurement = "kube_pod"
podContainerMeasurement = "kube_pod_container"
podVolumeMeasurement = "kube_pod_spec_volumes"
)
func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getPods(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, p := range list.Items {
if err = ks.gatherPod(p, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
nodeName := p.Spec.NodeName
fields := make(map[string]interface{})
tags := make(map[string]string)
createdBy := metav1.GetControllerOf(&p)
createdByKind := ""
createdByName := ""
if createdBy != nil {
if createdBy.Kind != "" {
createdByKind = createdBy.Kind
}
if createdBy.Name != "" {
createdByName = createdBy.Name
}
}
if p.Status.StartTime != nil {
fields["start_time"] = p.Status.StartTime.UnixNano()
}
tags["namesapce"] = p.Namespace
tags["name"] = p.Name
tags["host_ip"] = p.Status.HostIP
tags["pod_ip"] = p.Status.PodIP
tags["node"] = nodeName
tags["created_by_kind"] = createdByKind
tags["created_by_name"] = createdByName
tags["status_scheduled"] = "false"
tags["status_ready"] = "false"
owners := p.GetOwnerReferences()
if len(owners) == 0 {
tags["owner_kind"] = ""
tags["owner_name"] = ""
tags["owner_is_controller"] = ""
} else {
tags["owner_kind"] = owners[0].Kind
tags["owner_name"] = owners[0].Name
if owners[0].Controller != nil {
tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller)
} else {
tags["owner_is_controller"] = "false"
}
}
for k, v := range p.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
if phase := p.Status.Phase; phase != "" {
tags["status_phase"] = string(phase)
// This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601
// For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410
if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason {
tags["status_phase"] = string(v1.PodUnknown)
}
}
if !p.CreationTimestamp.IsZero() {
fields["created"] = p.CreationTimestamp.Unix()
}
for _, c := range p.Status.Conditions {
switch c.Type {
case v1.PodReady:
tags["status_ready"] = "true"
case v1.PodScheduled:
tags["status_scheduled"] = "true"
fields["status_scheduled_time"] = c.LastTransitionTime.Unix()
}
}
var lastFinishTime int64
for i, cs := range p.Status.ContainerStatuses {
c := p.Spec.Containers[i]
gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc)
}
if lastFinishTime > 0 {
fields["completion_time"] = lastFinishTime
}
for _, v := range p.Spec.Volumes {
if v.PersistentVolumeClaim != nil {
gatherPodVolume(v, p, acc)
}
}
acc.AddFields(podMeasurement, fields, tags)
return nil
}
func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"read_only": 0.0,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod": p.Name,
"volume": v.Name,
"persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName,
}
if v.PersistentVolumeClaim.ReadOnly {
fields["read_only"] = 1.0
}
acc.AddFields(podVolumeMeasurement, fields, tags)
}
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"status_restarts_total": cs.RestartCount,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod_name": p.Name,
"node_name": nodeName,
"container": c.Name,
"image": cs.Image,
"image_id": cs.ImageID,
"container_id": cs.ContainerID,
"status_waiting": strconv.FormatBool(cs.State.Waiting != nil),
"status_waiting_reason": "",
"status_running": strconv.FormatBool(cs.State.Terminated != nil),
"status_terminated": strconv.FormatBool(cs.State.Running != nil),
"status_terminated_reason": "",
"container_status_ready": strconv.FormatBool(cs.Ready),
}
if cs.State.Waiting != nil {
tags["status_waiting_reason"] = cs.State.Waiting.Reason
}
if cs.State.Terminated != nil {
tags["status_terminated_reason"] = cs.State.Terminated.Reason
if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() {
*lastFinishTime = cs.State.Terminated.FinishedAt.Unix()
}
}
req := c.Resources.Requests
lim := c.Resources.Limits
for resourceName, val := range req {
switch resourceName {
case v1.ResourceCPU:
fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
for resourceName, val := range lim {
switch resourceName {
case v1.ResourceCPU:
fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
acc.AddFields(podContainerMeasurement, fields, tags)
}
var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitizeLabelName(s string) string {
return invalidLabelCharRE.ReplaceAllString(s, "_")
}

View File

@@ -172,10 +172,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
go n.receiver()
in <- mqttMsg(testMsgJSON)

View File

@@ -108,10 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)

View File

@@ -300,10 +300,7 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.wg.Add(1)
go listener.tcpParser()

View File

@@ -193,10 +193,7 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.wg.Add(1)
go listener.udpParser()

View File

@@ -20,7 +20,6 @@ var (
type JSONParser struct {
MetricName string
TagKeys []string
FieldKeys []string
DefaultTags map[string]string
}
@@ -87,17 +86,6 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
}
}
//if field_keys is specified, only those values should be reported as fields
if len(p.FieldKeys) > 0 {
nFields := make(map[string]interface{})
for _, name := range p.FieldKeys {
if fields[name] != nil {
nFields[name] = fields[name]
}
}
return tags, nFields
}
//remove any additional string/bool values from fields
for k := range fields {
switch fields[k].(type) {

View File

@@ -1,7 +1,6 @@
package json
import (
"log"
"testing"
"github.com/stretchr/testify/assert"
@@ -449,28 +448,22 @@ func TestJSONParseNestedArray(t *testing.T) {
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": 7.93
}
}`
parser := JSONParser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
}
metrics, err := parser.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
}

View File

@@ -56,8 +56,6 @@ type Config struct {
// TagKeys only apply to JSON data
TagKeys []string
// FieldKeys only apply to JSON
FieldKeys []string
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string
@@ -97,8 +95,8 @@ func NewParser(config *Config) (Parser, error) {
var parser Parser
switch config.DataFormat {
case "json":
parser, err = newJSONParser(config.MetricName,
config.TagKeys, config.FieldKeys, config.DefaultTags)
parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
@@ -128,22 +126,6 @@ func NewParser(config *Config) (Parser, error) {
return parser, err
}
func newJSONParser(
metricName string,
tagKeys []string,
fieldKeys []string,
defaultTags map[string]string,
) (Parser, error) {
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
FieldKeys: fieldKeys,
DefaultTags: defaultTags,
}
return parser, nil
}
//Deprecated: Use NewParser to get a JSONParser object
func NewJSONParser(
metricName string,
tagKeys []string,