Limit GetMetricStatistics to 10 per second

closes #1197
This commit is contained in:
Cameron Sparr 2016-05-24 14:50:01 +01:00
parent 4f27315720
commit 9ff536d94d
4 changed files with 127 additions and 7 deletions

View File

@ -29,6 +29,7 @@ time before a new metric is included by the plugin.
- [#1275](https://github.com/influxdata/telegraf/pull/1275): Allow wildcard filtering of varnish stats. - [#1275](https://github.com/influxdata/telegraf/pull/1275): Allow wildcard filtering of varnish stats.
- [#1142](https://github.com/influxdata/telegraf/pull/1142): Support for glob patterns in exec plugin commands configuration. - [#1142](https://github.com/influxdata/telegraf/pull/1142): Support for glob patterns in exec plugin commands configuration.
- [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified - [#1278](https://github.com/influxdata/telegraf/pull/1278): RabbitMQ input: made url parameter optional by using DefaultURL (http://localhost:15672) if not specified
- [#1197](https://github.com/influxdata/telegraf/pull/1197): Limit AWS GetMetricStatistics requests to 10 per second.
### Bugfixes ### Bugfixes

View File

@ -0,0 +1,59 @@
package limiter
import (
"sync"
"time"
)
// NewRateLimiter returns a rate limiter that will will emit from the C
// channel only 'n' times every 'rate' seconds.
func NewRateLimiter(n int, rate time.Duration) *rateLimiter {
r := &rateLimiter{
C: make(chan bool),
rate: rate,
n: n,
shutdown: make(chan bool),
}
r.wg.Add(1)
go r.limiter()
return r
}
type rateLimiter struct {
C chan bool
rate time.Duration
n int
shutdown chan bool
wg sync.WaitGroup
}
func (r *rateLimiter) Stop() {
close(r.shutdown)
r.wg.Wait()
close(r.C)
}
func (r *rateLimiter) limiter() {
defer r.wg.Done()
ticker := time.NewTicker(r.rate)
defer ticker.Stop()
counter := 0
for {
select {
case <-r.shutdown:
return
case <-ticker.C:
counter = 0
default:
if counter < r.n {
select {
case r.C <- true:
counter++
case <-r.shutdown:
return
}
}
}
}
}

View File

@ -0,0 +1,54 @@
package limiter
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRateLimiter(t *testing.T) {
r := NewRateLimiter(5, time.Second)
ticker := time.NewTicker(time.Millisecond * 75)
// test that we can only get 5 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-r.C:
counter++
case <-ticker.C:
break outer
}
}
assert.Equal(t, 5, counter)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}
func TestRateLimiterMultipleIterations(t *testing.T) {
r := NewRateLimiter(5, time.Millisecond*50)
ticker := time.NewTicker(time.Millisecond * 250)
// test that we can get 15 receives from the rate limiter
counter := 0
outer:
for {
select {
case <-ticker.C:
break outer
case <-r.C:
counter++
}
}
assert.True(t, counter > 10)
r.Stop()
// verify that the Stop function closes the channel.
_, ok := <-r.C
assert.False(t, ok)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws" internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -170,11 +171,13 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
now := time.Now() now := time.Now()
// limit concurrency or we can easily exhaust user connection limit // limit concurrency or we can easily exhaust user connection limit
semaphore := make(chan byte, 64) // see cloudwatch API request limits:
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop()
for _, m := range metrics { for _, m := range metrics {
semaphore <- 0x1 <-lmtr.C
go c.gatherMetric(acc, m, now, semaphore, errChan) go c.gatherMetric(acc, m, now, errChan)
} }
for i := 1; i <= metricCount; i++ { for i := 1; i <= metricCount; i++ {
@ -257,12 +260,16 @@ func (c *CloudWatch) fetchNamespaceMetrics() (metrics []*cloudwatch.Metric, err
/* /*
* Gather given Metric and emit any error * Gather given Metric and emit any error
*/ */
func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, semaphore chan byte, errChan chan error) { func (c *CloudWatch) gatherMetric(
acc telegraf.Accumulator,
metric *cloudwatch.Metric,
now time.Time,
errChan chan error,
) {
params := c.getStatisticsInput(metric, now) params := c.getStatisticsInput(metric, now)
resp, err := c.client.GetMetricStatistics(params) resp, err := c.client.GetMetricStatistics(params)
if err != nil { if err != nil {
errChan <- err errChan <- err
<-semaphore
return return
} }
@ -299,7 +306,6 @@ func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.M
} }
errChan <- nil errChan <- nil
<-semaphore
} }
/* /*