diff --git a/CHANGELOG.md b/CHANGELOG.md index da2b41a5f..f81375479 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ time before a new metric is included by the plugin. - [#1275](https://github.com/influxdata/telegraf/pull/1275): Allow wildcard filtering of varnish stats. - [#1142](https://github.com/influxdata/telegraf/pull/1142): Support for glob patterns in exec plugin commands configuration. - [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified +- [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second. ### Bugfixes diff --git a/internal/limiter/limiter.go b/internal/limiter/limiter.go new file mode 100644 index 000000000..c5689751d --- /dev/null +++ b/internal/limiter/limiter.go @@ -0,0 +1,59 @@ +package limiter + +import ( + "sync" + "time" +) + +// NewRateLimiter returns a rate limiter that will will emit from the C +// channel only 'n' times every 'rate' seconds. +func NewRateLimiter(n int, rate time.Duration) *rateLimiter { + r := &rateLimiter{ + C: make(chan bool), + rate: rate, + n: n, + shutdown: make(chan bool), + } + r.wg.Add(1) + go r.limiter() + return r +} + +type rateLimiter struct { + C chan bool + rate time.Duration + n int + + shutdown chan bool + wg sync.WaitGroup +} + +func (r *rateLimiter) Stop() { + close(r.shutdown) + r.wg.Wait() + close(r.C) +} + +func (r *rateLimiter) limiter() { + defer r.wg.Done() + ticker := time.NewTicker(r.rate) + defer ticker.Stop() + counter := 0 + for { + select { + case <-r.shutdown: + return + case <-ticker.C: + counter = 0 + default: + if counter < r.n { + select { + case r.C <- true: + counter++ + case <-r.shutdown: + return + } + } + } + } +} diff --git a/internal/limiter/limiter_test.go b/internal/limiter/limiter_test.go new file mode 100644 index 000000000..83c9d86f1 --- /dev/null +++ b/internal/limiter/limiter_test.go @@ -0,0 +1,54 @@ +package limiter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRateLimiter(t *testing.T) { + r := NewRateLimiter(5, time.Second) + ticker := time.NewTicker(time.Millisecond * 75) + + // test that we can only get 5 receives from the rate limiter + counter := 0 +outer: + for { + select { + case <-r.C: + counter++ + case <-ticker.C: + break outer + } + } + + assert.Equal(t, 5, counter) + r.Stop() + // verify that the Stop function closes the channel. + _, ok := <-r.C + assert.False(t, ok) +} + +func TestRateLimiterMultipleIterations(t *testing.T) { + r := NewRateLimiter(5, time.Millisecond*50) + ticker := time.NewTicker(time.Millisecond * 250) + + // test that we can get 15 receives from the rate limiter + counter := 0 +outer: + for { + select { + case <-ticker.C: + break outer + case <-r.C: + counter++ + } + } + + assert.True(t, counter > 10) + r.Stop() + // verify that the Stop function closes the channel. + _, ok := <-r.C + assert.False(t, ok) +} diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index e6671a3bf..1bd2d5c07 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -170,11 +171,13 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { now := time.Now() // limit concurrency or we can easily exhaust user connection limit - semaphore := make(chan byte, 64) - + // see cloudwatch API request limits: + // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html + lmtr := limiter.NewRateLimiter(10, time.Second) + defer lmtr.Stop() for _, m := range metrics { - semaphore <- 0x1 - go c.gatherMetric(acc, m, now, semaphore, errChan) + <-lmtr.C + go c.gatherMetric(acc, m, now, errChan) } for i := 1; i <= metricCount; i++ { @@ -257,12 +260,16 @@ func (c *CloudWatch) fetchNamespaceMetrics() (metrics []*cloudwatch.Metric, err /* * Gather given Metric and emit any error */ -func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, semaphore chan byte, errChan chan error) { +func (c *CloudWatch) gatherMetric( + acc telegraf.Accumulator, + metric *cloudwatch.Metric, + now time.Time, + errChan chan error, +) { params := c.getStatisticsInput(metric, now) resp, err := c.client.GetMetricStatistics(params) if err != nil { errChan <- err - <-semaphore return } @@ -299,7 +306,6 @@ func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.M } errChan <- nil - <-semaphore } /*