diff --git a/CHANGELOG.md b/CHANGELOG.md index 495187379..3e070a51a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ collect data every 2 seconds, and flush every 60 seconds. - `precision` and `utc` are no longer valid agent config values. `precision` has moved to the `influxdb` output config, where it will continue to default to "s" - debug and test output will now print the raw line-protocol string +- Telegraf will now, by default, round the collection interval to the nearest +even interval. This means that `interval="10s"` will collect every :00, :10, etc. +To ease scale concerns, flushing will be "jittered" by a random amount so that +all Telegraf instances do not flush at the same time. Both of these options can +be controlled via the `round_interval` and `flush_jitter` config options. ### Features - [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info diff --git a/agent.go b/agent.go index 00b0c6ace..00e42d8cd 100644 --- a/agent.go +++ b/agent.go @@ -3,6 +3,7 @@ package telegraf import ( "fmt" "log" + "math/rand" "os" "sort" "sync" @@ -32,12 +33,19 @@ type Agent struct { // Interval at which to gather information Interval duration.Duration + // RoundInterval rounds collection interval to 'interval'. + // ie, if Interval=10s then always collect on :00, :10, :20, etc. + RoundInterval bool + // Interval at which to flush data FlushInterval duration.Duration // FlushRetries is the number of times to retry each data flush FlushRetries int + // FlushJitter tells + FlushJitter duration.Duration + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -64,10 +72,10 @@ func NewAgent(config *Config) (*Agent, error) { agent := &Agent{ Tags: make(map[string]string), Interval: duration.Duration{10 * time.Second}, + RoundInterval: true, FlushInterval: duration.Duration{10 * time.Second}, FlushRetries: 2, - UTC: true, - Precision: "s", + FlushJitter: duration.Duration{5 * time.Second}, } // Apply the toml table to the agent config, overriding defaults @@ -294,30 +302,37 @@ func (a *Agent) Test() error { return nil } -// writeOutput writes a list of points to a single output, with retries +// writeOutput writes a list of points to a single output, with retries. +// Optionally takes a `done` channel to indicate that it is done writing. func (a *Agent) writeOutput( points []*client.Point, ro *runningOutput, shutdown chan struct{}, + wg *sync.WaitGroup, ) { + defer wg.Done() + if len(points) == 0 { + return + } retry := 0 retries := a.FlushRetries start := time.Now() for { err := ro.output.Write(points) + if err == nil { + // Write successful + elapsed := time.Since(start) + log.Printf("Flushed %d metrics to output %s in %s\n", + len(points), ro.name, elapsed) + return + } select { case <-shutdown: return default: - if err == nil { - // Write successful - elapsed := time.Since(start) - log.Printf("Flushed %d metrics to output %s in %s\n", - len(points), ro.name, elapsed) - return - } else if retry >= retries { + if retry >= retries { // No more retries msg := "FATAL: Write to output [%s] failed %d times, dropping" + " %d metrics\n" @@ -336,13 +351,18 @@ func (a *Agent) writeOutput( } // flush writes a list of points to all configured outputs -func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) { - if len(points) == 0 { - return - } - +func (a *Agent) flush( + points []*client.Point, + shutdown chan struct{}, + wait bool, +) { + var wg sync.WaitGroup for _, o := range a.outputs { - go a.writeOutput(points, o, shutdown) + wg.Add(1) + go a.writeOutput(points, o, shutdown, &wg) + } + if wait { + wg.Wait() } } @@ -353,14 +373,23 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er time.Sleep(time.Millisecond * 100) ticker := time.NewTicker(a.FlushInterval.Duration) points := make([]*client.Point, 0) + jitter := rand.Int63n(int64(a.FlushJitter.Duration)) for { select { case <-shutdown: log.Println("Hang on, flushing any cached points before shutdown") - a.flush(points, shutdown) + a.flush(points, shutdown, true) return nil case <-ticker.C: - a.flush(points, shutdown) + timer := time.NewTimer(time.Duration(jitter)) + select { + case <-timer.C: + a.flush(points, shutdown, false) + case <-shutdown: + log.Println("Hang on, flushing any cached points before shutdown") + a.flush(points, shutdown, true) + return nil + } points = make([]*client.Point, 0) case pt := <-pointChan: points = append(points, pt) @@ -375,6 +404,13 @@ func (a *Agent) Run(shutdown chan struct{}) error { // channel shared between all plugin threads for accumulating points pointChan := make(chan *client.Point, 1000) + // Round collection to nearest interval by sleeping + if a.RoundInterval { + i := int64(a.Interval.Duration) + time.Sleep(time.Duration(i - (time.Now().UnixNano() % i))) + } + ticker := time.NewTicker(a.Interval.Duration) + wg.Add(1) go func() { defer wg.Done() @@ -412,8 +448,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { defer wg.Wait() - ticker := time.NewTicker(a.Interval.Duration) - for { if err := a.gatherParallel(pointChan); err != nil { log.Printf(err.Error()) diff --git a/agent_test.go b/agent_test.go index 691834ce0..6cd5cddda 100644 --- a/agent_test.go +++ b/agent_test.go @@ -32,7 +32,6 @@ func TestAgent_LoadPlugin(t *testing.T) { assert.Equal(t, 2, len(pluginsEnabled)) } -// TODO enable these unit tests, currently disabled because of a circular import func TestAgent_LoadOutput(t *testing.T) { // load a dedicated configuration file config, _ := LoadConfig("./testdata/telegraf-agent.toml") @@ -56,63 +55,3 @@ func TestAgent_LoadOutput(t *testing.T) { outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo", "kafka", "bar"}, config) assert.Equal(t, 2, len(outputsEnabled)) } - -/* -func TestAgent_DrivesMetrics(t *testing.T) { - var ( - plugin plugins.MockPlugin - ) - - defer plugin.AssertExpectations(t) - defer metrics.AssertExpectations(t) - - a := &Agent{ - plugins: []plugins.Plugin{&plugin}, - Config: &Config{}, - } - - plugin.On("Add", "foo", 1.2, nil).Return(nil) - plugin.On("Add", "bar", 888, nil).Return(nil) - - err := a.gather() - require.NoError(t, err) -} - -func TestAgent_AppliesTags(t *testing.T) { - var ( - plugin plugins.MockPlugin - metrics MockMetrics - ) - - defer plugin.AssertExpectations(t) - defer metrics.AssertExpectations(t) - - a := &Agent{ - plugins: []plugins.Plugin{&plugin}, - metrics: &metrics, - Config: &Config{ - Tags: map[string]string{ - "dc": "us-west-1", - }, - }, - } - - m1 := cypress.Metric() - m1.Add("name", "foo") - m1.Add("value", 1.2) - - msgs := []*cypress.Message{m1} - - m2 := cypress.Metric() - m2.Timestamp = m1.Timestamp - m2.Add("name", "foo") - m2.Add("value", 1.2) - m2.AddTag("dc", "us-west-1") - - plugin.On("Read").Return(msgs, nil) - metrics.On("Receive", m2).Return(nil) - - err := a.gather() - require.NoError(t, err) -} -*/ diff --git a/config.go b/config.go index 9bd095e0b..6853c99a8 100644 --- a/config.go +++ b/config.go @@ -226,11 +226,19 @@ var header = `# Telegraf configuration [agent] # Default data collection interval for all plugins interval = "10s" + # Rounds collection interval to 'interval' + # ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + # Default data flushing interval for all outputs flush_interval = "10s" + # Jitter the flush interval by a random range + # ie, a jitter of 5s and interval 10s means flush will happen every 10-15s + flush_jitter = "5s" # Number of times to retry each data flush flush_retries = 2 - # run telegraf in debug mode + + # Run telegraf in debug mode debug = false # Override default hostname, if empty use os.Hostname() hostname = "" diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 160d56630..6e3ee74f6 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -27,11 +27,19 @@ [agent] # Default data collection interval for all plugins interval = "10s" + # Rounds collection interval to 'interval' + # ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + # Default data flushing interval for all outputs flush_interval = "10s" + # Jitter the flush interval by a random range + # ie, a jitter of 5s and interval 10s means flush will happen every 10-15s + flush_jitter = "5s" # Number of times to retry each data flush flush_retries = 2 - # run telegraf in debug mode + + # Run telegraf in debug mode debug = false # Override default hostname, if empty use os.Hostname() hostname = "" diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index f40f4f25b..94b8662a6 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -33,10 +33,11 @@ var sampleConfig = ` urls = ["http://localhost:8086"] # required # The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required + # Precision of writes, valid values are n, u, ms, s, m, and h + # note: using second precision greatly helps InfluxDB compression precision = "s" # Connection timeout (for the connection with InfluxDB), formatted as a string. - # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". # If not provided, will default to 0 (no timeout) # timeout = "5s" # username = "telegraf"