diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index cdc8ec1e5..b1bbc63b3 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -3,6 +3,7 @@ package system import ( "encoding/json" "fmt" + "io" "log" "regexp" "strconv" @@ -10,12 +11,15 @@ import ( "sync" "time" + "golang.org/x/net/context" + + "github.com/docker/engine-api/client" + "github.com/docker/engine-api/types" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - - "github.com/fsouza/go-dockerclient" ) +// Docker object type Docker struct { Endpoint string ContainerNames []string @@ -23,14 +27,14 @@ type Docker struct { client DockerClient } +// DockerClient interface, useful for testing type DockerClient interface { - // Docker Client wrapper - // Useful for test - Info() (*docker.Env, error) - ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) - Stats(opts docker.StatsOptions) error + Info(ctx context.Context) (types.Info, error) + ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) + ContainerStats(ctx context.Context, containerID string, stream bool) (io.ReadCloser, error) } +// KB, MB, GB, TB, PB...human friendly const ( KB = 1000 MB = 1000 * KB @@ -52,28 +56,32 @@ var sampleConfig = ` container_names = [] ` +// Description returns input description func (d *Docker) Description() string { return "Read metrics about docker containers" } +// SampleConfig prints sampleConfig func (d *Docker) SampleConfig() string { return sampleConfig } +// Gather starts stats collection func (d *Docker) Gather(acc telegraf.Accumulator) error { if d.client == nil { - var c *docker.Client + var c *client.Client var err error + defaultHeaders := map[string]string{"User-Agent": "engine-api-cli-1.0"} if d.Endpoint == "ENV" { - c, err = docker.NewClientFromEnv() + c, err = client.NewEnvClient() if err != nil { return err } } else if d.Endpoint == "" { - c, err = docker.NewClient("unix:///var/run/docker.sock") + c, err = client.NewClient("unix:///var/run/docker.sock", "", nil, defaultHeaders) if err != nil { return err } } else { - c, err = docker.NewClient(d.Endpoint) + c, err = client.NewClient(d.Endpoint, "", nil, defaultHeaders) if err != nil { return err } @@ -88,8 +96,8 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { } // List containers - opts := docker.ListContainersOptions{} - containers, err := d.client.ListContainers(opts) + opts := types.ContainerListOptions{} + containers, err := d.client.ContainerList(context.Background(), opts) if err != nil { return err } @@ -99,7 +107,7 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { wg.Add(len(containers)) for _, container := range containers { - go func(c docker.APIContainers) { + go func(c types.Container) { defer wg.Done() err := d.gatherContainer(c, acc) if err != nil { @@ -114,23 +122,22 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { // Init vars - var driverStatus [][]string dataFields := make(map[string]interface{}) metadataFields := make(map[string]interface{}) now := time.Now() // Get info from docker daemon - info, err := d.client.Info() + info, err := d.client.Info(context.Background()) if err != nil { return err } fields := map[string]interface{}{ - "n_cpus": info.GetInt64("NCPU"), - "n_used_file_descriptors": info.GetInt64("NFd"), - "n_containers": info.GetInt64("Containers"), - "n_images": info.GetInt64("Images"), - "n_goroutines": info.GetInt64("NGoroutines"), - "n_listener_events": info.GetInt64("NEventsListener"), + "n_cpus": info.NCPU, + "n_used_file_descriptors": info.NFd, + "n_containers": info.Containers, + "n_images": info.Images, + "n_goroutines": info.NGoroutines, + "n_listener_events": info.NEventsListener, } // Add metrics acc.AddFields("docker", @@ -138,13 +145,13 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { nil, now) acc.AddFields("docker", - map[string]interface{}{"memory_total": info.GetInt64("MemTotal")}, + map[string]interface{}{"memory_total": info.MemTotal}, map[string]string{"unit": "bytes"}, now) // Get storage metrics - driverStatusRaw := []byte(info.Get("DriverStatus")) - json.Unmarshal(driverStatusRaw, &driverStatus) - for _, rawData := range driverStatus { + //driverStatusRaw := []byte(info.DriverStatus) + //json.Unmarshal(driverStatusRaw, &driverStatus) + for _, rawData := range info.DriverStatus { // Try to convert string to int (bytes) value, err := parseSize(rawData[1]) if err != nil { @@ -159,12 +166,12 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { now) } else if strings.HasPrefix(name, "data_space_") { // data space - field_name := strings.TrimPrefix(name, "data_space_") - dataFields[field_name] = value + fieldName := strings.TrimPrefix(name, "data_space_") + dataFields[fieldName] = value } else if strings.HasPrefix(name, "metadata_space_") { // metadata space - field_name := strings.TrimPrefix(name, "metadata_space_") - metadataFields[field_name] = value + fieldName := strings.TrimPrefix(name, "metadata_space_") + metadataFields[fieldName] = value } } if len(dataFields) > 0 { @@ -183,9 +190,10 @@ func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { } func (d *Docker) gatherContainer( - container docker.APIContainers, + container types.Container, acc telegraf.Accumulator, ) error { + var v *types.StatsJSON // Parse container name cname := "unknown" if len(container.Names) > 0 { @@ -204,28 +212,14 @@ func (d *Docker) gatherContainer( } } - statChan := make(chan *docker.Stats) - done := make(chan bool) - statOpts := docker.StatsOptions{ - Stream: false, - ID: container.ID, - Stats: statChan, - Done: done, - Timeout: time.Duration(time.Second * 5), + r, err := d.client.ContainerStats(context.Background(), container.ID, false) + if err != nil { + log.Printf("Error getting docker stats: %s\n", err.Error()) } - - go func() { - err := d.client.Stats(statOpts) - if err != nil { - log.Printf("Error getting docker stats: %s\n", err.Error()) - } - }() - - stat := <-statChan - close(done) - - if stat == nil { - return nil + defer r.Close() + dec := json.NewDecoder(r) + if err = dec.Decode(&v); err != nil { + log.Printf("Error decoding: %s\n", err.Error()) } // Add labels to tags @@ -233,13 +227,13 @@ func (d *Docker) gatherContainer( tags[k] = v } - gatherContainerStats(stat, acc, tags) + gatherContainerStats(v, acc, tags) return nil } func gatherContainerStats( - stat *docker.Stats, + stat *types.StatsJSON, acc telegraf.Accumulator, tags map[string]string, ) { @@ -250,35 +244,35 @@ func gatherContainerStats( "usage": stat.MemoryStats.Usage, "fail_count": stat.MemoryStats.Failcnt, "limit": stat.MemoryStats.Limit, - "total_pgmafault": stat.MemoryStats.Stats.TotalPgmafault, - "cache": stat.MemoryStats.Stats.Cache, - "mapped_file": stat.MemoryStats.Stats.MappedFile, - "total_inactive_file": stat.MemoryStats.Stats.TotalInactiveFile, - "pgpgout": stat.MemoryStats.Stats.Pgpgout, - "rss": stat.MemoryStats.Stats.Rss, - "total_mapped_file": stat.MemoryStats.Stats.TotalMappedFile, - "writeback": stat.MemoryStats.Stats.Writeback, - "unevictable": stat.MemoryStats.Stats.Unevictable, - "pgpgin": stat.MemoryStats.Stats.Pgpgin, - "total_unevictable": stat.MemoryStats.Stats.TotalUnevictable, - "pgmajfault": stat.MemoryStats.Stats.Pgmajfault, - "total_rss": stat.MemoryStats.Stats.TotalRss, - "total_rss_huge": stat.MemoryStats.Stats.TotalRssHuge, - "total_writeback": stat.MemoryStats.Stats.TotalWriteback, - "total_inactive_anon": stat.MemoryStats.Stats.TotalInactiveAnon, - "rss_huge": stat.MemoryStats.Stats.RssHuge, - "hierarchical_memory_limit": stat.MemoryStats.Stats.HierarchicalMemoryLimit, - "total_pgfault": stat.MemoryStats.Stats.TotalPgfault, - "total_active_file": stat.MemoryStats.Stats.TotalActiveFile, - "active_anon": stat.MemoryStats.Stats.ActiveAnon, - "total_active_anon": stat.MemoryStats.Stats.TotalActiveAnon, - "total_pgpgout": stat.MemoryStats.Stats.TotalPgpgout, - "total_cache": stat.MemoryStats.Stats.TotalCache, - "inactive_anon": stat.MemoryStats.Stats.InactiveAnon, - "active_file": stat.MemoryStats.Stats.ActiveFile, - "pgfault": stat.MemoryStats.Stats.Pgfault, - "inactive_file": stat.MemoryStats.Stats.InactiveFile, - "total_pgpgin": stat.MemoryStats.Stats.TotalPgpgin, + "total_pgmafault": stat.MemoryStats.Stats["total_pgmajfault"], + "cache": stat.MemoryStats.Stats["cache"], + "mapped_file": stat.MemoryStats.Stats["mapped_file"], + "total_inactive_file": stat.MemoryStats.Stats["total_inactive_file"], + "pgpgout": stat.MemoryStats.Stats["pagpgout"], + "rss": stat.MemoryStats.Stats["rss"], + "total_mapped_file": stat.MemoryStats.Stats["total_mapped_file"], + "writeback": stat.MemoryStats.Stats["writeback"], + "unevictable": stat.MemoryStats.Stats["unevictable"], + "pgpgin": stat.MemoryStats.Stats["pgpgin"], + "total_unevictable": stat.MemoryStats.Stats["total_unevictable"], + "pgmajfault": stat.MemoryStats.Stats["pgmajfault"], + "total_rss": stat.MemoryStats.Stats["total_rss"], + "total_rss_huge": stat.MemoryStats.Stats["total_rss_huge"], + "total_writeback": stat.MemoryStats.Stats["total_write_back"], + "total_inactive_anon": stat.MemoryStats.Stats["total_inactive_anon"], + "rss_huge": stat.MemoryStats.Stats["rss_huge"], + "hierarchical_memory_limit": stat.MemoryStats.Stats["hierarchical_memory_limit"], + "total_pgfault": stat.MemoryStats.Stats["total_pgfault"], + "total_active_file": stat.MemoryStats.Stats["total_active_file"], + "active_anon": stat.MemoryStats.Stats["active_anon"], + "total_active_anon": stat.MemoryStats.Stats["total_active_anon"], + "total_pgpgout": stat.MemoryStats.Stats["total_pgpgout"], + "total_cache": stat.MemoryStats.Stats["total_cache"], + "inactive_anon": stat.MemoryStats.Stats["inactive_anon"], + "active_file": stat.MemoryStats.Stats["active_file"], + "pgfault": stat.MemoryStats.Stats["pgfault"], + "inactive_file": stat.MemoryStats.Stats["inactive_file"], + "total_pgpgin": stat.MemoryStats.Stats["total_pgpgin"], "usage_percent": calculateMemPercent(stat), } acc.AddFields("docker_mem", memfields, tags, now) @@ -287,7 +281,7 @@ func gatherContainerStats( "usage_total": stat.CPUStats.CPUUsage.TotalUsage, "usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, "usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode, - "usage_system": stat.CPUStats.SystemCPUUsage, + "usage_system": stat.CPUStats.SystemUsage, "throttling_periods": stat.CPUStats.ThrottlingData.Periods, "throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods, "throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime, @@ -323,7 +317,7 @@ func gatherContainerStats( gatherBlockIOMetrics(stat, acc, tags, now) } -func calculateMemPercent(stat *docker.Stats) float64 { +func calculateMemPercent(stat *types.StatsJSON) float64 { var memPercent = 0.0 if stat.MemoryStats.Limit > 0 { memPercent = float64(stat.MemoryStats.Usage) / float64(stat.MemoryStats.Limit) * 100.0 @@ -331,11 +325,11 @@ func calculateMemPercent(stat *docker.Stats) float64 { return memPercent } -func calculateCPUPercent(stat *docker.Stats) float64 { +func calculateCPUPercent(stat *types.StatsJSON) float64 { var cpuPercent = 0.0 // calculate the change for the cpu and system usage of the container in between readings cpuDelta := float64(stat.CPUStats.CPUUsage.TotalUsage) - float64(stat.PreCPUStats.CPUUsage.TotalUsage) - systemDelta := float64(stat.CPUStats.SystemCPUUsage) - float64(stat.PreCPUStats.SystemCPUUsage) + systemDelta := float64(stat.CPUStats.SystemUsage) - float64(stat.PreCPUStats.SystemUsage) if systemDelta > 0.0 && cpuDelta > 0.0 { cpuPercent = (cpuDelta / systemDelta) * float64(len(stat.CPUStats.CPUUsage.PercpuUsage)) * 100.0 @@ -344,7 +338,7 @@ func calculateCPUPercent(stat *docker.Stats) float64 { } func gatherBlockIOMetrics( - stat *docker.Stats, + stat *types.StatsJSON, acc telegraf.Accumulator, tags map[string]string, now time.Time, @@ -353,7 +347,7 @@ func gatherBlockIOMetrics( // Make a map of devices to their block io stats deviceStatMap := make(map[string]map[string]interface{}) - for _, metric := range blkioStats.IOServiceBytesRecursive { + for _, metric := range blkioStats.IoServiceBytesRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) _, ok := deviceStatMap[device] if !ok { @@ -364,7 +358,7 @@ func gatherBlockIOMetrics( deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOServicedRecursive { + for _, metric := range blkioStats.IoServicedRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) _, ok := deviceStatMap[device] if !ok { @@ -375,31 +369,31 @@ func gatherBlockIOMetrics( deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOQueueRecursive { + for _, metric := range blkioStats.IoQueuedRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op)) deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOServiceTimeRecursive { + for _, metric := range blkioStats.IoServiceTimeRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op)) deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOWaitTimeRecursive { + for _, metric := range blkioStats.IoWaitTimeRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op)) deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOMergedRecursive { + for _, metric := range blkioStats.IoMergedRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op)) deviceStatMap[device][field] = metric.Value } - for _, metric := range blkioStats.IOTimeRecursive { + for _, metric := range blkioStats.IoTimeRecursive { device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor) field := fmt.Sprintf("io_time_recursive_%s", strings.ToLower(metric.Op)) deviceStatMap[device][field] = metric.Value