Add ability to collect pod labels to Kubernetes input (#6764)

This commit is contained in:
LinaLinn 2019-12-12 23:14:37 +01:00 committed by Daniel Nelson
parent 4fbba13622
commit f79ba10ab3
4 changed files with 161 additions and 41 deletions

View File

@ -1,6 +1,6 @@
# Kubernetes Input Plugin
This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
To find the ip address of the host you are running on you can issue a command like the following:
@ -44,6 +44,11 @@ avoid cardinality issues:
## OR
# bearer_token_string = "abc_123"
# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

View File

@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"github.com/influxdata/telegraf/filter"
"io/ioutil"
"net/http"
"net/url"
@ -23,6 +24,11 @@ type Kubernetes struct {
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`
LabelInclude []string `toml:"label_include"`
LabelExclude []string `toml:"label_exclude"`
labelFilter filter.Filter
// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration
@ -42,6 +48,11 @@ var sampleConfig = `
## OR
# bearer_token_string = "abc_123"
# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
@ -60,7 +71,10 @@ const (
func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{}
return &Kubernetes{
LabelInclude: []string{},
LabelExclude: []string{"*"},
}
})
}
@ -75,6 +89,7 @@ func (k *Kubernetes) Description() string {
}
func (k *Kubernetes) Init() error {
// If neither are provided, use the default service account.
if k.BearerToken == "" && k.BearerTokenString == "" {
k.BearerToken = defaultServiceAccountPath
@ -88,6 +103,12 @@ func (k *Kubernetes) Init() error {
k.BearerTokenString = strings.TrimSpace(string(token))
}
labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude)
if err != nil {
return err
}
k.labelFilter = labelFilter
return nil
}
@ -107,48 +128,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) {
}
func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
url := fmt.Sprintf("%s/stats/summary", baseURL)
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response
tlsCfg, err := k.ClientConfig.TLSConfig()
summaryMetrics := &SummaryMetrics{}
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)
if err != nil {
return err
}
if k.RoundTripper == nil {
// Set default values
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")
resp, err = k.RoundTripper.RoundTrip(req)
podInfos, err := k.gatherPodInfo(baseURL)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}
summaryMetrics := &SummaryMetrics{}
err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
return err
}
buildSystemContainerMetrics(summaryMetrics, acc)
buildNodeMetrics(summaryMetrics, acc)
buildPodMetrics(summaryMetrics, acc)
buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc)
return nil
}
@ -200,7 +192,56 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
acc.AddFields("kubernetes_node", fields, tags)
}
func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) {
var podApi Pods
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi)
if err != nil {
return nil, err
}
var podInfos []Metadata
for _, podMetadata := range podApi.Items {
podInfos = append(podInfos, podMetadata.Metadata)
}
return podInfos, nil
}
func (k *Kubernetes) LoadJson(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if k.RoundTripper == nil {
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")
resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
}
return nil
}
func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
for _, container := range pod.Containers {
tags := map[string]string{
@ -209,6 +250,16 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
"container_name": container.Name,
"pod_name": pod.PodRef.Name,
}
for _, info := range podInfo {
if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace {
for k, v := range info.Labels {
if labelFilter.Match(k) {
tags[k] = v
}
}
}
}
fields := make(map[string]interface{})
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds

View File

@ -0,0 +1,17 @@
package kubernetes
type Pods struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Items []Item `json:"items"`
}
type Item struct {
Metadata Metadata `json:"metadata"`
}
type Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Labels map[string]string `json:"labels"`
}

View File

@ -2,6 +2,7 @@ package kubernetes
import (
"fmt"
"github.com/influxdata/telegraf/filter"
"net/http"
"net/http/httptest"
"testing"
@ -12,13 +13,23 @@ import (
func TestKubernetesStats(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
if r.RequestURI == "/stats/summary" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responseStatsSummery)
}
if r.RequestURI == "/pods" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responsePods)
}
}))
defer ts.Close()
labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil)
k := &Kubernetes{
URL: ts.URL,
URL: ts.URL,
labelFilter: labelFilter,
}
var acc testutil.Accumulator
@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "foocontainer",
"namespace": "foons",
"pod_name": "foopod",
"app": "foo",
"superkey": "foobar",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)
@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "stopped-container",
"namespace": "foons",
"pod_name": "stopped-pod",
"app": "foo-stop",
"superkey": "superfoo",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)
@ -143,7 +158,39 @@ func TestKubernetesStats(t *testing.T) {
}
var response = `
var responsePods = `
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {},
"items": [
{
"metadata": {
"name": "foopod",
"namespace": "foons",
"labels": {
"superkey": "foobar",
"app": "foo",
"exclude": "exclude0"
}
}
},
{
"metadata": {
"name": "stopped-pod",
"namespace": "foons",
"labels": {
"superkey": "superfoo",
"app": "foo-stop",
"exclude": "exclude1"
}
}
}
]
}
`
var responseStatsSummery = `
{
"node": {
"nodeName": "node1",