Collect cloudwatch stats using GetMetricData (#5544)

This commit is contained in:
Greg 2019-04-22 18:36:46 -06:00 committed by Daniel Nelson
parent 58a6209a76
commit e334830458
3 changed files with 582 additions and 299 deletions

View File

@ -17,7 +17,7 @@ API endpoint. In the following order the plugin will attempt to authenticate.
```toml ```toml
[[inputs.cloudwatch]] [[inputs.cloudwatch]]
## Amazon Region (required) ## Amazon Region
region = "us-east-1" region = "us-east-1"
## Amazon Credentials ## Amazon Credentials
@ -28,12 +28,12 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## 4) environment variables ## 4) environment variables
## 5) shared credentials file ## 5) shared credentials file
## 6) EC2 Instance Profile ## 6) EC2 Instance Profile
#access_key = "" # access_key = ""
#secret_key = "" # secret_key = ""
#token = "" # token = ""
#role_arn = "" # role_arn = ""
#profile = "" # profile = ""
#shared_credential_file = "" # shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically ## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the ## determined and this option should only be set if you wish to override the
@ -54,32 +54,43 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## Collection Delay (required - must account for metrics availability via CloudWatch API) ## Collection Delay (required - must account for metrics availability via CloudWatch API)
delay = "5m" delay = "5m"
## Override global run interval (optional - defaults to global interval) ## Recommended: use metric 'interval' that is a multiple of 'period' to avoid
## Recomended: use metric 'interval' that is a multiple of 'period' to avoid
## gaps or overlap in pulled data ## gaps or overlap in pulled data
interval = "5m" interval = "5m"
## Configure the TTL for the internal cache of metrics.
# cache_ttl = "1h"
## Metric Statistic Namespace (required) ## Metric Statistic Namespace (required)
namespace = "AWS/ELB" namespace = "AWS/ELB"
## Maximum requests per second. Note that the global default AWS rate limit is ## Maximum requests per second. Note that the global default AWS rate limit is
## 400 reqs/sec, so if you define multiple namespaces, these should add up to a ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a
## maximum of 400. Optional - default value is 200. ## maximum of 50.
## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html ## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html
ratelimit = 200 # ratelimit = 25
## Metrics to Pull (optional) ## Namespace-wide statistic filters. These allow fewer queries to be made to
## cloudwatch.
# statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ]
# statistic_exclude = []
## Metrics to Pull
## Defaults to all Metrics in Namespace if nothing is provided ## Defaults to all Metrics in Namespace if nothing is provided
## Refreshes Namespace available metrics every 1h ## Refreshes Namespace available metrics every 1h
[[inputs.cloudwatch.metrics]] #[[inputs.cloudwatch.metrics]]
names = ["Latency", "RequestCount"] # names = ["Latency", "RequestCount"]
#
## Dimension filters for Metric. These are optional however all dimensions # ## Statistic filters for Metric. These allow for retrieving specific
## defined for the metric names must be specified in order to retrieve # ## statistics for an individual metric.
## the metric statistics. # # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ]
[[inputs.cloudwatch.metrics.dimensions]] # # statistic_exclude = []
name = "LoadBalancerName" #
value = "p-example" # ## Dimension filters for Metric. All dimensions defined for the metric names
# ## must be specified in order to retrieve the metric statistics.
# [[inputs.cloudwatch.metrics.dimensions]]
# name = "LoadBalancerName"
# value = "p-example"
``` ```
#### Requirements and Terminology #### Requirements and Terminology
@ -97,7 +108,11 @@ wildcard dimension is ignored.
Example: Example:
``` ```
[[inputs.cloudwatch.metrics]] [[inputs.cloudwatch]]
period = "1m"
interval = "5m"
[[inputs.cloudwatch.metrics]]
names = ["Latency"] names = ["Latency"]
## Dimension filters for Metric (optional) ## Dimension filters for Metric (optional)
@ -124,9 +139,11 @@ Then 2 metrics will be output:
If the `AvailabilityZone` wildcard dimension was omitted, then a single metric (name: `p-example`) If the `AvailabilityZone` wildcard dimension was omitted, then a single metric (name: `p-example`)
would be exported containing the aggregate values of the ELB across availability zones. would be exported containing the aggregate values of the ELB across availability zones.
To maximize efficiency and savings, consider making fewer requests by increasing `interval` but keeping `period` at the duration you would like metrics to be reported. The above example will request metrics from Cloudwatch every 5 minutes but will output five metrics timestamped one minute apart.
#### Restrictions and Limitations #### Restrictions and Limitations
- CloudWatch metrics are not available instantly via the CloudWatch API. You should adjust your collection `delay` to account for this lag in metrics availability based on your [monitoring subscription level](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html) - CloudWatch metrics are not available instantly via the CloudWatch API. You should adjust your collection `delay` to account for this lag in metrics availability based on your [monitoring subscription level](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html)
- CloudWatch API usage incurs cost - see [GetMetricStatistics Pricing](https://aws.amazon.com/cloudwatch/pricing/) - CloudWatch API usage incurs cost - see [GetMetricData Pricing](https://aws.amazon.com/cloudwatch/pricing/)
### Measurements & Fields: ### Measurements & Fields:
@ -147,7 +164,6 @@ Tag Dimension names are represented in [snake case](https://en.wikipedia.org/wik
- All measurements have the following tags: - All measurements have the following tags:
- region (CloudWatch Region) - region (CloudWatch Region)
- unit (CloudWatch Metric Unit)
- {dimension-name} (Cloudwatch Dimension value - one for each metric dimension) - {dimension-name} (Cloudwatch Dimension value - one for each metric dimension)
### Troubleshooting: ### Troubleshooting:
@ -168,5 +184,5 @@ aws cloudwatch get-metric-statistics --namespace AWS/EC2 --region us-east-1 --pe
``` ```
$ ./telegraf --config telegraf.conf --input-filter cloudwatch --test $ ./telegraf --config telegraf.conf --input-filter cloudwatch --test
> cloudwatch_aws_elb,load_balancer_name=p-example,region=us-east-1,unit=seconds latency_average=0.004810798017284538,latency_maximum=0.1100282669067383,latency_minimum=0.0006084442138671875,latency_sample_count=4029,latency_sum=19.382705211639404 1459542420000000000 > cloudwatch_aws_elb,load_balancer_name=p-example,region=us-east-1 latency_average=0.004810798017284538,latency_maximum=0.1100282669067383,latency_minimum=0.0006084442138671875,latency_sample_count=4029,latency_sum=19.382705211639404 1459542420000000000
``` ```

View File

@ -1,32 +1,38 @@
package cloudwatch package cloudwatch
import ( import (
"errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws" internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
type ( type (
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
CloudWatch struct { CloudWatch struct {
Region string `toml:"region"` Region string `toml:"region"`
AccessKey string `toml:"access_key"` AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"` SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"` RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"` Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"` CredentialPath string `toml:"shared_credential_file"`
Token string `toml:"token"` Token string `toml:"token"`
EndpointURL string `toml:"endpoint_url"` EndpointURL string `toml:"endpoint_url"`
StatisticExclude []string `toml:"statistic_exclude"`
StatisticInclude []string `toml:"statistic_include"`
Period internal.Duration `toml:"period"` Period internal.Duration `toml:"period"`
Delay internal.Duration `toml:"delay"` Delay internal.Duration `toml:"delay"`
@ -34,34 +40,44 @@ type (
Metrics []*Metric `toml:"metrics"` Metrics []*Metric `toml:"metrics"`
CacheTTL internal.Duration `toml:"cache_ttl"` CacheTTL internal.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"` RateLimit int `toml:"ratelimit"`
client cloudwatchClient client cloudwatchClient
metricCache *MetricCache statFilter filter.Filter
metricCache *metricCache
queryDimensions map[string]*map[string]string
windowStart time.Time windowStart time.Time
windowEnd time.Time windowEnd time.Time
} }
// Metric defines a simplified Cloudwatch metric.
Metric struct { Metric struct {
StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"` MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"` Dimensions []*Dimension `toml:"dimensions"`
} }
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
Dimension struct { Dimension struct {
Name string `toml:"name"` Name string `toml:"name"`
Value string `toml:"value"` Value string `toml:"value"`
} }
MetricCache struct { // metricCache caches metrics, their filters, and generated queries.
TTL time.Duration metricCache struct {
Fetched time.Time ttl time.Duration
Metrics []*cloudwatch.Metric built time.Time
metrics []filteredMetric
queries []*cloudwatch.MetricDataQuery
} }
cloudwatchClient interface { cloudwatchClient interface {
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
GetMetricStatistics(*cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
} }
) )
// SampleConfig returns the default configuration of the Cloudwatch input plugin.
func (c *CloudWatch) SampleConfig() string { func (c *CloudWatch) SampleConfig() string {
return ` return `
## Amazon Region ## Amazon Region
@ -75,12 +91,12 @@ func (c *CloudWatch) SampleConfig() string {
## 4) environment variables ## 4) environment variables
## 5) shared credentials file ## 5) shared credentials file
## 6) EC2 Instance Profile ## 6) EC2 Instance Profile
#access_key = "" # access_key = ""
#secret_key = "" # secret_key = ""
#token = "" # token = ""
#role_arn = "" # role_arn = ""
#profile = "" # profile = ""
#shared_credential_file = "" # shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically ## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the ## determined and this option should only be set if you wish to override the
@ -106,44 +122,155 @@ func (c *CloudWatch) SampleConfig() string {
interval = "5m" interval = "5m"
## Configure the TTL for the internal cache of metrics. ## Configure the TTL for the internal cache of metrics.
## Defaults to 1 hr if not specified # cache_ttl = "1h"
#cache_ttl = "10m"
## Metric Statistic Namespace (required) ## Metric Statistic Namespace (required)
namespace = "AWS/ELB" namespace = "AWS/ELB"
## Maximum requests per second. Note that the global default AWS rate limit is ## Maximum requests per second. Note that the global default AWS rate limit is
## 400 reqs/sec, so if you define multiple namespaces, these should add up to a ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a
## maximum of 400. Optional - default value is 200. ## maximum of 50.
## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html ## See http://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html
ratelimit = 200 # ratelimit = 25
## Metrics to Pull (optional) ## Namespace-wide statistic filters. These allow fewer queries to be made to
## cloudwatch.
# statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ]
# statistic_exclude = []
## Metrics to Pull
## Defaults to all Metrics in Namespace if nothing is provided ## Defaults to all Metrics in Namespace if nothing is provided
## Refreshes Namespace available metrics every 1h ## Refreshes Namespace available metrics every 1h
#[[inputs.cloudwatch.metrics]] #[[inputs.cloudwatch.metrics]]
# names = ["Latency", "RequestCount"] # names = ["Latency", "RequestCount"]
# #
# ## Dimension filters for Metric. These are optional however all dimensions # ## Statistic filters for Metric. These allow for retrieving specific
# ## defined for the metric names must be specified in order to retrieve # ## statistics for an individual metric.
# ## the metric statistics. # # statistic_include = [ "average", "sum", minimum", "maximum", sample_count" ]
# # statistic_exclude = []
#
# ## Dimension filters for Metric. All dimensions defined for the metric names
# ## must be specified in order to retrieve the metric statistics.
# [[inputs.cloudwatch.metrics.dimensions]] # [[inputs.cloudwatch.metrics.dimensions]]
# name = "LoadBalancerName" # name = "LoadBalancerName"
# value = "p-example" # value = "p-example"
` `
} }
// Description returns a one-sentence description on the Cloudwatch input plugin.
func (c *CloudWatch) Description() string { func (c *CloudWatch) Description() string {
return "Pull Metric Statistics from Amazon CloudWatch" return "Pull Metric Statistics from Amazon CloudWatch"
} }
func SelectMetrics(c *CloudWatch) ([]*cloudwatch.Metric, error) { // Gather takes in an accumulator and adds the metrics that the Input
var metrics []*cloudwatch.Metric // gathers. This is called every "interval".
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
if c.statFilter == nil {
var err error
// Set config level filter (won't change throughout life of plugin).
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
if err != nil {
return err
}
}
if c.client == nil {
c.initializeCloudWatch()
}
filteredMetrics, err := getFilteredMetrics(c)
if err != nil {
return err
}
err = c.updateWindow(time.Now())
if err != nil {
return err
}
// Get all of the possible queries so we can send groups of 100.
queries, err := c.getDataQueries(filteredMetrics)
if err != nil {
return err
}
// Limit concurrency or we can easily exhaust user connection limit.
// See cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(c.RateLimit, time.Second)
defer lmtr.Stop()
wg := sync.WaitGroup{}
rLock := sync.Mutex{}
results := []*cloudwatch.MetricDataResult{}
// 100 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 100
var batches [][]*cloudwatch.MetricDataQuery
for batchSize < len(queries) {
queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize])
}
batches = append(batches, queries)
for i := range batches {
wg.Add(1)
<-lmtr.C
go func(inm []*cloudwatch.MetricDataQuery) {
defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil {
acc.AddError(err)
return
}
rLock.Lock()
results = append(results, result...)
rLock.Unlock()
}(batches[i])
}
wg.Wait()
return c.aggregateMetrics(acc, results)
}
func (c *CloudWatch) initializeCloudWatch() error {
credentialConfig := &internalaws.CredentialConfig{
Region: c.Region,
AccessKey: c.AccessKey,
SecretKey: c.SecretKey,
RoleARN: c.RoleARN,
Profile: c.Profile,
Filename: c.CredentialPath,
Token: c.Token,
EndpointURL: c.EndpointURL,
}
configProvider := credentialConfig.Credentials()
cfg := &aws.Config{}
loglevel := aws.LogOff
c.client = cloudwatch.New(configProvider, cfg.WithLogLevel(loglevel))
return nil
}
type filteredMetric struct {
metrics []*cloudwatch.Metric
statFilter filter.Filter
}
// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch.
func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
if c.metricCache != nil && c.metricCache.isValid() {
return c.metricCache.metrics, nil
}
fMetrics := []filteredMetric{}
// check for provided metric filter // check for provided metric filter
if c.Metrics != nil { if c.Metrics != nil {
metrics = []*cloudwatch.Metric{}
for _, m := range c.Metrics { for _, m := range c.Metrics {
metrics := []*cloudwatch.Metric{}
if !hasWilcard(m.Dimensions) { if !hasWilcard(m.Dimensions) {
dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions)) dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions))
for k, d := range m.Dimensions { for k, d := range m.Dimensions {
@ -176,51 +303,71 @@ func SelectMetrics(c *CloudWatch) ([]*cloudwatch.Metric, error) {
} }
} }
} }
if m.StatisticExclude == nil {
m.StatisticExclude = &c.StatisticExclude
} }
} else { if m.StatisticInclude == nil {
var err error m.StatisticInclude = &c.StatisticInclude
metrics, err = c.fetchNamespaceMetrics() }
statFilter, err := filter.NewIncludeExcludeFilter(*m.StatisticInclude, *m.StatisticExclude)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fMetrics = append(fMetrics, filteredMetric{
metrics: metrics,
statFilter: statFilter,
})
} }
return metrics, nil } else {
metrics, err := c.fetchNamespaceMetrics()
if err != nil {
return nil, err
}
fMetrics = []filteredMetric{{
metrics: metrics,
statFilter: c.statFilter,
}}
}
c.metricCache = &metricCache{
metrics: fMetrics,
built: time.Now(),
ttl: c.CacheTTL.Duration,
}
return fMetrics, nil
} }
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { // fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace.
if c.client == nil { func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
c.initializeCloudWatch() metrics := []*cloudwatch.Metric{}
var token *string
params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(c.Namespace),
Dimensions: []*cloudwatch.DimensionFilter{},
NextToken: token,
MetricName: nil,
} }
metrics, err := SelectMetrics(c) for {
resp, err := c.client.ListMetrics(params)
if err != nil { if err != nil {
return err return nil, err
} }
now := time.Now() metrics = append(metrics, resp.Metrics...)
if resp.NextToken == nil {
err = c.updateWindow(now) break
if err != nil {
return err
} }
// limit concurrency or we can easily exhaust user connection limit params.NextToken = resp.NextToken
// see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(c.RateLimit, time.Second)
defer lmtr.Stop()
var wg sync.WaitGroup
wg.Add(len(metrics))
for _, m := range metrics {
<-lmtr.C
go func(inm *cloudwatch.Metric) {
defer wg.Done()
acc.AddError(c.gatherMetric(acc, inm))
}(m)
} }
wg.Wait()
return nil return metrics, nil
} }
func (c *CloudWatch) updateWindow(relativeTo time.Time) error { func (c *CloudWatch) updateWindow(relativeTo time.Time) error {
@ -239,168 +386,197 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) error {
return nil return nil
} }
// getDataQueries gets all of the possible queries so we can maximize the request payload.
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudwatch.MetricDataQuery, error) {
if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() {
return c.metricCache.queries, nil
}
c.queryDimensions = map[string]*map[string]string{}
dataQueries := []*cloudwatch.MetricDataQuery{}
for i, filtered := range filteredMetrics {
for j, metric := range filtered.metrics {
id := strconv.Itoa(j) + "_" + strconv.Itoa(i)
dimension := ctod(metric.Dimensions)
if filtered.statFilter.Match("average") {
c.queryDimensions["average_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{
Id: aws.String("average_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Stat: aws.String(cloudwatch.StatisticAverage),
},
})
}
if filtered.statFilter.Match("maximum") {
c.queryDimensions["maximum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{
Id: aws.String("maximum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Stat: aws.String(cloudwatch.StatisticMaximum),
},
})
}
if filtered.statFilter.Match("minimum") {
c.queryDimensions["minimum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{
Id: aws.String("minimum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Stat: aws.String(cloudwatch.StatisticMinimum),
},
})
}
if filtered.statFilter.Match("sum") {
c.queryDimensions["sum_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{
Id: aws.String("sum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Stat: aws.String(cloudwatch.StatisticSum),
},
})
}
if filtered.statFilter.Match("sample_count") {
c.queryDimensions["sample_count_"+id] = dimension
dataQueries = append(dataQueries, &cloudwatch.MetricDataQuery{
Id: aws.String("sample_count_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cloudwatch.MetricStat{
Metric: metric,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Stat: aws.String(cloudwatch.StatisticSampleCount),
},
})
}
}
}
if len(dataQueries) == 0 {
return nil, errors.New("no metrics found to collect")
}
if c.metricCache == nil {
c.metricCache = &metricCache{
queries: dataQueries,
built: time.Now(),
ttl: c.CacheTTL.Duration,
}
} else {
c.metricCache.queries = dataQueries
}
return dataQueries, nil
}
// gatherMetrics gets metric data from Cloudwatch.
func (c *CloudWatch) gatherMetrics(
params *cloudwatch.GetMetricDataInput,
) ([]*cloudwatch.MetricDataResult, error) {
results := []*cloudwatch.MetricDataResult{}
for {
resp, err := c.client.GetMetricData(params)
if err != nil {
return nil, fmt.Errorf("failed to get metric data: %v", err)
}
results = append(results, resp.MetricDataResults...)
if resp.NextToken == nil {
break
}
params.NextToken = resp.NextToken
}
return results, nil
}
func (c *CloudWatch) aggregateMetrics(
acc telegraf.Accumulator,
metricDataResults []*cloudwatch.MetricDataResult,
) error {
var (
grouper = metric.NewSeriesGrouper()
namespace = sanitizeMeasurement(c.Namespace)
)
for _, result := range metricDataResults {
tags := map[string]string{}
if dimensions, ok := c.queryDimensions[*result.Id]; ok {
tags = *dimensions
}
tags["region"] = c.Region
for i := range result.Values {
grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i])
}
}
for _, metric := range grouper.Metrics() {
acc.AddMetric(metric)
}
return nil
}
func init() { func init() {
inputs.Add("cloudwatch", func() telegraf.Input { inputs.Add("cloudwatch", func() telegraf.Input {
ttl, _ := time.ParseDuration("1hr")
return &CloudWatch{ return &CloudWatch{
CacheTTL: internal.Duration{Duration: ttl}, CacheTTL: internal.Duration{Duration: time.Hour},
RateLimit: 200, RateLimit: 25,
} }
}) })
} }
/* func sanitizeMeasurement(namespace string) string {
* Initialize CloudWatch client
*/
func (c *CloudWatch) initializeCloudWatch() error {
credentialConfig := &internalaws.CredentialConfig{
Region: c.Region,
AccessKey: c.AccessKey,
SecretKey: c.SecretKey,
RoleARN: c.RoleARN,
Profile: c.Profile,
Filename: c.Filename,
Token: c.Token,
EndpointURL: c.EndpointURL,
}
configProvider := credentialConfig.Credentials()
c.client = cloudwatch.New(configProvider)
return nil
}
/*
* Fetch available metrics for given CloudWatch Namespace
*/
func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
if c.metricCache != nil && c.metricCache.IsValid() {
return c.metricCache.Metrics, nil
}
metrics := []*cloudwatch.Metric{}
var token *string
for more := true; more; {
params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(c.Namespace),
Dimensions: []*cloudwatch.DimensionFilter{},
NextToken: token,
MetricName: nil,
}
resp, err := c.client.ListMetrics(params)
if err != nil {
return nil, err
}
metrics = append(metrics, resp.Metrics...)
token = resp.NextToken
more = token != nil
}
c.metricCache = &MetricCache{
Metrics: metrics,
Fetched: time.Now(),
TTL: c.CacheTTL.Duration,
}
return metrics, nil
}
/*
* Gather given Metric and emit any error
*/
func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator,
metric *cloudwatch.Metric,
) error {
params := c.getStatisticsInput(metric)
resp, err := c.client.GetMetricStatistics(params)
if err != nil {
return err
}
for _, point := range resp.Datapoints {
tags := map[string]string{
"region": c.Region,
"unit": snakeCase(*point.Unit),
}
for _, d := range metric.Dimensions {
tags[snakeCase(*d.Name)] = *d.Value
}
// record field for each statistic
fields := map[string]interface{}{}
if point.Average != nil {
fields[formatField(*metric.MetricName, cloudwatch.StatisticAverage)] = *point.Average
}
if point.Maximum != nil {
fields[formatField(*metric.MetricName, cloudwatch.StatisticMaximum)] = *point.Maximum
}
if point.Minimum != nil {
fields[formatField(*metric.MetricName, cloudwatch.StatisticMinimum)] = *point.Minimum
}
if point.SampleCount != nil {
fields[formatField(*metric.MetricName, cloudwatch.StatisticSampleCount)] = *point.SampleCount
}
if point.Sum != nil {
fields[formatField(*metric.MetricName, cloudwatch.StatisticSum)] = *point.Sum
}
acc.AddFields(formatMeasurement(c.Namespace), fields, tags, *point.Timestamp)
}
return nil
}
/*
* Formatting helpers
*/
func formatField(metricName string, statistic string) string {
return fmt.Sprintf("%s_%s", snakeCase(metricName), snakeCase(statistic))
}
func formatMeasurement(namespace string) string {
namespace = strings.Replace(namespace, "/", "_", -1) namespace = strings.Replace(namespace, "/", "_", -1)
namespace = snakeCase(namespace) namespace = snakeCase(namespace)
return fmt.Sprintf("cloudwatch_%s", namespace) return "cloudwatch_" + namespace
} }
func snakeCase(s string) string { func snakeCase(s string) string {
s = internal.SnakeCase(s) s = internal.SnakeCase(s)
s = strings.Replace(s, " ", "_", -1)
s = strings.Replace(s, "__", "_", -1) s = strings.Replace(s, "__", "_", -1)
return s return s
} }
/* type dimension struct {
* Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe name string
*/ value string
func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric) *cloudwatch.GetMetricStatisticsInput {
input := &cloudwatch.GetMetricStatisticsInput{
StartTime: aws.Time(c.windowStart),
EndTime: aws.Time(c.windowEnd),
MetricName: metric.MetricName,
Namespace: metric.Namespace,
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
Dimensions: metric.Dimensions,
Statistics: []*string{
aws.String(cloudwatch.StatisticAverage),
aws.String(cloudwatch.StatisticMaximum),
aws.String(cloudwatch.StatisticMinimum),
aws.String(cloudwatch.StatisticSum),
aws.String(cloudwatch.StatisticSampleCount)},
}
return input
} }
/* // ctod converts cloudwatch dimensions to regular dimensions.
* Check Metric Cache validity func ctod(cDimensions []*cloudwatch.Dimension) *map[string]string {
*/ dimensions := map[string]string{}
func (c *MetricCache) IsValid() bool { for i := range cDimensions {
return c.Metrics != nil && time.Since(c.Fetched) < c.TTL dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value
}
return &dimensions
}
func (c *CloudWatch) getDataInputs(dataQueries []*cloudwatch.MetricDataQuery) *cloudwatch.GetMetricDataInput {
return &cloudwatch.GetMetricDataInput{
StartTime: aws.Time(c.windowStart),
EndTime: aws.Time(c.windowEnd),
MetricDataQueries: dataQueries,
}
}
// isValid checks the validity of the metric cache.
func (f *metricCache) isValid() bool {
return f.metrics != nil && time.Since(f.built) < f.ttl
} }
func hasWilcard(dimensions []*Dimension) bool { func hasWilcard(dimensions []*Dimension) bool {

View File

@ -6,15 +6,20 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
type mockGatherCloudWatchClient struct{} type mockGatherCloudWatchClient struct{}
func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) { func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) {
metric := &cloudwatch.Metric{ return &cloudwatch.ListMetricsOutput{
Metrics: []*cloudwatch.Metric{
{
Namespace: params.Namespace, Namespace: params.Namespace,
MetricName: aws.String("Latency"), MetricName: aws.String("Latency"),
Dimensions: []*cloudwatch.Dimension{ Dimensions: []*cloudwatch.Dimension{
@ -23,29 +28,76 @@ func (m *mockGatherCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsI
Value: aws.String("p-example"), Value: aws.String("p-example"),
}, },
}, },
} },
},
result := &cloudwatch.ListMetricsOutput{ }, nil
Metrics: []*cloudwatch.Metric{metric},
}
return result, nil
} }
func (m *mockGatherCloudWatchClient) GetMetricStatistics(params *cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) { func (m *mockGatherCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) {
dataPoint := &cloudwatch.Datapoint{ return &cloudwatch.GetMetricDataOutput{
Timestamp: params.EndTime, MetricDataResults: []*cloudwatch.MetricDataResult{
Minimum: aws.Float64(0.1), {
Maximum: aws.Float64(0.3), Id: aws.String("minimum_0_0"),
Average: aws.Float64(0.2), Label: aws.String("latency_minimum"),
Sum: aws.Float64(123), StatusCode: aws.String("completed"),
SampleCount: aws.Float64(100), Timestamps: []*time.Time{
Unit: aws.String("Seconds"), params.EndTime,
} },
result := &cloudwatch.GetMetricStatisticsOutput{ Values: []*float64{
Label: aws.String("Latency"), aws.Float64(0.1),
Datapoints: []*cloudwatch.Datapoint{dataPoint}, },
} },
return result, nil {
Id: aws.String("maximum_0_0"),
Label: aws.String("latency_maximum"),
StatusCode: aws.String("completed"),
Timestamps: []*time.Time{
params.EndTime,
},
Values: []*float64{
aws.Float64(0.3),
},
},
{
Id: aws.String("average_0_0"),
Label: aws.String("latency_average"),
StatusCode: aws.String("completed"),
Timestamps: []*time.Time{
params.EndTime,
},
Values: []*float64{
aws.Float64(0.2),
},
},
{
Id: aws.String("sum_0_0"),
Label: aws.String("latency_sum"),
StatusCode: aws.String("completed"),
Timestamps: []*time.Time{
params.EndTime,
},
Values: []*float64{
aws.Float64(123),
},
},
{
Id: aws.String("sample_count_0_0"),
Label: aws.String("latency_sample_count"),
StatusCode: aws.String("completed"),
Timestamps: []*time.Time{
params.EndTime,
},
Values: []*float64{
aws.Float64(100),
},
},
},
}, nil
}
func TestSnakeCase(t *testing.T) {
assert.Equal(t, "cluster_name", snakeCase("Cluster Name"))
assert.Equal(t, "broker_id", snakeCase("Broker ID"))
} }
func TestGather(t *testing.T) { func TestGather(t *testing.T) {
@ -64,7 +116,7 @@ func TestGather(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
c.client = &mockGatherCloudWatchClient{} c.client = &mockGatherCloudWatchClient{}
acc.GatherError(c.Gather) assert.NoError(t, acc.GatherError(c.Gather))
fields := map[string]interface{}{} fields := map[string]interface{}{}
fields["latency_minimum"] = 0.1 fields["latency_minimum"] = 0.1
@ -74,13 +126,11 @@ func TestGather(t *testing.T) {
fields["latency_sample_count"] = 100.0 fields["latency_sample_count"] = 100.0
tags := map[string]string{} tags := map[string]string{}
tags["unit"] = "seconds"
tags["region"] = "us-east-1" tags["region"] = "us-east-1"
tags["load_balancer_name"] = "p-example" tags["load_balancer_name"] = "p-example"
assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
} }
type mockSelectMetricsCloudWatchClient struct{} type mockSelectMetricsCloudWatchClient struct{}
@ -132,7 +182,7 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(params *cloudwatch.ListM
return result, nil return result, nil
} }
func (m *mockSelectMetricsCloudWatchClient) GetMetricStatistics(params *cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) { func (m *mockSelectMetricsCloudWatchClient) GetMetricData(params *cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error) {
return nil, nil return nil, nil
} }
@ -164,10 +214,10 @@ func TestSelectMetrics(t *testing.T) {
}, },
} }
c.client = &mockSelectMetricsCloudWatchClient{} c.client = &mockSelectMetricsCloudWatchClient{}
metrics, err := SelectMetrics(c) filtered, err := getFilteredMetrics(c)
// We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2
// AZs. We should get 12 metrics. // AZs. We should get 12 metrics.
assert.Equal(t, 12, len(metrics)) assert.Equal(t, 12, len(filtered[0].metrics))
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -199,25 +249,66 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
c.updateWindow(now) c.updateWindow(now)
params := c.getStatisticsInput(m) statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil)
queries, _ := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries)
assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration)) assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration))
assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration)) assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration))
assert.Len(t, params.Dimensions, 1) require.Len(t, params.MetricDataQueries, 5)
assert.Len(t, params.Statistics, 5) assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
assert.EqualValues(t, *params.Period, 60) assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60)
}
func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
d := &cloudwatch.Dimension{
Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example"),
}
m := &cloudwatch.Metric{
MetricName: aws.String("Latency"),
Dimensions: []*cloudwatch.Dimension{d},
}
duration, _ := time.ParseDuration("1m")
internalDuration := internal.Duration{
Duration: duration,
}
c := &CloudWatch{
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
}
c.initializeCloudWatch()
now := time.Now()
c.updateWindow(now)
statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil)
queries, _ := c.getDataQueries([]filteredMetric{{metrics: []*cloudwatch.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries)
assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration))
assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration))
require.Len(t, params.MetricDataQueries, 2)
assert.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
assert.EqualValues(t, *params.MetricDataQueries[0].MetricStat.Period, 60)
} }
func TestMetricsCacheTimeout(t *testing.T) { func TestMetricsCacheTimeout(t *testing.T) {
cache := &MetricCache{ cache := &metricCache{
Metrics: []*cloudwatch.Metric{}, metrics: []filteredMetric{},
Fetched: time.Now(), built: time.Now(),
TTL: time.Minute, ttl: time.Minute,
} }
assert.True(t, cache.IsValid()) assert.True(t, cache.isValid())
cache.Fetched = time.Now().Add(-time.Minute) cache.built = time.Now().Add(-time.Minute)
assert.False(t, cache.IsValid()) assert.False(t, cache.isValid())
} }
func TestUpdateWindow(t *testing.T) { func TestUpdateWindow(t *testing.T) {