package kubernetes import ( "encoding/json" "fmt" "io/ioutil" "net/http" "net/url" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" ) // Kubernetes represents the config object for the plugin type Kubernetes struct { URL string // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` BearerTokenString string `toml:"bearer_token_string"` LabelInclude []string `toml:"label_include"` LabelExclude []string `toml:"label_exclude"` labelFilter filter.Filter // HTTP Timeout specified as a string - 3s, 1m, 1h ResponseTimeout internal.Duration tls.ClientConfig RoundTripper http.RoundTripper } var sampleConfig = ` ## URL for the kubelet url = "http://127.0.0.1:10255" ## Use bearer token for authorization. ('bearer_token' takes priority) ## If both of these are empty, we'll use the default serviceaccount: ## at: /run/secrets/kubernetes.io/serviceaccount/token # bearer_token = "/path/to/bearer/token" ## OR # bearer_token_string = "abc_123" ## Pod labels to be added as tags. An empty array for both include and ## exclude will include all labels. # label_include = [] # label_exclude = ["*"] ## Set response_timeout (default 5 seconds) # response_timeout = "5s" ## Optional TLS Config # tls_ca = /path/to/cafile # tls_cert = /path/to/certfile # tls_key = /path/to/keyfile ## Use TLS but skip chain & host verification # insecure_skip_verify = false ` const ( summaryEndpoint = `%s/stats/summary` defaultServiceAccountPath = "/run/secrets/kubernetes.io/serviceaccount/token" ) func init() { inputs.Add("kubernetes", func() telegraf.Input { return &Kubernetes{ LabelInclude: []string{}, LabelExclude: []string{"*"}, } }) } //SampleConfig returns a sample config func (k *Kubernetes) SampleConfig() string { return sampleConfig } //Description returns the description of this plugin func (k *Kubernetes) Description() string { return "Read metrics from the kubernetes kubelet api" } func (k *Kubernetes) Init() error { // If neither are provided, use the default service account. if k.BearerToken == "" && k.BearerTokenString == "" { k.BearerToken = defaultServiceAccountPath } if k.BearerToken != "" { token, err := ioutil.ReadFile(k.BearerToken) if err != nil { return err } k.BearerTokenString = strings.TrimSpace(string(token)) } labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude) if err != nil { return err } k.labelFilter = labelFilter return nil } //Gather collects kubernetes metrics from a given URL func (k *Kubernetes) Gather(acc telegraf.Accumulator) error { acc.AddError(k.gatherSummary(k.URL, acc)) return nil } func buildURL(endpoint string, base string) (*url.URL, error) { u := fmt.Sprintf(endpoint, base) addr, err := url.Parse(u) if err != nil { return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err) } return addr, nil } func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error { summaryMetrics := &SummaryMetrics{} err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics) if err != nil { return err } podInfos, err := k.gatherPodInfo(baseURL) if err != nil { return err } buildSystemContainerMetrics(summaryMetrics, acc) buildNodeMetrics(summaryMetrics, acc) buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc) return nil } func buildSystemContainerMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { for _, container := range summaryMetrics.Node.SystemContainers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "container_name": container.Name, } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds fields["memory_usage_bytes"] = container.Memory.UsageBytes fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes fields["memory_rss_bytes"] = container.Memory.RSSBytes fields["memory_page_faults"] = container.Memory.PageFaults fields["memory_major_page_faults"] = container.Memory.MajorPageFaults fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes fields["logsfs_available_bytes"] = container.LogsFS.AvailableBytes fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes acc.AddFields("kubernetes_system_container", fields, tags) } } func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = summaryMetrics.Node.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = summaryMetrics.Node.CPU.UsageCoreNanoSeconds fields["memory_available_bytes"] = summaryMetrics.Node.Memory.AvailableBytes fields["memory_usage_bytes"] = summaryMetrics.Node.Memory.UsageBytes fields["memory_working_set_bytes"] = summaryMetrics.Node.Memory.WorkingSetBytes fields["memory_rss_bytes"] = summaryMetrics.Node.Memory.RSSBytes fields["memory_page_faults"] = summaryMetrics.Node.Memory.PageFaults fields["memory_major_page_faults"] = summaryMetrics.Node.Memory.MajorPageFaults fields["network_rx_bytes"] = summaryMetrics.Node.Network.RXBytes fields["network_rx_errors"] = summaryMetrics.Node.Network.RXErrors fields["network_tx_bytes"] = summaryMetrics.Node.Network.TXBytes fields["network_tx_errors"] = summaryMetrics.Node.Network.TXErrors fields["fs_available_bytes"] = summaryMetrics.Node.FileSystem.AvailableBytes fields["fs_capacity_bytes"] = summaryMetrics.Node.FileSystem.CapacityBytes fields["fs_used_bytes"] = summaryMetrics.Node.FileSystem.UsedBytes fields["runtime_image_fs_available_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.AvailableBytes fields["runtime_image_fs_capacity_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.CapacityBytes fields["runtime_image_fs_used_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.UsedBytes acc.AddFields("kubernetes_node", fields, tags) } func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) { var podApi Pods err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi) if err != nil { return nil, err } var podInfos []Metadata for _, podMetadata := range podApi.Items { podInfos = append(podInfos, podMetadata.Metadata) } return podInfos, nil } func (k *Kubernetes) LoadJson(url string, v interface{}) error { var req, err = http.NewRequest("GET", url, nil) var resp *http.Response tlsCfg, err := k.ClientConfig.TLSConfig() if err != nil { return err } if k.RoundTripper == nil { if k.ResponseTimeout.Duration < time.Second { k.ResponseTimeout.Duration = time.Second * 5 } k.RoundTripper = &http.Transport{ TLSHandshakeTimeout: 5 * time.Second, TLSClientConfig: tlsCfg, ResponseHeaderTimeout: k.ResponseTimeout.Duration, } } req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) req.Header.Add("Accept", "application/json") resp, err = k.RoundTripper.RoundTrip(req) if err != nil { return fmt.Errorf("error making HTTP request to %s: %s", url, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) } err = json.NewDecoder(resp.Body).Decode(v) if err != nil { return fmt.Errorf(`Error parsing response: %s`, err) } return nil } func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) { for _, pod := range summaryMetrics.Pods { for _, container := range pod.Containers { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "namespace": pod.PodRef.Namespace, "container_name": container.Name, "pod_name": pod.PodRef.Name, } for _, info := range podInfo { if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace { for k, v := range info.Labels { if labelFilter.Match(k) { tags[k] = v } } } } fields := make(map[string]interface{}) fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds fields["memory_usage_bytes"] = container.Memory.UsageBytes fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes fields["memory_rss_bytes"] = container.Memory.RSSBytes fields["memory_page_faults"] = container.Memory.PageFaults fields["memory_major_page_faults"] = container.Memory.MajorPageFaults fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes fields["rootfs_used_bytes"] = container.RootFS.UsedBytes fields["logsfs_available_bytes"] = container.LogsFS.AvailableBytes fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes fields["logsfs_used_bytes"] = container.LogsFS.UsedBytes acc.AddFields("kubernetes_pod_container", fields, tags) } for _, volume := range pod.Volumes { tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "pod_name": pod.PodRef.Name, "namespace": pod.PodRef.Namespace, "volume_name": volume.Name, } fields := make(map[string]interface{}) fields["available_bytes"] = volume.AvailableBytes fields["capacity_bytes"] = volume.CapacityBytes fields["used_bytes"] = volume.UsedBytes acc.AddFields("kubernetes_pod_volume", fields, tags) } tags := map[string]string{ "node_name": summaryMetrics.Node.NodeName, "pod_name": pod.PodRef.Name, "namespace": pod.PodRef.Namespace, } fields := make(map[string]interface{}) fields["rx_bytes"] = pod.Network.RXBytes fields["rx_errors"] = pod.Network.RXErrors fields["tx_bytes"] = pod.Network.TXBytes fields["tx_errors"] = pod.Network.TXErrors acc.AddFields("kubernetes_pod_network", fields, tags) } }