add AddError method to accumulator (#1536)

This commit is contained in:
Patrick Hemmer 2016-07-25 08:09:49 -04:00 committed by Cameron Sparr
parent 986735234b
commit e68f251df7
5 changed files with 58 additions and 0 deletions

View File

@ -16,6 +16,8 @@ type Accumulator interface {
tags map[string]string, tags map[string]string,
t ...time.Time) t ...time.Time)
AddError(err error)
Debug() bool Debug() bool
SetDebug(enabled bool) SetDebug(enabled bool)

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"math" "math"
"sync/atomic"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -33,6 +34,8 @@ type accumulator struct {
inputConfig *internal_models.InputConfig inputConfig *internal_models.InputConfig
precision time.Duration precision time.Duration
errCount uint64
} }
func (ac *accumulator) Add( func (ac *accumulator) Add(
@ -155,6 +158,17 @@ func (ac *accumulator) AddFields(
ac.metrics <- m ac.metrics <- m
} }
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
}
func (ac *accumulator) Debug() bool { func (ac *accumulator) Debug() bool {
return ac.debug return ac.debug
} }

View File

@ -1,8 +1,11 @@
package agent package agent
import ( import (
"bytes"
"fmt" "fmt"
"log"
"math" "math"
"os"
"testing" "testing"
"time" "time"
@ -10,6 +13,7 @@ import (
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAdd(t *testing.T) { func TestAdd(t *testing.T) {
@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) {
fmt.Sprintf("acctest value=101 %d", now.UnixNano()), fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
actual) actual)
} }
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
a := accumulator{}
a.inputConfig = &internal_models.InputConfig{}
a.inputConfig.Name = "mock_plugin"
a.AddError(fmt.Errorf("foo"))
a.AddError(fmt.Errorf("bar"))
a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "mock_plugin")
assert.Contains(t, string(errs[0]), "foo")
assert.Contains(t, string(errs[1]), "mock_plugin")
assert.Contains(t, string(errs[1]), "bar")
assert.Contains(t, string(errs[2]), "mock_plugin")
assert.Contains(t, string(errs[2]), "baz")
}

View File

@ -215,6 +215,9 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
return err return err
} }
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}
// Special instructions for some inputs. cpu, for example, needs to be // Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages. // run twice in order to return cpu usage percentages.

View File

@ -28,6 +28,7 @@ type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
Errors []error
debug bool debug bool
} }
@ -84,6 +85,16 @@ func (a *Accumulator) AddFields(
a.Metrics = append(a.Metrics, p) a.Metrics = append(a.Metrics, p)
} }
// AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) {
if err == nil {
return
}
a.Lock()
a.Errors = append(a.Errors, err)
a.Unlock()
}
func (a *Accumulator) SetPrecision(precision, interval time.Duration) { func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
return return
} }