Fix inconsistency with input error counting (#7077)

This commit is contained in:
Steven Soroka 2020-02-25 13:40:29 -05:00 committed by GitHub
parent fc2486f24c
commit 2e32f894b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 115 additions and 85 deletions

View File

@ -1,21 +1,16 @@
package agent package agent
import ( import (
"log"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
)
var (
NErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
) )
type MetricMaker interface { type MetricMaker interface {
LogName() string LogName() string
MakeMetric(metric telegraf.Metric) telegraf.Metric MakeMetric(metric telegraf.Metric) telegraf.Metric
Log() telegraf.Logger
} }
type accumulator struct { type accumulator struct {
@ -110,8 +105,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil { if err == nil {
return return
} }
NErrors.Incr(1) ac.maker.Log().Errorf("Error in plugin: %v", err)
log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err)
} }
func (ac *accumulator) SetPrecision(precision time.Duration) { func (ac *accumulator) SetPrecision(precision time.Duration) {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -59,7 +60,6 @@ func TestAccAddError(t *testing.T) {
a.AddError(fmt.Errorf("baz")) a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, int64(3), NErrors.Get())
require.Len(t, errs, 4) // 4 because of trailing newline require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "TestPlugin")
assert.Contains(t, string(errs[0]), "foo") assert.Contains(t, string(errs[0]), "foo")
@ -154,3 +154,7 @@ func (tm *TestMetricMaker) LogName() string {
func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric return metric
} }
func (tm *TestMetricMaker) Log() telegraf.Logger {
return models.NewLogger("TestPlugin", "test", "")
}

View File

@ -196,6 +196,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
} }
} }
hasErrors := false
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -215,15 +216,18 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
nulAcc.SetPrecision(a.Precision()) nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil { if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err) acc.AddError(err)
hasErrors = true
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
acc.AddError(err) acc.AddError(err)
hasErrors = true
} }
default: default:
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
acc.AddError(err) acc.AddError(err)
hasErrors = true
} }
} }
} }
@ -235,7 +239,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
a.stopServiceInputs() a.stopServiceInputs()
} }
if NErrors.Get() > 0 { if hasErrors {
return fmt.Errorf("One or more input plugins had an error") return fmt.Errorf("One or more input plugins had an error")
} }
return nil return nil

View File

@ -5,24 +5,39 @@ import (
"reflect" "reflect"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
) )
// Logger defines a logging structure for plugins. // Logger defines a logging structure for plugins.
type Logger struct { type Logger struct {
Errs selfstat.Stat OnErrs []func()
Name string // Name is the plugin name, will be printed in the `[]`. Name string // Name is the plugin name, will be printed in the `[]`.
} }
// NewLogger creates a new logger instance
func NewLogger(pluginType, name, alias string) *Logger {
return &Logger{
Name: logName(pluginType, name, alias),
}
}
// OnErr defines a callback that triggers only when errors are about to be written to the log
func (l *Logger) OnErr(f func()) {
l.OnErrs = append(l.OnErrs, f)
}
// Errorf logs an error message, patterned after log.Printf. // Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) { func (l *Logger) Errorf(format string, args ...interface{}) {
l.Errs.Incr(1) for _, f := range l.OnErrs {
f()
}
log.Printf("E! ["+l.Name+"] "+format, args...) log.Printf("E! ["+l.Name+"] "+format, args...)
} }
// Error logs an error message, patterned after log.Print. // Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) { func (l *Logger) Error(args ...interface{}) {
l.Errs.Incr(1) for _, f := range l.OnErrs {
f()
}
log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...)
} }

View File

@ -8,63 +8,17 @@ import (
) )
func TestErrorCounting(t *testing.T) { func TestErrorCounting(t *testing.T) {
iLog := Logger{Name: "inputs.test", Errs: selfstat.Register( reg := selfstat.Register(
"gather", "gather",
"errors", "errors",
map[string]string{"input": "test"}, map[string]string{"input": "test"},
)} )
iLog := Logger{Name: "inputs.test"}
iLog.OnErr(func() {
reg.Incr(1)
})
iLog.Error("something went wrong") iLog.Error("something went wrong")
iLog.Errorf("something went wrong") iLog.Errorf("something went wrong")
aLog := Logger{Name: "aggregators.test", Errs: selfstat.Register( require.Equal(t, int64(2), reg.Get())
"aggregate",
"errors",
map[string]string{"aggregator": "test"},
)}
aLog.Name = "aggregators.test"
aLog.Error("another thing happened")
oLog := Logger{Name: "outputs.test", Errs: selfstat.Register(
"write",
"errors",
map[string]string{"output": "test"},
)}
oLog.Error("another thing happened")
pLog := Logger{Name: "processors.test", Errs: selfstat.Register(
"process",
"errors",
map[string]string{"processor": "test"},
)}
pLog.Error("another thing happened")
require.Equal(t, int64(2), iLog.Errs.Get())
require.Equal(t, int64(1), aLog.Errs.Get())
require.Equal(t, int64(1), oLog.Errs.Get())
require.Equal(t, int64(1), pLog.Errs.Get())
}
func TestLogging(t *testing.T) {
log := Logger{Name: "inputs.test", Errs: selfstat.Register(
"gather",
"errors",
map[string]string{"input": "test"},
)}
log.Errs.Set(0)
log.Debugf("something happened")
log.Debug("something happened")
log.Warnf("something happened")
log.Warn("something happened")
require.Equal(t, int64(0), log.Errs.Get())
log.Infof("something happened")
log.Info("something happened")
require.Equal(t, int64(0), log.Errs.Get())
log.Errorf("something happened")
log.Error("something happened")
require.Equal(t, int64(2), log.Errs.Get())
} }

View File

@ -29,10 +29,11 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
tags["alias"] = config.Alias tags["alias"] = config.Alias
} }
logger := &Logger{ aggErrorsRegister := selfstat.Register("aggregate", "errors", tags)
Name: logName("aggregators", config.Name, config.Alias), logger := NewLogger("aggregators", config.Name, config.Alias)
Errs: selfstat.Register("aggregate", "errors", tags), logger.OnErr(func() {
} aggErrorsRegister.Incr(1)
})
setLogIfExist(aggregator, logger) setLogIfExist(aggregator, logger)
@ -176,3 +177,7 @@ func (r *RunningAggregator) push(acc telegraf.Accumulator) {
elapsed := time.Since(start) elapsed := time.Since(start)
r.PushTime.Incr(elapsed.Nanoseconds()) r.PushTime.Incr(elapsed.Nanoseconds())
} }
func (r *RunningAggregator) Log() telegraf.Logger {
return r.log
}

View File

@ -7,7 +7,10 @@ import (
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) var (
GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{})
GlobalGatherErrors = selfstat.Register("agent", "gather_errors", map[string]string{})
)
type RunningInput struct { type RunningInput struct {
Input telegraf.Input Input telegraf.Input
@ -26,10 +29,12 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
tags["alias"] = config.Alias tags["alias"] = config.Alias
} }
logger := &Logger{ inputErrorsRegister := selfstat.Register("gather", "errors", tags)
Name: logName("inputs", config.Name, config.Alias), logger := NewLogger("inputs", config.Name, config.Alias)
Errs: selfstat.Register("gather", "errors", tags), logger.OnErr(func() {
} inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger) setLogIfExist(input, logger)
return &RunningInput{ return &RunningInput{
@ -116,3 +121,7 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
func (r *RunningInput) SetDefaultTags(tags map[string]string) { func (r *RunningInput) SetDefaultTags(tags map[string]string) {
r.defaultTags = tags r.defaultTags = tags
} }
func (r *RunningInput) Log() telegraf.Logger {
return r.log
}

View File

@ -4,6 +4,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -256,6 +258,35 @@ func TestMakeMetricNameSuffix(t *testing.T) {
require.Equal(t, expected, m) require.Equal(t, expected, m)
} }
func TestMetricErrorCounters(t *testing.T) {
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestMetricErrorCounters",
})
getGatherErrors := func() int64 {
for _, r := range selfstat.Metrics() {
tag, hasTag := r.GetTag("input")
if r.Name() == "internal_gather" && hasTag && tag == "TestMetricErrorCounters" {
errCount, ok := r.GetField("errors")
if !ok {
t.Fatal("Expected error field")
}
return errCount.(int64)
}
}
return 0
}
before := getGatherErrors()
ri.Log().Error("Oh no")
after := getGatherErrors()
require.Greater(t, after, before)
require.GreaterOrEqual(t, int64(1), GlobalGatherErrors.Get())
}
type testInput struct{} type testInput struct{}
func (t *testInput) Description() string { return "" } func (t *testInput) Description() string { return "" }

View File

@ -63,10 +63,11 @@ func NewRunningOutput(
tags["alias"] = config.Alias tags["alias"] = config.Alias
} }
logger := &Logger{ writeErrorsRegister := selfstat.Register("write", "errors", tags)
Name: logName("outputs", config.Name, config.Alias), logger := NewLogger("outputs", config.Name, config.Alias)
Errs: selfstat.Register("write", "errors", tags), logger.OnErr(func() {
} writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger) setLogIfExist(output, logger)
if config.MetricBufferLimit > 0 { if config.MetricBufferLimit > 0 {
@ -240,3 +241,7 @@ func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len() nBuffer := r.buffer.Len()
r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit) r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit)
} }
func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}

View File

@ -34,10 +34,11 @@ func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig)
tags["alias"] = config.Alias tags["alias"] = config.Alias
} }
logger := &Logger{ processErrorsRegister := selfstat.Register("process", "errors", tags)
Name: logName("processors", config.Name, config.Alias), logger := NewLogger("processors", config.Name, config.Alias)
Errs: selfstat.Register("process", "errors", tags), logger.OnErr(func() {
} processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger) setLogIfExist(processor, logger)
return &RunningProcessor{ return &RunningProcessor{
@ -97,3 +98,7 @@ func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
return ret return ret
} }
func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
}

View File

@ -193,6 +193,10 @@ func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric return metric
} }
func (tm *testMetricMaker) Log() telegraf.Logger {
return models.NewLogger("test", "test", "")
}
type testOutput struct { type testOutput struct {
// if true, mock a write failure // if true, mock a write failure
failWrite bool failWrite bool