Refactor the docker plugin, use go-dockerclient throughout

fixes #503
fixes #463
This commit is contained in:
Cameron Sparr
2016-01-20 16:21:19 -07:00
parent e0dc1ef5bd
commit 4d0dc8b7c8
10 changed files with 662 additions and 283 deletions

View File

@@ -0,0 +1,148 @@
# Docker Input Plugin
The docker plugin uses the docker remote API to gather metrics on running
docker containers. You can read Docker's documentation for their remote API
[here](https://docs.docker.com/engine/reference/api/docker_remote_api_v1.20/#get-container-stats-based-on-resource-usage)
The docker plugin uses the excellent
[fsouza go-dockerclient](https://github.com/fsouza/go-dockerclient) library to
gather stats. Documentation for the library can be found
[here](https://godoc.org/github.com/fsouza/go-dockerclient) and documentation
for the stat structure can be found
[here](https://godoc.org/github.com/fsouza/go-dockerclient#Stats)
### Configuration:
```
# Read metrics about docker containers
[[inputs.docker]]
# Docker Endpoint
# To use TCP, set endpoint = "tcp://[ip]:[port]"
# To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"
# Only collect metrics for these containers, collect all if empty
container_names = []
```
### Measurements & Fields:
Every effort was made to preserve the names based on the JSON response from the
docker API.
Note that the docker_cpu metric may appear multiple times per collection, based
on the availability of per-cpu stats on your system.
- docker_mem
- total_pgmafault
- cache
- mapped_file
- total_inactive_file
- pgpgout
- rss
- total_mapped_file
- writeback
- unevictable
- pgpgin
- total_unevictable
- pgmajfault
- total_rss
- total_rss_huge
- total_writeback
- total_inactive_anon
- rss_huge
- hierarchical_memory_limit
- total_pgfault
- total_active_file
- active_anon
- total_active_anon
- total_pgpgout
- total_cache
- inactive_anon
- active_file
- pgfault
- inactive_file
- total_pgpgin
- max_usage
- usage
- failcnt
- limit
- docker_cpu
- throttling_periods
- throttling_throttled_periods
- throttling_throttled_time
- usage_in_kernelmode
- usage_in_usermode
- usage_system
- usage_total
- docker_net
- rx_dropped
- rx_bytes
- rx_errors
- tx_packets
- tx_dropped
- rx_packets
- tx_errors
- tx_bytes
- docker_blkio
- io_service_bytes_recursive_async
- io_service_bytes_recursive_read
- io_service_bytes_recursive_sync
- io_service_bytes_recursive_total
- io_service_bytes_recursive_write
- io_serviced_recursive_async
- io_serviced_recursive_read
- io_serviced_recursive_sync
- io_serviced_recursive_total
- io_serviced_recursive_write
### Tags:
- All stats have the following tags:
- cont_id (container ID)
- cont_image (container image)
- cont_name (container name)
- docker_cpu specific:
- cpu
- docker_net specific:
- network
- docker_blkio specific:
- device
### Example Output:
```
% ./telegraf -config ~/ws/telegraf.conf -input-filter docker -test
* Plugin: docker, Collection 1
> docker_mem,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka \
active_anon=52568064i,active_file=6926336i,cache=12038144i,fail_count=0i,\
hierarchical_memory_limit=9223372036854771712i,inactive_anon=52707328i,\
inactive_file=5111808i,limit=1044578304i,mapped_file=10301440i,\
max_usage=140656640i,pgfault=63762i,pgmajfault=2837i,pgpgin=73355i,\
pgpgout=45736i,rss=105275392i,rss_huge=4194304i,total_active_anon=52568064i,\
total_active_file=6926336i,total_cache=12038144i,total_inactive_anon=52707328i,\
total_inactive_file=5111808i,total_mapped_file=10301440i,total_pgfault=63762i,\
total_pgmafault=0i,total_pgpgin=73355i,total_pgpgout=45736i,\
total_rss=105275392i,total_rss_huge=4194304i,total_unevictable=0i,\
total_writeback=0i,unevictable=0i,usage=117440512i,writeback=0i 1453409536840126713
> docker_cpu,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka,cpu=cpu-total \
throttling_periods=0i,throttling_throttled_periods=0i,\
throttling_throttled_time=0i,usage_in_kernelmode=440000000i,\
usage_in_usermode=2290000000i,usage_system=84795360000000i,\
usage_total=6628208865i 1453409536840126713
> docker_cpu,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka,cpu=cpu0 \
usage_total=6628208865i 1453409536840126713
> docker_net,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka,network=eth0 \
rx_bytes=7468i,rx_dropped=0i,rx_errors=0i,rx_packets=94i,tx_bytes=946i,\
tx_dropped=0i,tx_errors=0i,tx_packets=13i 1453409536840126713
> docker_blkio,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka,device=8:0 \
io_service_bytes_recursive_async=80216064i,io_service_bytes_recursive_read=79925248i,\
io_service_bytes_recursive_sync=77824i,io_service_bytes_recursive_total=80293888i,\
io_service_bytes_recursive_write=368640i,io_serviced_recursive_async=6562i,\
io_serviced_recursive_read=6492i,io_serviced_recursive_sync=37i,\
io_serviced_recursive_total=6599i,io_serviced_recursive_write=107i 1453409536840126713
```

View File

@@ -0,0 +1,312 @@
package system
import (
"fmt"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/fsouza/go-dockerclient"
)
type Docker struct {
Endpoint string
ContainerNames []string
client *docker.Client
}
var sampleConfig = `
# Docker Endpoint
# To use TCP, set endpoint = "tcp://[ip]:[port]"
# To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"
# Only collect metrics for these containers, collect all if empty
container_names = []
`
func (d *Docker) Description() string {
return "Read metrics about docker containers"
}
func (d *Docker) SampleConfig() string { return sampleConfig }
func (d *Docker) Gather(acc inputs.Accumulator) error {
if d.client == nil {
var c *docker.Client
var err error
if d.Endpoint == "ENV" {
c, err = docker.NewClientFromEnv()
if err != nil {
return err
}
} else if d.Endpoint == "" {
c, err = docker.NewClient("unix:///var/run/docker.sock")
if err != nil {
return err
}
} else {
c, err = docker.NewClient(d.Endpoint)
if err != nil {
return err
}
}
d.client = c
}
opts := docker.ListContainersOptions{}
containers, err := d.client.ListContainers(opts)
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(len(containers))
for _, container := range containers {
go func(c docker.APIContainers) {
defer wg.Done()
err := d.gatherContainer(c, acc)
if err != nil {
fmt.Println(err.Error())
}
}(container)
}
wg.Wait()
return nil
}
func (d *Docker) gatherContainer(
container docker.APIContainers,
acc inputs.Accumulator,
) error {
// Parse container name
cname := "unknown"
if len(container.Names) > 0 {
// Not sure what to do with other names, just take the first.
cname = strings.TrimPrefix(container.Names[0], "/")
}
tags := map[string]string{
"cont_id": container.ID,
"cont_name": cname,
"cont_image": container.Image,
}
if len(d.ContainerNames) > 0 {
if !sliceContains(cname, d.ContainerNames) {
return nil
}
}
statChan := make(chan *docker.Stats)
done := make(chan bool)
statOpts := docker.StatsOptions{
Stream: false,
ID: container.ID,
Stats: statChan,
Done: done,
Timeout: time.Duration(time.Second * 5),
}
var err error
go func() {
err = d.client.Stats(statOpts)
}()
stat := <-statChan
if err != nil {
return err
}
// Add labels to tags
for k, v := range container.Labels {
tags[k] = v
}
gatherContainerStats(stat, acc, tags)
return nil
}
func gatherContainerStats(
stat *docker.Stats,
acc inputs.Accumulator,
tags map[string]string,
) {
now := stat.Read
memfields := map[string]interface{}{
"max_usage": stat.MemoryStats.MaxUsage,
"usage": stat.MemoryStats.Usage,
"fail_count": stat.MemoryStats.Failcnt,
"limit": stat.MemoryStats.Limit,
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault,
"cache": stat.MemoryStats.Stats.Cache,
"mapped_file": stat.MemoryStats.Stats.MappedFile,
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile,
"pgpgout": stat.MemoryStats.Stats.Pgpgout,
"rss": stat.MemoryStats.Stats.Rss,
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile,
"writeback": stat.MemoryStats.Stats.Writeback,
"unevictable": stat.MemoryStats.Stats.Unevictable,
"pgpgin": stat.MemoryStats.Stats.Pgpgin,
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable,
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault,
"total_rss": stat.MemoryStats.Stats.TotalRss,
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge,
"total_writeback": stat.MemoryStats.Stats.TotalWriteback,
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon,
"rss_huge": stat.MemoryStats.Stats.RssHuge,
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit,
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault,
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile,
"active_anon": stat.MemoryStats.Stats.ActiveAnon,
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon,
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout,
"total_cache": stat.MemoryStats.Stats.TotalCache,
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon,
"active_file": stat.MemoryStats.Stats.ActiveFile,
"pgfault": stat.MemoryStats.Stats.Pgfault,
"inactive_file": stat.MemoryStats.Stats.InactiveFile,
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin,
}
acc.AddFields("docker_mem", memfields, tags, now)
cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemCPUUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_cpu", cpufields, cputags, now)
for i, percpu := range stat.CPUStats.CPUUsage.PercpuUsage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
acc.AddFields("docker_cpu", map[string]interface{}{"usage_total": percpu}, percputags, now)
}
for network, netstats := range stat.Networks {
netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped,
"rx_bytes": netstats.RxBytes,
"rx_errors": netstats.RxErrors,
"tx_packets": netstats.TxPackets,
"tx_dropped": netstats.TxDropped,
"rx_packets": netstats.RxPackets,
"tx_errors": netstats.TxErrors,
"tx_bytes": netstats.TxBytes,
}
// Create a new network tag dictionary for the "network" tag
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_net", netfields, nettags, now)
}
gatherBlockIOMetrics(stat, acc, tags, now)
}
func gatherBlockIOMetrics(
stat *docker.Stats,
acc inputs.Accumulator,
tags map[string]string,
now time.Time,
) {
blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats
deviceStatMap := make(map[string]map[string]interface{})
for _, metric := range blkioStats.IOServiceBytesRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}
field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOServicedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOQueueRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOServiceTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOWaitTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOMergedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IOTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.SectorsRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("sectors_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for device, fields := range deviceStatMap {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_blkio", fields, iotags, now)
}
}
func copyTags(in map[string]string) map[string]string {
out := make(map[string]string)
for k, v := range in {
out[k] = v
}
return out
}
func sliceContains(in string, sl []string) bool {
for _, str := range sl {
if str == in {
return true
}
}
return false
}
func init() {
inputs.Add("docker", func() inputs.Input {
return &Docker{}
})
}

View File

@@ -0,0 +1,190 @@
package system
import (
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/fsouza/go-dockerclient"
)
func TestDockerGatherContainerStats(t *testing.T) {
var acc testutil.Accumulator
stats := testStats()
tags := map[string]string{
"cont_id": "foobarbaz",
"cont_name": "redis",
"cont_image": "redis/image",
}
gatherContainerStats(stats, &acc, tags)
// test docker_net measurement
netfields := map[string]interface{}{
"rx_dropped": uint64(1),
"rx_bytes": uint64(2),
"rx_errors": uint64(3),
"tx_packets": uint64(4),
"tx_dropped": uint64(1),
"rx_packets": uint64(2),
"tx_errors": uint64(3),
"tx_bytes": uint64(4),
}
nettags := copyTags(tags)
nettags["network"] = "eth0"
acc.AssertContainsTaggedFields(t, "docker_net", netfields, nettags)
// test docker_blkio measurement
blkiotags := copyTags(tags)
blkiotags["device"] = "6:0"
blkiofields := map[string]interface{}{
"io_service_bytes_recursive_read": uint64(100),
"io_serviced_recursive_write": uint64(101),
}
acc.AssertContainsTaggedFields(t, "docker_blkio", blkiofields, blkiotags)
// test docker_mem measurement
memfields := map[string]interface{}{
"max_usage": uint64(1001),
"usage": uint64(1111),
"fail_count": uint64(1),
"limit": uint64(20),
"total_pgmafault": uint64(0),
"cache": uint64(0),
"mapped_file": uint64(0),
"total_inactive_file": uint64(0),
"pgpgout": uint64(0),
"rss": uint64(0),
"total_mapped_file": uint64(0),
"writeback": uint64(0),
"unevictable": uint64(0),
"pgpgin": uint64(0),
"total_unevictable": uint64(0),
"pgmajfault": uint64(0),
"total_rss": uint64(44),
"total_rss_huge": uint64(444),
"total_writeback": uint64(55),
"total_inactive_anon": uint64(0),
"rss_huge": uint64(0),
"hierarchical_memory_limit": uint64(0),
"total_pgfault": uint64(0),
"total_active_file": uint64(0),
"active_anon": uint64(0),
"total_active_anon": uint64(0),
"total_pgpgout": uint64(0),
"total_cache": uint64(0),
"inactive_anon": uint64(0),
"active_file": uint64(1),
"pgfault": uint64(2),
"inactive_file": uint64(3),
"total_pgpgin": uint64(4),
}
acc.AssertContainsTaggedFields(t, "docker_mem", memfields, tags)
// test docker_cpu measurement
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
cpufields := map[string]interface{}{
"usage_total": uint64(500),
"usage_in_usermode": uint64(100),
"usage_in_kernelmode": uint64(200),
"usage_system": uint64(100),
"throttling_periods": uint64(1),
"throttling_throttled_periods": uint64(0),
"throttling_throttled_time": uint64(0),
}
acc.AssertContainsTaggedFields(t, "docker_cpu", cpufields, cputags)
cputags["cpu"] = "cpu0"
cpu0fields := map[string]interface{}{
"usage_total": uint64(1),
}
acc.AssertContainsTaggedFields(t, "docker_cpu", cpu0fields, cputags)
cputags["cpu"] = "cpu1"
cpu1fields := map[string]interface{}{
"usage_total": uint64(1002),
}
acc.AssertContainsTaggedFields(t, "docker_cpu", cpu1fields, cputags)
}
func testStats() *docker.Stats {
stats := &docker.Stats{
Read: time.Now(),
Networks: make(map[string]docker.NetworkStats),
}
stats.CPUStats.CPUUsage.PercpuUsage = []uint64{1, 1002}
stats.CPUStats.CPUUsage.UsageInUsermode = 100
stats.CPUStats.CPUUsage.TotalUsage = 500
stats.CPUStats.CPUUsage.UsageInKernelmode = 200
stats.CPUStats.SystemCPUUsage = 100
stats.CPUStats.ThrottlingData.Periods = 1
stats.MemoryStats.Stats.TotalPgmafault = 0
stats.MemoryStats.Stats.Cache = 0
stats.MemoryStats.Stats.MappedFile = 0
stats.MemoryStats.Stats.TotalInactiveFile = 0
stats.MemoryStats.Stats.Pgpgout = 0
stats.MemoryStats.Stats.Rss = 0
stats.MemoryStats.Stats.TotalMappedFile = 0
stats.MemoryStats.Stats.Writeback = 0
stats.MemoryStats.Stats.Unevictable = 0
stats.MemoryStats.Stats.Pgpgin = 0
stats.MemoryStats.Stats.TotalUnevictable = 0
stats.MemoryStats.Stats.Pgmajfault = 0
stats.MemoryStats.Stats.TotalRss = 44
stats.MemoryStats.Stats.TotalRssHuge = 444
stats.MemoryStats.Stats.TotalWriteback = 55
stats.MemoryStats.Stats.TotalInactiveAnon = 0
stats.MemoryStats.Stats.RssHuge = 0
stats.MemoryStats.Stats.HierarchicalMemoryLimit = 0
stats.MemoryStats.Stats.TotalPgfault = 0
stats.MemoryStats.Stats.TotalActiveFile = 0
stats.MemoryStats.Stats.ActiveAnon = 0
stats.MemoryStats.Stats.TotalActiveAnon = 0
stats.MemoryStats.Stats.TotalPgpgout = 0
stats.MemoryStats.Stats.TotalCache = 0
stats.MemoryStats.Stats.InactiveAnon = 0
stats.MemoryStats.Stats.ActiveFile = 1
stats.MemoryStats.Stats.Pgfault = 2
stats.MemoryStats.Stats.InactiveFile = 3
stats.MemoryStats.Stats.TotalPgpgin = 4
stats.MemoryStats.MaxUsage = 1001
stats.MemoryStats.Usage = 1111
stats.MemoryStats.Failcnt = 1
stats.MemoryStats.Limit = 20
stats.Networks["eth0"] = docker.NetworkStats{
RxDropped: 1,
RxBytes: 2,
RxErrors: 3,
TxPackets: 4,
TxDropped: 1,
RxPackets: 2,
TxErrors: 3,
TxBytes: 4,
}
sbr := docker.BlkioStatsEntry{
Major: 6,
Minor: 0,
Op: "read",
Value: 100,
}
sr := docker.BlkioStatsEntry{
Major: 6,
Minor: 0,
Op: "write",
Value: 101,
}
stats.BlkioStats.IOServiceBytesRecursive = append(
stats.BlkioStats.IOServiceBytesRecursive, sbr)
stats.BlkioStats.IOServicedRecursive = append(
stats.BlkioStats.IOServicedRecursive, sr)
return stats
}