From e3348304584eb89d04238b79b81561caca399c7f Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Mon, 22 Apr 2019 18:36:46 -0600 Subject: [PATCH] Collect cloudwatch stats using GetMetricData (#5544) --- plugins/inputs/cloudwatch/README.md | 84 ++- plugins/inputs/cloudwatch/cloudwatch.go | 616 ++++++++++++------- plugins/inputs/cloudwatch/cloudwatch_test.go | 181 ++++-- 3 files changed, 582 insertions(+), 299 deletions(-) diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index dfb5bf95d..fab3cc295 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -17,7 +17,7 @@ API endpoint. In the following order the plugin will attempt to authenticate. ```toml [[inputs.cloudwatch]] - ## Amazon Region (required) + ## Amazon Region region = "us-east-1" ## Amazon Credentials @@ -28,12 +28,12 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## 4) environment variables ## 5) shared credentials file ## 6) EC2 Instance Profile - #access_key = "" - #secret_key = "" - #token = "" - #role_arn = "" - #profile = "" - #shared_credential_file = "" + # access_key = "" + # secret_key = "" + # token = "" + # role_arn = "" + # profile = "" + # shared_credential_file = "" ## Endpoint to make request against, the correct endpoint is automatically ## determined and this option should only be set if you wish to override the @@ -54,32 +54,43 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## Collection Delay (required - must account for metrics availability via CloudWatch API) delay = "5m" - ## Override global run interval (optional - defaults to global interval) - ## Recomended: use metric 'interval' that is a multiple of 'period' to avoid + ## Recommended: use metric 'interval' that is a multiple of 'period' to avoid ## gaps or overlap in pulled data interval = "5m" + ## Configure the TTL for the internal cache of metrics. + # cache_ttl = "1h" + ## Metric Statistic Namespace (required) namespace = "AWS/ELB" ## Maximum requests per second. Note that the global default AWS rate limit is - ## 400 reqs/sec, so if you define multiple namespaces, these should add up to a - ## maximum of 400. Optional - default value is 200. + ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a + ## maximum of 50. ## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html - ratelimit = 200 + # ratelimit = 25 - ## Metrics to Pull (optional) + ## Namespace-wide statistic filters. These allow fewer queries to be made to + ## cloudwatch. + # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ] + # statistic_exclude = [] + + ## Metrics to Pull ## Defaults to all Metrics in Namespace if nothing is provided ## Refreshes Namespace available metrics every 1h - [[inputs.cloudwatch.metrics]] - names = ["Latency", "RequestCount"] - - ## Dimension filters for Metric. These are optional however all dimensions - ## defined for the metric names must be specified in order to retrieve - ## the metric statistics. - [[inputs.cloudwatch.metrics.dimensions]] - name = "LoadBalancerName" - value = "p-example" + #[[inputs.cloudwatch.metrics]] + # names = ["Latency", "RequestCount"] + # + # ## Statistic filters for Metric. These allow for retrieving specific + # ## statistics for an individual metric. + # # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ] + # # statistic_exclude = [] + # + # ## Dimension filters for Metric. All dimensions defined for the metric names + # ## must be specified in order to retrieve the metric statistics. + # [[inputs.cloudwatch.metrics.dimensions]] + # name = "LoadBalancerName" + # value = "p-example" ``` #### Requirements and Terminology @@ -97,17 +108,21 @@ wildcard dimension is ignored. Example: ``` -[[inputs.cloudwatch.metrics]] - names = ["Latency"] +[[inputs.cloudwatch]] + period = "1m" + interval = "5m" - ## Dimension filters for Metric (optional) - [[inputs.cloudwatch.metrics.dimensions]] - name = "LoadBalancerName" - value = "p-example" + [[inputs.cloudwatch.metrics]] + names = ["Latency"] - [[inputs.cloudwatch.metrics.dimensions]] - name = "AvailabilityZone" - value = "*" + ## Dimension filters for Metric (optional) + [[inputs.cloudwatch.metrics.dimensions]] + name = "LoadBalancerName" + value = "p-example" + + [[inputs.cloudwatch.metrics.dimensions]] + name = "AvailabilityZone" + value = "*" ``` If the following ELBs are available: @@ -124,9 +139,11 @@ Then 2 metrics will be output: If the `AvailabilityZone` wildcard dimension was omitted, then a single metric (name: `p-example`) would be exported containing the aggregate values of the ELB across availability zones. +To maximize efficiency and savings, consider making fewer requests by increasing `interval` but keeping `period` at the duration you would like metrics to be reported. The above example will request metrics from Cloudwatch every 5 minutes but will output five metrics timestamped one minute apart. + #### Restrictions and Limitations - CloudWatch metrics are not available instantly via the CloudWatch API. You should adjust your collection `delay` to account for this lag in metrics availability based on your [monitoring subscription level](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html) -- CloudWatch API usage incurs cost - see [GetMetricStatistics Pricing](https://aws.amazon.com/cloudwatch/pricing/) +- CloudWatch API usage incurs cost - see [GetMetricData Pricing](https://aws.amazon.com/cloudwatch/pricing/) ### Measurements & Fields: @@ -147,7 +164,6 @@ Tag Dimension names are represented in [snake case](https://en.wikipedia.org/wik - All measurements have the following tags: - region (CloudWatch Region) - - unit (CloudWatch Metric Unit) - {dimension-name} (Cloudwatch Dimension value - one for each metric dimension) ### Troubleshooting: @@ -168,5 +184,5 @@ aws cloudwatch get-metric-statistics --namespace AWS/EC2 --region us-east-1 --pe ``` $ ./telegraf --config telegraf.conf --input-filter cloudwatch --test -> cloudwatch_aws_elb,load_balancer_name=p-example,region=us-east-1,unit=seconds latency_average=0.004810798017284538,latency_maximum=0.1100282669067383,latency_minimum=0.0006084442138671875,latency_sample_count=4029,latency_sum=19.382705211639404 1459542420000000000 +> cloudwatch_aws_elb,load_balancer_name=p-example,region=us-east-1 latency_average=0.004810798017284538,latency_maximum=0.1100282669067383,latency_minimum=0.0006084442138671875,latency_sample_count=4029,latency_sum=19.382705211639404 1459542420000000000 ``` diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 626511e2f..4b6469e2d 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -1,67 +1,83 @@ package cloudwatch import ( + "errors" "fmt" + "strconv" "strings" "sync" "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" internalaws "github.com/influxdata/telegraf/internal/config/aws" "github.com/influxdata/telegraf/internal/limiter" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/inputs" ) type ( + // CloudWatch contains the configuration and cache for the cloudwatch plugin. CloudWatch struct { - Region string `toml:"region"` - AccessKey string `toml:"access_key"` - SecretKey string `toml:"secret_key"` - RoleARN string `toml:"role_arn"` - Profile string `toml:"profile"` - Filename string `toml:"shared_credential_file"` - Token string `toml:"token"` - EndpointURL string `toml:"endpoint_url"` + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + CredentialPath string `toml:"shared_credential_file"` + Token string `toml:"token"` + EndpointURL string `toml:"endpoint_url"` + StatisticExclude []string `toml:"statistic_exclude"` + StatisticInclude []string `toml:"statistic_include"` - Period internal.Duration `toml:"period"` - Delay internal.Duration `toml:"delay"` - Namespace string `toml:"namespace"` - Metrics []*Metric `toml:"metrics"` - CacheTTL internal.Duration `toml:"cache_ttl"` - RateLimit int `toml:"ratelimit"` - client cloudwatchClient - metricCache *MetricCache - windowStart time.Time - windowEnd time.Time + Period internal.Duration `toml:"period"` + Delay internal.Duration `toml:"delay"` + Namespace string `toml:"namespace"` + Metrics []*Metric `toml:"metrics"` + CacheTTL internal.Duration `toml:"cache_ttl"` + RateLimit int `toml:"ratelimit"` + + client cloudwatchClient + statFilter filter.Filter + metricCache *metricCache + queryDimensions map[string]*map[string]string + windowStart time.Time + windowEnd time.Time } + // Metric defines a simplified Cloudwatch metric. Metric struct { - MetricNames []string `toml:"names"` - Dimensions []*Dimension `toml:"dimensions"` + StatisticExclude *[]string `toml:"statistic_exclude"` + StatisticInclude *[]string `toml:"statistic_include"` + MetricNames []string `toml:"names"` + Dimensions []*Dimension `toml:"dimensions"` } + // Dimension defines a simplified Cloudwatch dimension (provides metric filtering). Dimension struct { Name string `toml:"name"` Value string `toml:"value"` } - MetricCache struct { - TTL time.Duration - Fetched time.Time - Metrics []*cloudwatch.Metric + // metricCache caches metrics, their filters, and generated queries. + metricCache struct { + ttl time.Duration + built time.Time + metrics []filteredMetric + queries []*cloudwatch.MetricDataQuery } cloudwatchClient interface { ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) - GetMetricStatistics(*cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) + GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) } ) +// SampleConfig returns the default configuration of the Cloudwatch input plugin. func (c *CloudWatch) SampleConfig() string { return ` ## Amazon Region @@ -75,12 +91,12 @@ func (c *CloudWatch) SampleConfig() string { ## 4) environment variables ## 5) shared credentials file ## 6) EC2 Instance Profile - #access_key = "" - #secret_key = "" - #token = "" - #role_arn = "" - #profile = "" - #shared_credential_file = "" + # access_key = "" + # secret_key = "" + # token = "" + # role_arn = "" + # profile = "" + # shared_credential_file = "" ## Endpoint to make request against, the correct endpoint is automatically ## determined and this option should only be set if you wish to override the @@ -106,44 +122,155 @@ func (c *CloudWatch) SampleConfig() string { interval = "5m" ## Configure the TTL for the internal cache of metrics. - ## Defaults to 1 hr if not specified - #cache_ttl = "10m" + # cache_ttl = "1h" ## Metric Statistic Namespace (required) namespace = "AWS/ELB" ## Maximum requests per second. Note that the global default AWS rate limit is - ## 400 reqs/sec, so if you define multiple namespaces, these should add up to a - ## maximum of 400. Optional - default value is 200. + ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a + ## maximum of 50. ## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html - ratelimit = 200 + # ratelimit = 25 - ## Metrics to Pull (optional) + ## Namespace-wide statistic filters. These allow fewer queries to be made to + ## cloudwatch. + # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ] + # statistic_exclude = [] + + ## Metrics to Pull ## Defaults to all Metrics in Namespace if nothing is provided ## Refreshes Namespace available metrics every 1h #[[inputs.cloudwatch.metrics]] # names = ["Latency", "RequestCount"] # - # ## Dimension filters for Metric. These are optional however all dimensions - # ## defined for the metric names must be specified in order to retrieve - # ## the metric statistics. + # ## Statistic filters for Metric. These allow for retrieving specific + # ## statistics for an individual metric. + # # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ] + # # statistic_exclude = [] + # + # ## Dimension filters for Metric. All dimensions defined for the metric names + # ## must be specified in order to retrieve the metric statistics. # [[inputs.cloudwatch.metrics.dimensions]] # name = "LoadBalancerName" # value = "p-example" ` } +// Description returns a one-sentence description on the Cloudwatch input plugin. func (c *CloudWatch) Description() string { return "Pull Metric Statistics from Amazon CloudWatch" } -func SelectMetrics(c *CloudWatch) ([]*cloudwatch.Metric, error) { - var metrics []*cloudwatch.Metric +// Gather takes in an accumulator and adds the metrics that the Input +// gathers. This is called every "interval". +func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { + if c.statFilter == nil { + var err error + // Set config level filter (won't change throughout life of plugin). + c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude) + if err != nil { + return err + } + } + + if c.client == nil { + c.initializeCloudWatch() + } + + filteredMetrics, err := getFilteredMetrics(c) + if err != nil { + return err + } + + err = c.updateWindow(time.Now()) + if err != nil { + return err + } + + // Get all of the possible queries so we can send groups of 100. + queries, err := c.getDataQueries(filteredMetrics) + if err != nil { + return err + } + + // Limit concurrency or we can easily exhaust user connection limit. + // See cloudwatch API request limits: + // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html + lmtr := limiter.NewRateLimiter(c.RateLimit, time.Second) + defer lmtr.Stop() + wg := sync.WaitGroup{} + rLock := sync.Mutex{} + + results := []*cloudwatch.MetricDataResult{} + + // 100 is the maximum number of metric data queries a `GetMetricData` request can contain. + batchSize := 100 + var batches [][]*cloudwatch.MetricDataQuery + + for batchSize < len(queries) { + queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize]) + } + batches = append(batches, queries) + + for i := range batches { + wg.Add(1) + <-lmtr.C + go func(inm []*cloudwatch.MetricDataQuery) { + defer wg.Done() + result, err := c.gatherMetrics(c.getDataInputs(inm)) + if err != nil { + acc.AddError(err) + return + } + + rLock.Lock() + results = append(results, result...) + rLock.Unlock() + }(batches[i]) + } + + wg.Wait() + + return c.aggregateMetrics(acc, results) +} + +func (c *CloudWatch) initializeCloudWatch() error { + credentialConfig := &internalaws.CredentialConfig{ + Region: c.Region, + AccessKey: c.AccessKey, + SecretKey: c.SecretKey, + RoleARN: c.RoleARN, + Profile: c.Profile, + Filename: c.CredentialPath, + Token: c.Token, + EndpointURL: c.EndpointURL, + } + configProvider := credentialConfig.Credentials() + + cfg := &aws.Config{} + loglevel := aws.LogOff + c.client = cloudwatch.New(configProvider, cfg.WithLogLevel(loglevel)) + return nil +} + +type filteredMetric struct { + metrics []*cloudwatch.Metric + statFilter filter.Filter +} + +// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch. +func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { + if c.metricCache != nil && c.metricCache.isValid() { + return c.metricCache.metrics, nil + } + + fMetrics := []filteredMetric{} // check for provided metric filter if c.Metrics != nil { - metrics = []*cloudwatch.Metric{} for _, m := range c.Metrics { + metrics := []*cloudwatch.Metric{} if !hasWilcard(m.Dimensions) { dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions)) for k, d := range m.Dimensions { @@ -176,51 +303,71 @@ func SelectMetrics(c *CloudWatch) ([]*cloudwatch.Metric, error) { } } } + + if m.StatisticExclude == nil { + m.StatisticExclude = &c.StatisticExclude + } + if m.StatisticInclude == nil { + m.StatisticInclude = &c.StatisticInclude + } + statFilter, err := filter.NewIncludeExcludeFilter(*m.StatisticInclude, *m.StatisticExclude) + if err != nil { + return nil, err + } + + fMetrics = append(fMetrics, filteredMetric{ + metrics: metrics, + statFilter: statFilter, + }) } } else { - var err error - metrics, err = c.fetchNamespaceMetrics() + metrics, err := c.fetchNamespaceMetrics() if err != nil { return nil, err } + + fMetrics = []filteredMetric{{ + metrics: metrics, + statFilter: c.statFilter, + }} } - return metrics, nil + + c.metricCache = &metricCache{ + metrics: fMetrics, + built: time.Now(), + ttl: c.CacheTTL.Duration, + } + + return fMetrics, nil } -func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { - if c.client == nil { - c.initializeCloudWatch() +// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace. +func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { + metrics := []*cloudwatch.Metric{} + + var token *string + params := &cloudwatch.ListMetricsInput{ + Namespace: aws.String(c.Namespace), + Dimensions: []*cloudwatch.DimensionFilter{}, + NextToken: token, + MetricName: nil, } - metrics, err := SelectMetrics(c) - if err != nil { - return err + for { + resp, err := c.client.ListMetrics(params) + if err != nil { + return nil, err + } + + metrics = append(metrics, resp.Metrics...) + if resp.NextToken == nil { + break + } + + params.NextToken = resp.NextToken } - now := time.Now() - - err = c.updateWindow(now) - if err != nil { - return err - } - - // limit concurrency or we can easily exhaust user connection limit - // see cloudwatch API request limits: - // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html - lmtr := limiter.NewRateLimiter(c.RateLimit, time.Second) - defer lmtr.Stop() - var wg sync.WaitGroup - wg.Add(len(metrics)) - for _, m := range metrics { - <-lmtr.C - go func(inm *cloudwatch.Metric) { - defer wg.Done() - acc.AddError(c.gatherMetric(acc, inm)) - }(m) - } - wg.Wait() - - return nil + return metrics, nil } func (c *CloudWatch) updateWindow(relativeTo time.Time) error { @@ -239,168 +386,197 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) error { return nil } +// getDataQueries gets all of the possible queries so we can maximize the request payload. +func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudwatch.MetricDataQuery, error) { + if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() { + return c.metricCache.queries, nil + } + + c.queryDimensions = map[string]*map[string]string{} + + dataQueries := []*cloudwatch.MetricDataQuery{} + for i, filtered := range filteredMetrics { + for j, metric := range filtered.metrics { + id := strconv.Itoa(j) + "_" + strconv.Itoa(i) + dimension := ctod(metric.Dimensions) + if filtered.statFilter.Match("average") { + c.queryDimensions["average_"+id] = dimension + dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String("average_" + id), + Label: aws.String(snakeCase(*metric.MetricName + "_average")), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Period: aws.Int64(int64(c.Period.Duration.Seconds())), + Stat: aws.String(cloudwatch.StatisticAverage), + }, + }) + } + if filtered.statFilter.Match("maximum") { + c.queryDimensions["maximum_"+id] = dimension + dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String("maximum_" + id), + Label: aws.String(snakeCase(*metric.MetricName + "_maximum")), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Period: aws.Int64(int64(c.Period.Duration.Seconds())), + Stat: aws.String(cloudwatch.StatisticMaximum), + }, + }) + } + if filtered.statFilter.Match("minimum") { + c.queryDimensions["minimum_"+id] = dimension + dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String("minimum_" + id), + Label: aws.String(snakeCase(*metric.MetricName + "_minimum")), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Period: aws.Int64(int64(c.Period.Duration.Seconds())), + Stat: aws.String(cloudwatch.StatisticMinimum), + }, + }) + } + if filtered.statFilter.Match("sum") { + c.queryDimensions["sum_"+id] = dimension + dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String("sum_" + id), + Label: aws.String(snakeCase(*metric.MetricName + "_sum")), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Period: aws.Int64(int64(c.Period.Duration.Seconds())), + Stat: aws.String(cloudwatch.StatisticSum), + }, + }) + } + if filtered.statFilter.Match("sample_count") { + c.queryDimensions["sample_count_"+id] = dimension + dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{ + Id: aws.String("sample_count_" + id), + Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")), + MetricStat: &cloudwatch.MetricStat{ + Metric: metric, + Period: aws.Int64(int64(c.Period.Duration.Seconds())), + Stat: aws.String(cloudwatch.StatisticSampleCount), + }, + }) + } + } + } + + if len(dataQueries) == 0 { + return nil, errors.New("no metrics found to collect") + } + + if c.metricCache == nil { + c.metricCache = &metricCache{ + queries: dataQueries, + built: time.Now(), + ttl: c.CacheTTL.Duration, + } + } else { + c.metricCache.queries = dataQueries + } + + return dataQueries, nil +} + +// gatherMetrics gets metric data from Cloudwatch. +func (c *CloudWatch) gatherMetrics( + params *cloudwatch.GetMetricDataInput, +) ([]*cloudwatch.MetricDataResult, error) { + results := []*cloudwatch.MetricDataResult{} + + for { + resp, err := c.client.GetMetricData(params) + if err != nil { + return nil, fmt.Errorf("failed to get metric data: %v", err) + } + + results = append(results, resp.MetricDataResults...) + if resp.NextToken == nil { + break + } + params.NextToken = resp.NextToken + } + + return results, nil +} + +func (c *CloudWatch) aggregateMetrics( + acc telegraf.Accumulator, + metricDataResults []*cloudwatch.MetricDataResult, +) error { + var ( + grouper = metric.NewSeriesGrouper() + namespace = sanitizeMeasurement(c.Namespace) + ) + + for _, result := range metricDataResults { + tags := map[string]string{} + + if dimensions, ok := c.queryDimensions[*result.Id]; ok { + tags = *dimensions + } + tags["region"] = c.Region + + for i := range result.Values { + grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]) + } + } + + for _, metric := range grouper.Metrics() { + acc.AddMetric(metric) + } + + return nil +} + func init() { inputs.Add("cloudwatch", func() telegraf.Input { - ttl, _ := time.ParseDuration("1hr") return &CloudWatch{ - CacheTTL: internal.Duration{Duration: ttl}, - RateLimit: 200, + CacheTTL: internal.Duration{Duration: time.Hour}, + RateLimit: 25, } }) } -/* - * Initialize CloudWatch client - */ -func (c *CloudWatch) initializeCloudWatch() error { - credentialConfig := &internalaws.CredentialConfig{ - Region: c.Region, - AccessKey: c.AccessKey, - SecretKey: c.SecretKey, - RoleARN: c.RoleARN, - Profile: c.Profile, - Filename: c.Filename, - Token: c.Token, - EndpointURL: c.EndpointURL, - } - configProvider := credentialConfig.Credentials() - - c.client = cloudwatch.New(configProvider) - return nil -} - -/* - * Fetch available metrics for given CloudWatch Namespace - */ -func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { - if c.metricCache != nil && c.metricCache.IsValid() { - return c.metricCache.Metrics, nil - } - - metrics := []*cloudwatch.Metric{} - - var token *string - for more := true; more; { - params := &cloudwatch.ListMetricsInput{ - Namespace: aws.String(c.Namespace), - Dimensions: []*cloudwatch.DimensionFilter{}, - NextToken: token, - MetricName: nil, - } - - resp, err := c.client.ListMetrics(params) - if err != nil { - return nil, err - } - - metrics = append(metrics, resp.Metrics...) - - token = resp.NextToken - more = token != nil - } - - c.metricCache = &MetricCache{ - Metrics: metrics, - Fetched: time.Now(), - TTL: c.CacheTTL.Duration, - } - - return metrics, nil -} - -/* - * Gather given Metric and emit any error - */ -func (c *CloudWatch) gatherMetric( - acc telegraf.Accumulator, - metric *cloudwatch.Metric, -) error { - params := c.getStatisticsInput(metric) - resp, err := c.client.GetMetricStatistics(params) - if err != nil { - return err - } - - for _, point := range resp.Datapoints { - tags := map[string]string{ - "region": c.Region, - "unit": snakeCase(*point.Unit), - } - - for _, d := range metric.Dimensions { - tags[snakeCase(*d.Name)] = *d.Value - } - - // record field for each statistic - fields := map[string]interface{}{} - - if point.Average != nil { - fields[formatField(*metric.MetricName, cloudwatch.StatisticAverage)] = *point.Average - } - if point.Maximum != nil { - fields[formatField(*metric.MetricName, cloudwatch.StatisticMaximum)] = *point.Maximum - } - if point.Minimum != nil { - fields[formatField(*metric.MetricName, cloudwatch.StatisticMinimum)] = *point.Minimum - } - if point.SampleCount != nil { - fields[formatField(*metric.MetricName, cloudwatch.StatisticSampleCount)] = *point.SampleCount - } - if point.Sum != nil { - fields[formatField(*metric.MetricName, cloudwatch.StatisticSum)] = *point.Sum - } - - acc.AddFields(formatMeasurement(c.Namespace), fields, tags, *point.Timestamp) - } - - return nil -} - -/* - * Formatting helpers - */ -func formatField(metricName string, statistic string) string { - return fmt.Sprintf("%s_%s", snakeCase(metricName), snakeCase(statistic)) -} - -func formatMeasurement(namespace string) string { +func sanitizeMeasurement(namespace string) string { namespace = strings.Replace(namespace, "/", "_", -1) namespace = snakeCase(namespace) - return fmt.Sprintf("cloudwatch_%s", namespace) + return "cloudwatch_" + namespace } func snakeCase(s string) string { s = internal.SnakeCase(s) + s = strings.Replace(s, " ", "_", -1) s = strings.Replace(s, "__", "_", -1) return s } -/* - * Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe - */ -func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric) *cloudwatch.GetMetricStatisticsInput { - input := &cloudwatch.GetMetricStatisticsInput{ - StartTime: aws.Time(c.windowStart), - EndTime: aws.Time(c.windowEnd), - MetricName: metric.MetricName, - Namespace: metric.Namespace, - Period: aws.Int64(int64(c.Period.Duration.Seconds())), - Dimensions: metric.Dimensions, - Statistics: []*string{ - aws.String(cloudwatch.StatisticAverage), - aws.String(cloudwatch.StatisticMaximum), - aws.String(cloudwatch.StatisticMinimum), - aws.String(cloudwatch.StatisticSum), - aws.String(cloudwatch.StatisticSampleCount)}, - } - return input +type dimension struct { + name string + value string } -/* - * Check Metric Cache validity - */ -func (c *MetricCache) IsValid() bool { - return c.Metrics != nil && time.Since(c.Fetched) < c.TTL +// ctod converts cloudwatch dimensions to regular dimensions. +func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string { + dimensions := map[string]string{} + for i := range cDimensions { + dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value + } + return &dimensions +} + +func (c *CloudWatch) getDataInputs(dataQueries []*cloudwatch.MetricDataQuery) *cloudwatch.GetMetricDataInput { + return &cloudwatch.GetMetricDataInput{ + StartTime: aws.Time(c.windowStart), + EndTime: aws.Time(c.windowEnd), + MetricDataQueries: dataQueries, + } +} + +// isValid checks the validity of the metric cache. +func (f *metricCache) isValid() bool { + return f.metrics != nil && time.Since(f.built) < f.ttl } func hasWilcard(dimensions []*Dimension) bool { diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 9449cbead..f28473a57 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -6,46 +6,98 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) type mockGatherCloudWatchClient struct{} func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { - metric := &cloudwatch.Metric{ - Namespace: params.Namespace, - MetricName: aws.String("Latency"), - Dimensions: []*cloudwatch.Dimension{ + return &cloudwatch.ListMetricsOutput{ + Metrics: []*cloudwatch.Metric{ { - Name: aws.String("LoadBalancerName"), - Value: aws.String("p-example"), + Namespace: params.Namespace, + MetricName: aws.String("Latency"), + Dimensions: []*cloudwatch.Dimension{ + { + Name: aws.String("LoadBalancerName"), + Value: aws.String("p-example"), + }, + }, }, }, - } - - result := &cloudwatch.ListMetricsOutput{ - Metrics: []*cloudwatch.Metric{metric}, - } - return result, nil + }, nil } -func (m *mockGatherCloudWatchClient) GetMetricStatistics(params *cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) { - dataPoint := &cloudwatch.Datapoint{ - Timestamp: params.EndTime, - Minimum: aws.Float64(0.1), - Maximum: aws.Float64(0.3), - Average: aws.Float64(0.2), - Sum: aws.Float64(123), - SampleCount: aws.Float64(100), - Unit: aws.String("Seconds"), - } - result := &cloudwatch.GetMetricStatisticsOutput{ - Label: aws.String("Latency"), - Datapoints: []*cloudwatch.Datapoint{dataPoint}, - } - return result, nil +func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { + return &cloudwatch.GetMetricDataOutput{ + MetricDataResults: []*cloudwatch.MetricDataResult{ + { + Id: aws.String("minimum_0_0"), + Label: aws.String("latency_minimum"), + StatusCode: aws.String("completed"), + Timestamps: []*time.Time{ + params.EndTime, + }, + Values: []*float64{ + aws.Float64(0.1), + }, + }, + { + Id: aws.String("maximum_0_0"), + Label: aws.String("latency_maximum"), + StatusCode: aws.String("completed"), + Timestamps: []*time.Time{ + params.EndTime, + }, + Values: []*float64{ + aws.Float64(0.3), + }, + }, + { + Id: aws.String("average_0_0"), + Label: aws.String("latency_average"), + StatusCode: aws.String("completed"), + Timestamps: []*time.Time{ + params.EndTime, + }, + Values: []*float64{ + aws.Float64(0.2), + }, + }, + { + Id: aws.String("sum_0_0"), + Label: aws.String("latency_sum"), + StatusCode: aws.String("completed"), + Timestamps: []*time.Time{ + params.EndTime, + }, + Values: []*float64{ + aws.Float64(123), + }, + }, + { + Id: aws.String("sample_count_0_0"), + Label: aws.String("latency_sample_count"), + StatusCode: aws.String("completed"), + Timestamps: []*time.Time{ + params.EndTime, + }, + Values: []*float64{ + aws.Float64(100), + }, + }, + }, + }, nil +} + +func TestSnakeCase(t *testing.T) { + assert.Equal(t, "cluster_name", snakeCase("Cluster Name")) + assert.Equal(t, "broker_id", snakeCase("Broker ID")) } func TestGather(t *testing.T) { @@ -64,7 +116,7 @@ func TestGather(t *testing.T) { var acc testutil.Accumulator c.client = &mockGatherCloudWatchClient{} - acc.GatherError(c.Gather) + assert.NoError(t, acc.GatherError(c.Gather)) fields := map[string]interface{}{} fields["latency_minimum"] = 0.1 @@ -74,13 +126,11 @@ func TestGather(t *testing.T) { fields["latency_sample_count"] = 100.0 tags := map[string]string{} - tags["unit"] = "seconds" tags["region"] = "us-east-1" tags["load_balancer_name"] = "p-example" assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) - } type mockSelectMetricsCloudWatchClient struct{} @@ -132,7 +182,7 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(params *cloudwatch.ListM return result, nil } -func (m *mockSelectMetricsCloudWatchClient) GetMetricStatistics(params *cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) { +func (m *mockSelectMetricsCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) { return nil, nil } @@ -164,10 +214,10 @@ func TestSelectMetrics(t *testing.T) { }, } c.client = &mockSelectMetricsCloudWatchClient{} - metrics, err := SelectMetrics(c) + filtered, err := getFilteredMetrics(c) // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 // AZs. We should get 12 metrics. - assert.Equal(t, 12, len(metrics)) + assert.Equal(t, 12, len(filtered[0].metrics)) assert.Nil(t, err) } @@ -199,25 +249,66 @@ func TestGenerateStatisticsInputParams(t *testing.T) { c.updateWindow(now) - params := c.getStatisticsInput(m) + statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil) + queries, _ := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) + params := c.getDataInputs(queries) assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration)) assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration)) - assert.Len(t, params.Dimensions, 1) - assert.Len(t, params.Statistics, 5) - assert.EqualValues(t, *params.Period, 60) + require.Len(t, params.MetricDataQueries, 5) + assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) +} + +func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { + d := &cloudwatch.Dimension{ + Name: aws.String("LoadBalancerName"), + Value: aws.String("p-example"), + } + + m := &cloudwatch.Metric{ + MetricName: aws.String("Latency"), + Dimensions: []*cloudwatch.Dimension{d}, + } + + duration, _ := time.ParseDuration("1m") + internalDuration := internal.Duration{ + Duration: duration, + } + + c := &CloudWatch{ + Namespace: "AWS/ELB", + Delay: internalDuration, + Period: internalDuration, + } + + c.initializeCloudWatch() + + now := time.Now() + + c.updateWindow(now) + + statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) + queries, _ := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}}) + params := c.getDataInputs(queries) + + assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration)) + assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration)) + require.Len(t, params.MetricDataQueries, 2) + assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60) } func TestMetricsCacheTimeout(t *testing.T) { - cache := &MetricCache{ - Metrics: []*cloudwatch.Metric{}, - Fetched: time.Now(), - TTL: time.Minute, + cache := &metricCache{ + metrics: []filteredMetric{}, + built: time.Now(), + ttl: time.Minute, } - assert.True(t, cache.IsValid()) - cache.Fetched = time.Now().Add(-time.Minute) - assert.False(t, cache.IsValid()) + assert.True(t, cache.isValid()) + cache.built = time.Now().Add(-time.Minute) + assert.False(t, cache.isValid()) } func TestUpdateWindow(t *testing.T) {