diff --git a/Gopkg.lock b/Gopkg.lock index 194bb61e6..9cade79c3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -317,10 +317,19 @@ [[projects]] name = "github.com/gogo/protobuf" - packages = ["proto"] + packages = [ + "proto", + "sortkeys" + ] revision = "1adfc126b41513cc696b209667c8656ea7aac67c" version = "v1.0.0" +[[projects]] + branch = "master" + name = "github.com/golang/glog" + packages = ["."] + revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998" + [[projects]] name = "github.com/golang/protobuf" packages = [ @@ -350,6 +359,12 @@ revision = "3af367b6b30c263d47e8895973edcca9a49cf029" version = "v0.2.0" +[[projects]] + branch = "master" + name = "github.com/google/gofuzz" + packages = ["."] + revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1" + [[projects]] name = "github.com/gorilla/context" packages = ["."] @@ -925,6 +940,12 @@ revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b" version = "v3.0.5" +[[projects]] + name = "gopkg.in/inf.v0" + packages = ["."] + revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" + version = "v0.9.1" + [[projects]] name = "gopkg.in/ldap.v2" packages = ["."] @@ -965,9 +986,43 @@ revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" version = "v2.2.1" +[[projects]] + name = "k8s.io/api" + packages = ["core/v1"] + revision = "af4bc157c3a209798fc897f6d4aaaaeb6c2e0d6a" + version = "kubernetes-1.9.0" + +[[projects]] + branch = "release-1.11" + name = "k8s.io/apimachinery" + packages = [ + "pkg/api/resource", + "pkg/apis/meta/v1", + "pkg/conversion", + "pkg/conversion/queryparams", + "pkg/fields", + "pkg/labels", + "pkg/runtime", + "pkg/runtime/schema", + "pkg/selection", + "pkg/types", + "pkg/util/errors", + "pkg/util/intstr", + "pkg/util/json", + "pkg/util/net", + "pkg/util/runtime", + "pkg/util/sets", + "pkg/util/validation", + "pkg/util/validation/field", + "pkg/util/wait", + "pkg/watch", + "third_party/forked/golang/reflect" + ] + revision = "103fd098999dc9c0c88536f5c9ad2e5da39373ae" + [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370" + inputs-digest = "e475e221e1a1bbcd2eced72dfe4c152382581c7588f087d3f36941df8984c8f6" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 78d3749a9..0b484968e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -241,3 +241,15 @@ [[override]] source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" name = "gopkg.in/fsnotify.v1" + +[[constraint]] + name = "k8s.io/api" + version = "kubernetes-1.11.0" + +[[constraint]] + name = "k8s.io/apimachinery" + version = "kubernetes-1.11.0" + +[[constraint]] + name = "k8s.io/kubernetes" + version = "v1.11.0" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b2be2be5a..21eb55868 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -48,6 +48,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/kapacitor" + _ "github.com/influxdata/telegraf/plugins/inputs/kube_state" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/logparser" diff --git a/plugins/inputs/kube_state/README.md b/plugins/inputs/kube_state/README.md new file mode 100644 index 000000000..fbee01e0e --- /dev/null +++ b/plugins/inputs/kube_state/README.md @@ -0,0 +1,77 @@ +### Line Protocol + +### PODs + +#### kube_pod + namespace = + name = + host_ip = + pod_ip = + node = + created_by_kind = + created_by_name = + owner_kind = + owner_name = + owner_is_controller = "true" + label_1 = "" + label_2 = "" + created = "" + + + start_time = + completion_time = + owner = + label_* = + created = + + status_scheduled_time + + + + + +#### kube_pod_status_scheduled_time + +#### kube_pod_status_phase + +#### kube_pod_status_ready + +#### kube_pod_status_scheduled + +#### kube_pod_container_info +namespace= +pod_name= +container_name= + + + +#### kube_pod_container_status_waiting + +#### kube_pod_container_status_waiting_reason + +#### kube_pod_container_status_running + +#### kube_pod_container_status_terminated + +#### kube_pod_container_status_terminated_reason + +#### kube_pod_container_status_ready + +#### kube_pod_container_status_restarts_total + +#### kube_pod_container_resource_requests + +#### kube_pod_container_resource_limits + +#### kube_pod_container_resource_requests_cpu_cores + +#### kube_pod_container_resource_requests_memory_bytes + +#### kube_pod_container_resource_limits_cpu_cores + +#### kube_pod_container_resource_limits_memory_bytes + + +#### kube_pod_spec_volumes_persistentvolumeclaims_info + +#### kube_pod_spec_volumes_persistentvolumeclaims_readonly diff --git a/plugins/inputs/kube_state/client.go b/plugins/inputs/kube_state/client.go new file mode 100644 index 000000000..40f81e456 --- /dev/null +++ b/plugins/inputs/kube_state/client.go @@ -0,0 +1,144 @@ +package kube_state + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type client struct { + baseURL string + httpClient *http.Client + bearerToken string + semaphore chan struct{} +} + +func newClient(baseURL string, timeout time.Duration, maxConns int, bearerToken string, tlsConfig *tls.Config) *client { + return &client{ + baseURL: baseURL, + httpClient: &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: maxConns, + TLSClientConfig: tlsConfig, + }, + Timeout: timeout, + }, + bearerToken: bearerToken, + semaphore: make(chan struct{}, maxConns), + } +} + +func (c *client) getAPIResourceList(ctx context.Context) (rList *metav1.APIResourceList, err error) { + rList = new(metav1.APIResourceList) + if err = c.doGet(ctx, "", rList); err != nil { + return nil, err + } + if rList.GroupVersion == "" { + return nil, &APIError{ + URL: c.baseURL, + StatusCode: http.StatusOK, + Title: "empty group version", + } + } + return rList, nil +} + +func (c *client) getNodes(ctx context.Context) (list *v1.NodeList, err error) { + list = new(v1.NodeList) + if err = c.doGet(ctx, "/nodes/", list); err != nil { + return nil, err + } + return list, nil +} + +func (c *client) getPods(ctx context.Context) (list *v1.PodList, err error) { + list = new(v1.PodList) + if err = c.doGet(ctx, "/pods/", list); err != nil { + return nil, err + } + return list, nil +} + +func (c *client) getConfigMaps(ctx context.Context) (list *v1.ConfigMapList, err error) { + list = new(v1.ConfigMapList) + if err = c.doGet(ctx, "/configmaps/", list); err != nil { + return nil, err + } + return list, nil +} + +func (c *client) doGet(ctx context.Context, url string, v interface{}) error { + req, err := createGetRequest(c.baseURL+url, c.bearerToken) + if err != nil { + return err + } + select { + case c.semaphore <- struct{}{}: + break + case <-ctx.Done(): + return ctx.Err() + } + + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + <-c.semaphore + return err + } + defer func() { + resp.Body.Close() + <-c.semaphore + }() + + // Clear invalid token if unauthorized + if resp.StatusCode == http.StatusUnauthorized { + c.bearerToken = "" + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return &APIError{ + URL: url, + StatusCode: resp.StatusCode, + Title: resp.Status, + } + } + + if resp.StatusCode == http.StatusNoContent { + return nil + } + + return json.NewDecoder(resp.Body).Decode(v) +} + +func createGetRequest(url string, token string) (*http.Request, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + req.Header.Add("Accept", "application/json") + + return req, nil +} + +type APIError struct { + URL string + StatusCode int + Title string + Description string +} + +func (e APIError) Error() string { + if e.Description != "" { + return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description) + } + return fmt.Sprintf("[%s] %s", e.URL, e.Title) +} diff --git a/plugins/inputs/kube_state/configmap.go b/plugins/inputs/kube_state/configmap.go new file mode 100644 index 000000000..9ac22f921 --- /dev/null +++ b/plugins/inputs/kube_state/configmap.go @@ -0,0 +1,42 @@ +package kube_state + +import ( + "context" + "time" + + "github.com/influxdata/telegraf" + "k8s.io/api/core/v1" +) + +var configMapMeasurement = "kube_configmap" + +func registerConfigMapCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) { + list, err := ks.client.getConfigMaps(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, s := range list.Items { + if err = ks.gatherConfigMap(s, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ks *KubenetesState) gatherConfigMap(s v1.ConfigMap, acc telegraf.Accumulator) error { + var creationTime time.Time + if !s.CreationTimestamp.IsZero() { + creationTime = s.CreationTimestamp.Time + } + fields := map[string]interface{}{ + "gauge": 1, + } + tags := map[string]string{ + "namespace": s.Namespace, + "configmap": s.Name, + "resource_version": s.ResourceVersion, + } + acc.AddFields(configMapMeasurement, fields, tags, creationTime) + return nil +} diff --git a/plugins/inputs/kube_state/kubernetes_state_metrics.go b/plugins/inputs/kube_state/kubernetes_state_metrics.go new file mode 100644 index 000000000..32b009306 --- /dev/null +++ b/plugins/inputs/kube_state/kubernetes_state_metrics.go @@ -0,0 +1,194 @@ +package kube_state + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "log" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// KubenetesState represents the config object for the plugin. +type KubenetesState struct { + URL string + + // Bearer Token authorization file path + BearerToken string `toml:"bearer_token"` + + // MaxConnections for worker pool tcp connections + MaxConnections int `toml:"max_connections"` + + // HTTP Timeout specified as a string - 3s, 1m, 1h + ResponseTimeout internal.Duration `toml:"response_timeout"` + + tls.ClientConfig + + client *client + rListHash string + filter filter.Filter + lastFilterBuilt int64 + ResourceListCheckInterval *internal.Duration `toml:"resouce_list_check_interval"` + ResourceExclude []string `toml:"resource_exclude"` + + DisablePodNonGenericResourceMetrics bool `json:"disable_pod_non_generic_resource_metrics"` + DisableNodeNonGenericResourceMetrics bool `json:"disable_node_non_generic_resource_metrics"` +} + +var sampleConfig = ` +## URL for the kubelet + url = "http://1.1.1.1:10255" + + ## Use bearer token for authorization + # bearer_token = /path/to/bearer/token + + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = /path/to/cafile + # tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Woker pool for kube_state_metric plugin only + # empty this field will use default value 30 + # max_connections = 30 +` + +//SampleConfig returns a sample config +func (k *KubenetesState) SampleConfig() string { + return sampleConfig +} + +//Description returns the description of this plugin +func (k *KubenetesState) Description() string { + return "Read metrics from the kubernetes kubelet api" +} + +//Gather collects kubernetes metrics from a given URL +func (k *KubenetesState) Gather(acc telegraf.Accumulator) (err error) { + var rList *metav1.APIResourceList + if k.client == nil { + if k.client, rList, err = k.initClient(); err != nil { + return err + } + goto buildFilter + } + + if k.lastFilterBuilt > 0 && time.Now().Unix()-k.lastFilterBuilt < int64(k.ResourceListCheckInterval.Duration.Seconds()) { + println("! skip to gather") + goto doGather + } + + rList, err = k.client.getAPIResourceList(context.Background()) + if err != nil { + return err + } + +buildFilter: + k.lastFilterBuilt = time.Now().Unix() + if err = k.buildFilter(rList); err != nil { + return err + } + +doGather: + for n, f := range availableCollectors { + ctx := context.Background() + if k.filter.Match(n) { + println("!", n) + go f(ctx, acc, k) + } + } + + return nil +} + +func (k *KubenetesState) buildFilter(rList *metav1.APIResourceList) error { + hash, err := genHash(rList) + if err != nil { + return err + } + if k.rListHash == hash { + return nil + } + k.rListHash = hash + include := make([]string, len(rList.APIResources)) + for k, v := range rList.APIResources { + include[k] = v.Name + } + k.filter, err = filter.NewIncludeExcludeFilter(include, k.ResourceExclude) + return err +} + +func genHash(rList *metav1.APIResourceList) (string, error) { + buf := new(bytes.Buffer) + for _, v := range rList.APIResources { + if _, err := buf.WriteString(v.Name + "|"); err != nil { + return "", err + } + } + sum := md5.Sum(buf.Bytes()) + return string(sum[:]), nil +} + +var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, k *KubenetesState){ + // "cronjobs": RegisterCronJobCollector, + // "daemonsets": RegisterDaemonSetCollector, + // "deployments": RegisterDeploymentCollector, + // "jobs": RegisterJobCollector, + // "limitranges": RegisterLimitRangeCollector, + "nodes": registerNodeCollector, + "pods": registerPodCollector, + // "replicasets": RegisterReplicaSetCollector, + // "replicationcontrollers": RegisterReplicationControllerCollector, + // "resourcequotas": RegisterResourceQuotaCollector, + // "services": RegisterServiceCollector, + // "statefulsets": RegisterStatefulSetCollector, + // "persistentvolumes": RegisterPersistentVolumeCollector, + // "persistentvolumeclaims": RegisterPersistentVolumeClaimCollector, + // "namespaces": RegisterNamespaceCollector, + // "horizontalpodautoscalers": RegisterHorizontalPodAutoScalerCollector, + // "endpoints": RegisterEndpointCollector, + // "secrets": RegisterSecretCollector, + "configmaps": registerConfigMapCollector, +} + +func (k *KubenetesState) initClient() (*client, *metav1.APIResourceList, error) { + tlsCfg, err := k.ClientConfig.TLSConfig() + if err != nil { + return nil, nil, fmt.Errorf("error parse kube state metrics config[%s]: %v", k.URL, err) + } + // default 30 concurrent TCP connections + if k.MaxConnections == 0 { + k.MaxConnections = 30 + } + + // default check resourceList every hour + if k.ResourceListCheckInterval == nil { + k.ResourceListCheckInterval = &internal.Duration{ + Duration: time.Hour, + } + } + c := newClient(k.URL, k.ResponseTimeout.Duration, k.MaxConnections, k.BearerToken, tlsCfg) + rList, err := c.getAPIResourceList(context.Background()) + if err != nil { + return nil, nil, fmt.Errorf("error connect to kubenetes api endpoint[%s]: %v", k.URL, err) + } + log.Printf("I! Kubenetes API group version is %s", rList.GroupVersion) + return c, rList, nil +} + +func init() { + inputs.Add("kubernetes_state", func() telegraf.Input { + return &KubenetesState{} + }) +} diff --git a/plugins/inputs/kube_state/node.go b/plugins/inputs/kube_state/node.go new file mode 100644 index 000000000..852468523 --- /dev/null +++ b/plugins/inputs/kube_state/node.go @@ -0,0 +1,73 @@ +package kube_state + +import ( + "context" + "strconv" + + "github.com/influxdata/telegraf" + "k8s.io/api/core/v1" +) + +var ( + nodeMeasurement = "kube_node" + nodeTaintMeasurement = "kube_node_spec_taint" +) + +func registerNodeCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) { + list, err := ks.client.getNodes(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, n := range list.Items { + if err = ks.gatherNode(n, acc); err != nil { + acc.AddError(err) + return + } + } + +} +func (ks *KubenetesState) gatherNode(n v1.Node, acc telegraf.Accumulator) error { + fields := map[string]interface{}{} + tags := map[string]string{ + "node": n.Name, + "kernel_version": n.Status.NodeInfo.KernelVersion, + "os_image": n.Status.NodeInfo.OSImage, + "container_runtime_version": n.Status.NodeInfo.ContainerRuntimeVersion, + "kubelet_version": n.Status.NodeInfo.KubeletVersion, + "kubeproxy_version": n.Status.NodeInfo.KubeProxyVersion, + "provider_id": n.Spec.ProviderID, + "spec_unschedulable": strconv.FormatBool(n.Spec.Unschedulable) + } + + if !n.CreationTimestamp.IsZero() { + fields["created"] = n.CreationTimestamp.Unix() + } + + for k, v := range n.Labels { + tags["label_"+sanitizeLabelName(k)] = v + } + + // Collect node taints + for _, taint := range n.Spec.Taints { + go gatherNodeTaint(n, taint, acc) + } + + acc.AddFields(nodeMeasurement, fields, tags) + return nil +} + +func gatherNodeTaint(n v1.Node, taint v1.Taint,acc telegraf.Accumulator){ + fields := map[string]interface{}{ + "gauge":1, + } + tags := map[string]string{ + "node": n.Name, + "key": taint.Key, + "value": taint.Value, + "effect":string(taint.Effect), + } + + acc.AddFields(nodeTaintMeasurement, fields, tags) + +} \ No newline at end of file diff --git a/plugins/inputs/kube_state/pod.go b/plugins/inputs/kube_state/pod.go new file mode 100644 index 000000000..e0eb7ed87 --- /dev/null +++ b/plugins/inputs/kube_state/pod.go @@ -0,0 +1,202 @@ +package kube_state + +import ( + "context" + "regexp" + "strconv" + + "github.com/influxdata/telegraf" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/util/node" +) + +var ( + podMeasurement = "kube_pod" + podContainerMeasurement = "kube_pod_container" + podVolumeMeasurement = "kube_pod_spec_volumes" +) + +func registerPodCollector(ctx context.Context, acc telegraf.Accumulator, ks *KubenetesState) { + list, err := ks.client.getPods(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, p := range list.Items { + if err = ks.gatherPod(p, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ks *KubenetesState) gatherPod(p v1.Pod, acc telegraf.Accumulator) error { + nodeName := p.Spec.NodeName + fields := make(map[string]interface{}) + tags := make(map[string]string) + + createdBy := metav1.GetControllerOf(&p) + createdByKind := "" + createdByName := "" + if createdBy != nil { + if createdBy.Kind != "" { + createdByKind = createdBy.Kind + } + if createdBy.Name != "" { + createdByName = createdBy.Name + } + } + + if p.Status.StartTime != nil { + fields["start_time"] = p.Status.StartTime.UnixNano() + } + + tags["namesapce"] = p.Namespace + tags["name"] = p.Name + tags["host_ip"] = p.Status.HostIP + tags["pod_ip"] = p.Status.PodIP + tags["node"] = nodeName + tags["created_by_kind"] = createdByKind + tags["created_by_name"] = createdByName + tags["status_scheduled"] = "false" + tags["status_ready"] = "false" + + owners := p.GetOwnerReferences() + if len(owners) == 0 { + tags["owner_kind"] = "" + tags["owner_name"] = "" + tags["owner_is_controller"] = "" + } else { + tags["owner_kind"] = owners[0].Kind + tags["owner_name"] = owners[0].Name + if owners[0].Controller != nil { + tags["owner_is_controller"] = strconv.FormatBool(*owners[0].Controller) + } else { + tags["owner_is_controller"] = "false" + } + } + + for k, v := range p.Labels { + tags["label_"+sanitizeLabelName(k)] = v + } + + if phase := p.Status.Phase; phase != "" { + tags["status_phase"] = string(phase) + // This logic is directly copied from: https://github.com/kubernetes/kubernetes/blob/d39bfa0d138368bbe72b0eaf434501dcb4ec9908/pkg/printers/internalversion/printers.go#L597-L601 + // For more info, please go to: https://github.com/kubernetes/kube-state-metrics/issues/410 + if p.DeletionTimestamp != nil && p.Status.Reason == node.NodeUnreachablePodReason { + tags["status_phase"] = string(v1.PodUnknown) + } + } + + if !p.CreationTimestamp.IsZero() { + fields["created"] = p.CreationTimestamp.Unix() + } + + for _, c := range p.Status.Conditions { + switch c.Type { + case v1.PodReady: + tags["status_ready"] = "true" + case v1.PodScheduled: + tags["status_scheduled"] = "true" + fields["status_scheduled_time"] = c.LastTransitionTime.Unix() + } + } + + var lastFinishTime int64 + + for i, cs := range p.Status.ContainerStatuses { + c := p.Spec.Containers[i] + gatherPodContainer(nodeName, p, cs, c, &lastFinishTime, acc) + } + + if lastFinishTime > 0 { + fields["completion_time"] = lastFinishTime + } + + for _, v := range p.Spec.Volumes { + if v.PersistentVolumeClaim != nil { + gatherPodVolume(v, p, acc) + } + } + + acc.AddFields(podMeasurement, fields, tags) + return nil +} + +func gatherPodVolume(v v1.Volume, p v1.Pod, acc telegraf.Accumulator) { + fields := map[string]interface{}{ + "read_only": 0.0, + } + tags := map[string]string{ + "namespace": p.Namespace, + "pod": p.Name, + "volume": v.Name, + "persistentvolumeclaim": v.PersistentVolumeClaim.ClaimName, + } + if v.PersistentVolumeClaim.ReadOnly { + fields["read_only"] = 1.0 + } + acc.AddFields(podVolumeMeasurement, fields, tags) +} + +func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, lastFinishTime *int64, acc telegraf.Accumulator) { + + fields := map[string]interface{}{ + "status_restarts_total": cs.RestartCount, + } + tags := map[string]string{ + "namespace": p.Namespace, + "pod_name": p.Name, + "node_name": nodeName, + "container": c.Name, + "image": cs.Image, + "image_id": cs.ImageID, + "container_id": cs.ContainerID, + "status_waiting": strconv.FormatBool(cs.State.Waiting != nil), + "status_waiting_reason": "", + "status_running": strconv.FormatBool(cs.State.Terminated != nil), + "status_terminated": strconv.FormatBool(cs.State.Running != nil), + "status_terminated_reason": "", + "container_status_ready": strconv.FormatBool(cs.Ready), + } + + if cs.State.Waiting != nil { + tags["status_waiting_reason"] = cs.State.Waiting.Reason + } + + if cs.State.Terminated != nil { + tags["status_terminated_reason"] = cs.State.Terminated.Reason + if *lastFinishTime == 0 || *lastFinishTime < cs.State.Terminated.FinishedAt.Unix() { + *lastFinishTime = cs.State.Terminated.FinishedAt.Unix() + } + } + req := c.Resources.Requests + lim := c.Resources.Limits + + for resourceName, val := range req { + switch resourceName { + case v1.ResourceCPU: + fields["resource_requests_cpu_cores"] = val.MilliValue() / 1000 + default: + fields["resource_requests_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value() + } + } + for resourceName, val := range lim { + switch resourceName { + case v1.ResourceCPU: + fields["resource_limits_cpu_cores"] = val.MilliValue() / 1000 + default: + fields["resource_limits_"+sanitizeLabelName(string(resourceName))+"_bytes"] = val.Value() + } + } + + acc.AddFields(podContainerMeasurement, fields, tags) +} + +var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) + +func sanitizeLabelName(s string) string { + return invalidLabelCharRE.ReplaceAllString(s, "_") +}