Clean up agent error handling and logging of outputs/plugins

Closes #145
This commit is contained in:
Cameron Sparr 2015-08-26 17:43:09 -06:00
parent d1f965ae30
commit 5b78b1e548
2 changed files with 26 additions and 21 deletions

View File

@ -1,7 +1,7 @@
## v0.1.7 [unreleased]
### Features
- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output sink.
- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output producer.
- [#133](https://github.com/influxdb/telegraf/pull/133): Add plugin.Gather error logging. Thanks @nickscript0!
- [#136](https://github.com/influxdb/telegraf/issues/136): Add a -usage flag for printing usage of a single plugin.
- [#137](https://github.com/influxdb/telegraf/issues/137): Memcached: fix when a value contains a space

View File

@ -1,6 +1,7 @@
package telegraf
import (
"errors"
"fmt"
"log"
"os"
@ -184,8 +185,7 @@ func (a *Agent) crankParallel() error {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
if err != nil {
if err := plugin.plugin.Gather(&acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
}
@ -236,22 +236,27 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
for {
var bp BatchPoints
var outerr error
bp.Debug = a.Debug
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
outerr = errors.New("Error encountered processing plugins & outputs")
}
bp.Tags = a.Config.Tags
bp.Time = time.Now()
err = a.flush(&bp)
if err != nil {
return err
if err := a.flush(&bp); err != nil {
outerr = errors.New("Error encountered processing plugins & outputs")
}
if outerr != nil {
return outerr
}
select {
@ -266,16 +271,20 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
// Log all output errors:
if err := ro.output.Write(bp.BatchPoints); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err)
outerr = errors.New("Error encountered flushing outputs")
}
}(o)
}
wg.Wait()
return outerr
}
@ -301,8 +310,7 @@ func (a *Agent) TestAllPlugins() error {
fmt.Printf("* Plugin: %s\n", name)
acc.Prefix = name + "_"
err := plugin.Gather(&acc)
if err != nil {
if err := plugin.Gather(&acc); err != nil {
return err
}
}
@ -326,8 +334,7 @@ func (a *Agent) Test() error {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}
err := plugin.plugin.Gather(&acc)
if err != nil {
if err := plugin.plugin.Gather(&acc); err != nil {
return err
}
}
@ -344,9 +351,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
err := a.crankSeparate(shutdown, plugin)
if err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
if err := a.crankSeparate(shutdown, plugin); err != nil {
log.Printf(err.Error())
}
}(plugin)
}
@ -357,9 +363,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
ticker := time.NewTicker(a.Interval.Duration)
for {
err := a.crankParallel()
if err != nil {
log.Printf("Error in plugins: %s", err)
if err := a.crankParallel(); err != nil {
log.Printf(err.Error())
}
select {