Align metrics window to interval in cloudwatch input (#4667)

This commit is contained in:
Jon McKenzie 2018-09-11 17:59:39 -04:00 committed by Daniel Nelson
parent 51bb937fdd
commit 03a119e322
2 changed files with 64 additions and 9 deletions

View File

@ -36,6 +36,8 @@ type (
RateLimit int `toml:"ratelimit"` RateLimit int `toml:"ratelimit"`
client cloudwatchClient client cloudwatchClient
metricCache *MetricCache metricCache *MetricCache
windowStart time.Time
windowEnd time.Time
} }
Metric struct { Metric struct {
@ -197,6 +199,11 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
now := time.Now() now := time.Now()
err = c.updateWindow(now)
if err != nil {
return err
}
// limit concurrency or we can easily exhaust user connection limit // limit concurrency or we can easily exhaust user connection limit
// see cloudwatch API request limits: // see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
@ -208,7 +215,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
<-lmtr.C <-lmtr.C
go func(inm *cloudwatch.Metric) { go func(inm *cloudwatch.Metric) {
defer wg.Done() defer wg.Done()
acc.AddError(c.gatherMetric(acc, inm, now)) acc.AddError(c.gatherMetric(acc, inm))
}(m) }(m)
} }
wg.Wait() wg.Wait()
@ -216,6 +223,22 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (c *CloudWatch) updateWindow(relativeTo time.Time) error {
windowEnd := relativeTo.Add(-c.Delay.Duration)
if c.windowEnd.IsZero() {
// this is the first run, no window info, so just get a single period
c.windowStart = windowEnd.Add(-c.Period.Duration)
} else {
// subsequent window, start where last window left off
c.windowStart = c.windowEnd
}
c.windowEnd = windowEnd
return nil
}
func init() { func init() {
inputs.Add("cloudwatch", func() telegraf.Input { inputs.Add("cloudwatch", func() telegraf.Input {
ttl, _ := time.ParseDuration("1hr") ttl, _ := time.ParseDuration("1hr")
@ -291,9 +314,8 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
func (c *CloudWatch) gatherMetric( func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator, acc telegraf.Accumulator,
metric *cloudwatch.Metric, metric *cloudwatch.Metric,
now time.Time,
) error { ) error {
params := c.getStatisticsInput(metric, now) params := c.getStatisticsInput(metric)
resp, err := c.client.GetMetricStatistics(params) resp, err := c.client.GetMetricStatistics(params)
if err != nil { if err != nil {
return err return err
@ -356,12 +378,10 @@ func snakeCase(s string) string {
/* /*
* Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe * Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe
*/ */
func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric, now time.Time) *cloudwatch.GetMetricStatisticsInput { func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric) *cloudwatch.GetMetricStatisticsInput {
end := now.Add(-c.Delay.Duration)
input := &cloudwatch.GetMetricStatisticsInput{ input := &cloudwatch.GetMetricStatisticsInput{
StartTime: aws.Time(end.Add(-c.Period.Duration)), StartTime: aws.Time(c.windowStart),
EndTime: aws.Time(end), EndTime: aws.Time(c.windowEnd),
MetricName: metric.MetricName, MetricName: metric.MetricName,
Namespace: metric.Namespace, Namespace: metric.Namespace,
Period: aws.Int64(int64(c.Period.Duration.Seconds())), Period: aws.Int64(int64(c.Period.Duration.Seconds())),

View File

@ -197,7 +197,9 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
now := time.Now() now := time.Now()
params := c.getStatisticsInput(m, now) c.updateWindow(now)
params := c.getStatisticsInput(m)
assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration)) assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration))
assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration)) assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration))
@ -217,3 +219,36 @@ func TestMetricsCacheTimeout(t *testing.T) {
cache.Fetched = time.Now().Add(-time.Minute) cache.Fetched = time.Now().Add(-time.Minute)
assert.False(t, cache.IsValid()) assert.False(t, cache.IsValid())
} }
func TestUpdateWindow(t *testing.T) {
duration, _ := time.ParseDuration("1m")
internalDuration := internal.Duration{
Duration: duration,
}
c := &CloudWatch{
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
}
now := time.Now()
assert.True(t, c.windowEnd.IsZero())
assert.True(t, c.windowStart.IsZero())
c.updateWindow(now)
newStartTime := c.windowEnd
// initial window just has a single period
assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration))
assert.EqualValues(t, c.windowStart, now.Add(-c.Delay.Duration).Add(-c.Period.Duration))
now = time.Now()
c.updateWindow(now)
// subsequent window uses previous end time as start time
assert.EqualValues(t, c.windowEnd, now.Add(-c.Delay.Duration))
assert.EqualValues(t, c.windowStart, newStartTime)
}