From 03a119e322e260568ec86dcd420f9c285756e4bd Mon Sep 17 00:00:00 2001 From: Jon McKenzie Date: Tue, 11 Sep 2018 17:59:39 -0400 Subject: [PATCH] Align metrics window to interval in cloudwatch input (#4667) --- plugins/inputs/cloudwatch/cloudwatch.go | 36 ++++++++++++++----- plugins/inputs/cloudwatch/cloudwatch_test.go | 37 +++++++++++++++++++- 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 9ba15b6ac..626511e2f 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -36,6 +36,8 @@ type ( RateLimit int `toml:"ratelimit"` client cloudwatchClient metricCache *MetricCache + windowStart time.Time + windowEnd time.Time } Metric struct { @@ -197,6 +199,11 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { now := time.Now() + err = c.updateWindow(now) + if err != nil { + return err + } + // limit concurrency or we can easily exhaust user connection limit // see cloudwatch API request limits: // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html @@ -208,7 +215,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { <-lmtr.C go func(inm *cloudwatch.Metric) { defer wg.Done() - acc.AddError(c.gatherMetric(acc, inm, now)) + acc.AddError(c.gatherMetric(acc, inm)) }(m) } wg.Wait() @@ -216,6 +223,22 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { return nil } +func (c *CloudWatch) updateWindow(relativeTo time.Time) error { + windowEnd := relativeTo.Add(-c.Delay.Duration) + + if c.windowEnd.IsZero() { + // this is the first run, no window info, so just get a single period + c.windowStart = windowEnd.Add(-c.Period.Duration) + } else { + // subsequent window, start where last window left off + c.windowStart = c.windowEnd + } + + c.windowEnd = windowEnd + + return nil +} + func init() { inputs.Add("cloudwatch", func() telegraf.Input { ttl, _ := time.ParseDuration("1hr") @@ -291,9 +314,8 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) { func (c *CloudWatch) gatherMetric( acc telegraf.Accumulator, metric *cloudwatch.Metric, - now time.Time, ) error { - params := c.getStatisticsInput(metric, now) + params := c.getStatisticsInput(metric) resp, err := c.client.GetMetricStatistics(params) if err != nil { return err @@ -356,12 +378,10 @@ func snakeCase(s string) string { /* * Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe */ -func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric, now time.Time) *cloudwatch.GetMetricStatisticsInput { - end := now.Add(-c.Delay.Duration) - +func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric) *cloudwatch.GetMetricStatisticsInput { input := &cloudwatch.GetMetricStatisticsInput{ - StartTime: aws.Time(end.Add(-c.Period.Duration)), - EndTime: aws.Time(end), + StartTime: aws.Time(c.windowStart), + EndTime: aws.Time(c.windowEnd), MetricName: metric.MetricName, Namespace: metric.Namespace, Period: aws.Int64(int64(c.Period.Duration.Seconds())), diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index c52b3a353..57c92b3f6 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -197,7 +197,9 @@ func TestGenerateStatisticsInputParams(t *testing.T) { now := time.Now() - params := c.getStatisticsInput(m, now) + c.updateWindow(now) + + params := c.getStatisticsInput(m) assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration)) assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration)) @@ -217,3 +219,36 @@ func TestMetricsCacheTimeout(t *testing.T) { cache.Fetched = time.Now().Add(-time.Minute) assert.False(t, cache.IsValid()) } + +func TestUpdateWindow(t *testing.T) { + duration, _ := time.ParseDuration("1m") + internalDuration := internal.Duration{ + Duration: duration, + } + + c := &CloudWatch{ + Namespace: "AWS/ELB", + Delay: internalDuration, + Period: internalDuration, + } + + now := time.Now() + + assert.True(t, c.windowEnd.IsZero()) + assert.True(t, c.windowStart.IsZero()) + + c.updateWindow(now) + + newStartTime := c.windowEnd + + // initial window just has a single period + assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration)) + assert.EqualValues(t, c.windowStart, now.Add(-c.Delay.Duration).Add(-c.Period.Duration)) + + now = time.Now() + c.updateWindow(now) + + // subsequent window uses previous end time as start time + assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration)) + assert.EqualValues(t, c.windowStart, newStartTime) +}