diff --git a/agent/accumulator.go b/agent/accumulator.go index 21146e3e2..65000fd98 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -1,21 +1,16 @@ package agent import ( - "log" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/influxdata/telegraf/selfstat" -) - -var ( - NErrors = selfstat.Register("agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { LogName() string MakeMetric(metric telegraf.Metric) telegraf.Metric + Log() telegraf.Logger } type accumulator struct { @@ -110,8 +105,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - NErrors.Incr(1) - log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err) + ac.maker.Log().Errorf("Error in plugin: %v", err) } func (ac *accumulator) SetPrecision(precision time.Duration) { diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index c84948ba9..496d131f4 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -59,7 +60,6 @@ func TestAccAddError(t *testing.T) { a.AddError(fmt.Errorf("baz")) errs := bytes.Split(errBuf.Bytes(), []byte{'\n'}) - assert.EqualValues(t, int64(3), NErrors.Get()) require.Len(t, errs, 4) // 4 because of trailing newline assert.Contains(t, string(errs[0]), "TestPlugin") assert.Contains(t, string(errs[0]), "foo") @@ -154,3 +154,7 @@ func (tm *TestMetricMaker) LogName() string { func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { return metric } + +func (tm *TestMetricMaker) Log() telegraf.Logger { + return models.NewLogger("TestPlugin", "test", "") +} diff --git a/agent/agent.go b/agent/agent.go index aa8d07e67..66fc140ae 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -196,6 +196,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { } } + hasErrors := false for _, input := range a.Config.Inputs { select { case <-ctx.Done(): @@ -215,15 +216,18 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { nulAcc.SetPrecision(a.Precision()) if err := input.Input.Gather(nulAcc); err != nil { acc.AddError(err) + hasErrors = true } time.Sleep(500 * time.Millisecond) if err := input.Input.Gather(acc); err != nil { acc.AddError(err) + hasErrors = true } default: if err := input.Input.Gather(acc); err != nil { acc.AddError(err) + hasErrors = true } } } @@ -235,7 +239,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { a.stopServiceInputs() } - if NErrors.Get() > 0 { + if hasErrors { return fmt.Errorf("One or more input plugins had an error") } return nil diff --git a/internal/models/log.go b/internal/models/log.go index a99eb3212..a89b17763 100644 --- a/internal/models/log.go +++ b/internal/models/log.go @@ -5,24 +5,39 @@ import ( "reflect" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/selfstat" ) // Logger defines a logging structure for plugins. type Logger struct { - Errs selfstat.Stat - Name string // Name is the plugin name, will be printed in the `[]`. + OnErrs []func() + Name string // Name is the plugin name, will be printed in the `[]`. +} + +// NewLogger creates a new logger instance +func NewLogger(pluginType, name, alias string) *Logger { + return &Logger{ + Name: logName(pluginType, name, alias), + } +} + +// OnErr defines a callback that triggers only when errors are about to be written to the log +func (l *Logger) OnErr(f func()) { + l.OnErrs = append(l.OnErrs, f) } // Errorf logs an error message, patterned after log.Printf. func (l *Logger) Errorf(format string, args ...interface{}) { - l.Errs.Incr(1) + for _, f := range l.OnErrs { + f() + } log.Printf("E! ["+l.Name+"] "+format, args...) } // Error logs an error message, patterned after log.Print. func (l *Logger) Error(args ...interface{}) { - l.Errs.Incr(1) + for _, f := range l.OnErrs { + f() + } log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) } diff --git a/internal/models/log_test.go b/internal/models/log_test.go index d4bb6ca09..2b5ec39c6 100644 --- a/internal/models/log_test.go +++ b/internal/models/log_test.go @@ -8,63 +8,17 @@ import ( ) func TestErrorCounting(t *testing.T) { - iLog := Logger{Name: "inputs.test", Errs: selfstat.Register( + reg := selfstat.Register( "gather", "errors", map[string]string{"input": "test"}, - )} + ) + iLog := Logger{Name: "inputs.test"} + iLog.OnErr(func() { + reg.Incr(1) + }) iLog.Error("something went wrong") iLog.Errorf("something went wrong") - aLog := Logger{Name: "aggregators.test", Errs: selfstat.Register( - "aggregate", - "errors", - map[string]string{"aggregator": "test"}, - )} - aLog.Name = "aggregators.test" - aLog.Error("another thing happened") - - oLog := Logger{Name: "outputs.test", Errs: selfstat.Register( - "write", - "errors", - map[string]string{"output": "test"}, - )} - oLog.Error("another thing happened") - - pLog := Logger{Name: "processors.test", Errs: selfstat.Register( - "process", - "errors", - map[string]string{"processor": "test"}, - )} - pLog.Error("another thing happened") - - require.Equal(t, int64(2), iLog.Errs.Get()) - require.Equal(t, int64(1), aLog.Errs.Get()) - require.Equal(t, int64(1), oLog.Errs.Get()) - require.Equal(t, int64(1), pLog.Errs.Get()) -} - -func TestLogging(t *testing.T) { - log := Logger{Name: "inputs.test", Errs: selfstat.Register( - "gather", - "errors", - map[string]string{"input": "test"}, - )} - - log.Errs.Set(0) - - log.Debugf("something happened") - log.Debug("something happened") - - log.Warnf("something happened") - log.Warn("something happened") - require.Equal(t, int64(0), log.Errs.Get()) - - log.Infof("something happened") - log.Info("something happened") - require.Equal(t, int64(0), log.Errs.Get()) - - log.Errorf("something happened") - log.Error("something happened") - require.Equal(t, int64(2), log.Errs.Get()) + require.Equal(t, int64(2), reg.Get()) } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index b8957e30a..d0ad944b1 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -29,10 +29,11 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf tags["alias"] = config.Alias } - logger := &Logger{ - Name: logName("aggregators", config.Name, config.Alias), - Errs: selfstat.Register("aggregate", "errors", tags), - } + aggErrorsRegister := selfstat.Register("aggregate", "errors", tags) + logger := NewLogger("aggregators", config.Name, config.Alias) + logger.OnErr(func() { + aggErrorsRegister.Incr(1) + }) setLogIfExist(aggregator, logger) @@ -176,3 +177,7 @@ func (r *RunningAggregator) push(acc telegraf.Accumulator) { elapsed := time.Since(start) r.PushTime.Incr(elapsed.Nanoseconds()) } + +func (r *RunningAggregator) Log() telegraf.Logger { + return r.log +} diff --git a/internal/models/running_input.go b/internal/models/running_input.go index c09fb1409..bb1033fdd 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -7,7 +7,10 @@ import ( "github.com/influxdata/telegraf/selfstat" ) -var GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) +var ( + GlobalMetricsGathered = selfstat.Register("agent", "metrics_gathered", map[string]string{}) + GlobalGatherErrors = selfstat.Register("agent", "gather_errors", map[string]string{}) +) type RunningInput struct { Input telegraf.Input @@ -26,10 +29,12 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { tags["alias"] = config.Alias } - logger := &Logger{ - Name: logName("inputs", config.Name, config.Alias), - Errs: selfstat.Register("gather", "errors", tags), - } + inputErrorsRegister := selfstat.Register("gather", "errors", tags) + logger := NewLogger("inputs", config.Name, config.Alias) + logger.OnErr(func() { + inputErrorsRegister.Incr(1) + GlobalGatherErrors.Incr(1) + }) setLogIfExist(input, logger) return &RunningInput{ @@ -116,3 +121,7 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error { func (r *RunningInput) SetDefaultTags(tags map[string]string) { r.defaultTags = tags } + +func (r *RunningInput) Log() telegraf.Logger { + return r.log +} diff --git a/internal/models/running_input_test.go b/internal/models/running_input_test.go index 5978a0061..ff3747116 100644 --- a/internal/models/running_input_test.go +++ b/internal/models/running_input_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/selfstat" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" @@ -256,6 +258,35 @@ func TestMakeMetricNameSuffix(t *testing.T) { require.Equal(t, expected, m) } +func TestMetricErrorCounters(t *testing.T) { + ri := NewRunningInput(&testInput{}, &InputConfig{ + Name: "TestMetricErrorCounters", + }) + + getGatherErrors := func() int64 { + for _, r := range selfstat.Metrics() { + tag, hasTag := r.GetTag("input") + if r.Name() == "internal_gather" && hasTag && tag == "TestMetricErrorCounters" { + errCount, ok := r.GetField("errors") + if !ok { + t.Fatal("Expected error field") + } + return errCount.(int64) + } + } + return 0 + } + + before := getGatherErrors() + + ri.Log().Error("Oh no") + + after := getGatherErrors() + + require.Greater(t, after, before) + require.GreaterOrEqual(t, int64(1), GlobalGatherErrors.Get()) +} + type testInput struct{} func (t *testInput) Description() string { return "" } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index c48bccd3c..13f2a94d6 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -63,10 +63,11 @@ func NewRunningOutput( tags["alias"] = config.Alias } - logger := &Logger{ - Name: logName("outputs", config.Name, config.Alias), - Errs: selfstat.Register("write", "errors", tags), - } + writeErrorsRegister := selfstat.Register("write", "errors", tags) + logger := NewLogger("outputs", config.Name, config.Alias) + logger.OnErr(func() { + writeErrorsRegister.Incr(1) + }) setLogIfExist(output, logger) if config.MetricBufferLimit > 0 { @@ -240,3 +241,7 @@ func (r *RunningOutput) LogBufferStatus() { nBuffer := r.buffer.Len() r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit) } + +func (r *RunningOutput) Log() telegraf.Logger { + return r.log +} diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 22a7d0198..a7871b3e8 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -34,10 +34,11 @@ func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) tags["alias"] = config.Alias } - logger := &Logger{ - Name: logName("processors", config.Name, config.Alias), - Errs: selfstat.Register("process", "errors", tags), - } + processErrorsRegister := selfstat.Register("process", "errors", tags) + logger := NewLogger("processors", config.Name, config.Alias) + logger.OnErr(func() { + processErrorsRegister.Incr(1) + }) setLogIfExist(processor, logger) return &RunningProcessor{ @@ -97,3 +98,7 @@ func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { return ret } + +func (r *RunningProcessor) Log() telegraf.Logger { + return r.log +} diff --git a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go index a0d71da94..308a8181d 100644 --- a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go +++ b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go @@ -193,6 +193,10 @@ func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { return metric } +func (tm *testMetricMaker) Log() telegraf.Logger { + return models.NewLogger("test", "test", "") +} + type testOutput struct { // if true, mock a write failure failWrite bool