From 4f5d5926d9b3f0dfd0b61c9ab5ba9f2551556aa7 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 19 May 2016 16:36:58 +0100 Subject: [PATCH] Set a timeout for calls to input.Gather Changing the internal behavior around running plugins. Each plugin will now have it's own goroutine with it's own ticker. This means that a hung plugin will not block any other plugins. When a plugin is hung, we will log an error message every interval, letting users know which plugin is hung. Currently the input interface does not have any methods for killing a running Gather call, so there is nothing we can do but log an "ERROR" and move on. This will give some visibility into the plugin that is acting up. closes #1230 fixes #479 --- CHANGELOG.md | 10 +++ agent/accumulator.go | 15 ++-- agent/agent.go | 158 +++++++++++++++++++------------------------ 3 files changed, 91 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d01cabe47..cb0261f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,20 @@ ## v0.13.1 [unreleased] +### Release Notes + +- Input plugin Gathers will no longer be logged by default, but a Gather for +_each_ plugin will be logged in Debug mode. +- Debug mode will no longer print every point added to the accumulator. This +functionality can be duplicated using the `file` output plugin and printing +to "stdout". + ### Features - [#1138](https://github.com/influxdata/telegraf/pull/1138): nstat input plugin. Thanks @Maksadbek! - [#1139](https://github.com/influxdata/telegraf/pull/1139): instrumental output plugin. Thanks @jasonroelofs! - [#1172](https://github.com/influxdata/telegraf/pull/1172): Ceph storage stats. Thanks @robinpercy! - [#1233](https://github.com/influxdata/telegraf/pull/1233): Updated golint gopsutil dependency. +- [#479](https://github.com/influxdata/telegraf/issues/479): per-plugin execution time added to debug output. ### Bugfixes @@ -14,6 +23,7 @@ - [#1215](https://github.com/influxdata/telegraf/pull/1215): Fix for possible gopsutil-dependent plugin hangs. - [#1228](https://github.com/influxdata/telegraf/pull/1228): Fix service plugin host tag overwrite. - [#1198](https://github.com/influxdata/telegraf/pull/1198): http_response: override request Host header properly +- [#1230](https://github.com/influxdata/telegraf/issues/1230): Fix Telegraf process hangup due to a single plugin hanging. ## v0.13 [2016-05-11] diff --git a/agent/accumulator.go b/agent/accumulator.go index 6b2ffde2d..d6ff8de60 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "math" - "sync" "time" "github.com/influxdata/telegraf" @@ -22,13 +21,13 @@ func NewAccumulator( } type accumulator struct { - sync.Mutex - metrics chan telegraf.Metric defaultTags map[string]string debug bool + // print every point added to the accumulator + trace bool inputConfig *internal_models.InputConfig @@ -152,7 +151,7 @@ func (ac *accumulator) AddFields( log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) return } - if ac.debug { + if ac.trace { fmt.Println("> " + m.String()) } ac.metrics <- m @@ -166,6 +165,14 @@ func (ac *accumulator) SetDebug(debug bool) { ac.debug = debug } +func (ac *accumulator) Trace() bool { + return ac.trace +} + +func (ac *accumulator) SetTrace(trace bool) { + ac.trace = trace +} + func (ac *accumulator) setDefaultTags(tags map[string]string) { ac.defaultTags = tags } diff --git a/agent/agent.go b/agent/agent.go index 60f2d63c6..7dedb70ee 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -102,71 +102,20 @@ func panicRecover(input *internal_models.RunningInput) { } } -// gatherParallel runs the inputs that are using the same reporting interval -// as the telegraf agent. -func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error { - var wg sync.WaitGroup - - start := time.Now() - counter := 0 - jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds() - for _, input := range a.Config.Inputs { - if input.Config.Interval != 0 { - continue - } - - wg.Add(1) - counter++ - go func(input *internal_models.RunningInput) { - defer panicRecover(input) - defer wg.Done() - - acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) - - if jitter != 0 { - nanoSleep := rand.Int63n(jitter) - d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep)) - if err != nil { - log.Printf("Jittering collection interval failed for plugin %s", - input.Name) - } else { - time.Sleep(d) - } - } - - if err := input.Input.Gather(acc); err != nil { - log.Printf("Error in input [%s]: %s", input.Name, err) - } - - }(input) - } - - if counter == 0 { - return nil - } - - wg.Wait() - - elapsed := time.Since(start) - if !a.Config.Agent.Quiet { - log.Printf("Gathered metrics, (%s interval), from %d inputs in %s\n", - a.Config.Agent.Interval.Duration, counter, elapsed) - } - return nil -} - -// gatherSeparate runs the inputs that have been configured with their own +// gatherer runs the inputs that have been configured with their own // reporting interval. -func (a *Agent) gatherSeparate( +func (a *Agent) gatherer( shutdown chan struct{}, input *internal_models.RunningInput, + interval time.Duration, metricC chan telegraf.Metric, ) error { defer panicRecover(input) - ticker := time.NewTicker(input.Config.Interval) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds() for { var outerr error @@ -176,14 +125,23 @@ func (a *Agent) gatherSeparate( acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) - if err := input.Input.Gather(acc); err != nil { - log.Printf("Error in input [%s]: %s", input.Name, err) + if jitter != 0 { + nanoSleep := rand.Int63n(jitter) + d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep)) + if err != nil { + log.Printf("Jittering collection interval failed for plugin %s", + input.Name) + } else { + time.Sleep(d) + } } + gatherWithTimeout(shutdown, input, acc, interval) + elapsed := time.Since(start) - if !a.Config.Agent.Quiet { - log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n", - input.Config.Interval, input.Name, elapsed) + if a.Config.Agent.Debug { + log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n", + input.Name, interval, elapsed) } if outerr != nil { @@ -199,6 +157,42 @@ func (a *Agent) gatherSeparate( } } +// gatherWithTimeout gathers from the given input, with the given timeout. +// when the given timeout is reached, gatherWithTimeout logs an error message +// but continues waiting for it to return. This is to avoid leaving behind +// hung processes, and to prevent re-calling the same hung process over and +// over. +func gatherWithTimeout( + shutdown chan struct{}, + input *internal_models.RunningInput, + acc *accumulator, + timeout time.Duration, +) { + ticker := time.NewTicker(timeout) + defer ticker.Stop() + done := make(chan error) + go func() { + done <- input.Input.Gather(acc) + }() + + for { + select { + case err := <-done: + if err != nil { + log.Printf("ERROR in input [%s]: %s", input.Name, err) + } + return + case <-ticker.C: + log.Printf("ERROR: input [%s] took longer to collect than "+ + "collection interval (%s)", + input.Name, timeout) + continue + case <-shutdown: + return + } + } +} + // Test verifies that we can 'Gather' from all inputs with their configured // Config struct func (a *Agent) Test() error { @@ -220,7 +214,7 @@ func (a *Agent) Test() error { for _, input := range a.Config.Inputs { acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(true) + acc.SetTrace(true) acc.setDefaultTags(a.Config.Tags) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) @@ -348,7 +342,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { i := int64(a.Config.Agent.Interval.Duration) time.Sleep(time.Duration(i - (time.Now().UnixNano() % i))) } - ticker := time.NewTicker(a.Config.Agent.Interval.Duration) wg.Add(1) go func() { @@ -359,32 +352,21 @@ func (a *Agent) Run(shutdown chan struct{}) error { } }() + wg.Add(len(a.Config.Inputs)) for _, input := range a.Config.Inputs { - // Special handling for inputs that have their own collection interval - // configured. Default intervals are handled below with gatherParallel + interval := a.Config.Agent.Interval.Duration + // overwrite global interval if this plugin has it's own. if input.Config.Interval != 0 { - wg.Add(1) - go func(input *internal_models.RunningInput) { - defer wg.Done() - if err := a.gatherSeparate(shutdown, input, metricC); err != nil { - log.Printf(err.Error()) - } - }(input) + interval = input.Config.Interval } + go func(in *internal_models.RunningInput, interv time.Duration) { + defer wg.Done() + if err := a.gatherer(shutdown, in, interv, metricC); err != nil { + log.Printf(err.Error()) + } + }(input, interval) } - defer wg.Wait() - - for { - if err := a.gatherParallel(metricC); err != nil { - log.Printf(err.Error()) - } - - select { - case <-shutdown: - return nil - case <-ticker.C: - continue - } - } + wg.Wait() + return nil }