243 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			243 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
| package kubernetes
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io/ioutil"
 | |
| 	"net/http"
 | |
| 	"net/url"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/influxdata/telegraf"
 | |
| 	"github.com/influxdata/telegraf/internal"
 | |
| 	"github.com/influxdata/telegraf/internal/errchan"
 | |
| 	"github.com/influxdata/telegraf/plugins/inputs"
 | |
| )
 | |
| 
 | |
| // Kubernetes represents the config object for the plugin
 | |
| type Kubernetes struct {
 | |
| 	URL string
 | |
| 
 | |
| 	// Bearer Token authorization file path
 | |
| 	BearerToken string `toml:"bearer_token"`
 | |
| 
 | |
| 	// Path to CA file
 | |
| 	SSLCA string `toml:"ssl_ca"`
 | |
| 	// Path to host cert file
 | |
| 	SSLCert string `toml:"ssl_cert"`
 | |
| 	// Path to cert key file
 | |
| 	SSLKey string `toml:"ssl_key"`
 | |
| 	// Use SSL but skip chain & host verification
 | |
| 	InsecureSkipVerify bool
 | |
| 
 | |
| 	RoundTripper http.RoundTripper
 | |
| }
 | |
| 
 | |
| var sampleConfig = `
 | |
|   ## URL for the kubelet
 | |
|   url = "http://1.1.1.1:10255"
 | |
| 
 | |
|   ## Use bearer token for authorization
 | |
|   # bearer_token = /path/to/bearer/token
 | |
| 
 | |
|   ## Optional SSL Config
 | |
|   # ssl_ca = /path/to/cafile
 | |
|   # ssl_cert = /path/to/certfile
 | |
|   # ssl_key = /path/to/keyfile
 | |
|   ## Use SSL but skip chain & host verification
 | |
|   # insecure_skip_verify = false
 | |
| `
 | |
| 
 | |
| const (
 | |
| 	summaryEndpoint = `%s/stats/summary`
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	inputs.Add("kubernetes", func() telegraf.Input {
 | |
| 		return &Kubernetes{}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| //SampleConfig returns a sample config
 | |
| func (k *Kubernetes) SampleConfig() string {
 | |
| 	return sampleConfig
 | |
| }
 | |
| 
 | |
| //Description returns the description of this plugin
 | |
| func (k *Kubernetes) Description() string {
 | |
| 	return "Read metrics from the kubernetes kubelet api"
 | |
| }
 | |
| 
 | |
| //Gather collects kubernetes metrics from a given URL
 | |
| func (k *Kubernetes) Gather(acc telegraf.Accumulator) error {
 | |
| 	var wg sync.WaitGroup
 | |
| 	errChan := errchan.New(1)
 | |
| 	wg.Add(1)
 | |
| 	go func(k *Kubernetes) {
 | |
| 		defer wg.Done()
 | |
| 		errChan.C <- k.gatherSummary(k.URL, acc)
 | |
| 	}(k)
 | |
| 	wg.Wait()
 | |
| 	return errChan.Error()
 | |
| }
 | |
| 
 | |
| func buildURL(endpoint string, base string) (*url.URL, error) {
 | |
| 	u := fmt.Sprintf(endpoint, base)
 | |
| 	addr, err := url.Parse(u)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("Unable to parse address '%s': %s", u, err)
 | |
| 	}
 | |
| 	return addr, nil
 | |
| }
 | |
| 
 | |
| func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
 | |
| 	url := fmt.Sprintf("%s/stats/summary", baseURL)
 | |
| 	var req, err = http.NewRequest("GET", url, nil)
 | |
| 	var token []byte
 | |
| 	var resp *http.Response
 | |
| 
 | |
| 	tlsCfg, err := internal.GetTLSConfig(k.SSLCert, k.SSLKey, k.SSLCA, k.InsecureSkipVerify)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if k.RoundTripper == nil {
 | |
| 		k.RoundTripper = &http.Transport{
 | |
| 			TLSHandshakeTimeout:   5 * time.Second,
 | |
| 			TLSClientConfig:       tlsCfg,
 | |
| 			ResponseHeaderTimeout: time.Duration(3 * time.Second),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if k.BearerToken != "" {
 | |
| 		token, err = ioutil.ReadFile(k.BearerToken)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		req.Header.Set("Authorization", "Bearer "+string(token))
 | |
| 	}
 | |
| 
 | |
| 	resp, err = k.RoundTripper.RoundTrip(req)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("error making HTTP request to %s: %s", url, err)
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode != http.StatusOK {
 | |
| 		return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
 | |
| 	}
 | |
| 
 | |
| 	summaryMetrics := &SummaryMetrics{}
 | |
| 	err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf(`Error parsing response: %s`, err)
 | |
| 	}
 | |
| 	buildSystemContainerMetrics(summaryMetrics, acc)
 | |
| 	buildNodeMetrics(summaryMetrics, acc)
 | |
| 	buildPodMetrics(summaryMetrics, acc)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func buildSystemContainerMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
 | |
| 	for _, container := range summaryMetrics.Node.SystemContainers {
 | |
| 		tags := map[string]string{
 | |
| 			"node_name":      summaryMetrics.Node.NodeName,
 | |
| 			"container_name": container.Name,
 | |
| 		}
 | |
| 		fields := make(map[string]interface{})
 | |
| 		fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
 | |
| 		fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
 | |
| 		fields["memory_usage_bytes"] = container.Memory.UsageBytes
 | |
| 		fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes
 | |
| 		fields["memory_rss_bytes"] = container.Memory.RSSBytes
 | |
| 		fields["memory_page_faults"] = container.Memory.PageFaults
 | |
| 		fields["memory_major_page_faults"] = container.Memory.MajorPageFaults
 | |
| 		fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes
 | |
| 		fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes
 | |
| 		fields["logsfs_avaialble_bytes"] = container.LogsFS.AvailableBytes
 | |
| 		fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
 | |
| 		acc.AddFields("kubernetes_system_container", fields, tags)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
 | |
| 	tags := map[string]string{
 | |
| 		"node_name": summaryMetrics.Node.NodeName,
 | |
| 	}
 | |
| 	fields := make(map[string]interface{})
 | |
| 	fields["cpu_usage_nanocores"] = summaryMetrics.Node.CPU.UsageNanoCores
 | |
| 	fields["cpu_usage_core_nanoseconds"] = summaryMetrics.Node.CPU.UsageCoreNanoSeconds
 | |
| 	fields["memory_available_bytes"] = summaryMetrics.Node.Memory.AvailableBytes
 | |
| 	fields["memory_usage_bytes"] = summaryMetrics.Node.Memory.UsageBytes
 | |
| 	fields["memory_working_set_bytes"] = summaryMetrics.Node.Memory.WorkingSetBytes
 | |
| 	fields["memory_rss_bytes"] = summaryMetrics.Node.Memory.RSSBytes
 | |
| 	fields["memory_page_faults"] = summaryMetrics.Node.Memory.PageFaults
 | |
| 	fields["memory_major_page_faults"] = summaryMetrics.Node.Memory.MajorPageFaults
 | |
| 	fields["network_rx_bytes"] = summaryMetrics.Node.Network.RXBytes
 | |
| 	fields["network_rx_errors"] = summaryMetrics.Node.Network.RXErrors
 | |
| 	fields["network_tx_bytes"] = summaryMetrics.Node.Network.TXBytes
 | |
| 	fields["network_tx_errors"] = summaryMetrics.Node.Network.TXErrors
 | |
| 	fields["fs_available_bytes"] = summaryMetrics.Node.FileSystem.AvailableBytes
 | |
| 	fields["fs_capacity_bytes"] = summaryMetrics.Node.FileSystem.CapacityBytes
 | |
| 	fields["fs_used_bytes"] = summaryMetrics.Node.FileSystem.UsedBytes
 | |
| 	fields["runtime_image_fs_available_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.AvailableBytes
 | |
| 	fields["runtime_image_fs_capacity_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.CapacityBytes
 | |
| 	fields["runtime_image_fs_used_bytes"] = summaryMetrics.Node.Runtime.ImageFileSystem.UsedBytes
 | |
| 	acc.AddFields("kubernetes_node", fields, tags)
 | |
| }
 | |
| 
 | |
| func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
 | |
| 	for _, pod := range summaryMetrics.Pods {
 | |
| 		for _, container := range pod.Containers {
 | |
| 			tags := map[string]string{
 | |
| 				"node_name":      summaryMetrics.Node.NodeName,
 | |
| 				"namespace":      pod.PodRef.Namespace,
 | |
| 				"container_name": container.Name,
 | |
| 				"pod_name":       pod.PodRef.Name,
 | |
| 			}
 | |
| 			fields := make(map[string]interface{})
 | |
| 			fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
 | |
| 			fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
 | |
| 			fields["memory_usage_bytes"] = container.Memory.UsageBytes
 | |
| 			fields["memory_working_set_bytes"] = container.Memory.WorkingSetBytes
 | |
| 			fields["memory_rss_bytes"] = container.Memory.RSSBytes
 | |
| 			fields["memory_page_faults"] = container.Memory.PageFaults
 | |
| 			fields["memory_major_page_faults"] = container.Memory.MajorPageFaults
 | |
| 			fields["rootfs_available_bytes"] = container.RootFS.AvailableBytes
 | |
| 			fields["rootfs_capacity_bytes"] = container.RootFS.CapacityBytes
 | |
| 			fields["rootfs_used_bytes"] = container.RootFS.UsedBytes
 | |
| 			fields["logsfs_avaialble_bytes"] = container.LogsFS.AvailableBytes
 | |
| 			fields["logsfs_capacity_bytes"] = container.LogsFS.CapacityBytes
 | |
| 			fields["logsfs_used_bytes"] = container.LogsFS.UsedBytes
 | |
| 			acc.AddFields("kubernetes_pod_container", fields, tags)
 | |
| 		}
 | |
| 
 | |
| 		for _, volume := range pod.Volumes {
 | |
| 			tags := map[string]string{
 | |
| 				"node_name":   summaryMetrics.Node.NodeName,
 | |
| 				"pod_name":    pod.PodRef.Name,
 | |
| 				"namespace":   pod.PodRef.Namespace,
 | |
| 				"volume_name": volume.Name,
 | |
| 			}
 | |
| 			fields := make(map[string]interface{})
 | |
| 			fields["available_bytes"] = volume.AvailableBytes
 | |
| 			fields["capacity_bytes"] = volume.CapacityBytes
 | |
| 			fields["used_bytes"] = volume.UsedBytes
 | |
| 			acc.AddFields("kubernetes_pod_volume", fields, tags)
 | |
| 		}
 | |
| 
 | |
| 		tags := map[string]string{
 | |
| 			"node_name": summaryMetrics.Node.NodeName,
 | |
| 			"pod_name":  pod.PodRef.Name,
 | |
| 			"namespace": pod.PodRef.Namespace,
 | |
| 		}
 | |
| 		fields := make(map[string]interface{})
 | |
| 		fields["rx_bytes"] = pod.Network.RXBytes
 | |
| 		fields["rx_errors"] = pod.Network.RXErrors
 | |
| 		fields["tx_bytes"] = pod.Network.TXBytes
 | |
| 		fields["tx_errors"] = pod.Network.TXErrors
 | |
| 		acc.AddFields("kubernetes_pod_network", fields, tags)
 | |
| 	}
 | |
| }
 |