package ecs import ( "net/url" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) // Ecs config object type Ecs struct { EndpointURL string `toml:"endpoint_url"` Timeout internal.Duration ContainerNameInclude []string `toml:"container_name_include"` ContainerNameExclude []string `toml:"container_name_exclude"` ContainerStatusInclude []string `toml:"container_status_include"` ContainerStatusExclude []string `toml:"container_status_exclude"` LabelInclude []string `toml:"ecs_label_include"` LabelExclude []string `toml:"ecs_label_exclude"` newClient func(timeout time.Duration) (*EcsClient, error) client Client filtersCreated bool labelFilter filter.Filter containerNameFilter filter.Filter statusFilter filter.Filter } const ( KB = 1000 MB = 1000 * KB GB = 1000 * MB TB = 1000 * GB PB = 1000 * TB ) var sampleConfig = ` ## ECS metadata url # endpoint_url = "http://169.254.170.2" ## Containers to include and exclude. Globs accepted. ## Note that an empty array for both will include all containers # container_name_include = [] # container_name_exclude = [] ## Container states to include and exclude. Globs accepted. ## When empty only containers in the "RUNNING" state will be captured. ## Possible values are "NONE", "PULLED", "CREATED", "RUNNING", ## "RESOURCES_PROVISIONED", "STOPPED". # container_status_include = [] # container_status_exclude = [] ## ecs labels to include and exclude as tags. Globs accepted. ## Note that an empty array for both will include all labels as tags ecs_label_include = [ "com.amazonaws.ecs.*" ] ecs_label_exclude = [] ## Timeout for queries. # timeout = "5s" ` // Description describes ECS plugin func (ecs *Ecs) Description() string { return "Read metrics about docker containers from Fargate/ECS v2 meta endpoints." } // SampleConfig returns the ECS example config func (ecs *Ecs) SampleConfig() string { return sampleConfig } // Gather is the entrypoint for telegraf metrics collection func (ecs *Ecs) Gather(acc telegraf.Accumulator) error { err := initSetup(ecs) if err != nil { return err } task, stats, err := PollSync(ecs.client) if err != nil { return err } mergeTaskStats(task, stats) taskTags := map[string]string{ "cluster": task.Cluster, "task_arn": task.TaskARN, "family": task.Family, "revision": task.Revision, } // accumulate metrics ecs.accTask(task, taskTags, acc) ecs.accContainers(task, taskTags, acc) return nil } func initSetup(ecs *Ecs) error { if ecs.client == nil { var err error var c *EcsClient c, err = ecs.newClient(ecs.Timeout.Duration) if err != nil { return err } c.BaseURL, err = url.Parse(ecs.EndpointURL) if err != nil { return err } ecs.client = c } // Create filters if !ecs.filtersCreated { err := ecs.createContainerNameFilters() if err != nil { return err } err = ecs.createContainerStatusFilters() if err != nil { return err } err = ecs.createLabelFilters() if err != nil { return err } ecs.filtersCreated = true } return nil } func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) { taskFields := map[string]interface{}{ "revision": task.Revision, "desired_status": task.DesiredStatus, "known_status": task.KnownStatus, "limit_cpu": task.Limits["CPU"], "limit_mem": task.Limits["Memory"], } acc.AddFields("ecs_task", taskFields, tags, task.PullStoppedAt) } func (ecs *Ecs) accContainers(task *Task, taskTags map[string]string, acc telegraf.Accumulator) { for _, c := range task.Containers { if !ecs.containerNameFilter.Match(c.Name) { continue } if !ecs.statusFilter.Match(strings.ToUpper(c.KnownStatus)) { continue } // add matching ECS container Labels containerTags := map[string]string{ "id": c.ID, "name": c.Name, } for k, v := range c.Labels { if ecs.labelFilter.Match(k) { containerTags[k] = v } } tags := mergeTags(taskTags, containerTags) parseContainerStats(c, acc, tags) } } // returns a new map with the same content values as the input map func copyTags(in map[string]string) map[string]string { out := make(map[string]string) for k, v := range in { out[k] = v } return out } // returns a new map with the merged content values of the two input maps func mergeTags(a map[string]string, b map[string]string) map[string]string { c := copyTags(a) for k, v := range b { c[k] = v } return c } func (ecs *Ecs) createContainerNameFilters() error { filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerNameInclude, ecs.ContainerNameExclude) if err != nil { return err } ecs.containerNameFilter = filter return nil } func (ecs *Ecs) createLabelFilters() error { filter, err := filter.NewIncludeExcludeFilter(ecs.LabelInclude, ecs.LabelExclude) if err != nil { return err } ecs.labelFilter = filter return nil } func (ecs *Ecs) createContainerStatusFilters() error { if len(ecs.ContainerStatusInclude) == 0 && len(ecs.ContainerStatusExclude) == 0 { ecs.ContainerStatusInclude = []string{"RUNNING"} } // ECS uses uppercase status names, normalizing for comparison. for i, include := range ecs.ContainerStatusInclude { ecs.ContainerStatusInclude[i] = strings.ToUpper(include) } for i, exclude := range ecs.ContainerStatusExclude { ecs.ContainerStatusExclude[i] = strings.ToUpper(exclude) } filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerStatusInclude, ecs.ContainerStatusExclude) if err != nil { return err } ecs.statusFilter = filter return nil } func init() { inputs.Add("ecs", func() telegraf.Input { return &Ecs{ EndpointURL: "http://169.254.170.2", Timeout: internal.Duration{Duration: 5 * time.Second}, newClient: NewClient, filtersCreated: false, } }) }