From 7c162b09d413de44ff1bf27aa2dec013beb383bd Mon Sep 17 00:00:00 2001 From: Jonathan Chauncey Date: Tue, 26 Jan 2016 15:16:46 -0700 Subject: [PATCH] feature(kubernetes plugin): Adds support for a kuberentes plugin fixes #569 This commit also refactors the existing docker plugin so that the kubernetes plugin can gather metrics from the docker system. --- Godeps | 14 + internal/docker/docker.go | 301 ++++++++++++++++++ .../inputs => internal}/docker/docker_test.go | 2 +- plugins/inputs/all/all.go | 1 + plugins/inputs/docker/README.md | 6 +- plugins/inputs/docker/docker.go | 284 +---------------- plugins/inputs/kubernetes/README.md | 30 ++ plugins/inputs/kubernetes/kubernetes.go | 59 ++++ 8 files changed, 417 insertions(+), 280 deletions(-) create mode 100644 internal/docker/docker.go rename {plugins/inputs => internal}/docker/docker_test.go (99%) create mode 100644 plugins/inputs/kubernetes/README.md create mode 100644 plugins/inputs/kubernetes/kubernetes.go diff --git a/Godeps b/Godeps index 3393a1cee..c52736509 100644 --- a/Godeps +++ b/Godeps @@ -5,19 +5,27 @@ github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d +github.com/blang/semver aea32c919a18e5ef4537bbd283ff29594b1b0165 github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d +github.com/docker/docker 2b27fe17a1b3fb8472fde96d768fa70996adf201 +github.com/docker/go-units 5d2041e26a699eaca682e2ea41c8f891e1060444 github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 +github.com/emicklei/go-restful/swagger b86acf97a74ed7603ac78d012f5535b4d587b156 github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 +github.com/ghodss/yaml 73d445a93680fa1a78ae23a5839bad48f32ba1ee github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 +github.com/golang/glog fca8c8854093a154ff1eb580aae10276ad6b1b5f github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 +github.com/google/cadvisor/info/v1 bef8522964928c3de92319c548adbc07fa1ea113 +github.com/google/gofuzz e4af62d086c303f2bed467b227fc0a034b218916 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 @@ -28,6 +36,7 @@ github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 +github.com/juju/ratelimit 77ed1c8a01217656d2080ad51981f6e99adaa177 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453 @@ -35,6 +44,7 @@ github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9 github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f +github.com/opencontainers/runc 072fa6fdccaba49b11ba91ad4265b1ec1043787e github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2 github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f @@ -47,6 +57,8 @@ github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18 +github.com/spf13/pflag 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7 +github.com/ugorji/go/codec 646ae4a518c1c3be0739df898118d9bccf993858 github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 @@ -57,3 +69,5 @@ gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 +k8s.io/kubernetes/pkg 3254df3a7c9e25b243a3a9b0abdba1888cf52956 +speter.net/go/exp/math/dec/inf 42ca6cd68aa922bc3f32f1e056e61b65945d9ad7 diff --git a/internal/docker/docker.go b/internal/docker/docker.go new file mode 100644 index 000000000..85876ba20 --- /dev/null +++ b/internal/docker/docker.go @@ -0,0 +1,301 @@ +package docker + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + + api "k8s.io/kubernetes/pkg/api" + kube "k8s.io/kubernetes/pkg/client/unversioned" + + godocker "github.com/fsouza/go-dockerclient" +) + +// CreateClient sets the client value in the Docker struct +func CreateClient(endpoint string) (*godocker.Client, error) { + var c *godocker.Client + var err error + if endpoint == "ENV" { + c, err = godocker.NewClientFromEnv() + } else if endpoint == "" { + c, err = godocker.NewClient("unix:///var/run/docker.sock") + } else { + c, err = godocker.NewClient(endpoint) + } + return c, err +} + +func GatherContainerMetrics(client *godocker.Client, kubeClient *kube.Client, containerNames []string, acc telegraf.Accumulator) error { + opts := godocker.ListContainersOptions{} + containers, err := client.ListContainers(opts) + if err != nil { + return err + } + + var wg sync.WaitGroup + wg.Add(len(containers)) + for _, container := range containers { + go func(c godocker.APIContainers) { + defer wg.Done() + err := gatherContainer(client, kubeClient, c, containerNames, acc) + if err != nil { + fmt.Println(err.Error()) + } + }(container) + } + wg.Wait() + return nil +} + +func gatherContainer( + client *godocker.Client, + kubeClient *kube.Client, + container godocker.APIContainers, + containerNames []string, + acc telegraf.Accumulator, +) error { + // Parse container name + cname := "unknown" + if len(container.Names) > 0 { + // Not sure what to do with other names, just take the first. + cname = strings.TrimPrefix(container.Names[0], "/") + } + + tags := map[string]string{ + "container_id": container.ID, + "container_name": cname, + "container_image": container.Image, + } + + if kubeClient != nil { + tags = gatherKubernetesLabels(kubeClient, container.ID, tags) + } + + if len(containerNames) > 0 { + if !sliceContains(cname, containerNames) { + return nil + } + } + + statChan := make(chan *godocker.Stats) + done := make(chan bool) + statOpts := godocker.StatsOptions{ + Stream: false, + ID: container.ID, + Stats: statChan, + Done: done, + Timeout: time.Duration(time.Second * 5), + } + + go func() { + client.Stats(statOpts) + }() + + stat := <-statChan + close(done) + + // Add labels to tags + for k, v := range container.Labels { + tags[k] = v + } + + gatherContainerStats(stat, acc, tags) + + return nil +} + +func gatherContainerStats( + stat *godocker.Stats, + acc telegraf.Accumulator, + tags map[string]string, +) { + now := stat.Read + + memfields := map[string]interface{}{ + "max_usage": stat.MemoryStats.MaxUsage, + "usage": stat.MemoryStats.Usage, + "fail_count": stat.MemoryStats.Failcnt, + "limit": stat.MemoryStats.Limit, + "total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault, + "cache": stat.MemoryStats.Stats.Cache, + "mapped_file": stat.MemoryStats.Stats.MappedFile, + "total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile, + "pgpgout": stat.MemoryStats.Stats.Pgpgout, + "rss": stat.MemoryStats.Stats.Rss, + "total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile, + "writeback": stat.MemoryStats.Stats.Writeback, + "unevictable": stat.MemoryStats.Stats.Unevictable, + "pgpgin": stat.MemoryStats.Stats.Pgpgin, + "total_unevictable": stat.MemoryStats.Stats.TotalUnevictable, + "pgmajfault": stat.MemoryStats.Stats.Pgmajfault, + "total_rss": stat.MemoryStats.Stats.TotalRss, + "total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge, + "total_writeback": stat.MemoryStats.Stats.TotalWriteback, + "total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon, + "rss_huge": stat.MemoryStats.Stats.RssHuge, + "hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit, + "total_pgfault": stat.MemoryStats.Stats.TotalPgfault, + "total_active_file": stat.MemoryStats.Stats.TotalActiveFile, + "active_anon": stat.MemoryStats.Stats.ActiveAnon, + "total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon, + "total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout, + "total_cache": stat.MemoryStats.Stats.TotalCache, + "inactive_anon": stat.MemoryStats.Stats.InactiveAnon, + "active_file": stat.MemoryStats.Stats.ActiveFile, + "pgfault": stat.MemoryStats.Stats.Pgfault, + "inactive_file": stat.MemoryStats.Stats.InactiveFile, + "total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin, + } + acc.AddFields("docker_mem", memfields, tags, now) + + cpufields := map[string]interface{}{ + "usage_total": stat.CPUStats.CPUUsage.TotalUsage, + "usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, + "usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode, + "usage_system": stat.CPUStats.SystemCPUUsage, + "throttling_periods": stat.CPUStats.ThrottlingData.Periods, + "throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods, + "throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime, + } + cputags := copyTags(tags) + cputags["cpu"] = "cpu-total" + acc.AddFields("docker_cpu", cpufields, cputags, now) + + for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage { + percputags := copyTags(tags) + percputags["cpu"] = fmt.Sprintf("cpu%d", i) + acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now) + } + + for network, netstats := range stat.Networks { + netfields := map[string]interface{}{ + "rx_dropped": netstats.RxDropped, + "rx_bytes": netstats.RxBytes, + "rx_errors": netstats.RxErrors, + "tx_packets": netstats.TxPackets, + "tx_dropped": netstats.TxDropped, + "rx_packets": netstats.RxPackets, + "tx_errors": netstats.TxErrors, + "tx_bytes": netstats.TxBytes, + } + // Create a new network tag dictionary for the "network" tag + nettags := copyTags(tags) + nettags["network"] = network + acc.AddFields("docker_net", netfields, nettags, now) + } + + gatherBlockIOMetrics(stat, acc, tags, now) +} + +func gatherBlockIOMetrics( + stat *godocker.Stats, + acc telegraf.Accumulator, + tags map[string]string, + now time.Time, +) { + blkioStats := stat.BlkioStats + // Make a map of devices to their block io stats + deviceStatMap := make(map[string]map[string]interface{}) + + for _, metric := range blkioStats.IOServiceBytesRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + _, ok := deviceStatMap[device] + if !ok { + deviceStatMap[device] = make(map[string]interface{}) + } + + field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOServicedRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + _, ok := deviceStatMap[device] + if !ok { + deviceStatMap[device] = make(map[string]interface{}) + } + + field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOQueueRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOServiceTimeRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOWaitTimeRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOMergedRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.IOTimeRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for _, metric := range blkioStats.SectorsRecursive { + device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) + field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op)) + deviceStatMap[device][field] = metric.Value + } + + for device, fields := range deviceStatMap { + iotags := copyTags(tags) + iotags["device"] = device + acc.AddFields("docker_blkio", fields, iotags, now) + } +} + +func copyTags(in map[string]string) map[string]string { + out := make(map[string]string) + for k, v := range in { + out[k] = v + } + return out +} + +func gatherKubernetesLabels(kubeClient *kube.Client, containerID string, tags map[string]string) map[string]string { + podClient := kubeClient.Pods(api.NamespaceAll) + pods, _ := podClient.List(api.ListOptions{}) + + for _, pod := range pods.Items { + for _, containerStatus := range pod.Status.ContainerStatuses { + podContainerID := strings.TrimPrefix(containerStatus.ContainerID, "docker://") + if podContainerID == containerID { + for k, v := range pod.ObjectMeta.Labels { + tags[k] = v + } + return tags + } + } + } + return tags +} + +func sliceContains(in string, sl []string) bool { + for _, str := range sl { + if str == in { + return true + } + } + return false +} diff --git a/plugins/inputs/docker/docker_test.go b/internal/docker/docker_test.go similarity index 99% rename from plugins/inputs/docker/docker_test.go rename to internal/docker/docker_test.go index 9b85d1029..be6ebea47 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/internal/docker/docker_test.go @@ -1,4 +1,4 @@ -package system +package docker import ( "testing" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 52ab428f8..eb33c5718 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -14,6 +14,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" + _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" diff --git a/plugins/inputs/docker/README.md b/plugins/inputs/docker/README.md index fa662ca80..5c861e947 100644 --- a/plugins/inputs/docker/README.md +++ b/plugins/inputs/docker/README.md @@ -98,9 +98,9 @@ on the availability of per-cpu stats on your system. ### Tags: - All stats have the following tags: - - cont_id (container ID) - - cont_image (container image) - - cont_name (container name) + - container_id (container ID) + - container_image (container image) + - container_name (container name) - docker_cpu specific: - cpu - docker_net specific: diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 6814c190a..5e15d72d8 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -1,23 +1,17 @@ package system import ( - "fmt" - "log" - "strings" - "sync" - "time" - "github.com/influxdata/telegraf" + docker "github.com/influxdata/telegraf/internal/docker" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/fsouza/go-dockerclient" + godocker "github.com/fsouza/go-dockerclient" ) type Docker struct { Endpoint string ContainerNames []string - - client *docker.Client + client *godocker.Client } var sampleConfig = ` @@ -36,279 +30,17 @@ func (d *Docker) Description() string { func (d *Docker) SampleConfig() string { return sampleConfig } func (d *Docker) Gather(acc telegraf.Accumulator) error { - if d.client == nil { - var c *docker.Client - var err error - if d.Endpoint == "ENV" { - c, err = docker.NewClientFromEnv() - if err != nil { - return err - } - } else if d.Endpoint == "" { - c, err = docker.NewClient("unix:///var/run/docker.sock") - if err != nil { - return err - } - } else { - c, err = docker.NewClient(d.Endpoint) - if err != nil { - return err - } - } - d.client = c - } + var err error - opts := docker.ListContainersOptions{} - containers, err := d.client.ListContainers(opts) + d.client, err = docker.CreateClient(d.Endpoint) if err != nil { return err } - var wg sync.WaitGroup - wg.Add(len(containers)) - for _, container := range containers { - go func(c docker.APIContainers) { - defer wg.Done() - err := d.gatherContainer(c, acc) - if err != nil { - fmt.Println(err.Error()) - } - }(container) + if d.client != nil { + err = docker.GatherContainerMetrics(d.client, nil, d.ContainerNames, acc) } - wg.Wait() - - return nil -} - -func (d *Docker) gatherContainer( - container docker.APIContainers, - acc telegraf.Accumulator, -) error { - // Parse container name - cname := "unknown" - if len(container.Names) > 0 { - // Not sure what to do with other names, just take the first. - cname = strings.TrimPrefix(container.Names[0], "/") - } - - tags := map[string]string{ - "cont_id": container.ID, - "cont_name": cname, - "cont_image": container.Image, - } - if len(d.ContainerNames) > 0 { - if !sliceContains(cname, d.ContainerNames) { - return nil - } - } - - statChan := make(chan *docker.Stats) - done := make(chan bool) - statOpts := docker.StatsOptions{ - Stream: false, - ID: container.ID, - Stats: statChan, - Done: done, - Timeout: time.Duration(time.Second * 5), - } - - go func() { - err := d.client.Stats(statOpts) - if err != nil { - log.Printf("Error getting docker stats: %s\n", err.Error()) - } - }() - - stat := <-statChan - close(done) - - if stat == nil { - return nil - } - - // Add labels to tags - for k, v := range container.Labels { - tags[k] = v - } - - gatherContainerStats(stat, acc, tags) - - return nil -} - -func gatherContainerStats( - stat *docker.Stats, - acc telegraf.Accumulator, - tags map[string]string, -) { - now := stat.Read - - memfields := map[string]interface{}{ - "max_usage": stat.MemoryStats.MaxUsage, - "usage": stat.MemoryStats.Usage, - "fail_count": stat.MemoryStats.Failcnt, - "limit": stat.MemoryStats.Limit, - "total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault, - "cache": stat.MemoryStats.Stats.Cache, - "mapped_file": stat.MemoryStats.Stats.MappedFile, - "total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile, - "pgpgout": stat.MemoryStats.Stats.Pgpgout, - "rss": stat.MemoryStats.Stats.Rss, - "total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile, - "writeback": stat.MemoryStats.Stats.Writeback, - "unevictable": stat.MemoryStats.Stats.Unevictable, - "pgpgin": stat.MemoryStats.Stats.Pgpgin, - "total_unevictable": stat.MemoryStats.Stats.TotalUnevictable, - "pgmajfault": stat.MemoryStats.Stats.Pgmajfault, - "total_rss": stat.MemoryStats.Stats.TotalRss, - "total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge, - "total_writeback": stat.MemoryStats.Stats.TotalWriteback, - "total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon, - "rss_huge": stat.MemoryStats.Stats.RssHuge, - "hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit, - "total_pgfault": stat.MemoryStats.Stats.TotalPgfault, - "total_active_file": stat.MemoryStats.Stats.TotalActiveFile, - "active_anon": stat.MemoryStats.Stats.ActiveAnon, - "total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon, - "total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout, - "total_cache": stat.MemoryStats.Stats.TotalCache, - "inactive_anon": stat.MemoryStats.Stats.InactiveAnon, - "active_file": stat.MemoryStats.Stats.ActiveFile, - "pgfault": stat.MemoryStats.Stats.Pgfault, - "inactive_file": stat.MemoryStats.Stats.InactiveFile, - "total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin, - } - acc.AddFields("docker_mem", memfields, tags, now) - - cpufields := map[string]interface{}{ - "usage_total": stat.CPUStats.CPUUsage.TotalUsage, - "usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, - "usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode, - "usage_system": stat.CPUStats.SystemCPUUsage, - "throttling_periods": stat.CPUStats.ThrottlingData.Periods, - "throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods, - "throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime, - } - cputags := copyTags(tags) - cputags["cpu"] = "cpu-total" - acc.AddFields("docker_cpu", cpufields, cputags, now) - - for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage { - percputags := copyTags(tags) - percputags["cpu"] = fmt.Sprintf("cpu%d", i) - acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now) - } - - for network, netstats := range stat.Networks { - netfields := map[string]interface{}{ - "rx_dropped": netstats.RxDropped, - "rx_bytes": netstats.RxBytes, - "rx_errors": netstats.RxErrors, - "tx_packets": netstats.TxPackets, - "tx_dropped": netstats.TxDropped, - "rx_packets": netstats.RxPackets, - "tx_errors": netstats.TxErrors, - "tx_bytes": netstats.TxBytes, - } - // Create a new network tag dictionary for the "network" tag - nettags := copyTags(tags) - nettags["network"] = network - acc.AddFields("docker_net", netfields, nettags, now) - } - - gatherBlockIOMetrics(stat, acc, tags, now) -} - -func gatherBlockIOMetrics( - stat *docker.Stats, - acc telegraf.Accumulator, - tags map[string]string, - now time.Time, -) { - blkioStats := stat.BlkioStats - // Make a map of devices to their block io stats - deviceStatMap := make(map[string]map[string]interface{}) - - for _, metric := range blkioStats.IOServiceBytesRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - _, ok := deviceStatMap[device] - if !ok { - deviceStatMap[device] = make(map[string]interface{}) - } - - field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOServicedRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - _, ok := deviceStatMap[device] - if !ok { - deviceStatMap[device] = make(map[string]interface{}) - } - - field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOQueueRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOServiceTimeRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOWaitTimeRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOMergedRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.IOTimeRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for _, metric := range blkioStats.SectorsRecursive { - device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) - field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op)) - deviceStatMap[device][field] = metric.Value - } - - for device, fields := range deviceStatMap { - iotags := copyTags(tags) - iotags["device"] = device - acc.AddFields("docker_blkio", fields, iotags, now) - } -} - -func copyTags(in map[string]string) map[string]string { - out := make(map[string]string) - for k, v := range in { - out[k] = v - } - return out -} - -func sliceContains(in string, sl []string) bool { - for _, str := range sl { - if str == in { - return true - } - } - return false + return err } func init() { diff --git a/plugins/inputs/kubernetes/README.md b/plugins/inputs/kubernetes/README.md new file mode 100644 index 000000000..cf709ac5f --- /dev/null +++ b/plugins/inputs/kubernetes/README.md @@ -0,0 +1,30 @@ +# Kubernetes Input Plugin +The kubernetes plugin uses the docker remote API to gather metrics on running +docker containers. You can read Docker's documentation for their remote API +[here](https://docs.docker.com/engine/reference/api/docker_remote_api_v1.20/#get-container-stats-based-on-resource-usage) + +It then decorates those metrics with the kubernetes labels (and docker labels). + +The kubernetes plugin uses the excellent +[fsouza go-dockerclient](https://github.com/fsouza/go-dockerclient) library to +gather stats. Documentation for the library can be found +[here](https://godoc.org/github.com/fsouza/go-dockerclient) and documentation +for the stat structure can be found +[here](https://godoc.org/github.com/fsouza/go-dockerclient#Stats) + +### Configuration: + +``` +# Read metrics about docker containers +[[inputs.kubernetes]] + # Docker Endpoint + # To use TCP, set endpoint = "tcp://[ip]:[port]" + # To use environment variables (ie, docker-machine), set endpoint = "ENV" + endpoint = "unix:///var/run/docker.sock" + # Only collect metrics for these containers, collect all if empty + container_names = [] +``` + +### Measurements & Fields: + +Please see the [docker input plugin](../docker/README.md) for detailed list of measurements. diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go new file mode 100644 index 000000000..4e5ecec28 --- /dev/null +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -0,0 +1,59 @@ +package system + +import ( + kube "k8s.io/kubernetes/pkg/client/unversioned" + + godocker "github.com/fsouza/go-dockerclient" + + "github.com/influxdata/telegraf" + docker "github.com/influxdata/telegraf/internal/docker" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Kubernetes struct { + DockerEndpoint string + ContainerNames []string + DockerClient *godocker.Client + KubernetesClient *kube.Client +} + +var sampleConfig = ` + # Docker Endpoint + # To use TCP, set docker_endpoint = "tcp://[ip]:[port]" + # To use environment variables (ie, docker-machine), set docker_endpoint = "ENV" + docker_endpoint = "unix:///var/run/docker.sock" + # Only collect metrics for these containers, collect all if empty + container_names = [] +` + +func (k *Kubernetes) Description() string { + return "Read metrics about docker containers running on a kubernetes cluster" +} + +func (k *Kubernetes) SampleConfig() string { return sampleConfig } + +func (k *Kubernetes) Gather(acc telegraf.Accumulator) error { + var err error + + k.KubernetesClient, err = kube.NewInCluster() + if err != nil { + return err + } + + k.DockerClient, err = docker.CreateClient(k.DockerEndpoint) + if err != nil { + return err + } + + if k.DockerClient != nil { + err = docker.GatherContainerMetrics(k.DockerClient, k.KubernetesClient, k.ContainerNames, acc) + } + + return err +} + +func init() { + inputs.Add("kubernetes", func() telegraf.Input { + return &Kubernetes{} + }) +}