feature(kubernetes plugin): Adds support for a kuberentes plugin
fixes #569 This commit also refactors the existing docker plugin so that the kubernetes plugin can gather metrics from the docker system.
This commit is contained in:
parent
a11e07e250
commit
7c162b09d4
14
Godeps
14
Godeps
|
@ -5,19 +5,27 @@ github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339
|
|||
github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757
|
||||
github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804
|
||||
github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
|
||||
github.com/blang/semver aea32c919a18e5ef4537bbd283ff29594b1b0165
|
||||
github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873
|
||||
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
|
||||
github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70
|
||||
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
|
||||
github.com/docker/docker 2b27fe17a1b3fb8472fde96d768fa70996adf201
|
||||
github.com/docker/go-units 5d2041e26a699eaca682e2ea41c8f891e1060444
|
||||
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
|
||||
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
|
||||
github.com/emicklei/go-restful/swagger b86acf97a74ed7603ac78d012f5535b4d587b156
|
||||
github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4
|
||||
github.com/ghodss/yaml 73d445a93680fa1a78ae23a5839bad48f32ba1ee
|
||||
github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3
|
||||
github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239
|
||||
github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263
|
||||
github.com/golang/glog fca8c8854093a154ff1eb580aae10276ad6b1b5f
|
||||
github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3
|
||||
github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a
|
||||
github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
|
||||
github.com/google/cadvisor/info/v1 bef8522964928c3de92319c548adbc07fa1ea113
|
||||
github.com/google/gofuzz e4af62d086c303f2bed467b227fc0a034b218916
|
||||
github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d
|
||||
github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690
|
||||
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
|
||||
|
@ -28,6 +36,7 @@ github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24
|
|||
github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6
|
||||
github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6
|
||||
github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264
|
||||
github.com/juju/ratelimit 77ed1c8a01217656d2080ad51981f6e99adaa177
|
||||
github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38
|
||||
github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f
|
||||
github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453
|
||||
|
@ -35,6 +44,7 @@ github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
|
|||
github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
|
||||
github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
|
||||
github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
|
||||
github.com/opencontainers/runc 072fa6fdccaba49b11ba91ad4265b1ec1043787e
|
||||
github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
|
||||
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
|
||||
github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f
|
||||
|
@ -47,6 +57,8 @@ github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
|
|||
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
|
||||
github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94
|
||||
github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18
|
||||
github.com/spf13/pflag 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7
|
||||
github.com/ugorji/go/codec 646ae4a518c1c3be0739df898118d9bccf993858
|
||||
github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3
|
||||
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
||||
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
|
||||
|
@ -57,3 +69,5 @@ gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
|
|||
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
|
||||
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
|
||||
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
|
||||
k8s.io/kubernetes/pkg 3254df3a7c9e25b243a3a9b0abdba1888cf52956
|
||||
speter.net/go/exp/math/dec/inf 42ca6cd68aa922bc3f32f1e056e61b65945d9ad7
|
||||
|
|
|
@ -0,0 +1,301 @@
|
|||
package docker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
api "k8s.io/kubernetes/pkg/api"
|
||||
kube "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
|
||||
godocker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
// CreateClient sets the client value in the Docker struct
|
||||
func CreateClient(endpoint string) (*godocker.Client, error) {
|
||||
var c *godocker.Client
|
||||
var err error
|
||||
if endpoint == "ENV" {
|
||||
c, err = godocker.NewClientFromEnv()
|
||||
} else if endpoint == "" {
|
||||
c, err = godocker.NewClient("unix:///var/run/docker.sock")
|
||||
} else {
|
||||
c, err = godocker.NewClient(endpoint)
|
||||
}
|
||||
return c, err
|
||||
}
|
||||
|
||||
func GatherContainerMetrics(client *godocker.Client, kubeClient *kube.Client, containerNames []string, acc telegraf.Accumulator) error {
|
||||
opts := godocker.ListContainersOptions{}
|
||||
containers, err := client.ListContainers(opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(containers))
|
||||
for _, container := range containers {
|
||||
go func(c godocker.APIContainers) {
|
||||
defer wg.Done()
|
||||
err := gatherContainer(client, kubeClient, c, containerNames, acc)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
}(container)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func gatherContainer(
|
||||
client *godocker.Client,
|
||||
kubeClient *kube.Client,
|
||||
container godocker.APIContainers,
|
||||
containerNames []string,
|
||||
acc telegraf.Accumulator,
|
||||
) error {
|
||||
// Parse container name
|
||||
cname := "unknown"
|
||||
if len(container.Names) > 0 {
|
||||
// Not sure what to do with other names, just take the first.
|
||||
cname = strings.TrimPrefix(container.Names[0], "/")
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"container_id": container.ID,
|
||||
"container_name": cname,
|
||||
"container_image": container.Image,
|
||||
}
|
||||
|
||||
if kubeClient != nil {
|
||||
tags = gatherKubernetesLabels(kubeClient, container.ID, tags)
|
||||
}
|
||||
|
||||
if len(containerNames) > 0 {
|
||||
if !sliceContains(cname, containerNames) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
statChan := make(chan *godocker.Stats)
|
||||
done := make(chan bool)
|
||||
statOpts := godocker.StatsOptions{
|
||||
Stream: false,
|
||||
ID: container.ID,
|
||||
Stats: statChan,
|
||||
Done: done,
|
||||
Timeout: time.Duration(time.Second * 5),
|
||||
}
|
||||
|
||||
go func() {
|
||||
client.Stats(statOpts)
|
||||
}()
|
||||
|
||||
stat := <-statChan
|
||||
close(done)
|
||||
|
||||
// Add labels to tags
|
||||
for k, v := range container.Labels {
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
gatherContainerStats(stat, acc, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func gatherContainerStats(
|
||||
stat *godocker.Stats,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) {
|
||||
now := stat.Read
|
||||
|
||||
memfields := map[string]interface{}{
|
||||
"max_usage": stat.MemoryStats.MaxUsage,
|
||||
"usage": stat.MemoryStats.Usage,
|
||||
"fail_count": stat.MemoryStats.Failcnt,
|
||||
"limit": stat.MemoryStats.Limit,
|
||||
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
|
||||
"cache": stat.MemoryStats.Stats.Cache,
|
||||
"mapped_file": stat.MemoryStats.Stats.MappedFile,
|
||||
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
|
||||
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
|
||||
"rss": stat.MemoryStats.Stats.Rss,
|
||||
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
|
||||
"writeback": stat.MemoryStats.Stats.Writeback,
|
||||
"unevictable": stat.MemoryStats.Stats.Unevictable,
|
||||
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
|
||||
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
|
||||
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
|
||||
"total_rss": stat.MemoryStats.Stats.TotalRss,
|
||||
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
|
||||
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
|
||||
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
|
||||
"rss_huge": stat.MemoryStats.Stats.RssHuge,
|
||||
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
|
||||
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
|
||||
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
|
||||
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
|
||||
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
|
||||
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
|
||||
"total_cache": stat.MemoryStats.Stats.TotalCache,
|
||||
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
|
||||
"active_file": stat.MemoryStats.Stats.ActiveFile,
|
||||
"pgfault": stat.MemoryStats.Stats.Pgfault,
|
||||
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
|
||||
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
|
||||
}
|
||||
acc.AddFields("docker_mem", memfields, tags, now)
|
||||
|
||||
cpufields := map[string]interface{}{
|
||||
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
|
||||
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
|
||||
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
|
||||
"usage_system": stat.CPUStats.SystemCPUUsage,
|
||||
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
|
||||
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
|
||||
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
|
||||
}
|
||||
cputags := copyTags(tags)
|
||||
cputags["cpu"] = "cpu-total"
|
||||
acc.AddFields("docker_cpu", cpufields, cputags, now)
|
||||
|
||||
for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage {
|
||||
percputags := copyTags(tags)
|
||||
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
|
||||
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now)
|
||||
}
|
||||
|
||||
for network, netstats := range stat.Networks {
|
||||
netfields := map[string]interface{}{
|
||||
"rx_dropped": netstats.RxDropped,
|
||||
"rx_bytes": netstats.RxBytes,
|
||||
"rx_errors": netstats.RxErrors,
|
||||
"tx_packets": netstats.TxPackets,
|
||||
"tx_dropped": netstats.TxDropped,
|
||||
"rx_packets": netstats.RxPackets,
|
||||
"tx_errors": netstats.TxErrors,
|
||||
"tx_bytes": netstats.TxBytes,
|
||||
}
|
||||
// Create a new network tag dictionary for the "network" tag
|
||||
nettags := copyTags(tags)
|
||||
nettags["network"] = network
|
||||
acc.AddFields("docker_net", netfields, nettags, now)
|
||||
}
|
||||
|
||||
gatherBlockIOMetrics(stat, acc, tags, now)
|
||||
}
|
||||
|
||||
func gatherBlockIOMetrics(
|
||||
stat *godocker.Stats,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
now time.Time,
|
||||
) {
|
||||
blkioStats := stat.BlkioStats
|
||||
// Make a map of devices to their block io stats
|
||||
deviceStatMap := make(map[string]map[string]interface{})
|
||||
|
||||
for _, metric := range blkioStats.IOServiceBytesRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
_, ok := deviceStatMap[device]
|
||||
if !ok {
|
||||
deviceStatMap[device] = make(map[string]interface{})
|
||||
}
|
||||
|
||||
field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOServicedRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
_, ok := deviceStatMap[device]
|
||||
if !ok {
|
||||
deviceStatMap[device] = make(map[string]interface{})
|
||||
}
|
||||
|
||||
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOQueueRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOServiceTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOWaitTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOMergedRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.SectorsRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for device, fields := range deviceStatMap {
|
||||
iotags := copyTags(tags)
|
||||
iotags["device"] = device
|
||||
acc.AddFields("docker_blkio", fields, iotags, now)
|
||||
}
|
||||
}
|
||||
|
||||
func copyTags(in map[string]string) map[string]string {
|
||||
out := make(map[string]string)
|
||||
for k, v := range in {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func gatherKubernetesLabels(kubeClient *kube.Client, containerID string, tags map[string]string) map[string]string {
|
||||
podClient := kubeClient.Pods(api.NamespaceAll)
|
||||
pods, _ := podClient.List(api.ListOptions{})
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, containerStatus := range pod.Status.ContainerStatuses {
|
||||
podContainerID := strings.TrimPrefix(containerStatus.ContainerID, "docker://")
|
||||
if podContainerID == containerID {
|
||||
for k, v := range pod.ObjectMeta.Labels {
|
||||
tags[k] = v
|
||||
}
|
||||
return tags
|
||||
}
|
||||
}
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
func sliceContains(in string, sl []string) bool {
|
||||
for _, str := range sl {
|
||||
if str == in {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package system
|
||||
package docker
|
||||
|
||||
import (
|
||||
"testing"
|
|
@ -14,6 +14,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
|
||||
|
|
|
@ -98,9 +98,9 @@ on the availability of per-cpu stats on your system.
|
|||
### Tags:
|
||||
|
||||
- All stats have the following tags:
|
||||
- cont_id (container ID)
|
||||
- cont_image (container image)
|
||||
- cont_name (container name)
|
||||
- container_id (container ID)
|
||||
- container_image (container image)
|
||||
- container_name (container name)
|
||||
- docker_cpu specific:
|
||||
- cpu
|
||||
- docker_net specific:
|
||||
|
|
|
@ -1,23 +1,17 @@
|
|||
package system
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
docker "github.com/influxdata/telegraf/internal/docker"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
godocker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
type Docker struct {
|
||||
Endpoint string
|
||||
ContainerNames []string
|
||||
|
||||
client *docker.Client
|
||||
client *godocker.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
|
@ -36,279 +30,17 @@ func (d *Docker) Description() string {
|
|||
func (d *Docker) SampleConfig() string { return sampleConfig }
|
||||
|
||||
func (d *Docker) Gather(acc telegraf.Accumulator) error {
|
||||
if d.client == nil {
|
||||
var c *docker.Client
|
||||
var err error
|
||||
if d.Endpoint == "ENV" {
|
||||
c, err = docker.NewClientFromEnv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if d.Endpoint == "" {
|
||||
c, err = docker.NewClient("unix:///var/run/docker.sock")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
c, err = docker.NewClient(d.Endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
d.client = c
|
||||
}
|
||||
|
||||
opts := docker.ListContainersOptions{}
|
||||
containers, err := d.client.ListContainers(opts)
|
||||
d.client, err = docker.CreateClient(d.Endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(containers))
|
||||
for _, container := range containers {
|
||||
go func(c docker.APIContainers) {
|
||||
defer wg.Done()
|
||||
err := d.gatherContainer(c, acc)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
if d.client != nil {
|
||||
err = docker.GatherContainerMetrics(d.client, nil, d.ContainerNames, acc)
|
||||
}
|
||||
}(container)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Docker) gatherContainer(
|
||||
container docker.APIContainers,
|
||||
acc telegraf.Accumulator,
|
||||
) error {
|
||||
// Parse container name
|
||||
cname := "unknown"
|
||||
if len(container.Names) > 0 {
|
||||
// Not sure what to do with other names, just take the first.
|
||||
cname = strings.TrimPrefix(container.Names[0], "/")
|
||||
}
|
||||
|
||||
tags := map[string]string{
|
||||
"cont_id": container.ID,
|
||||
"cont_name": cname,
|
||||
"cont_image": container.Image,
|
||||
}
|
||||
if len(d.ContainerNames) > 0 {
|
||||
if !sliceContains(cname, d.ContainerNames) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
statChan := make(chan *docker.Stats)
|
||||
done := make(chan bool)
|
||||
statOpts := docker.StatsOptions{
|
||||
Stream: false,
|
||||
ID: container.ID,
|
||||
Stats: statChan,
|
||||
Done: done,
|
||||
Timeout: time.Duration(time.Second * 5),
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := d.client.Stats(statOpts)
|
||||
if err != nil {
|
||||
log.Printf("Error getting docker stats: %s\n", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
stat := <-statChan
|
||||
close(done)
|
||||
|
||||
if stat == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add labels to tags
|
||||
for k, v := range container.Labels {
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
gatherContainerStats(stat, acc, tags)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func gatherContainerStats(
|
||||
stat *docker.Stats,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
) {
|
||||
now := stat.Read
|
||||
|
||||
memfields := map[string]interface{}{
|
||||
"max_usage": stat.MemoryStats.MaxUsage,
|
||||
"usage": stat.MemoryStats.Usage,
|
||||
"fail_count": stat.MemoryStats.Failcnt,
|
||||
"limit": stat.MemoryStats.Limit,
|
||||
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
|
||||
"cache": stat.MemoryStats.Stats.Cache,
|
||||
"mapped_file": stat.MemoryStats.Stats.MappedFile,
|
||||
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
|
||||
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
|
||||
"rss": stat.MemoryStats.Stats.Rss,
|
||||
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
|
||||
"writeback": stat.MemoryStats.Stats.Writeback,
|
||||
"unevictable": stat.MemoryStats.Stats.Unevictable,
|
||||
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
|
||||
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
|
||||
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
|
||||
"total_rss": stat.MemoryStats.Stats.TotalRss,
|
||||
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
|
||||
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
|
||||
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
|
||||
"rss_huge": stat.MemoryStats.Stats.RssHuge,
|
||||
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
|
||||
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
|
||||
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
|
||||
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
|
||||
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
|
||||
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
|
||||
"total_cache": stat.MemoryStats.Stats.TotalCache,
|
||||
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
|
||||
"active_file": stat.MemoryStats.Stats.ActiveFile,
|
||||
"pgfault": stat.MemoryStats.Stats.Pgfault,
|
||||
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
|
||||
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
|
||||
}
|
||||
acc.AddFields("docker_mem", memfields, tags, now)
|
||||
|
||||
cpufields := map[string]interface{}{
|
||||
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
|
||||
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
|
||||
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
|
||||
"usage_system": stat.CPUStats.SystemCPUUsage,
|
||||
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
|
||||
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
|
||||
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
|
||||
}
|
||||
cputags := copyTags(tags)
|
||||
cputags["cpu"] = "cpu-total"
|
||||
acc.AddFields("docker_cpu", cpufields, cputags, now)
|
||||
|
||||
for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage {
|
||||
percputags := copyTags(tags)
|
||||
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
|
||||
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now)
|
||||
}
|
||||
|
||||
for network, netstats := range stat.Networks {
|
||||
netfields := map[string]interface{}{
|
||||
"rx_dropped": netstats.RxDropped,
|
||||
"rx_bytes": netstats.RxBytes,
|
||||
"rx_errors": netstats.RxErrors,
|
||||
"tx_packets": netstats.TxPackets,
|
||||
"tx_dropped": netstats.TxDropped,
|
||||
"rx_packets": netstats.RxPackets,
|
||||
"tx_errors": netstats.TxErrors,
|
||||
"tx_bytes": netstats.TxBytes,
|
||||
}
|
||||
// Create a new network tag dictionary for the "network" tag
|
||||
nettags := copyTags(tags)
|
||||
nettags["network"] = network
|
||||
acc.AddFields("docker_net", netfields, nettags, now)
|
||||
}
|
||||
|
||||
gatherBlockIOMetrics(stat, acc, tags, now)
|
||||
}
|
||||
|
||||
func gatherBlockIOMetrics(
|
||||
stat *docker.Stats,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
now time.Time,
|
||||
) {
|
||||
blkioStats := stat.BlkioStats
|
||||
// Make a map of devices to their block io stats
|
||||
deviceStatMap := make(map[string]map[string]interface{})
|
||||
|
||||
for _, metric := range blkioStats.IOServiceBytesRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
_, ok := deviceStatMap[device]
|
||||
if !ok {
|
||||
deviceStatMap[device] = make(map[string]interface{})
|
||||
}
|
||||
|
||||
field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOServicedRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
_, ok := deviceStatMap[device]
|
||||
if !ok {
|
||||
deviceStatMap[device] = make(map[string]interface{})
|
||||
}
|
||||
|
||||
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOQueueRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOServiceTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOWaitTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOMergedRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.IOTimeRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for _, metric := range blkioStats.SectorsRecursive {
|
||||
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
|
||||
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op))
|
||||
deviceStatMap[device][field] = metric.Value
|
||||
}
|
||||
|
||||
for device, fields := range deviceStatMap {
|
||||
iotags := copyTags(tags)
|
||||
iotags["device"] = device
|
||||
acc.AddFields("docker_blkio", fields, iotags, now)
|
||||
}
|
||||
}
|
||||
|
||||
func copyTags(in map[string]string) map[string]string {
|
||||
out := make(map[string]string)
|
||||
for k, v := range in {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func sliceContains(in string, sl []string) bool {
|
||||
for _, str := range sl {
|
||||
if str == in {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# Kubernetes Input Plugin
|
||||
The kubernetes plugin uses the docker remote API to gather metrics on running
|
||||
docker containers. You can read Docker's documentation for their remote API
|
||||
[here](https://docs.docker.com/engine/reference/api/docker_remote_api_v1.20/#get-container-stats-based-on-resource-usage)
|
||||
|
||||
It then decorates those metrics with the kubernetes labels (and docker labels).
|
||||
|
||||
The kubernetes plugin uses the excellent
|
||||
[fsouza go-dockerclient](https://github.com/fsouza/go-dockerclient) library to
|
||||
gather stats. Documentation for the library can be found
|
||||
[here](https://godoc.org/github.com/fsouza/go-dockerclient) and documentation
|
||||
for the stat structure can be found
|
||||
[here](https://godoc.org/github.com/fsouza/go-dockerclient#Stats)
|
||||
|
||||
### Configuration:
|
||||
|
||||
```
|
||||
# Read metrics about docker containers
|
||||
[[inputs.kubernetes]]
|
||||
# Docker Endpoint
|
||||
# To use TCP, set endpoint = "tcp://[ip]:[port]"
|
||||
# To use environment variables (ie, docker-machine), set endpoint = "ENV"
|
||||
endpoint = "unix:///var/run/docker.sock"
|
||||
# Only collect metrics for these containers, collect all if empty
|
||||
container_names = []
|
||||
```
|
||||
|
||||
### Measurements & Fields:
|
||||
|
||||
Please see the [docker input plugin](../docker/README.md) for detailed list of measurements.
|
|
@ -0,0 +1,59 @@
|
|||
package system
|
||||
|
||||
import (
|
||||
kube "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
|
||||
godocker "github.com/fsouza/go-dockerclient"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
docker "github.com/influxdata/telegraf/internal/docker"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
type Kubernetes struct {
|
||||
DockerEndpoint string
|
||||
ContainerNames []string
|
||||
DockerClient *godocker.Client
|
||||
KubernetesClient *kube.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# Docker Endpoint
|
||||
# To use TCP, set docker_endpoint = "tcp://[ip]:[port]"
|
||||
# To use environment variables (ie, docker-machine), set docker_endpoint = "ENV"
|
||||
docker_endpoint = "unix:///var/run/docker.sock"
|
||||
# Only collect metrics for these containers, collect all if empty
|
||||
container_names = []
|
||||
`
|
||||
|
||||
func (k *Kubernetes) Description() string {
|
||||
return "Read metrics about docker containers running on a kubernetes cluster"
|
||||
}
|
||||
|
||||
func (k *Kubernetes) SampleConfig() string { return sampleConfig }
|
||||
|
||||
func (k *Kubernetes) Gather(acc telegraf.Accumulator) error {
|
||||
var err error
|
||||
|
||||
k.KubernetesClient, err = kube.NewInCluster()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
k.DockerClient, err = docker.CreateClient(k.DockerEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if k.DockerClient != nil {
|
||||
err = docker.GatherContainerMetrics(k.DockerClient, k.KubernetesClient, k.ContainerNames, acc)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("kubernetes", func() telegraf.Input {
|
||||
return &Kubernetes{}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue