From 308f4af40f8c5d4fc9198d1c819eb3b574b3509a Mon Sep 17 00:00:00 2001 From: Aditya C S Date: Wed, 4 Oct 2017 03:06:26 +0530 Subject: [PATCH] Collect Docker Swarm service metrics in docker input plugin (#3141) --- plugins/inputs/docker/client.go | 13 ++++ plugins/inputs/docker/docker.go | 83 ++++++++++++++++++++++++ plugins/inputs/docker/docker_test.go | 82 +++++++++++++++++++++++ plugins/inputs/docker/docker_testdata.go | 74 +++++++++++++++++++++ 4 files changed, 252 insertions(+) diff --git a/plugins/inputs/docker/client.go b/plugins/inputs/docker/client.go index e918231a0..a021b59c4 100644 --- a/plugins/inputs/docker/client.go +++ b/plugins/inputs/docker/client.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" docker "github.com/docker/docker/client" "github.com/docker/go-connections/sockets" ) @@ -20,6 +21,9 @@ type Client interface { ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) ContainerStats(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) + ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) + TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) + NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) } func NewEnvClient() (Client, error) { @@ -65,3 +69,12 @@ func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, s func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { return c.client.ContainerInspect(ctx, containerID) } +func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) { + return c.client.ServiceList(ctx, options) +} +func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) { + return c.client.TaskList(ctx, options) +} +func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) { + return c.client.NodeList(ctx, options) +} diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 3634e596f..171097621 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "regexp" "strconv" @@ -14,6 +15,7 @@ import ( "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" @@ -35,6 +37,8 @@ type Docker struct { Endpoint string ContainerNames []string + GatherServices bool `toml:"gather_services"` + Timeout internal.Duration PerDevice bool `toml:"perdevice"` Total bool `toml:"total"` @@ -82,6 +86,9 @@ var sampleConfig = ` ## To use environment variables (ie, docker-machine), set endpoint = "ENV" endpoint = "unix:///var/run/docker.sock" + ## Set to true to collect Swarm metrics(desired_replicas, running_replicas) + gather_services = false + ## Only collect metrics for these containers, collect all if empty container_names = [] @@ -160,6 +167,13 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { acc.AddError(err) } + if d.GatherServices { + err := d.gatherSwarmInfo(acc) + if err != nil { + acc.AddError(err) + } + } + // List containers opts := types.ContainerListOptions{} ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration) @@ -187,6 +201,75 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { return nil } +func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error { + + ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration) + defer cancel() + services, err := d.client.ServiceList(ctx, types.ServiceListOptions{}) + if err != nil { + return err + } + + if len(services) > 0 { + + tasks, err := d.client.TaskList(ctx, types.TaskListOptions{}) + if err != nil { + return err + } + + nodes, err := d.client.NodeList(ctx, types.NodeListOptions{}) + if err != nil { + return err + } + + running := map[string]int{} + tasksNoShutdown := map[string]int{} + + activeNodes := make(map[string]struct{}) + for _, n := range nodes { + if n.Status.State != swarm.NodeStateDown { + activeNodes[n.ID] = struct{}{} + } + } + + for _, task := range tasks { + if task.DesiredState != swarm.TaskStateShutdown { + tasksNoShutdown[task.ServiceID]++ + } + + if task.Status.State == swarm.TaskStateRunning { + running[task.ServiceID]++ + } + } + + for _, service := range services { + tags := map[string]string{} + fields := make(map[string]interface{}) + now := time.Now() + tags["service_id"] = service.ID + tags["service_name"] = service.Spec.Name + if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil { + tags["service_mode"] = "replicated" + fields["tasks_running"] = running[service.ID] + fields["tasks_desired"] = *service.Spec.Mode.Replicated.Replicas + } else if service.Spec.Mode.Global != nil { + tags["service_mode"] = "global" + fields["tasks_running"] = running[service.ID] + fields["tasks_desired"] = tasksNoShutdown[service.ID] + } else { + log.Printf("E! Unknow Replicas Mode") + } + // Add metrics + acc.AddFields("docker_swarm", + fields, + tags, + now) + } + } + + return nil +} + func (d *Docker) gatherInfo(acc telegraf.Accumulator) error { // Init vars dataFields := make(map[string]interface{}) diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index 95adfcf8e..b18274136 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" "github.com/stretchr/testify/require" ) @@ -16,6 +17,9 @@ type MockClient struct { ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) ContainerStatsF func(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error) ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error) + ServiceListF func(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) + TaskListF func(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) + NodeListF func(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) } func (c *MockClient) Info(ctx context.Context) (types.Info, error) { @@ -44,6 +48,27 @@ func (c *MockClient) ContainerInspect( return c.ContainerInspectF(ctx, containerID) } +func (c *MockClient) ServiceList( + ctx context.Context, + options types.ServiceListOptions, +) ([]swarm.Service, error) { + return c.ServiceListF(ctx, options) +} + +func (c *MockClient) TaskList( + ctx context.Context, + options types.TaskListOptions, +) ([]swarm.Task, error) { + return c.TaskListF(ctx, options) +} + +func (c *MockClient) NodeList( + ctx context.Context, + options types.NodeListOptions, +) ([]swarm.Node, error) { + return c.NodeListF(ctx, options) +} + func newClient(host string, tlsConfig *tls.Config) (Client, error) { return &MockClient{ InfoF: func(context.Context) (types.Info, error) { @@ -58,6 +83,15 @@ func newClient(host string, tlsConfig *tls.Config) (Client, error) { ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) { return containerInspect, nil }, + ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) { + return ServiceList, nil + }, + TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) { + return TaskList, nil + }, + NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) { + return NodeList, nil + }, }, nil } @@ -227,6 +261,15 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) { ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { return containerInspect, nil }, + ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) { + return ServiceList, nil + }, + TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) { + return TaskList, nil + }, + NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) { + return NodeList, nil + }, }, nil }, } @@ -436,3 +479,42 @@ func TestDockerGatherInfo(t *testing.T) { }, ) } + +func TestDockerGatherSwarmInfo(t *testing.T) { + var acc testutil.Accumulator + d := Docker{ + newClient: newClient, + } + + err := acc.GatherError(d.Gather) + require.NoError(t, err) + + d.gatherSwarmInfo(&acc) + + // test docker_container_net measurement + acc.AssertContainsTaggedFields(t, + "docker_swarm", + map[string]interface{}{ + "tasks_running": int(2), + "tasks_desired": uint64(2), + }, + map[string]string{ + "service_id": "qolkls9g5iasdiuihcyz9rnx2", + "service_name": "test1", + "service_mode": "replicated", + }, + ) + + acc.AssertContainsTaggedFields(t, + "docker_swarm", + map[string]interface{}{ + "tasks_running": int(1), + "tasks_desired": int(1), + }, + map[string]string{ + "service_id": "qolkls9g5iasdiuihcyz9rn3", + "service_name": "test2", + "service_mode": "global", + }, + ) +} diff --git a/plugins/inputs/docker/docker_testdata.go b/plugins/inputs/docker/docker_testdata.go index d16a3a728..929119fcb 100644 --- a/plugins/inputs/docker/docker_testdata.go +++ b/plugins/inputs/docker/docker_testdata.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/registry" + "github.com/docker/docker/api/types/swarm" ) var info = types.Info{ @@ -133,6 +134,79 @@ var containerList = []types.Container{ }, } +var two = uint64(2) +var ServiceList = []swarm.Service{ + swarm.Service{ + ID: "qolkls9g5iasdiuihcyz9rnx2", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{ + Name: "test1", + }, + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &two, + }, + }, + }, + }, + swarm.Service{ + ID: "qolkls9g5iasdiuihcyz9rn3", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{ + Name: "test2", + }, + Mode: swarm.ServiceMode{ + Global: &swarm.GlobalService{}, + }, + }, + }, +} + +var TaskList = []swarm.Task{ + swarm.Task{ + ID: "kwh0lv7hwwbh", + ServiceID: "qolkls9g5iasdiuihcyz9rnx2", + NodeID: "0cl4jturcyd1ks3fwpd010kor", + Status: swarm.TaskStatus{ + State: "running", + }, + DesiredState: "running", + }, + swarm.Task{ + ID: "u78m5ojbivc3", + ServiceID: "qolkls9g5iasdiuihcyz9rnx2", + NodeID: "0cl4jturcyd1ks3fwpd010kor", + Status: swarm.TaskStatus{ + State: "running", + }, + DesiredState: "running", + }, + swarm.Task{ + ID: "1n1uilkhr98l", + ServiceID: "qolkls9g5iasdiuihcyz9rn3", + NodeID: "0cl4jturcyd1ks3fwpd010kor", + Status: swarm.TaskStatus{ + State: "running", + }, + DesiredState: "running", + }, +} + +var NodeList = []swarm.Node{ + swarm.Node{ + ID: "0cl4jturcyd1ks3fwpd010kor", + Status: swarm.NodeStatus{ + State: "ready", + }, + }, + swarm.Node{ + ID: "0cl4jturcyd1ks3fwpd010kor", + Status: swarm.NodeStatus{ + State: "ready", + }, + }, +} + func containerStats() types.ContainerStats { var stat types.ContainerStats jsonStat := `