Add kube_inventory input plugin (#5110)

This commit is contained in:
Greg 2019-02-04 13:28:43 -07:00 committed by Daniel Nelson
parent 6be5b55094
commit 94de223916
21 changed files with 1928 additions and 0 deletions

24
Gopkg.lock generated
View File

@ -387,8 +387,11 @@
packages = [
".",
"apis/apiextensions/v1beta1",
"apis/apps/v1beta1",
"apis/apps/v1beta2",
"apis/core/v1",
"apis/meta/v1",
"apis/policy/v1beta1",
"apis/resource",
"runtime",
"runtime/schema",
@ -696,6 +699,14 @@
pruneopts = ""
revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0"
[[projects]]
branch = "master"
digest = "1:e7737c09200582508f4f67227c39e7c4667cc6067a6d2b2e679654e43e8a8cb4"
name = "github.com/kubernetes/apimachinery"
packages = ["pkg/api/resource"]
pruneopts = ""
revision = "d41becfba9ee9bf8e55cec1dd3934cd7cfc04b99"
[[projects]]
branch = "develop"
digest = "1:3e66a61a57bbbe832c338edb3a623be0deb3dec650c2f3515149658898287e37"
@ -1424,6 +1435,14 @@
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
version = "v3.0.5"
[[projects]]
digest = "1:75fb3fcfc73a8c723efde7777b40e8e8ff9babf30d8c56160d01beffea8a95a6"
name = "gopkg.in/inf.v0"
packages = ["."]
pruneopts = ""
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
version = "v0.9.1"
[[projects]]
digest = "1:367baf06b7dbd0ef0bbdd785f6a79f929c96b0c18e9d3b29c0eed1ac3f5db133"
name = "gopkg.in/ldap.v2"
@ -1511,8 +1530,12 @@
"github.com/docker/libnetwork/ipvs",
"github.com/eclipse/paho.mqtt.golang",
"github.com/ericchiang/k8s",
"github.com/ericchiang/k8s/apis/apps/v1beta1",
"github.com/ericchiang/k8s/apis/apps/v1beta2",
"github.com/ericchiang/k8s/apis/core/v1",
"github.com/ericchiang/k8s/apis/meta/v1",
"github.com/ericchiang/k8s/apis/resource",
"github.com/ericchiang/k8s/util/intstr",
"github.com/go-logfmt/logfmt",
"github.com/go-redis/redis",
"github.com/go-sql-driver/mysql",
@ -1537,6 +1560,7 @@
"github.com/kardianos/service",
"github.com/karrick/godirwalk",
"github.com/kballard/go-shellquote",
"github.com/kubernetes/apimachinery/pkg/api/resource",
"github.com/matttproud/golang_protobuf_extensions/pbutil",
"github.com/miekg/dns",
"github.com/multiplay/go-ts3",

View File

@ -254,6 +254,10 @@
name = "github.com/karrick/godirwalk"
version = "1.7.5"
[[constraint]]
branch = "master"
name = "github.com/kubernetes/apimachinery"
[[constraint]]
name = "github.com/go-logfmt/logfmt"
version = "0.4.0"

View File

@ -63,6 +63,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kernel"
_ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat"
_ "github.com/influxdata/telegraf/plugins/inputs/kibana"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"

View File

@ -0,0 +1,238 @@
# Kube_Inventory Plugin
This plugin generates metrics derived from the state of the following Kubernetes resources:
- daemonsets
- deployments
- nodes
- persistentvolumes
- persistentvolumeclaims
- pods (containers)
- statefulsets
#### Series Cardinality Warning
This plugin may produce a high number of series which, when not controlled
for, will cause high load on your database. Use the following techniques to
avoid cardinality issues:
- Use [metric filtering][] options to exclude unneeded measurements and tags.
- Write to a database with an appropriate [retention policy][].
- Limit series cardinality in your database using the
[max-series-per-database][] and [max-values-per-tag][] settings.
- Consider using the [Time Series Index][tsi].
- Monitor your databases [series cardinality][].
- Consult the [InfluxDB documentation][influx-docs] for the most up-to-date techniques.
### Configuration:
```toml
[[inputs.kube_inventory]]
## URL for the Kubernetes API
url = "https://127.0.0.1"
## Namespace to use
# namespace = "default"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional Resources to exclude from gathering
## Leave them with blank with try to gather everything available.
## Values can be - "daemonsets", deployments", "nodes", "persistentvolumes",
## "persistentvolumeclaims", "pods", "statefulsets"
# resource_exclude = [ "deployments", "nodes", "statefulsets" ]
## Optional Resources to include when gathering
## Overrides resource_exclude if both set.
# resource_include = [ "deployments", "nodes", "statefulsets" ]
## Optional TLS Config
# tls_ca = "/path/to/cafile"
# tls_cert = "/path/to/certfile"
# tls_key = "/path/to/keyfile"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
#### Kubernetes Permissions
If using [RBAC authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/), you will need to create a cluster role to list "persistentvolumes" and "nodes". You will then need to make an [aggregated ClusterRole](https://kubernetes.io/docs/reference/access-authn-authz/rbac/#aggregated-clusterroles) that will eventually be bound to a user or group.
```yaml
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: influx:cluster:viewer
labels:
rbac.authorization.k8s.io/aggregate-view-telegraf: "true"
rules:
- apiGroups: [""]
resources: ["persistentvolumes","nodes"]
verbs: ["get","list"]
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: influx:telegraf
aggregationRule:
clusterRoleSelectors:
- matchLabels:
rbac.authorization.k8s.io/aggregate-view-telegraf: "true"
rbac.authorization.k8s.io/aggregate-to-view: "true"
rules: [] # Rules are automatically filled in by the controller manager.
```
Bind the newly created aggregated ClusterRole with the following config file, updating the subjects as needed.
```yaml
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: influx:telegraf:viewer
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: influx:telegraf
subjects:
- kind: ServiceAccount
name: telegraf
namespace: default
```
### Metrics:
+ kubernetes_daemonset
- tags:
- daemonset_name
- namespace
- fields:
- generation
- current_number_scheduled
- desired_number_scheduled
- number_available
- number_misscheduled
- number_ready
- number_unavailable
- updated_number_scheduled
- kubernetes_deployment
- tags:
- deployment_name
- namespace
- fields:
- replicas_available
- replicas_unavailable
- created
+ kubernetes_node
- tags:
- node_name
- fields:
- capacity_cpu_cores
- capacity_memory_bytes
- capacity_pods
- allocatable_cpu_cores
- allocatable_memory_bytes
- allocatable_pods
- kubernetes_persistentvolume
- tags:
- pv_name
- phase
- storageclass
- fields:
- phase_type (int, [see below](#pv-phase_type))
+ kubernetes_persistentvolumeclaim
- tags:
- pvc_name
- namespace
- phase
- storageclass
- fields:
- phase_type (int, [see below](#pvc-phase_type))
- kubernetes_pod_container
- tags:
- container_name
- namespace
- node_name
- pod_name
- fields:
- restarts_total
- state
- terminated_reason
- resource_requests_cpu_units
- resource_requests_memory_bytes
- resource_limits_cpu_units
- resource_limits_memory_bytes
+ kubernetes_statefulset
- tags:
- statefulset_name
- namespace
- fields:
- created
- generation
- replicas
- replicas_current
- replicas_ready
- replicas_updated
- spec_replicas
- observed_generation
#### pv `phase_type`
The persistentvolume "phase" is saved in the `phase` tag with a correlated numeric field called `phase_type` corresponding with that tag value.
|Tag value |Corresponding field value|
-----------|-------------------------|
|bound | 0 |
|failed | 1 |
|pending | 2 |
|released | 3 |
|available | 4 |
|unknown | 5 |
#### pvc `phase_type`
The persistentvolumeclaim "phase" is saved in the `phase` tag with a correlated numeric field called `phase_type` corresponding with that tag value.
|Tag value |Corresponding field value|
-----------|-------------------------|
|bound | 0 |
|lost | 1 |
|pending | 2 |
|unknown | 3 |
### Example Output:
```
kubernetes_configmap,configmap_name=envoy-config,namespace=default,resource_version=56593031 created=1544103867000000000i 1547597616000000000
kubernetes_daemonset
kubernetes_deployment,deployment_name=deployd,namespace=default replicas_unavailable=0i,created=1544103082000000000i,replicas_available=1i 1547597616000000000
kubernetes_node,node_name=ip-172-17-0-2.internal allocatable_pods=110i,capacity_memory_bytes=128837533696,capacity_pods=110i,capacity_cpu_cores=16i,allocatable_cpu_cores=16i,allocatable_memory_bytes=128732676096 1547597616000000000
kubernetes_persistentvolume,phase=Released,pv_name=pvc-aaaaaaaa-bbbb-cccc-1111-222222222222,storageclass=ebs-1-retain phase_type=3i 1547597616000000000
kubernetes_persistentvolumeclaim,namespace=default,phase=Bound,pvc_name=data-etcd-0,storageclass=ebs-1-retain phase_type=0i 1547597615000000000
kubernetes_pod,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1 last_transition_time=1547578322000000000i,ready="false" 1547597616000000000
kubernetes_pod_container,container_name=telegraf,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1,state=running resource_requests_cpu_units=0.1,resource_limits_memory_bytes=524288000,resource_limits_cpu_units=0.5,restarts_total=0i,state_code=0i,terminated_reason="",resource_requests_memory_bytes=524288000 1547597616000000000
kubernetes_statefulset,namespace=default,statefulset_name=etcd replicas_updated=3i,spec_replicas=3i,observed_generation=1i,created=1544101669000000000i,generation=1i,replicas=3i,replicas_current=3i,replicas_ready=3i 1547597616000000000
```
[metric filtering]: https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering
[retention policy]: https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/
[max-series-per-database]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000
[max-values-per-tag]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000
[tsi]: https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/
[series cardinality]: https://docs.influxdata.com/influxdb/latest/query_language/spec/#show-cardinality
[influx-docs]: https://docs.influxdata.com/influxdb/latest/
[k8s-telegraf]: https://www.influxdata.com/blog/monitoring-kubernetes-architecture/
[tick-charts]: https://github.com/influxdata/tick-charts

View File

@ -0,0 +1,97 @@
package kube_inventory
import (
"context"
"time"
"github.com/ericchiang/k8s"
"github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/apps/v1beta2"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/influxdata/telegraf/internal/tls"
)
type client struct {
namespace string
timeout time.Duration
*k8s.Client
}
func newClient(baseURL, namespace, bearerToken string, timeout time.Duration, tlsConfig tls.ClientConfig) (*client, error) {
c, err := k8s.NewClient(&k8s.Config{
Clusters: []k8s.NamedCluster{{Name: "cluster", Cluster: k8s.Cluster{
Server: baseURL,
InsecureSkipTLSVerify: tlsConfig.InsecureSkipVerify,
CertificateAuthority: tlsConfig.TLSCA,
}}},
Contexts: []k8s.NamedContext{{Name: "context", Context: k8s.Context{
Cluster: "cluster",
AuthInfo: "auth",
Namespace: namespace,
}}},
AuthInfos: []k8s.NamedAuthInfo{{Name: "auth", AuthInfo: k8s.AuthInfo{
Token: bearerToken,
ClientCertificate: tlsConfig.TLSCert,
ClientKey: tlsConfig.TLSKey,
}}},
})
if err != nil {
return nil, err
}
return &client{
Client: c,
timeout: timeout,
namespace: namespace,
}, nil
}
func (c *client) getDaemonSets(ctx context.Context) (*v1beta2.DaemonSetList, error) {
list := new(v1beta2.DaemonSetList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}
func (c *client) getDeployments(ctx context.Context) (*v1beta1.DeploymentList, error) {
list := &v1beta1.DeploymentList{}
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}
func (c *client) getNodes(ctx context.Context) (*v1.NodeList, error) {
list := new(v1.NodeList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, "", list)
}
func (c *client) getPersistentVolumes(ctx context.Context) (*v1.PersistentVolumeList, error) {
list := new(v1.PersistentVolumeList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, "", list)
}
func (c *client) getPersistentVolumeClaims(ctx context.Context) (*v1.PersistentVolumeClaimList, error) {
list := new(v1.PersistentVolumeClaimList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}
func (c *client) getPods(ctx context.Context) (*v1.PodList, error) {
list := new(v1.PodList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}
func (c *client) getStatefulSets(ctx context.Context) (*v1beta1.StatefulSetList, error) {
list := new(v1beta1.StatefulSetList)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
return list, c.List(ctx, c.namespace, list)
}

View File

@ -0,0 +1,35 @@
package kube_inventory
import (
"testing"
"time"
"github.com/influxdata/telegraf/internal/tls"
)
type mockHandler struct {
responseMap map[string]interface{}
}
func toStrPtr(s string) *string {
return &s
}
func toInt32Ptr(i int32) *int32 {
return &i
}
func toInt64Ptr(i int64) *int64 {
return &i
}
func toBoolPtr(b bool) *bool {
return &b
}
func TestNewClient(t *testing.T) {
_, err := newClient("https://127.0.0.1:443/", "default", "abc123", time.Second, tls.ClientConfig{})
if err != nil {
t.Errorf("Failed to create new client - %s", err.Error())
}
}

View File

@ -0,0 +1,49 @@
package kube_inventory
import (
"context"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta2"
"github.com/influxdata/telegraf"
)
func collectDaemonSets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getDaemonSets(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, d := range list.Items {
if err = ki.gatherDaemonSet(*d, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherDaemonSet(d v1beta2.DaemonSet, acc telegraf.Accumulator) error {
fields := map[string]interface{}{
"generation": d.Metadata.GetGeneration(),
"current_number_scheduled": d.Status.GetCurrentNumberScheduled(),
"desired_number_scheduled": d.Status.GetDesiredNumberScheduled(),
"number_available": d.Status.GetNumberAvailable(),
"number_misscheduled": d.Status.GetNumberMisscheduled(),
"number_ready": d.Status.GetNumberReady(),
"number_unavailable": d.Status.GetNumberUnavailable(),
"updated_number_scheduled": d.Status.GetUpdatedNumberScheduled(),
}
tags := map[string]string{
"daemonset_name": d.Metadata.GetName(),
"namespace": d.Metadata.GetNamespace(),
}
if d.Metadata.CreationTimestamp.GetSeconds() != 0 {
fields["created"] = time.Unix(d.Metadata.CreationTimestamp.GetSeconds(), int64(d.Metadata.CreationTimestamp.GetNanos())).UnixNano()
}
acc.AddFields(daemonSetMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,123 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta2"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/influxdata/telegraf/testutil"
)
func TestDaemonSet(t *testing.T) {
cli := &client{}
now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no daemon set",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/daemonsets/": &v1beta2.DaemonSetList{},
},
},
hasError: false,
},
{
name: "collect daemonsets",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/daemonsets/": &v1beta2.DaemonSetList{
Items: []*v1beta2.DaemonSet{
{
Status: &v1beta2.DaemonSetStatus{
CurrentNumberScheduled: toInt32Ptr(3),
DesiredNumberScheduled: toInt32Ptr(5),
NumberAvailable: toInt32Ptr(2),
NumberMisscheduled: toInt32Ptr(2),
NumberReady: toInt32Ptr(1),
NumberUnavailable: toInt32Ptr(1),
UpdatedNumberScheduled: toInt32Ptr(2),
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(11221),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("daemon1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"generation": int64(11221),
"current_number_scheduled": int32(3),
"desired_number_scheduled": int32(5),
"number_available": int32(2),
"number_misscheduled": int32(2),
"number_ready": int32(1),
"number_unavailable": int32(1),
"updated_number_scheduled": int32(2),
"created": now.UnixNano(),
},
Tags: map[string]string{
"daemonset_name": "daemon1",
"namespace": "ns1",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, dset := range ((v.handler.responseMap["/daemonsets/"]).(*v1beta2.DaemonSetList)).Items {
err := ks.gatherDaemonSet(*dset, acc)
if err != nil {
t.Errorf("Failed to gather daemonset - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}

View File

@ -0,0 +1,40 @@
package kube_inventory
import (
"context"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/influxdata/telegraf"
)
func collectDeployments(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getDeployments(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, d := range list.Items {
if err = ki.gatherDeployment(*d, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherDeployment(d v1beta1.Deployment, acc telegraf.Accumulator) error {
fields := map[string]interface{}{
"replicas_available": d.Status.GetAvailableReplicas(),
"replicas_unavailable": d.Status.GetUnavailableReplicas(),
"created": time.Unix(d.Metadata.CreationTimestamp.GetSeconds(), int64(d.Metadata.CreationTimestamp.GetNanos())).UnixNano(),
}
tags := map[string]string{
"deployment_name": d.Metadata.GetName(),
"namespace": d.Metadata.GetNamespace(),
}
acc.AddFields(deploymentMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,131 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/ericchiang/k8s/util/intstr"
"github.com/influxdata/telegraf/testutil"
)
func TestDeployment(t *testing.T) {
cli := &client{}
now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())
outputMetric := &testutil.Metric{
Fields: map[string]interface{}{
"replicas_available": int32(1),
"replicas_unavailable": int32(4),
"created": now.UnixNano(),
},
Tags: map[string]string{
"namespace": "ns1",
"deployment_name": "deploy1",
},
}
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no deployments",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/deployments/": &v1beta1.DeploymentList{},
},
},
hasError: false,
},
{
name: "collect deployments",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/deployments/": &v1beta1.DeploymentList{
Items: []*v1beta1.Deployment{
{
Status: &v1beta1.DeploymentStatus{
Replicas: toInt32Ptr(3),
AvailableReplicas: toInt32Ptr(1),
UnavailableReplicas: toInt32Ptr(4),
UpdatedReplicas: toInt32Ptr(2),
ObservedGeneration: toInt64Ptr(9121),
},
Spec: &v1beta1.DeploymentSpec{
Strategy: &v1beta1.DeploymentStrategy{
RollingUpdate: &v1beta1.RollingUpdateDeployment{
MaxUnavailable: &intstr.IntOrString{
IntVal: toInt32Ptr(30),
},
MaxSurge: &intstr.IntOrString{
IntVal: toInt32Ptr(20),
},
},
},
Replicas: toInt32Ptr(4),
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(11221),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("deploy1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
outputMetric,
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, deployment := range ((v.handler.responseMap["/deployments/"]).(*v1beta1.DeploymentList)).Items {
err := ks.gatherDeployment(*deployment, acc)
if err != nil {
t.Errorf("Failed to gather deployment - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}

View File

@ -0,0 +1,175 @@
package kube_inventory
import (
"context"
"fmt"
"io/ioutil"
"log"
"strconv"
"strings"
"sync"
"time"
"github.com/kubernetes/apimachinery/pkg/api/resource"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
// KubernetesInventory represents the config object for the plugin.
type KubernetesInventory struct {
URL string `toml:"url"`
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
Namespace string `toml:"namespace"`
ResponseTimeout internal.Duration `toml:"response_timeout"` // Timeout specified as a string - 3s, 1m, 1h
ResourceExclude []string `toml:"resource_exclude"`
ResourceInclude []string `toml:"resource_include"`
MaxConfigMapAge internal.Duration `toml:"max_config_map_age"`
tls.ClientConfig
client *client
}
var sampleConfig = `
## URL for the Kubernetes API
url = "https://127.0.0.1"
## Namespace to use
# namespace = "default"
## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional Resources to exclude from gathering
## Leave them with blank with try to gather everything available.
## Values can be - "daemonsets", deployments", "nodes", "persistentvolumes",
## "persistentvolumeclaims", "pods", "statefulsets"
# resource_exclude = [ "deployments", "nodes", "statefulsets" ]
## Optional Resources to include when gathering
## Overrides resource_exclude if both set.
# resource_include = [ "deployments", "nodes", "statefulsets" ]
## Optional TLS Config
# tls_ca = "/path/to/cafile"
# tls_cert = "/path/to/certfile"
# tls_key = "/path/to/keyfile"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
// SampleConfig returns a sample config
func (ki *KubernetesInventory) SampleConfig() string {
return sampleConfig
}
// Description returns the description of this plugin
func (ki *KubernetesInventory) Description() string {
return "Read metrics from the Kubernetes api"
}
// Gather collects kubernetes metrics from a given URL.
func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) {
if ki.client == nil {
if ki.client, err = ki.initClient(); err != nil {
return err
}
}
resourceFilter, err := filter.NewIncludeExcludeFilter(ki.ResourceInclude, ki.ResourceExclude)
if err != nil {
return err
}
wg := sync.WaitGroup{}
ctx := context.Background()
for collector, f := range availableCollectors {
if resourceFilter.Match(collector) {
wg.Add(1)
go func(f func(ctx context.Context, acc telegraf.Accumulator, k *KubernetesInventory)) {
defer wg.Done()
f(ctx, acc, ki)
}(f)
}
}
wg.Wait()
return nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){
"daemonsets": collectDaemonSets,
"deployments": collectDeployments,
"nodes": collectNodes,
"persistentvolumes": collectPersistentVolumes,
"persistentvolumeclaims": collectPersistentVolumeClaims,
"pods": collectPods,
"statefulsets": collectStatefulSets,
}
func (ki *KubernetesInventory) initClient() (*client, error) {
if ki.BearerToken != "" {
token, err := ioutil.ReadFile(ki.BearerToken)
if err != nil {
return nil, err
}
ki.BearerTokenString = strings.TrimSpace(string(token))
}
return newClient(ki.URL, ki.Namespace, ki.BearerTokenString, ki.ResponseTimeout.Duration, ki.ClientConfig)
}
func atoi(s string) int64 {
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0
}
return int64(i)
}
func convertQuantity(s string, m float64) int64 {
q, err := resource.ParseQuantity(s)
if err != nil {
log.Printf("E! Failed to parse quantity - %v", err)
return 0
}
f, err := strconv.ParseFloat(fmt.Sprint(q.AsDec()), 64)
if err != nil {
log.Printf("E! Failed to parse float - %v", err)
return 0
}
if m < 1 {
m = 1
}
return int64(f * m)
}
var (
daemonSetMeasurement = "kubernetes_daemonset"
deploymentMeasurement = "kubernetes_deployment"
nodeMeasurement = "kubernetes_node"
persistentVolumeMeasurement = "kubernetes_persistentvolume"
persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim"
podContainerMeasurement = "kubernetes_pod_container"
statefulSetMeasurement = "kubernetes_statefulset"
)
func init() {
inputs.Add("kube_inventory", func() telegraf.Input {
return &KubernetesInventory{
ResponseTimeout: internal.Duration{Duration: time.Second * 5},
Namespace: "default",
}
})
}

View File

@ -0,0 +1,56 @@
package kube_inventory
import (
"context"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/influxdata/telegraf"
)
func collectNodes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getNodes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, n := range list.Items {
if err = ki.gatherNode(*n, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
fields := map[string]interface{}{}
tags := map[string]string{
"node_name": *n.Metadata.Name,
}
for resourceName, val := range n.Status.Capacity {
switch resourceName {
case "cpu":
fields["capacity_cpu_cores"] = atoi(val.GetString_())
case "memory":
fields["capacity_memory_bytes"] = convertQuantity(val.GetString_(), 1)
case "pods":
fields["capacity_pods"] = atoi(val.GetString_())
}
}
for resourceName, val := range n.Status.Allocatable {
switch resourceName {
case "cpu":
fields["allocatable_cpu_cores"] = atoi(val.GetString_())
case "memory":
fields["allocatable_memory_bytes"] = convertQuantity(val.GetString_(), 1)
case "pods":
fields["allocatable_pods"] = atoi(val.GetString_())
}
}
acc.AddFields(nodeMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,172 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/ericchiang/k8s/apis/resource"
"github.com/influxdata/telegraf/testutil"
)
func TestNode(t *testing.T) {
cli := &client{}
now := time.Now()
created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no nodes",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/nodes/": &v1.NodeList{},
},
},
hasError: false,
},
{
name: "collect nodes",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/nodes/": &v1.NodeList{
Items: []*v1.Node{
{
Status: &v1.NodeStatus{
NodeInfo: &v1.NodeSystemInfo{
KernelVersion: toStrPtr("4.14.48-coreos-r2"),
OsImage: toStrPtr("Container Linux by CoreOS 1745.7.0 (Rhyolite)"),
ContainerRuntimeVersion: toStrPtr("docker://18.3.1"),
KubeletVersion: toStrPtr("v1.10.3"),
KubeProxyVersion: toStrPtr("v1.10.3"),
},
Phase: toStrPtr("Running"),
Capacity: map[string]*resource.Quantity{
"cpu": {String_: toStrPtr("16")},
"ephemeral_storage_bytes": {String_: toStrPtr("49536401408")},
"hugepages_1Gi_bytes": {String_: toStrPtr("0")},
"hugepages_2Mi_bytes": {String_: toStrPtr("0")},
"memory": {String_: toStrPtr("125817904Ki")},
"pods": {String_: toStrPtr("110")},
},
Allocatable: map[string]*resource.Quantity{
"cpu": {String_: toStrPtr("16")},
"ephemeral_storage_bytes": {String_: toStrPtr("44582761194")},
"hugepages_1Gi_bytes": {String_: toStrPtr("0")},
"hugepages_2Mi_bytes": {String_: toStrPtr("0")},
"memory": {String_: toStrPtr("125715504Ki")},
"pods": {String_: toStrPtr("110")},
},
Conditions: []*v1.NodeCondition{
{Type: toStrPtr("Ready"), Status: toStrPtr("true"), LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}},
{Type: toStrPtr("OutOfDisk"), Status: toStrPtr("false"), LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(created.Unix())}},
},
},
Spec: &v1.NodeSpec{
ProviderID: toStrPtr("aws:///us-east-1c/i-0c00"),
Taints: []*v1.Taint{
{
Key: toStrPtr("k1"),
Value: toStrPtr("v1"),
Effect: toStrPtr("NoExecute"),
},
{
Key: toStrPtr("k2"),
Value: toStrPtr("v2"),
Effect: toStrPtr("NoSchedule"),
},
},
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(int64(11232)),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("node1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(created.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Measurement: nodeMeasurement,
Fields: map[string]interface{}{
"capacity_cpu_cores": int64(16),
"capacity_memory_bytes": int64(1.28837533696e+11),
"capacity_pods": int64(110),
"allocatable_cpu_cores": int64(16),
"allocatable_memory_bytes": int64(1.28732676096e+11),
"allocatable_pods": int64(110),
},
Tags: map[string]string{
"node_name": "node1",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, node := range ((v.handler.responseMap["/nodes/"]).(*v1.NodeList)).Items {
err := ks.gatherNode(*node, acc)
if err != nil {
t.Errorf("Failed to gather node - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
measurement := v.output.Metrics[i].Measurement
var keyTag string
switch measurement {
case nodeMeasurement:
keyTag = "node"
}
var j int
for j = range acc.Metrics {
if acc.Metrics[j].Measurement == measurement &&
acc.Metrics[j].Tags[keyTag] == v.output.Metrics[i].Tags[keyTag] {
break
}
}
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[j].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s, measurement %s, j %d\n", v.name, k, m, acc.Metrics[j].Tags[k], measurement, j)
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[j].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T), measurement %s, j %d\n", v.name, k, m, m, acc.Metrics[j].Fields[k], acc.Metrics[i].Fields[k], measurement, j)
}
}
}
}
}
}

View File

@ -0,0 +1,52 @@
package kube_inventory
import (
"context"
"strings"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/influxdata/telegraf"
)
func collectPersistentVolumes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getPersistentVolumes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, pv := range list.Items {
if err = ki.gatherPersistentVolume(*pv, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherPersistentVolume(pv v1.PersistentVolume, acc telegraf.Accumulator) error {
phaseType := 5
switch strings.ToLower(pv.Status.GetPhase()) {
case "bound":
phaseType = 0
case "failed":
phaseType = 1
case "pending":
phaseType = 2
case "released":
phaseType = 3
case "available":
phaseType = 4
}
fields := map[string]interface{}{
"phase_type": phaseType,
}
tags := map[string]string{
"pv_name": pv.Metadata.GetName(),
"phase": pv.Status.GetPhase(),
"storageclass": pv.Spec.GetStorageClassName(),
}
acc.AddFields(persistentVolumeMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,112 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/influxdata/telegraf/testutil"
)
func TestPersistentVolume(t *testing.T) {
cli := &client{}
now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no pv",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/persistentvolumes/": &v1.PersistentVolumeList{},
},
},
hasError: false,
},
{
name: "collect pvs",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/persistentvolumes/": &v1.PersistentVolumeList{
Items: []*v1.PersistentVolume{
{
Status: &v1.PersistentVolumeStatus{
Phase: toStrPtr("pending"),
},
Spec: &v1.PersistentVolumeSpec{
StorageClassName: toStrPtr("ebs-1"),
},
Metadata: &metav1.ObjectMeta{
Name: toStrPtr("pv1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"phase_type": 2,
},
Tags: map[string]string{
"pv_name": "pv1",
"storageclass": "ebs-1",
"phase": "pending",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, pv := range ((v.handler.responseMap["/persistentvolumes/"]).(*v1.PersistentVolumeList)).Items {
err := ks.gatherPersistentVolume(*pv, acc)
if err != nil {
t.Errorf("Failed to gather pv - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}

View File

@ -0,0 +1,49 @@
package kube_inventory
import (
"context"
"strings"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/influxdata/telegraf"
)
func collectPersistentVolumeClaims(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getPersistentVolumeClaims(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, pvc := range list.Items {
if err = ki.gatherPersistentVolumeClaim(*pvc, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherPersistentVolumeClaim(pvc v1.PersistentVolumeClaim, acc telegraf.Accumulator) error {
phaseType := 3
switch strings.ToLower(pvc.Status.GetPhase()) {
case "bound":
phaseType = 0
case "lost":
phaseType = 1
case "pending":
phaseType = 2
}
fields := map[string]interface{}{
"phase_type": phaseType,
}
tags := map[string]string{
"pvc_name": pvc.Metadata.GetName(),
"namespace": pvc.Metadata.GetNamespace(),
"phase": pvc.Status.GetPhase(),
"storageclass": pvc.Spec.GetStorageClassName(),
}
acc.AddFields(persistentVolumeClaimMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,115 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/influxdata/telegraf/testutil"
)
func TestPersistentVolumeClaim(t *testing.T) {
cli := &client{}
now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no pv claims",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/persistentvolumeclaims/": &v1.PersistentVolumeClaimList{},
},
},
hasError: false,
},
{
name: "collect pv claims",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/persistentvolumeclaims/": &v1.PersistentVolumeClaimList{
Items: []*v1.PersistentVolumeClaim{
{
Status: &v1.PersistentVolumeClaimStatus{
Phase: toStrPtr("bound"),
},
Spec: &v1.PersistentVolumeClaimSpec{
VolumeName: toStrPtr("pvc-dc870fd6-1e08-11e8-b226-02aa4bc06eb8"),
StorageClassName: toStrPtr("ebs-1"),
},
Metadata: &metav1.ObjectMeta{
Namespace: toStrPtr("ns1"),
Name: toStrPtr("pc1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"phase_type": 0,
},
Tags: map[string]string{
"pvc_name": "pc1",
"namespace": "ns1",
"storageclass": "ebs-1",
"phase": "bound",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, pvc := range ((v.handler.responseMap["/persistentvolumeclaims/"]).(*v1.PersistentVolumeClaimList)).Items {
err := ks.gatherPersistentVolumeClaim(*pvc, acc)
if err != nil {
t.Errorf("Failed to gather pvc - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}

View File

@ -0,0 +1,87 @@
package kube_inventory
import (
"context"
"github.com/ericchiang/k8s/apis/core/v1"
"github.com/influxdata/telegraf"
)
func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getPods(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, p := range list.Items {
if err = ki.gatherPod(*p, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
if p.Metadata.CreationTimestamp.GetSeconds() == 0 && p.Metadata.CreationTimestamp.GetNanos() == 0 {
return nil
}
for i, cs := range p.Status.ContainerStatuses {
c := p.Spec.Containers[i]
gatherPodContainer(*p.Spec.NodeName, p, *cs, *c, acc)
}
return nil
}
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, acc telegraf.Accumulator) {
stateCode := 3
state := "unknown"
switch {
case cs.State.Running != nil:
stateCode = 0
state = "running"
case cs.State.Terminated != nil:
stateCode = 1
state = "terminated"
case cs.State.Waiting != nil:
stateCode = 2
state = "waiting"
}
fields := map[string]interface{}{
"restarts_total": cs.GetRestartCount(),
"state_code": stateCode,
"terminated_reason": cs.State.Terminated.GetReason(),
}
tags := map[string]string{
"container_name": *c.Name,
"namespace": *p.Metadata.Namespace,
"node_name": *p.Spec.NodeName,
"pod_name": *p.Metadata.Name,
"state": state,
}
req := c.Resources.Requests
lim := c.Resources.Limits
for resourceName, val := range req {
switch resourceName {
case "cpu":
fields["resource_requests_millicpu_units"] = convertQuantity(val.GetString_(), 1000)
case "memory":
fields["resource_requests_memory_bytes"] = convertQuantity(val.GetString_(), 1)
}
}
for resourceName, val := range lim {
switch resourceName {
case "cpu":
fields["resource_limits_millicpu_units"] = convertQuantity(val.GetString_(), 1000)
case "memory":
fields["resource_limits_memory_bytes"] = convertQuantity(val.GetString_(), 1)
}
}
acc.AddFields(podContainerMeasurement, fields, tags)
}

View File

@ -0,0 +1,199 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/ericchiang/k8s/apis/resource"
"github.com/influxdata/telegraf/testutil"
)
func TestPod(t *testing.T) {
cli := &client{}
now := time.Now()
started := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-1, 1, 36, 0, now.Location())
created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location())
cond1 := time.Date(now.Year(), 7, 5, 7, 53, 29, 0, now.Location())
cond2 := time.Date(now.Year(), 7, 5, 7, 53, 31, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no pods",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/pods/": &v1.PodList{},
},
},
hasError: false,
},
{
name: "collect pods",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/pods/": &v1.PodList{
Items: []*v1.Pod{
{
Spec: &v1.PodSpec{
NodeName: toStrPtr("node1"),
Containers: []*v1.Container{
{
Name: toStrPtr("forwarder"),
Image: toStrPtr("image1"),
Ports: []*v1.ContainerPort{
{
ContainerPort: toInt32Ptr(8080),
Protocol: toStrPtr("TCP"),
},
},
Resources: &v1.ResourceRequirements{
Limits: map[string]*resource.Quantity{
"cpu": {String_: toStrPtr("100m")},
},
Requests: map[string]*resource.Quantity{
"cpu": {String_: toStrPtr("100m")},
},
},
},
},
Volumes: []*v1.Volume{
{
Name: toStrPtr("vol1"),
VolumeSource: &v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: toStrPtr("pc1"),
ReadOnly: toBoolPtr(true),
},
},
},
{
Name: toStrPtr("vol2"),
},
},
},
Status: &v1.PodStatus{
Phase: toStrPtr("Running"),
HostIP: toStrPtr("180.12.10.18"),
PodIP: toStrPtr("10.244.2.15"),
StartTime: &metav1.Time{Seconds: toInt64Ptr(started.Unix())},
Conditions: []*v1.PodCondition{
{
Type: toStrPtr("Initialized"),
Status: toStrPtr("True"),
LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond1.Unix())},
},
{
Type: toStrPtr("Ready"),
Status: toStrPtr("True"),
LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond2.Unix())},
},
{
Type: toStrPtr("Scheduled"),
Status: toStrPtr("True"),
LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond1.Unix())},
},
},
ContainerStatuses: []*v1.ContainerStatus{
{
Name: toStrPtr("forwarder"),
State: &v1.ContainerState{
Running: &v1.ContainerStateRunning{
StartedAt: &metav1.Time{Seconds: toInt64Ptr(cond2.Unix())},
},
},
Ready: toBoolPtr(true),
RestartCount: toInt32Ptr(3),
Image: toStrPtr("image1"),
ImageID: toStrPtr("image_id1"),
ContainerID: toStrPtr("docker://54abe32d0094479d3d"),
},
},
},
Metadata: &metav1.ObjectMeta{
OwnerReferences: []*metav1.OwnerReference{
{
ApiVersion: toStrPtr("apps/v1"),
Kind: toStrPtr("DaemonSet"),
Name: toStrPtr("forwarder"),
Controller: toBoolPtr(true),
},
},
Generation: toInt64Ptr(11232),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("pod1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(created.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Measurement: podContainerMeasurement,
Fields: map[string]interface{}{
"restarts_total": int32(3),
"state_code": 0,
"resource_requests_millicpu_units": int64(100),
"resource_limits_millicpu_units": int64(100),
},
Tags: map[string]string{
"namespace": "ns1",
"container_name": "forwarder",
"node_name": "node1",
"pod_name": "pod1",
"state": "running",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, pod := range ((v.handler.responseMap["/pods/"]).(*v1.PodList)).Items {
err := ks.gatherPod(*pod, acc)
if err != nil {
t.Errorf("Failed to gather pod - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s, i %d\n", v.name, k, m, acc.Metrics[i].Tags[k], i)
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T), i %d\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k], i)
}
}
}
}
}
}

View File

@ -0,0 +1,46 @@
package kube_inventory
import (
"context"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/influxdata/telegraf"
)
func collectStatefulSets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) {
list, err := ki.client.getStatefulSets(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, s := range list.Items {
if err = ki.gatherStatefulSet(*s, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ki *KubernetesInventory) gatherStatefulSet(s v1beta1.StatefulSet, acc telegraf.Accumulator) error {
status := s.Status
fields := map[string]interface{}{
"created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(),
"generation": *s.Metadata.Generation,
"replicas": *status.Replicas,
"replicas_current": *status.CurrentReplicas,
"replicas_ready": *status.ReadyReplicas,
"replicas_updated": *status.UpdatedReplicas,
"spec_replicas": *s.Spec.Replicas,
"observed_generation": *s.Status.ObservedGeneration,
}
tags := map[string]string{
"statefulset_name": *s.Metadata.Name,
"namespace": *s.Metadata.Namespace,
}
acc.AddFields(statefulSetMeasurement, fields, tags)
return nil
}

View File

@ -0,0 +1,123 @@
package kube_inventory
import (
"testing"
"time"
"github.com/ericchiang/k8s/apis/apps/v1beta1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
"github.com/influxdata/telegraf/testutil"
)
func TestStatefulSet(t *testing.T) {
cli := &client{}
now := time.Now()
now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location())
tests := []struct {
name string
handler *mockHandler
output *testutil.Accumulator
hasError bool
}{
{
name: "no statefulsets",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/statefulsets/": &v1beta1.StatefulSetList{},
},
},
hasError: false,
},
{
name: "collect statefulsets",
handler: &mockHandler{
responseMap: map[string]interface{}{
"/statefulsets/": &v1beta1.StatefulSetList{
Items: []*v1beta1.StatefulSet{
{
Status: &v1beta1.StatefulSetStatus{
Replicas: toInt32Ptr(2),
CurrentReplicas: toInt32Ptr(4),
ReadyReplicas: toInt32Ptr(1),
UpdatedReplicas: toInt32Ptr(3),
ObservedGeneration: toInt64Ptr(119),
},
Spec: &v1beta1.StatefulSetSpec{
Replicas: toInt32Ptr(3),
},
Metadata: &metav1.ObjectMeta{
Generation: toInt64Ptr(332),
Namespace: toStrPtr("ns1"),
Name: toStrPtr("sts1"),
Labels: map[string]string{
"lab1": "v1",
"lab2": "v2",
},
CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())},
},
},
},
},
},
},
output: &testutil.Accumulator{
Metrics: []*testutil.Metric{
{
Fields: map[string]interface{}{
"generation": int64(332),
"observed_generation": int64(119),
"created": now.UnixNano(),
"spec_replicas": int32(3),
"replicas": int32(2),
"replicas_current": int32(4),
"replicas_ready": int32(1),
"replicas_updated": int32(3),
},
Tags: map[string]string{
"namespace": "ns1",
"statefulset_name": "sts1",
},
},
},
},
hasError: false,
},
}
for _, v := range tests {
ks := &KubernetesInventory{
client: cli,
}
acc := new(testutil.Accumulator)
for _, ss := range ((v.handler.responseMap["/statefulsets/"]).(*v1beta1.StatefulSetList)).Items {
err := ks.gatherStatefulSet(*ss, acc)
if err != nil {
t.Errorf("Failed to gather ss - %s", err.Error())
}
}
err := acc.FirstError()
if err == nil && v.hasError {
t.Fatalf("%s failed, should have error", v.name)
} else if err != nil && !v.hasError {
t.Fatalf("%s failed, err: %v", v.name, err)
}
if v.output == nil && len(acc.Metrics) > 0 {
t.Fatalf("%s: collected extra data", v.name)
} else if v.output != nil && len(v.output.Metrics) > 0 {
for i := range v.output.Metrics {
for k, m := range v.output.Metrics[i].Tags {
if acc.Metrics[i].Tags[k] != m {
t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k])
}
}
for k, m := range v.output.Metrics[i].Fields {
if acc.Metrics[i].Fields[k] != m {
t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k])
}
}
}
}
}
}