add AddError method to accumulator (#1536)
This commit is contained in:
parent
f4d802d9a8
commit
e9c3882d35
|
@ -16,6 +16,8 @@ type Accumulator interface {
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
t ...time.Time)
|
t ...time.Time)
|
||||||
|
|
||||||
|
AddError(err error)
|
||||||
|
|
||||||
Debug() bool
|
Debug() bool
|
||||||
SetDebug(enabled bool)
|
SetDebug(enabled bool)
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
@ -33,6 +34,8 @@ type accumulator struct {
|
||||||
inputConfig *internal_models.InputConfig
|
inputConfig *internal_models.InputConfig
|
||||||
|
|
||||||
precision time.Duration
|
precision time.Duration
|
||||||
|
|
||||||
|
errCount uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *accumulator) Add(
|
func (ac *accumulator) Add(
|
||||||
|
@ -155,6 +158,17 @@ func (ac *accumulator) AddFields(
|
||||||
ac.metrics <- m
|
ac.metrics <- m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddError passes a runtime error to the accumulator.
|
||||||
|
// The error will be tagged with the plugin name and written to the log.
|
||||||
|
func (ac *accumulator) AddError(err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
atomic.AddUint64(&ac.errCount, 1)
|
||||||
|
//TODO suppress/throttle consecutive duplicate errors?
|
||||||
|
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
func (ac *accumulator) Debug() bool {
|
func (ac *accumulator) Debug() bool {
|
||||||
return ac.debug
|
return ac.debug
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -10,6 +13,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/internal/models"
|
"github.com/influxdata/telegraf/internal/models"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAdd(t *testing.T) {
|
func TestAdd(t *testing.T) {
|
||||||
|
@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) {
|
||||||
fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
|
fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
|
||||||
actual)
|
actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAccAddError(t *testing.T) {
|
||||||
|
errBuf := bytes.NewBuffer(nil)
|
||||||
|
log.SetOutput(errBuf)
|
||||||
|
defer log.SetOutput(os.Stderr)
|
||||||
|
|
||||||
|
a := accumulator{}
|
||||||
|
a.inputConfig = &internal_models.InputConfig{}
|
||||||
|
a.inputConfig.Name = "mock_plugin"
|
||||||
|
|
||||||
|
a.AddError(fmt.Errorf("foo"))
|
||||||
|
a.AddError(fmt.Errorf("bar"))
|
||||||
|
a.AddError(fmt.Errorf("baz"))
|
||||||
|
|
||||||
|
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
|
||||||
|
assert.EqualValues(t, 3, a.errCount)
|
||||||
|
require.Len(t, errs, 4) // 4 because of trailing newline
|
||||||
|
assert.Contains(t, string(errs[0]), "mock_plugin")
|
||||||
|
assert.Contains(t, string(errs[0]), "foo")
|
||||||
|
assert.Contains(t, string(errs[1]), "mock_plugin")
|
||||||
|
assert.Contains(t, string(errs[1]), "bar")
|
||||||
|
assert.Contains(t, string(errs[2]), "mock_plugin")
|
||||||
|
assert.Contains(t, string(errs[2]), "baz")
|
||||||
|
}
|
||||||
|
|
|
@ -215,6 +215,9 @@ func (a *Agent) Test() error {
|
||||||
if err := input.Input.Gather(acc); err != nil {
|
if err := input.Input.Gather(acc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if acc.errCount > 0 {
|
||||||
|
return fmt.Errorf("Errors encountered during processing")
|
||||||
|
}
|
||||||
|
|
||||||
// Special instructions for some inputs. cpu, for example, needs to be
|
// Special instructions for some inputs. cpu, for example, needs to be
|
||||||
// run twice in order to return cpu usage percentages.
|
// run twice in order to return cpu usage percentages.
|
||||||
|
|
|
@ -28,6 +28,7 @@ type Accumulator struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
Metrics []*Metric
|
Metrics []*Metric
|
||||||
|
Errors []error
|
||||||
debug bool
|
debug bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,6 +85,16 @@ func (a *Accumulator) AddFields(
|
||||||
a.Metrics = append(a.Metrics, p)
|
a.Metrics = append(a.Metrics, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddError appends the given error to Accumulator.Errors.
|
||||||
|
func (a *Accumulator) AddError(err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.Lock()
|
||||||
|
a.Errors = append(a.Errors, err)
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
|
func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue