Improve docker input plugin

closes #754
This commit is contained in:
Thibault Cohen 2016-02-23 23:58:14 -05:00 committed by Michele Fadda
parent 466e1cc33f
commit 69735af530
4 changed files with 353 additions and 3 deletions

View File

@ -8,6 +8,7 @@
- [#758](https://github.com/influxdata/telegraf/pull/758): UDP Listener input plugin, thanks @whatyouhide! - [#758](https://github.com/influxdata/telegraf/pull/758): UDP Listener input plugin, thanks @whatyouhide!
- [#769](https://github.com/influxdata/telegraf/issues/769): httpjson plugin: allow specifying SSL configuration. - [#769](https://github.com/influxdata/telegraf/issues/769): httpjson plugin: allow specifying SSL configuration.
- [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert! - [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert!
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
### Bugfixes ### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"

View File

@ -95,18 +95,50 @@ on the availability of per-cpu stats on your system.
- io_serviced_recursive_sync - io_serviced_recursive_sync
- io_serviced_recursive_total - io_serviced_recursive_total
- io_serviced_recursive_write - io_serviced_recursive_write
- docker_
- n_used_file_descriptors
- n_cpus
- n_containers
- n_images
- n_goroutines
- n_listener_events
- memory_total
- pool_blocksize
- docker_data
- available
- total
- used
- docker_metadata
- available
- total
- used
### Tags: ### Tags:
- All stats have the following tags: - docker (memory_total)
- unit=bytes
- docker (pool_blocksize)
- unit=bytes
- docker_data
- unit=bytes
- docker_metadata
- unit=bytes
- docker_cpu specific:
- cont_id (container ID) - cont_id (container ID)
- cont_image (container image) - cont_image (container image)
- cont_name (container name) - cont_name (container name)
- docker_cpu specific:
- cpu - cpu
- docker_net specific: - docker_net specific:
- cont_id (container ID)
- cont_image (container image)
- cont_name (container name)
- network - network
- docker_blkio specific: - docker_blkio specific:
- cont_id (container ID)
- cont_image (container image)
- cont_name (container name)
- device - device
### Example Output: ### Example Output:
@ -114,6 +146,16 @@ on the availability of per-cpu stats on your system.
``` ```
% ./telegraf -config ~/ws/telegraf.conf -input-filter docker -test % ./telegraf -config ~/ws/telegraf.conf -input-filter docker -test
* Plugin: docker, Collection 1 * Plugin: docker, Collection 1
> docker n_cpus=8i 1456926671065383978
> docker n_used_file_descriptors=15i 1456926671065383978
> docker n_containers=7i 1456926671065383978
> docker n_images=152i 1456926671065383978
> docker n_goroutines=36i 1456926671065383978
> docker n_listener_events=0i 1456926671065383978
> docker,unit=bytes memory_total=18935443456i 1456926671065383978
> docker,unit=bytes pool_blocksize=65540i 1456926671065383978
> docker_data,unit=bytes available=24340000000i,total=107400000000i,used=14820000000i 1456926671065383978
> docker_metadata,unit=bytes available=2126999999i,total=2146999999i,used=20420000i 145692667106538
> docker_mem,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\ > docker_mem,cont_id=5705ba8ed8fb47527410653d60a8bb2f3af5e62372297c419022a3cc6d45d848,\
cont_image=spotify/kafka,cont_name=kafka \ cont_image=spotify/kafka,cont_name=kafka \
active_anon=52568064i,active_file=6926336i,cache=12038144i,fail_count=0i,\ active_anon=52568064i,active_file=6926336i,cache=12038144i,fail_count=0i,\

View File

@ -1,8 +1,11 @@
package system package system
import ( import (
"encoding/json"
"fmt" "fmt"
"log" "log"
"regexp"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -17,9 +20,29 @@ type Docker struct {
Endpoint string Endpoint string
ContainerNames []string ContainerNames []string
client *docker.Client client DockerClient
} }
type DockerClient interface {
// Docker Client wrapper
// Useful for test
Info() (*docker.Env, error)
ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error)
Stats(opts docker.StatsOptions) error
}
const (
KB = 1000
MB = 1000 * KB
GB = 1000 * MB
TB = 1000 * GB
PB = 1000 * TB
)
var (
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
)
var sampleConfig = ` var sampleConfig = `
## Docker Endpoint ## Docker Endpoint
## To use TCP, set endpoint = "tcp://[ip]:[port]" ## To use TCP, set endpoint = "tcp://[ip]:[port]"
@ -58,12 +81,20 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
d.client = c d.client = c
} }
// Get daemon info
err := d.gatherInfo(acc)
if err != nil {
fmt.Println(err.Error())
}
// List containers
opts := docker.ListContainersOptions{} opts := docker.ListContainersOptions{}
containers, err := d.client.ListContainers(opts) containers, err := d.client.ListContainers(opts)
if err != nil { if err != nil {
return err return err
} }
// Get container data
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(containers)) wg.Add(len(containers))
for _, container := range containers { for _, container := range containers {
@ -81,6 +112,76 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars
var driverStatus [][]string
dataFields := make(map[string]interface{})
metadataFields := make(map[string]interface{})
now := time.Now()
// Get info from docker daemon
info, err := d.client.Info()
if err != nil {
return err
}
fields := map[string]interface{}{
"n_cpus": info.GetInt64("NCPU"),
"n_used_file_descriptors": info.GetInt64("NFd"),
"n_containers": info.GetInt64("Containers"),
"n_images": info.GetInt64("Images"),
"n_goroutines": info.GetInt64("NGoroutines"),
"n_listener_events": info.GetInt64("NEventsListener"),
}
// Add metrics
acc.AddFields("docker",
fields,
nil,
now)
acc.AddFields("docker",
map[string]interface{}{"memory_total": info.GetInt64("MemTotal")},
map[string]string{"unit": "bytes"},
now)
// Get storage metrics
driverStatusRaw := []byte(info.Get("DriverStatus"))
json.Unmarshal(driverStatusRaw, &driverStatus)
for _, rawData := range driverStatus {
// Try to convert string to int (bytes)
value, err := parseSize(rawData[1])
if err != nil {
continue
}
name := strings.ToLower(strings.Replace(rawData[0], " ", "_", -1))
if name == "pool_blocksize" {
// pool blocksize
acc.AddFields("docker",
map[string]interface{}{"pool_blocksize": value},
map[string]string{"unit": "bytes"},
now)
} else if strings.HasPrefix(name, "data_space_") {
// data space
field_name := strings.TrimPrefix(name, "data_space_")
dataFields[field_name] = value
} else if strings.HasPrefix(name, "metadata_space_") {
// metadata space
field_name := strings.TrimPrefix(name, "metadata_space_")
metadataFields[field_name] = value
}
}
if len(dataFields) > 0 {
acc.AddFields("docker_data",
dataFields,
map[string]string{"unit": "bytes"},
now)
}
if len(metadataFields) > 0 {
acc.AddFields("docker_metadata",
metadataFields,
map[string]string{"unit": "bytes"},
now)
}
return nil
}
func (d *Docker) gatherContainer( func (d *Docker) gatherContainer(
container docker.APIContainers, container docker.APIContainers,
acc telegraf.Accumulator, acc telegraf.Accumulator,
@ -334,6 +435,27 @@ func sliceContains(in string, sl []string) bool {
return false return false
} }
// Parses the human-readable size string into the amount it represents.
func parseSize(sizeStr string) (int64, error) {
matches := sizeRegex.FindStringSubmatch(sizeStr)
if len(matches) != 4 {
return -1, fmt.Errorf("invalid size: '%s'", sizeStr)
}
size, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return -1, err
}
uMap := map[string]int64{"k": KB, "m": MB, "g": GB, "t": TB, "p": PB}
unitPrefix := strings.ToLower(matches[3])
if mul, ok := uMap[unitPrefix]; ok {
size *= float64(mul)
}
return int64(size), nil
}
func init() { func init() {
inputs.Add("docker", func() telegraf.Input { inputs.Add("docker", func() telegraf.Input {
return &Docker{} return &Docker{}

View File

@ -1,12 +1,14 @@
package system package system
import ( import (
"encoding/json"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
"github.com/stretchr/testify/require"
) )
func TestDockerGatherContainerStats(t *testing.T) { func TestDockerGatherContainerStats(t *testing.T) {
@ -194,3 +196,186 @@ func testStats() *docker.Stats {
return stats return stats
} }
type FakeDockerClient struct {
}
func (d FakeDockerClient) Info() (*docker.Env, error) {
env := docker.Env{"Containers=108", "OomKillDisable=false", "SystemTime=2016-02-24T00:55:09.15073105-05:00", "NEventsListener=0", "ID=5WQQ:TFWR:FDNG:OKQ3:37Y4:FJWG:QIKK:623T:R3ME:QTKB:A7F7:OLHD", "Debug=false", "LoggingDriver=json-file", "KernelVersion=4.3.0-1-amd64", "IndexServerAddress=https://index.docker.io/v1/", "MemTotal=3840757760", "Images=199", "CpuCfsQuota=true", "Name=absol", "SwapLimit=false", "IPv4Forwarding=true", "ExecutionDriver=native-0.2", "InitSha1=23a51f3c916d2b5a3bbb31caf301fd2d14edd518", "ExperimentalBuild=false", "CpuCfsPeriod=true", "RegistryConfig={\"IndexConfigs\":{\"docker.io\":{\"Mirrors\":null,\"Name\":\"docker.io\",\"Official\":true,\"Secure\":true}},\"InsecureRegistryCIDRs\":[\"127.0.0.0/8\"],\"Mirrors\":null}", "OperatingSystem=Linux Mint LMDE (containerized)", "BridgeNfIptables=true", "HttpsProxy=", "Labels=null", "MemoryLimit=false", "DriverStatus=[[\"Pool Name\",\"docker-8:1-1182287-pool\"],[\"Pool Blocksize\",\"65.54 kB\"],[\"Backing Filesystem\",\"extfs\"],[\"Data file\",\"/dev/loop0\"],[\"Metadata file\",\"/dev/loop1\"],[\"Data Space Used\",\"17.3 GB\"],[\"Data Space Total\",\"107.4 GB\"],[\"Data Space Available\",\"36.53 GB\"],[\"Metadata Space Used\",\"20.97 MB\"],[\"Metadata Space Total\",\"2.147 GB\"],[\"Metadata Space Available\",\"2.127 GB\"],[\"Udev Sync Supported\",\"true\"],[\"Deferred Removal Enabled\",\"false\"],[\"Data loop file\",\"/var/lib/docker/devicemapper/devicemapper/data\"],[\"Metadata loop file\",\"/var/lib/docker/devicemapper/devicemapper/metadata\"],[\"Library Version\",\"1.02.115 (2016-01-25)\"]]", "NFd=19", "HttpProxy=", "Driver=devicemapper", "NGoroutines=39", "InitPath=/usr/lib/docker.io/dockerinit", "NCPU=4", "DockerRootDir=/var/lib/docker", "NoProxy=", "BridgeNfIp6tables=true"}
return &env, nil
}
func (d FakeDockerClient) ListContainers(opts docker.ListContainersOptions) ([]docker.APIContainers, error) {
container1 := docker.APIContainers{
ID: "e2173b9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296b7dfb",
Image: "quay.io/coreos/etcd:v2.2.2",
Command: "/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379",
Created: 1455941930,
Status: "Up 4 hours",
Ports: []docker.APIPort{
docker.APIPort{
PrivatePort: 7001,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 4001,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 2380,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 2379,
PublicPort: 2379,
Type: "tcp",
IP: "0.0.0.0",
},
},
SizeRw: 0,
SizeRootFs: 0,
Names: []string{"/etcd"},
}
container2 := docker.APIContainers{
ID: "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
Image: "quay.io/coreos/etcd:v2.2.2",
Command: "/etcd -name etcd2 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379",
Created: 1455941933,
Status: "Up 4 hours",
Ports: []docker.APIPort{
docker.APIPort{
PrivatePort: 7002,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 4002,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 2381,
PublicPort: 0,
Type: "tcp",
},
docker.APIPort{
PrivatePort: 2382,
PublicPort: 2382,
Type: "tcp",
IP: "0.0.0.0",
},
},
SizeRw: 0,
SizeRootFs: 0,
Names: []string{"/etcd2"},
}
containers := []docker.APIContainers{container1, container2}
return containers, nil
//#{e6a96c84ca91a5258b7cb752579fb68826b68b49ff957487695cd4d13c343b44 titilambert/snmpsim /bin/sh -c 'snmpsimd --agent-udpv4-endpoint=0.0.0.0:31161 --process-user=root --process-group=user' 1455724831 Up 4 hours [{31161 31161 udp 0.0.0.0}] 0 0 [/snmp] map[]}]2016/02/24 01:05:01 Gathered metrics, (3s interval), from 1 inputs in 1.233836656s
}
func (d FakeDockerClient) Stats(opts docker.StatsOptions) error {
jsonStat := `{"read":"2016-02-24T11:42:27.472459608-05:00","memory_stats":{"stats":{},"limit":18935443456},"blkio_stats":{"io_service_bytes_recursive":[{"major":252,"minor":1,"op":"Read","value":753664},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":753664},{"major":252,"minor":1,"op":"Total","value":753664}],"io_serviced_recursive":[{"major":252,"minor":1,"op":"Read","value":26},{"major":252,"minor":1,"op":"Write"},{"major":252,"minor":1,"op":"Sync"},{"major":252,"minor":1,"op":"Async","value":26},{"major":252,"minor":1,"op":"Total","value":26}]},"cpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052607520000000,"throttling_data":{}},"precpu_stats":{"cpu_usage":{"percpu_usage":[17871,4959158,1646137,1231652,11829401,244656,369972,0],"usage_in_usermode":10000000,"total_usage":20298847},"system_cpu_usage":24052599550000000,"throttling_data":{}}}`
var stat docker.Stats
json.Unmarshal([]byte(jsonStat), &stat)
opts.Stats <- &stat
return nil
}
func TestDockerGatherInfo(t *testing.T) {
var acc testutil.Accumulator
client := FakeDockerClient{}
d := Docker{client: client}
err := d.Gather(&acc)
require.NoError(t, err)
acc.AssertContainsTaggedFields(t,
"docker",
map[string]interface{}{
"n_listener_events": int64(0),
"n_cpus": int64(4),
"n_used_file_descriptors": int64(19),
"n_containers": int64(108),
"n_images": int64(199),
"n_goroutines": int64(39),
},
map[string]string{},
)
acc.AssertContainsTaggedFields(t,
"docker_data",
map[string]interface{}{
"used": int64(17300000000),
"total": int64(107400000000),
"available": int64(36530000000),
},
map[string]string{
"unit": "bytes",
},
)
acc.AssertContainsTaggedFields(t,
"docker_cpu",
map[string]interface{}{
"usage_total": uint64(1231652),
},
map[string]string{
"cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
"cont_name": "etcd2",
"cont_image": "quay.io/coreos/etcd:v2.2.2",
"cpu": "cpu3",
},
)
acc.AssertContainsTaggedFields(t,
"docker_mem",
map[string]interface{}{
"total_pgpgout": uint64(0),
"usage_percent": float64(0),
"rss": uint64(0),
"total_writeback": uint64(0),
"active_anon": uint64(0),
"total_pgmafault": uint64(0),
"total_rss": uint64(0),
"total_unevictable": uint64(0),
"active_file": uint64(0),
"total_mapped_file": uint64(0),
"pgpgin": uint64(0),
"total_active_file": uint64(0),
"total_active_anon": uint64(0),
"total_cache": uint64(0),
"inactive_anon": uint64(0),
"pgmajfault": uint64(0),
"total_inactive_anon": uint64(0),
"total_rss_huge": uint64(0),
"rss_huge": uint64(0),
"hierarchical_memory_limit": uint64(0),
"pgpgout": uint64(0),
"unevictable": uint64(0),
"total_inactive_file": uint64(0),
"writeback": uint64(0),
"total_pgfault": uint64(0),
"total_pgpgin": uint64(0),
"cache": uint64(0),
"mapped_file": uint64(0),
"inactive_file": uint64(0),
"max_usage": uint64(0),
"fail_count": uint64(0),
"pgfault": uint64(0),
"usage": uint64(0),
"limit": uint64(18935443456),
},
map[string]string{
"cont_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
"cont_name": "etcd2",
"cont_image": "quay.io/coreos/etcd:v2.2.2",
},
)
//fmt.Print(info)
}