add kube state metrics

This commit is contained in:
Kelvin Wang 2018-07-01 12:17:31 -07:00
parent 90a38bd125
commit 54ac4d70c9
9 changed files with 802 additions and 2 deletions

59
Gopkg.lock generated
View File

@ -317,10 +317,19 @@
[[projects]]
name = "github.com/gogo/protobuf"
packages = ["proto"]
packages = [
"proto",
"sortkeys"
]
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/golang/glog"
packages = ["."]
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
[[projects]]
name = "github.com/golang/protobuf"
packages = [
@ -350,6 +359,12 @@
revision = "3af367b6b30c263d47e8895973edcca9a49cf029"
version = "v0.2.0"
[[projects]]
branch = "master"
name = "github.com/google/gofuzz"
packages = ["."]
revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1"
[[projects]]
name = "github.com/gorilla/context"
packages = ["."]
@ -925,6 +940,12 @@
revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b"
version = "v3.0.5"
[[projects]]
name = "gopkg.in/inf.v0"
packages = ["."]
revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf"
version = "v0.9.1"
[[projects]]
name = "gopkg.in/ldap.v2"
packages = ["."]
@ -965,9 +986,43 @@
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1"
[[projects]]
name = "k8s.io/api"
packages = ["core/v1"]
revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a"
version = "kubernetes-1.9.0"
[[projects]]
branch = "release-1.11"
name = "k8s.io/apimachinery"
packages = [
"pkg/api/resource",
"pkg/apis/meta/v1",
"pkg/conversion",
"pkg/conversion/queryparams",
"pkg/fields",
"pkg/labels",
"pkg/runtime",
"pkg/runtime/schema",
"pkg/selection",
"pkg/types",
"pkg/util/errors",
"pkg/util/intstr",
"pkg/util/json",
"pkg/util/net",
"pkg/util/runtime",
"pkg/util/sets",
"pkg/util/validation",
"pkg/util/validation/field",
"pkg/util/wait",
"pkg/watch",
"third_party/forked/golang/reflect"
]
revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370"
inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -241,3 +241,15 @@
[[override]]
source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz"
name = "gopkg.in/fsnotify.v1"
[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"
[[constraint]]
name = "k8s.io/kubernetes"
version = "v1.11.0"

View File

@ -48,6 +48,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_state"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"

View File

@ -0,0 +1,77 @@
### Line Protocol
### PODs
#### kube_pod
namespace =
name =
host_ip =
pod_ip =
node =
created_by_kind =
created_by_name =
owner_kind =
owner_name =
owner_is_controller = "true"
label_1 = ""
label_2 = ""
created = ""
start_time =
completion_time =
owner =
label_* =
created =
status_scheduled_time
#### kube_pod_status_scheduled_time
#### kube_pod_status_phase
#### kube_pod_status_ready
#### kube_pod_status_scheduled
#### kube_pod_container_info
namespace=
pod_name=
container_name=
#### kube_pod_container_status_waiting
#### kube_pod_container_status_waiting_reason
#### kube_pod_container_status_running
#### kube_pod_container_status_terminated
#### kube_pod_container_status_terminated_reason
#### kube_pod_container_status_ready
#### kube_pod_container_status_restarts_total
#### kube_pod_container_resource_requests
#### kube_pod_container_resource_limits
#### kube_pod_container_resource_requests_cpu_cores
#### kube_pod_container_resource_requests_memory_bytes
#### kube_pod_container_resource_limits_cpu_cores
#### kube_pod_container_resource_limits_memory_bytes
#### kube_pod_spec_volumes_persistentvolumeclaims_info
#### kube_pod_spec_volumes_persistentvolumeclaims_readonly

View File

@ -0,0 +1,144 @@
package kube_state
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type client struct {
baseURL string
httpClient *http.Client
bearerToken string
semaphore chan struct{}
}
func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client {
return &client{
baseURL: baseURL,
httpClient: &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
TLSClientConfig: tlsConfig,
},
Timeout: timeout,
},
bearerToken: bearerToken,
semaphore: make(chan struct{}, maxConns),
}
}
func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) {
rList = new(metav1.APIResourceList)
if err = c.doGet(ctx, "", rList); err != nil {
return nil, err
}
if rList.GroupVersion == "" {
return nil, &APIError{
URL: c.baseURL,
StatusCode: http.StatusOK,
Title: "empty group version",
}
}
return rList, nil
}
func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) {
list = new(v1.NodeList)
if err = c.doGet(ctx, "/nodes/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) {
list = new(v1.PodList)
if err = c.doGet(ctx, "/pods/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) {
list = new(v1.ConfigMapList)
if err = c.doGet(ctx, "/configmaps/", list); err != nil {
return nil, err
}
return list, nil
}
func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(c.baseURL+url, c.bearerToken)
if err != nil {
return err
}
select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
return err
}
defer func() {
resp.Body.Close()
<-c.semaphore
}()
// Clear invalid token if unauthorized
if resp.StatusCode == http.StatusUnauthorized {
c.bearerToken = ""
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{
URL: url,
StatusCode: resp.StatusCode,
Title: resp.Status,
}
}
if resp.StatusCode == http.StatusNoContent {
return nil
}
return json.NewDecoder(resp.Body).Decode(v)
}
func createGetRequest(url string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
req.Header.Add("Accept", "application/json")
return req, nil
}
type APIError struct {
URL string
StatusCode int
Title string
Description string
}
func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
}

View File

@ -0,0 +1,42 @@
package kube_state
import (
"context"
"time"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var configMapMeasurement = "kube_configmap"
func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getConfigMaps(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, s := range list.Items {
if err = ks.gatherConfigMap(s, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error {
var creationTime time.Time
if !s.CreationTimestamp.IsZero() {
creationTime = s.CreationTimestamp.Time
}
fields := map[string]interface{}{
"gauge": 1,
}
tags := map[string]string{
"namespace": s.Namespace,
"configmap": s.Name,
"resource_version": s.ResourceVersion,
}
acc.AddFields(configMapMeasurement, fields, tags, creationTime)
return nil
}

View File

@ -0,0 +1,194 @@
package kube_state
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KubenetesState represents the config object for the plugin.
type KubenetesState struct {
URL string
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
// MaxConnections for worker pool tcp connections
MaxConnections int `toml:"max_connections"`
// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *client
rListHash string
filter filter.Filter
lastFilterBuilt int64
ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"`
ResourceExclude []string `toml:"resource_exclude"`
DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"`
DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"`
}
var sampleConfig = `
## URL for the kubelet
url = "http://1.1.1.1:10255"
## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Woker pool for kube_state_metric plugin only
# empty this field will use default value 30
# max_connections = 30
`
//SampleConfig returns a sample config
func (k *KubenetesState) SampleConfig() string {
return sampleConfig
}
//Description returns the description of this plugin
func (k *KubenetesState) Description() string {
return "Read metrics from the kubernetes kubelet api"
}
//Gather collects kubernetes metrics from a given URL
func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) {
var rList *metav1.APIResourceList
if k.client == nil {
if k.client, rList, err = k.initClient(); err != nil {
return err
}
goto buildFilter
}
if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) {
println("! skip to gather")
goto doGather
}
rList, err = k.client.getAPIResourceList(context.Background())
if err != nil {
return err
}
buildFilter:
k.lastFilterBuilt = time.Now().Unix()
if err = k.buildFilter(rList); err != nil {
return err
}
doGather:
for n, f := range availableCollectors {
ctx := context.Background()
if k.filter.Match(n) {
println("!", n)
go f(ctx, acc, k)
}
}
return nil
}
func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error {
hash, err := genHash(rList)
if err != nil {
return err
}
if k.rListHash == hash {
return nil
}
k.rListHash = hash
include := make([]string, len(rList.APIResources))
for k, v := range rList.APIResources {
include[k] = v.Name
}
k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude)
return err
}
func genHash(rList *metav1.APIResourceList) (string, error) {
buf := new(bytes.Buffer)
for _, v := range rList.APIResources {
if _, err := buf.WriteString(v.Name + "|"); err != nil {
return "", err
}
}
sum := md5.Sum(buf.Bytes())
return string(sum[:]), nil
}
var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){
// "cronjobs": RegisterCronJobCollector,
// "daemonsets": RegisterDaemonSetCollector,
// "deployments": RegisterDeploymentCollector,
// "jobs": RegisterJobCollector,
// "limitranges": RegisterLimitRangeCollector,
"nodes": registerNodeCollector,
"pods": registerPodCollector,
// "replicasets": RegisterReplicaSetCollector,
// "replicationcontrollers": RegisterReplicationControllerCollector,
// "resourcequotas": RegisterResourceQuotaCollector,
// "services": RegisterServiceCollector,
// "statefulsets": RegisterStatefulSetCollector,
// "persistentvolumes": RegisterPersistentVolumeCollector,
// "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector,
// "namespaces": RegisterNamespaceCollector,
// "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector,
// "endpoints": RegisterEndpointCollector,
// "secrets": RegisterSecretCollector,
"configmaps": registerConfigMapCollector,
}
func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) {
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err)
}
// default 30 concurrent TCP connections
if k.MaxConnections == 0 {
k.MaxConnections = 30
}
// default check resourceList every hour
if k.ResourceListCheckInterval == nil {
k.ResourceListCheckInterval = &internal.Duration{
Duration: time.Hour,
}
}
c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg)
rList, err := c.getAPIResourceList(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err)
}
log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion)
return c, rList, nil
}
func init() {
inputs.Add("kubernetes_state", func() telegraf.Input {
return &KubenetesState{}
})
}

View File

@ -0,0 +1,73 @@
package kube_state
import (
"context"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
)
var (
nodeMeasurement = "kube_node"
nodeTaintMeasurement = "kube_node_spec_taint"
)
func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getNodes(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, n := range list.Items {
if err = ks.gatherNode(n, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error {
fields := map[string]interface{}{}
tags := map[string]string{
"node": n.Name,
"kernel_version": n.Status.NodeInfo.KernelVersion,
"os_image": n.Status.NodeInfo.OSImage,
"container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion,
"kubelet_version": n.Status.NodeInfo.KubeletVersion,
"kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion,
"provider_id": n.Spec.ProviderID,
"spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable)
}
if !n.CreationTimestamp.IsZero() {
fields["created"] = n.CreationTimestamp.Unix()
}
for k, v := range n.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
// Collect node taints
for _, taint := range n.Spec.Taints {
go gatherNodeTaint(n, taint, acc)
}
acc.AddFields(nodeMeasurement, fields, tags)
return nil
}
func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){
fields := map[string]interface{}{
"gauge":1,
}
tags := map[string]string{
"node": n.Name,
"key": taint.Key,
"value": taint.Value,
"effect":string(taint.Effect),
}
acc.AddFields(nodeTaintMeasurement, fields, tags)
}

View File

@ -0,0 +1,202 @@
package kube_state
import (
"context"
"regexp"
"strconv"
"github.com/influxdata/telegraf"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/util/node"
)
var (
podMeasurement = "kube_pod"
podContainerMeasurement = "kube_pod_container"
podVolumeMeasurement = "kube_pod_spec_volumes"
)
func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) {
list, err := ks.client.getPods(ctx)
if err != nil {
acc.AddError(err)
return
}
for _, p := range list.Items {
if err = ks.gatherPod(p, acc); err != nil {
acc.AddError(err)
return
}
}
}
func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error {
nodeName := p.Spec.NodeName
fields := make(map[string]interface{})
tags := make(map[string]string)
createdBy := metav1.GetControllerOf(&p)
createdByKind := ""
createdByName := ""
if createdBy != nil {
if createdBy.Kind != "" {
createdByKind = createdBy.Kind
}
if createdBy.Name != "" {
createdByName = createdBy.Name
}
}
if p.Status.StartTime != nil {
fields["start_time"] = p.Status.StartTime.UnixNano()
}
tags["namesapce"] = p.Namespace
tags["name"] = p.Name
tags["host_ip"] = p.Status.HostIP
tags["pod_ip"] = p.Status.PodIP
tags["node"] = nodeName
tags["created_by_kind"] = createdByKind
tags["created_by_name"] = createdByName
tags["status_scheduled"] = "false"
tags["status_ready"] = "false"
owners := p.GetOwnerReferences()
if len(owners) == 0 {
tags["owner_kind"] = ""
tags["owner_name"] = ""
tags["owner_is_controller"] = ""
} else {
tags["owner_kind"] = owners[0].Kind
tags["owner_name"] = owners[0].Name
if owners[0].Controller != nil {
tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller)
} else {
tags["owner_is_controller"] = "false"
}
}
for k, v := range p.Labels {
tags["label_"+sanitizeLabelName(k)] = v
}
if phase := p.Status.Phase; phase != "" {
tags["status_phase"] = string(phase)
// This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601
// For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410
if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason {
tags["status_phase"] = string(v1.PodUnknown)
}
}
if !p.CreationTimestamp.IsZero() {
fields["created"] = p.CreationTimestamp.Unix()
}
for _, c := range p.Status.Conditions {
switch c.Type {
case v1.PodReady:
tags["status_ready"] = "true"
case v1.PodScheduled:
tags["status_scheduled"] = "true"
fields["status_scheduled_time"] = c.LastTransitionTime.Unix()
}
}
var lastFinishTime int64
for i, cs := range p.Status.ContainerStatuses {
c := p.Spec.Containers[i]
gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc)
}
if lastFinishTime > 0 {
fields["completion_time"] = lastFinishTime
}
for _, v := range p.Spec.Volumes {
if v.PersistentVolumeClaim != nil {
gatherPodVolume(v, p, acc)
}
}
acc.AddFields(podMeasurement, fields, tags)
return nil
}
func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"read_only": 0.0,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod": p.Name,
"volume": v.Name,
"persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName,
}
if v.PersistentVolumeClaim.ReadOnly {
fields["read_only"] = 1.0
}
acc.AddFields(podVolumeMeasurement, fields, tags)
}
func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) {
fields := map[string]interface{}{
"status_restarts_total": cs.RestartCount,
}
tags := map[string]string{
"namespace": p.Namespace,
"pod_name": p.Name,
"node_name": nodeName,
"container": c.Name,
"image": cs.Image,
"image_id": cs.ImageID,
"container_id": cs.ContainerID,
"status_waiting": strconv.FormatBool(cs.State.Waiting != nil),
"status_waiting_reason": "",
"status_running": strconv.FormatBool(cs.State.Terminated != nil),
"status_terminated": strconv.FormatBool(cs.State.Running != nil),
"status_terminated_reason": "",
"container_status_ready": strconv.FormatBool(cs.Ready),
}
if cs.State.Waiting != nil {
tags["status_waiting_reason"] = cs.State.Waiting.Reason
}
if cs.State.Terminated != nil {
tags["status_terminated_reason"] = cs.State.Terminated.Reason
if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() {
*lastFinishTime = cs.State.Terminated.FinishedAt.Unix()
}
}
req := c.Resources.Requests
lim := c.Resources.Limits
for resourceName, val := range req {
switch resourceName {
case v1.ResourceCPU:
fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
for resourceName, val := range lim {
switch resourceName {
case v1.ResourceCPU:
fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000
default:
fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value()
}
}
acc.AddFields(podContainerMeasurement, fields, tags)
}
var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
func sanitizeLabelName(s string) string {
return invalidLabelCharRE.ReplaceAllString(s, "_")
}