input(docker): docker/engine-api

* Made required changes to get it to compile
* First manual tests looking good, still unit tests need fixing
* Made go linter happier
This commit is contained in:
Sergio Jimenez 2016-04-03 00:34:34 +02:00
parent 357849c348
commit d3636b5f0b
1 changed files with 89 additions and 95 deletions

View File

@ -3,6 +3,7 @@ package system
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"regexp" "regexp"
"strconv" "strconv"
@ -10,12 +11,15 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/fsouza/go-dockerclient"
) )
// Docker object
type Docker struct { type Docker struct {
Endpoint string Endpoint string
ContainerNames []string ContainerNames []string
@ -23,14 +27,14 @@ type Docker struct {
client DockerClient client DockerClient
} }
// DockerClient interface, useful for testing
type DockerClient interface { type DockerClient interface {
// Docker Client wrapper Info(ctx context.Context) (types.Info, error)
// Useful for test ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
Info() (*docker.Env, error) ContainerStats(ctx context.Context, containerID string, stream bool) (io.ReadCloser, error)
ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error)
Stats(opts docker.StatsOptions) error
} }
// KB, MB, GB, TB, PB...human friendly
const ( const (
KB = 1000 KB = 1000
MB = 1000 * KB MB = 1000 * KB
@ -52,28 +56,32 @@ var sampleConfig = `
container_names = [] container_names = []
` `
// Description returns input description
func (d *Docker) Description() string { func (d *Docker) Description() string {
return "Read metrics about docker containers" return "Read metrics about docker containers"
} }
// SampleConfig prints sampleConfig
func (d *Docker) SampleConfig() string { return sampleConfig } func (d *Docker) SampleConfig() string { return sampleConfig }
// Gather starts stats collection
func (d *Docker) Gather(acc telegraf.Accumulator) error { func (d *Docker) Gather(acc telegraf.Accumulator) error {
if d.client == nil { if d.client == nil {
var c *docker.Client var c *client.Client
var err error var err error
defaultHeaders := map[string]string{"User-Agent": "engine-api-cli-1.0"}
if d.Endpoint == "ENV" { if d.Endpoint == "ENV" {
c, err = docker.NewClientFromEnv() c, err = client.NewEnvClient()
if err != nil { if err != nil {
return err return err
} }
} else if d.Endpoint == "" { } else if d.Endpoint == "" {
c, err = docker.NewClient("unix:///var/run/docker.sock") c, err = client.NewClient("unix:///var/run/docker.sock", "", nil, defaultHeaders)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
c, err = docker.NewClient(d.Endpoint) c, err = client.NewClient(d.Endpoint, "", nil, defaultHeaders)
if err != nil { if err != nil {
return err return err
} }
@ -88,8 +96,8 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
} }
// List containers // List containers
opts := docker.ListContainersOptions{} opts := types.ContainerListOptions{}
containers, err := d.client.ListContainers(opts) containers, err := d.client.ContainerList(context.Background(), opts)
if err != nil { if err != nil {
return err return err
} }
@ -99,7 +107,7 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
wg.Add(len(containers)) wg.Add(len(containers))
for _, container := range containers { for _, container := range containers {
go func(c docker.APIContainers) { go func(c types.Container) {
defer wg.Done() defer wg.Done()
err := d.gatherContainer(c, acc) err := d.gatherContainer(c, acc)
if err != nil { if err != nil {
@ -114,23 +122,22 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars // Init vars
var driverStatus [][]string
dataFields := make(map[string]interface{}) dataFields := make(map[string]interface{})
metadataFields := make(map[string]interface{}) metadataFields := make(map[string]interface{})
now := time.Now() now := time.Now()
// Get info from docker daemon // Get info from docker daemon
info, err := d.client.Info() info, err := d.client.Info(context.Background())
if err != nil { if err != nil {
return err return err
} }
fields := map[string]interface{}{ fields := map[string]interface{}{
"n_cpus": info.GetInt64("NCPU"), "n_cpus": info.NCPU,
"n_used_file_descriptors": info.GetInt64("NFd"), "n_used_file_descriptors": info.NFd,
"n_containers": info.GetInt64("Containers"), "n_containers": info.Containers,
"n_images": info.GetInt64("Images"), "n_images": info.Images,
"n_goroutines": info.GetInt64("NGoroutines"), "n_goroutines": info.NGoroutines,
"n_listener_events": info.GetInt64("NEventsListener"), "n_listener_events": info.NEventsListener,
} }
// Add metrics // Add metrics
acc.AddFields("docker", acc.AddFields("docker",
@ -138,13 +145,13 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
nil, nil,
now) now)
acc.AddFields("docker", acc.AddFields("docker",
map[string]interface{}{"memory_total": info.GetInt64("MemTotal")}, map[string]interface{}{"memory_total": info.MemTotal},
map[string]string{"unit": "bytes"}, map[string]string{"unit": "bytes"},
now) now)
// Get storage metrics // Get storage metrics
driverStatusRaw := []byte(info.Get("DriverStatus")) //driverStatusRaw := []byte(info.DriverStatus)
json.Unmarshal(driverStatusRaw, &driverStatus) //json.Unmarshal(driverStatusRaw, &driverStatus)
for _, rawData := range driverStatus { for _, rawData := range info.DriverStatus {
// Try to convert string to int (bytes) // Try to convert string to int (bytes)
value, err := parseSize(rawData[1]) value, err := parseSize(rawData[1])
if err != nil { if err != nil {
@ -159,12 +166,12 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
now) now)
} else if strings.HasPrefix(name, "data_space_") { } else if strings.HasPrefix(name, "data_space_") {
// data space // data space
field_name := strings.TrimPrefix(name, "data_space_") fieldName := strings.TrimPrefix(name, "data_space_")
dataFields[field_name] = value dataFields[fieldName] = value
} else if strings.HasPrefix(name, "metadata_space_") { } else if strings.HasPrefix(name, "metadata_space_") {
// metadata space // metadata space
field_name := strings.TrimPrefix(name, "metadata_space_") fieldName := strings.TrimPrefix(name, "metadata_space_")
metadataFields[field_name] = value metadataFields[fieldName] = value
} }
} }
if len(dataFields) > 0 { if len(dataFields) > 0 {
@ -183,9 +190,10 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
} }
func (d *Docker) gatherContainer( func (d *Docker) gatherContainer(
container docker.APIContainers, container types.Container,
acc telegraf.Accumulator, acc telegraf.Accumulator,
) error { ) error {
var v *types.StatsJSON
// Parse container name // Parse container name
cname := "unknown" cname := "unknown"
if len(container.Names) > 0 { if len(container.Names) > 0 {
@ -204,28 +212,14 @@ func (d *Docker) gatherContainer(
} }
} }
statChan := make(chan *docker.Stats) r, err := d.client.ContainerStats(context.Background(), container.ID, false)
done := make(chan bool) if err != nil {
statOpts := docker.StatsOptions{ log.Printf("Error getting docker stats: %s\n", err.Error())
Stream: false,
ID: container.ID,
Stats: statChan,
Done: done,
Timeout: time.Duration(time.Second * 5),
} }
defer r.Close()
go func() { dec := json.NewDecoder(r)
err := d.client.Stats(statOpts) if err = dec.Decode(&v); err != nil {
if err != nil { log.Printf("Error decoding: %s\n", err.Error())
log.Printf("Error getting docker stats: %s\n", err.Error())
}
}()
stat := <-statChan
close(done)
if stat == nil {
return nil
} }
// Add labels to tags // Add labels to tags
@ -233,13 +227,13 @@ func (d *Docker) gatherContainer(
tags[k] = v tags[k] = v
} }
gatherContainerStats(stat, acc, tags) gatherContainerStats(v, acc, tags)
return nil return nil
} }
func gatherContainerStats( func gatherContainerStats(
stat *docker.Stats, stat *types.StatsJSON,
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
) { ) {
@ -250,35 +244,35 @@ func gatherContainerStats(
"usage": stat.MemoryStats.Usage, "usage": stat.MemoryStats.Usage,
"fail_count": stat.MemoryStats.Failcnt, "fail_count": stat.MemoryStats.Failcnt,
"limit": stat.MemoryStats.Limit, "limit": stat.MemoryStats.Limit,
"total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault, "total_pgmafault": stat.MemoryStats.Stats["total_pgmajfault"],
"cache": stat.MemoryStats.Stats.Cache, "cache": stat.MemoryStats.Stats["cache"],
"mapped_file": stat.MemoryStats.Stats.MappedFile, "mapped_file": stat.MemoryStats.Stats["mapped_file"],
"total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile, "total_inactive_file": stat.MemoryStats.Stats["total_inactive_file"],
"pgpgout": stat.MemoryStats.Stats.Pgpgout, "pgpgout": stat.MemoryStats.Stats["pagpgout"],
"rss": stat.MemoryStats.Stats.Rss, "rss": stat.MemoryStats.Stats["rss"],
"total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile, "total_mapped_file": stat.MemoryStats.Stats["total_mapped_file"],
"writeback": stat.MemoryStats.Stats.Writeback, "writeback": stat.MemoryStats.Stats["writeback"],
"unevictable": stat.MemoryStats.Stats.Unevictable, "unevictable": stat.MemoryStats.Stats["unevictable"],
"pgpgin": stat.MemoryStats.Stats.Pgpgin, "pgpgin": stat.MemoryStats.Stats["pgpgin"],
"total_unevictable": stat.MemoryStats.Stats.TotalUnevictable, "total_unevictable": stat.MemoryStats.Stats["total_unevictable"],
"pgmajfault": stat.MemoryStats.Stats.Pgmajfault, "pgmajfault": stat.MemoryStats.Stats["pgmajfault"],
"total_rss": stat.MemoryStats.Stats.TotalRss, "total_rss": stat.MemoryStats.Stats["total_rss"],
"total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge, "total_rss_huge": stat.MemoryStats.Stats["total_rss_huge"],
"total_writeback": stat.MemoryStats.Stats.TotalWriteback, "total_writeback": stat.MemoryStats.Stats["total_write_back"],
"total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon, "total_inactive_anon": stat.MemoryStats.Stats["total_inactive_anon"],
"rss_huge": stat.MemoryStats.Stats.RssHuge, "rss_huge": stat.MemoryStats.Stats["rss_huge"],
"hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit, "hierarchical_memory_limit": stat.MemoryStats.Stats["hierarchical_memory_limit"],
"total_pgfault": stat.MemoryStats.Stats.TotalPgfault, "total_pgfault": stat.MemoryStats.Stats["total_pgfault"],
"total_active_file": stat.MemoryStats.Stats.TotalActiveFile, "total_active_file": stat.MemoryStats.Stats["total_active_file"],
"active_anon": stat.MemoryStats.Stats.ActiveAnon, "active_anon": stat.MemoryStats.Stats["active_anon"],
"total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon, "total_active_anon": stat.MemoryStats.Stats["total_active_anon"],
"total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout, "total_pgpgout": stat.MemoryStats.Stats["total_pgpgout"],
"total_cache": stat.MemoryStats.Stats.TotalCache, "total_cache": stat.MemoryStats.Stats["total_cache"],
"inactive_anon": stat.MemoryStats.Stats.InactiveAnon, "inactive_anon": stat.MemoryStats.Stats["inactive_anon"],
"active_file": stat.MemoryStats.Stats.ActiveFile, "active_file": stat.MemoryStats.Stats["active_file"],
"pgfault": stat.MemoryStats.Stats.Pgfault, "pgfault": stat.MemoryStats.Stats["pgfault"],
"inactive_file": stat.MemoryStats.Stats.InactiveFile, "inactive_file": stat.MemoryStats.Stats["inactive_file"],
"total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin, "total_pgpgin": stat.MemoryStats.Stats["total_pgpgin"],
"usage_percent": calculateMemPercent(stat), "usage_percent": calculateMemPercent(stat),
} }
acc.AddFields("docker_mem", memfields, tags, now) acc.AddFields("docker_mem", memfields, tags, now)
@ -287,7 +281,7 @@ func gatherContainerStats(
"usage_total": stat.CPUStats.CPUUsage.TotalUsage, "usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, "usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode, "usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemCPUUsage, "usage_system": stat.CPUStats.SystemUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods, "throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods, "throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime, "throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
@ -323,7 +317,7 @@ func gatherContainerStats(
gatherBlockIOMetrics(stat, acc, tags, now) gatherBlockIOMetrics(stat, acc, tags, now)
} }
func calculateMemPercent(stat *docker.Stats) float64 { func calculateMemPercent(stat *types.StatsJSON) float64 {
var memPercent = 0.0 var memPercent = 0.0
if stat.MemoryStats.Limit > 0 { if stat.MemoryStats.Limit > 0 {
memPercent = float64(stat.MemoryStats.Usage) / float64(stat.MemoryStats.Limit) * 100.0 memPercent = float64(stat.MemoryStats.Usage) / float64(stat.MemoryStats.Limit) * 100.0
@ -331,11 +325,11 @@ func calculateMemPercent(stat *docker.Stats) float64 {
return memPercent return memPercent
} }
func calculateCPUPercent(stat *docker.Stats) float64 { func calculateCPUPercent(stat *types.StatsJSON) float64 {
var cpuPercent = 0.0 var cpuPercent = 0.0
// calculate the change for the cpu and system usage of the container in between readings // calculate the change for the cpu and system usage of the container in between readings
cpuDelta := float64(stat.CPUStats.CPUUsage.TotalUsage) - float64(stat.PreCPUStats.CPUUsage.TotalUsage) cpuDelta := float64(stat.CPUStats.CPUUsage.TotalUsage) - float64(stat.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(stat.CPUStats.SystemCPUUsage) - float64(stat.PreCPUStats.SystemCPUUsage) systemDelta := float64(stat.CPUStats.SystemUsage) - float64(stat.PreCPUStats.SystemUsage)
if systemDelta > 0.0 && cpuDelta > 0.0 { if systemDelta > 0.0 && cpuDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(len(stat.CPUStats.CPUUsage.PercpuUsage)) * 100.0 cpuPercent = (cpuDelta / systemDelta) * float64(len(stat.CPUStats.CPUUsage.PercpuUsage)) * 100.0
@ -344,7 +338,7 @@ func calculateCPUPercent(stat *docker.Stats) float64 {
} }
func gatherBlockIOMetrics( func gatherBlockIOMetrics(
stat *docker.Stats, stat *types.StatsJSON,
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
now time.Time, now time.Time,
@ -353,7 +347,7 @@ func gatherBlockIOMetrics(
// Make a map of devices to their block io stats // Make a map of devices to their block io stats
deviceStatMap := make(map[string]map[string]interface{}) deviceStatMap := make(map[string]map[string]interface{})
for _, metric := range blkioStats.IOServiceBytesRecursive { for _, metric := range blkioStats.IoServiceBytesRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device] _, ok := deviceStatMap[device]
if !ok { if !ok {
@ -364,7 +358,7 @@ func gatherBlockIOMetrics(
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOServicedRecursive { for _, metric := range blkioStats.IoServicedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device] _, ok := deviceStatMap[device]
if !ok { if !ok {
@ -375,31 +369,31 @@ func gatherBlockIOMetrics(
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOQueueRecursive { for _, metric := range blkioStats.IoQueuedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op)) field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOServiceTimeRecursive { for _, metric := range blkioStats.IoServiceTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op)) field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOWaitTimeRecursive { for _, metric := range blkioStats.IoWaitTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op)) field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOMergedRecursive { for _, metric := range blkioStats.IoMergedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op)) field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value
} }
for _, metric := range blkioStats.IOTimeRecursive { for _, metric := range blkioStats.IoTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op)) field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value deviceStatMap[device][field] = metric.Value