From 5b78b1e5484553ab42094181a2cbf2b2a11541d0 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 26 Aug 2015 17:43:09 -0600 Subject: [PATCH] Clean up agent error handling and logging of outputs/plugins Closes #145 --- CHANGELOG.md | 2 +- agent.go | 45 +++++++++++++++++++++++++-------------------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71852ce96..5db15d9dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## v0.1.7 [unreleased] ### Features -- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output sink. +- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output producer. - [#133](https://github.com/influxdb/telegraf/pull/133): Add plugin.Gather error logging. Thanks @nickscript0! - [#136](https://github.com/influxdb/telegraf/issues/136): Add a -usage flag for printing usage of a single plugin. - [#137](https://github.com/influxdb/telegraf/issues/137): Memcached: fix when a value contains a space diff --git a/agent.go b/agent.go index e5871b5b7..e54b7e863 100644 --- a/agent.go +++ b/agent.go @@ -1,6 +1,7 @@ package telegraf import ( + "errors" "fmt" "log" "os" @@ -184,8 +185,7 @@ func (a *Agent) crankParallel() error { acc.Prefix = plugin.name + "_" acc.Config = plugin.config - err := plugin.plugin.Gather(&acc) - if err != nil { + if err := plugin.plugin.Gather(&acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.name, err) } @@ -236,22 +236,27 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err for { var bp BatchPoints + var outerr error bp.Debug = a.Debug bp.Prefix = plugin.name + "_" bp.Config = plugin.config - err := plugin.plugin.Gather(&bp) - if err != nil { - return err + + if err := plugin.plugin.Gather(&bp); err != nil { + log.Printf("Error in plugin [%s]: %s", plugin.name, err) + outerr = errors.New("Error encountered processing plugins & outputs") } bp.Tags = a.Config.Tags bp.Time = time.Now() - err = a.flush(&bp) - if err != nil { - return err + if err := a.flush(&bp); err != nil { + outerr = errors.New("Error encountered processing plugins & outputs") + } + + if outerr != nil { + return outerr } select { @@ -266,16 +271,20 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err func (a *Agent) flush(bp *BatchPoints) error { var wg sync.WaitGroup var outerr error + for _, o := range a.outputs { wg.Add(1) go func(ro *runningOutput) { defer wg.Done() - outerr = ro.output.Write(bp.BatchPoints) + // Log all output errors: + if err := ro.output.Write(bp.BatchPoints); err != nil { + log.Printf("Error in output [%s]: %s", ro.name, err) + outerr = errors.New("Error encountered flushing outputs") + } }(o) } wg.Wait() - return outerr } @@ -301,8 +310,7 @@ func (a *Agent) TestAllPlugins() error { fmt.Printf("* Plugin: %s\n", name) acc.Prefix = name + "_" - err := plugin.Gather(&acc) - if err != nil { + if err := plugin.Gather(&acc); err != nil { return err } } @@ -326,8 +334,7 @@ func (a *Agent) Test() error { fmt.Printf("* Internal: %s\n", plugin.config.Interval) } - err := plugin.plugin.Gather(&acc) - if err != nil { + if err := plugin.plugin.Gather(&acc); err != nil { return err } } @@ -344,9 +351,8 @@ func (a *Agent) Run(shutdown chan struct{}) error { wg.Add(1) go func(plugin *runningPlugin) { defer wg.Done() - err := a.crankSeparate(shutdown, plugin) - if err != nil { - log.Printf("Error in plugin [%s]: %s", plugin.name, err) + if err := a.crankSeparate(shutdown, plugin); err != nil { + log.Printf(err.Error()) } }(plugin) } @@ -357,9 +363,8 @@ func (a *Agent) Run(shutdown chan struct{}) error { ticker := time.NewTicker(a.Interval.Duration) for { - err := a.crankParallel() - if err != nil { - log.Printf("Error in plugins: %s", err) + if err := a.crankParallel(); err != nil { + log.Printf(err.Error()) } select {