From e9c3882d353ba97d0e005d807ba95cf1e3fca709 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer Date: Mon, 25 Jul 2016 08:09:49 -0400 Subject: [PATCH] add AddError method to accumulator (#1536) --- accumulator.go | 2 ++ agent/accumulator.go | 14 ++++++++++++++ agent/accumulator_test.go | 28 ++++++++++++++++++++++++++++ agent/agent.go | 3 +++ testutil/accumulator.go | 11 +++++++++++ 5 files changed, 58 insertions(+) diff --git a/accumulator.go b/accumulator.go index 15c5485f8..1fdba8f99 100644 --- a/accumulator.go +++ b/accumulator.go @@ -16,6 +16,8 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + AddError(err error) + Debug() bool SetDebug(enabled bool) diff --git a/agent/accumulator.go b/agent/accumulator.go index 8b0987c41..d80affe68 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "math" + "sync/atomic" "time" "github.com/influxdata/telegraf" @@ -33,6 +34,8 @@ type accumulator struct { inputConfig *internal_models.InputConfig precision time.Duration + + errCount uint64 } func (ac *accumulator) Add( @@ -155,6 +158,17 @@ func (ac *accumulator) AddFields( ac.metrics <- m } +// AddError passes a runtime error to the accumulator. +// The error will be tagged with the plugin name and written to the log. +func (ac *accumulator) AddError(err error) { + if err == nil { + return + } + atomic.AddUint64(&ac.errCount, 1) + //TODO suppress/throttle consecutive duplicate errors? + log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err) +} + func (ac *accumulator) Debug() bool { return ac.debug } diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 9bf681192..8618d327d 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -1,8 +1,11 @@ package agent import ( + "bytes" "fmt" + "log" "math" + "os" "testing" "time" @@ -10,6 +13,7 @@ import ( "github.com/influxdata/telegraf/internal/models" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAdd(t *testing.T) { @@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) { fmt.Sprintf("acctest value=101 %d", now.UnixNano()), actual) } + +func TestAccAddError(t *testing.T) { + errBuf := bytes.NewBuffer(nil) + log.SetOutput(errBuf) + defer log.SetOutput(os.Stderr) + + a := accumulator{} + a.inputConfig = &internal_models.InputConfig{} + a.inputConfig.Name = "mock_plugin" + + a.AddError(fmt.Errorf("foo")) + a.AddError(fmt.Errorf("bar")) + a.AddError(fmt.Errorf("baz")) + + errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) + assert.EqualValues(t, 3, a.errCount) + require.Len(t, errs, 4) // 4 because of trailing newline + assert.Contains(t, string(errs[0]), "mock_plugin") + assert.Contains(t, string(errs[0]), "foo") + assert.Contains(t, string(errs[1]), "mock_plugin") + assert.Contains(t, string(errs[1]), "bar") + assert.Contains(t, string(errs[2]), "mock_plugin") + assert.Contains(t, string(errs[2]), "baz") +} diff --git a/agent/agent.go b/agent/agent.go index ae520b89e..5ee73512b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -215,6 +215,9 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } + if acc.errCount > 0 { + return fmt.Errorf("Errors encountered during processing") + } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 1058faf83..598aa3155 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -28,6 +28,7 @@ type Accumulator struct { sync.Mutex Metrics []*Metric + Errors []error debug bool } @@ -84,6 +85,16 @@ func (a *Accumulator) AddFields( a.Metrics = append(a.Metrics, p) } +// AddError appends the given error to Accumulator.Errors. +func (a *Accumulator) AddError(err error) { + if err == nil { + return + } + a.Lock() + a.Errors = append(a.Errors, err) + a.Unlock() +} + func (a *Accumulator) SetPrecision(precision, interval time.Duration) { return }