250 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			250 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
| package ecs
 | |
| 
 | |
| import (
 | |
| 	"net/url"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/influxdata/telegraf"
 | |
| 	"github.com/influxdata/telegraf/filter"
 | |
| 	"github.com/influxdata/telegraf/internal"
 | |
| 	"github.com/influxdata/telegraf/plugins/inputs"
 | |
| )
 | |
| 
 | |
| // Ecs config object
 | |
| type Ecs struct {
 | |
| 	EndpointURL string `toml:"endpoint_url"`
 | |
| 	Timeout     internal.Duration
 | |
| 
 | |
| 	ContainerNameInclude []string `toml:"container_name_include"`
 | |
| 	ContainerNameExclude []string `toml:"container_name_exclude"`
 | |
| 
 | |
| 	ContainerStatusInclude []string `toml:"container_status_include"`
 | |
| 	ContainerStatusExclude []string `toml:"container_status_exclude"`
 | |
| 
 | |
| 	LabelInclude []string `toml:"ecs_label_include"`
 | |
| 	LabelExclude []string `toml:"ecs_label_exclude"`
 | |
| 
 | |
| 	newClient func(timeout time.Duration) (*EcsClient, error)
 | |
| 
 | |
| 	client              Client
 | |
| 	filtersCreated      bool
 | |
| 	labelFilter         filter.Filter
 | |
| 	containerNameFilter filter.Filter
 | |
| 	statusFilter        filter.Filter
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	KB = 1000
 | |
| 	MB = 1000 * KB
 | |
| 	GB = 1000 * MB
 | |
| 	TB = 1000 * GB
 | |
| 	PB = 1000 * TB
 | |
| )
 | |
| 
 | |
| var sampleConfig = `
 | |
|   ## ECS metadata url
 | |
|   # endpoint_url = "http://169.254.170.2"
 | |
| 
 | |
|   ## Containers to include and exclude. Globs accepted.
 | |
|   ## Note that an empty array for both will include all containers
 | |
|   # container_name_include = []
 | |
|   # container_name_exclude = []
 | |
| 
 | |
|   ## Container states to include and exclude. Globs accepted.
 | |
|   ## When empty only containers in the "RUNNING" state will be captured.
 | |
|   ## Possible values are "NONE", "PULLED", "CREATED", "RUNNING",
 | |
|   ## "RESOURCES_PROVISIONED", "STOPPED".
 | |
|   # container_status_include = []
 | |
|   # container_status_exclude = []
 | |
| 
 | |
|   ## ecs labels to include and exclude as tags.  Globs accepted.
 | |
|   ## Note that an empty array for both will include all labels as tags
 | |
|   ecs_label_include = [ "com.amazonaws.ecs.*" ]
 | |
|   ecs_label_exclude = []
 | |
| 
 | |
|   ## Timeout for queries.
 | |
|   # timeout = "5s"
 | |
| `
 | |
| 
 | |
| // Description describes ECS plugin
 | |
| func (ecs *Ecs) Description() string {
 | |
| 	return "Read metrics about docker containers from Fargate/ECS v2 meta endpoints."
 | |
| }
 | |
| 
 | |
| // SampleConfig returns the ECS example config
 | |
| func (ecs *Ecs) SampleConfig() string {
 | |
| 	return sampleConfig
 | |
| }
 | |
| 
 | |
| // Gather is the entrypoint for telegraf metrics collection
 | |
| func (ecs *Ecs) Gather(acc telegraf.Accumulator) error {
 | |
| 	err := initSetup(ecs)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	task, stats, err := PollSync(ecs.client)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	mergeTaskStats(task, stats)
 | |
| 
 | |
| 	taskTags := map[string]string{
 | |
| 		"cluster":  task.Cluster,
 | |
| 		"task_arn": task.TaskARN,
 | |
| 		"family":   task.Family,
 | |
| 		"revision": task.Revision,
 | |
| 	}
 | |
| 
 | |
| 	// accumulate metrics
 | |
| 	ecs.accTask(task, taskTags, acc)
 | |
| 	ecs.accContainers(task, taskTags, acc)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func initSetup(ecs *Ecs) error {
 | |
| 	if ecs.client == nil {
 | |
| 		var err error
 | |
| 		var c *EcsClient
 | |
| 		c, err = ecs.newClient(ecs.Timeout.Duration)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		c.BaseURL, err = url.Parse(ecs.EndpointURL)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		ecs.client = c
 | |
| 	}
 | |
| 
 | |
| 	// Create filters
 | |
| 	if !ecs.filtersCreated {
 | |
| 		err := ecs.createContainerNameFilters()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		err = ecs.createContainerStatusFilters()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		err = ecs.createLabelFilters()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		ecs.filtersCreated = true
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) {
 | |
| 	taskFields := map[string]interface{}{
 | |
| 		"revision":       task.Revision,
 | |
| 		"desired_status": task.DesiredStatus,
 | |
| 		"known_status":   task.KnownStatus,
 | |
| 		"limit_cpu":      task.Limits["CPU"],
 | |
| 		"limit_mem":      task.Limits["Memory"],
 | |
| 	}
 | |
| 
 | |
| 	acc.AddFields("ecs_task", taskFields, tags, task.PullStoppedAt)
 | |
| }
 | |
| 
 | |
| func (ecs *Ecs) accContainers(task *Task, taskTags map[string]string, acc telegraf.Accumulator) {
 | |
| 	for _, c := range task.Containers {
 | |
| 		if !ecs.containerNameFilter.Match(c.Name) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if !ecs.statusFilter.Match(strings.ToUpper(c.KnownStatus)) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// add matching ECS container Labels
 | |
| 		containerTags := map[string]string{
 | |
| 			"id":   c.ID,
 | |
| 			"name": c.Name,
 | |
| 		}
 | |
| 		for k, v := range c.Labels {
 | |
| 			if ecs.labelFilter.Match(k) {
 | |
| 				containerTags[k] = v
 | |
| 			}
 | |
| 		}
 | |
| 		tags := mergeTags(taskTags, containerTags)
 | |
| 
 | |
| 		parseContainerStats(c, acc, tags)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // returns a new map with the same content values as the input map
 | |
| func copyTags(in map[string]string) map[string]string {
 | |
| 	out := make(map[string]string)
 | |
| 	for k, v := range in {
 | |
| 		out[k] = v
 | |
| 	}
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| // returns a new map with the merged content values of the two input maps
 | |
| func mergeTags(a map[string]string, b map[string]string) map[string]string {
 | |
| 	c := copyTags(a)
 | |
| 	for k, v := range b {
 | |
| 		c[k] = v
 | |
| 	}
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| func (ecs *Ecs) createContainerNameFilters() error {
 | |
| 	filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerNameInclude, ecs.ContainerNameExclude)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	ecs.containerNameFilter = filter
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ecs *Ecs) createLabelFilters() error {
 | |
| 	filter, err := filter.NewIncludeExcludeFilter(ecs.LabelInclude, ecs.LabelExclude)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	ecs.labelFilter = filter
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ecs *Ecs) createContainerStatusFilters() error {
 | |
| 	if len(ecs.ContainerStatusInclude) == 0 && len(ecs.ContainerStatusExclude) == 0 {
 | |
| 		ecs.ContainerStatusInclude = []string{"RUNNING"}
 | |
| 	}
 | |
| 
 | |
| 	// ECS uses uppercase status names, normalizing for comparison.
 | |
| 	for i, include := range ecs.ContainerStatusInclude {
 | |
| 		ecs.ContainerStatusInclude[i] = strings.ToUpper(include)
 | |
| 	}
 | |
| 	for i, exclude := range ecs.ContainerStatusExclude {
 | |
| 		ecs.ContainerStatusExclude[i] = strings.ToUpper(exclude)
 | |
| 	}
 | |
| 
 | |
| 	filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerStatusInclude, ecs.ContainerStatusExclude)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	ecs.statusFilter = filter
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	inputs.Add("ecs", func() telegraf.Input {
 | |
| 		return &Ecs{
 | |
| 			EndpointURL:    "http://169.254.170.2",
 | |
| 			Timeout:        internal.Duration{Duration: 5 * time.Second},
 | |
| 			newClient:      NewClient,
 | |
| 			filtersCreated: false,
 | |
| 		}
 | |
| 	})
 | |
| }
 |