From 5c8d0e3ac9d888ef2851406db72c014d9cbbe974 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Wed, 21 Aug 2019 17:49:07 -0600 Subject: [PATCH] Add ability to label inputs for logging (#6207) --- agent/accumulator.go | 4 +- agent/accumulator_test.go | 4 + agent/agent.go | 30 +++---- input.go | 9 -- internal/config/config.go | 46 ++++++++-- internal/models/log.go | 87 +++++++++++++++++++ internal/models/log_test.go | 70 +++++++++++++++ internal/models/running_aggregator.go | 35 +++++--- internal/models/running_input.go | 23 +++-- internal/models/running_output.go | 78 ++++++++++------- internal/models/running_processor.go | 25 +++++- plugin.go | 30 +++++++ .../cloud_pubsub_push/pubsub_push_test.go | 4 + plugins/inputs/exec/exec.go | 8 +- plugins/inputs/exec/exec_test.go | 4 +- plugins/outputs/influxdb/http.go | 17 ++-- plugins/outputs/influxdb/http_test.go | 2 + plugins/outputs/influxdb/influxdb.go | 13 ++- plugins/outputs/influxdb/influxdb_test.go | 22 ++++- plugins/outputs/influxdb/udp.go | 10 ++- plugins/outputs/influxdb/udp_test.go | 7 ++ testutil/log.go | 50 +++++++++++ 22 files changed, 475 insertions(+), 103 deletions(-) create mode 100644 internal/models/log.go create mode 100644 internal/models/log_test.go create mode 100644 plugin.go create mode 100644 testutil/log.go diff --git a/agent/accumulator.go b/agent/accumulator.go index 9e0bb11ca..21146e3e2 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -14,7 +14,7 @@ var ( ) type MetricMaker interface { - Name() string + LogName() string MakeMetric(metric telegraf.Metric) telegraf.Metric } @@ -111,7 +111,7 @@ func (ac *accumulator) AddError(err error) { return } NErrors.Incr(1) - log.Printf("E! [%s]: Error in plugin: %v", ac.maker.Name(), err) + log.Printf("E! [%s] Error in plugin: %v", ac.maker.LogName(), err) } func (ac *accumulator) SetPrecision(precision time.Duration) { diff --git a/agent/accumulator_test.go b/agent/accumulator_test.go index 933821701..c84948ba9 100644 --- a/agent/accumulator_test.go +++ b/agent/accumulator_test.go @@ -147,6 +147,10 @@ func (tm *TestMetricMaker) Name() string { return "TestPlugin" } +func (tm *TestMetricMaker) LogName() string { + return tm.Name() +} + func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { return metric } diff --git a/agent/agent.go b/agent/agent.go index 636c4ba68..700bccb05 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -209,7 +209,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. - switch input.Name() { + switch input.Config.Name { case "inputs.cpu", "inputs.mongodb", "inputs.procstat": nulAcc := NewAccumulator(input, nulC) nulAcc.SetPrecision(a.Precision()) @@ -337,8 +337,8 @@ func (a *Agent) gatherOnce( case err := <-done: return err case <-ticker.C: - log.Printf("W! [agent] input %q did not complete within its interval", - input.Name()) + log.Printf("W! [agent] [%s] did not complete within its interval", + input.LogName()) } } } @@ -551,7 +551,7 @@ func (a *Agent) flush( logError := func(err error) { if err != nil { - log.Printf("E! [agent] Error writing to output [%s]: %v", output.Name, err) + log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err) } } @@ -603,8 +603,8 @@ func (a *Agent) flushOnce( output.LogBufferStatus() return err case <-ticker.C: - log.Printf("W! [agent] output %q did not complete within its flush interval", - output.Name) + log.Printf("W! [agent] [%q] did not complete within its flush interval", + output.LogName()) output.LogBufferStatus() } } @@ -617,7 +617,7 @@ func (a *Agent) initPlugins() error { err := input.Init() if err != nil { return fmt.Errorf("could not initialize input %s: %v", - input.Config.Name, err) + input.LogName(), err) } } for _, processor := range a.Config.Processors { @@ -647,11 +647,11 @@ func (a *Agent) initPlugins() error { // connectOutputs connects to all outputs. func (a *Agent) connectOutputs(ctx context.Context) error { for _, output := range a.Config.Outputs { - log.Printf("D! [agent] Attempting connection to output: %s\n", output.Name) + log.Printf("D! [agent] Attempting connection to [%s]", output.LogName()) err := output.Output.Connect() if err != nil { - log.Printf("E! [agent] Failed to connect to output %s, retrying in 15s, "+ - "error was '%s' \n", output.Name, err) + log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+ + "error was '%s'", output.LogName(), err) err := internal.SleepContext(ctx, 15*time.Second) if err != nil { @@ -663,7 +663,7 @@ func (a *Agent) connectOutputs(ctx context.Context) error { return err } } - log.Printf("D! [agent] Successfully connected to output: %s\n", output.Name) + log.Printf("D! [agent] Successfully connected to %s", output.LogName()) } return nil } @@ -693,8 +693,8 @@ func (a *Agent) startServiceInputs( err := si.Start(acc) if err != nil { - log.Printf("E! [agent] Service for input %s failed to start: %v", - input.Name(), err) + log.Printf("E! [agent] Service for [%s] failed to start: %v", + input.LogName(), err) for _, si := range started { si.Stop() @@ -745,8 +745,8 @@ func panicRecover(input *models.RunningInput) { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) - log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n", - input.Name(), err, trace) + log.Printf("E! FATAL: [%s] panicked: %s, Stack:\n%s", + input.LogName(), err, trace) log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " + "stack trace, configuration, and OS information: " + "https://github.com/influxdata/telegraf/issues/new/choose") diff --git a/input.go b/input.go index ee47bc347..071ab7d9d 100644 --- a/input.go +++ b/input.go @@ -1,14 +1,5 @@ package telegraf -// Initializer is an interface that all plugin types: Inputs, Outputs, -// Processors, and Aggregators can optionally implement to initialize the -// plugin. -type Initializer interface { - // Init performs one time setup of the plugin and returns an error if the - // configuration is invalid. - Init() error -} - type Input interface { // SampleConfig returns the default configuration of the Input SampleConfig() string diff --git a/internal/config/config.go b/internal/config/config.go index 802e3152e..f2617e8b3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -187,7 +187,7 @@ func (c *Config) AggregatorNames() []string { func (c *Config) ProcessorNames() []string { var name []string for _, processor := range c.Processors { - name = append(name, processor.Name) + name = append(name, processor.Config.Name) } return name } @@ -196,7 +196,7 @@ func (c *Config) ProcessorNames() []string { func (c *Config) OutputNames() []string { var name []string for _, output := range c.Outputs { - name = append(name, output.Name) + name = append(name, output.Config.Name) } return name } @@ -920,11 +920,7 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { return err } - rf := &models.RunningProcessor{ - Name: name, - Processor: processor, - Config: processorConfig, - } + rf := models.NewRunningProcessor(processor, processorConfig) c.Processors = append(c.Processors, rf) return nil @@ -1103,6 +1099,14 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } + if node, ok := tbl.Fields["alias"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + conf.Alias = str.Value + } + } + } + conf.Tags = make(map[string]string) if node, ok := tbl.Fields["tags"]; ok { if subtbl, ok := node.(*ast.Table); ok { @@ -1119,6 +1123,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") delete(tbl.Fields, "name_override") + delete(tbl.Fields, "alias") delete(tbl.Fields, "tags") var err error conf.Filter, err = buildFilter(tbl) @@ -1146,6 +1151,15 @@ func buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error } } + if node, ok := tbl.Fields["alias"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + conf.Alias = str.Value + } + } + } + + delete(tbl.Fields, "alias") delete(tbl.Fields, "order") var err error conf.Filter, err = buildFilter(tbl) @@ -1334,6 +1348,14 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { } } + if node, ok := tbl.Fields["alias"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + cp.Alias = str.Value + } + } + } + cp.Tags = make(map[string]string) if node, ok := tbl.Fields["tags"]; ok { if subtbl, ok := node.(*ast.Table); ok { @@ -1346,6 +1368,7 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") delete(tbl.Fields, "name_override") + delete(tbl.Fields, "alias") delete(tbl.Fields, "interval") delete(tbl.Fields, "tags") var err error @@ -2007,9 +2030,18 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { } } + if node, ok := tbl.Fields["alias"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + oc.Alias = str.Value + } + } + } + delete(tbl.Fields, "flush_interval") delete(tbl.Fields, "metric_buffer_limit") delete(tbl.Fields, "metric_batch_size") + delete(tbl.Fields, "alias") return oc, nil } diff --git a/internal/models/log.go b/internal/models/log.go new file mode 100644 index 000000000..a99eb3212 --- /dev/null +++ b/internal/models/log.go @@ -0,0 +1,87 @@ +package models + +import ( + "log" + "reflect" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +// Logger defines a logging structure for plugins. +type Logger struct { + Errs selfstat.Stat + Name string // Name is the plugin name, will be printed in the `[]`. +} + +// Errorf logs an error message, patterned after log.Printf. +func (l *Logger) Errorf(format string, args ...interface{}) { + l.Errs.Incr(1) + log.Printf("E! ["+l.Name+"] "+format, args...) +} + +// Error logs an error message, patterned after log.Print. +func (l *Logger) Error(args ...interface{}) { + l.Errs.Incr(1) + log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) +} + +// Debugf logs a debug message, patterned after log.Printf. +func (l *Logger) Debugf(format string, args ...interface{}) { + log.Printf("D! ["+l.Name+"] "+format, args...) +} + +// Debug logs a debug message, patterned after log.Print. +func (l *Logger) Debug(args ...interface{}) { + log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...) +} + +// Warnf logs a warning message, patterned after log.Printf. +func (l *Logger) Warnf(format string, args ...interface{}) { + log.Printf("W! ["+l.Name+"] "+format, args...) +} + +// Warn logs a warning message, patterned after log.Print. +func (l *Logger) Warn(args ...interface{}) { + log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...) +} + +// Infof logs an information message, patterned after log.Printf. +func (l *Logger) Infof(format string, args ...interface{}) { + log.Printf("I! ["+l.Name+"] "+format, args...) +} + +// Info logs an information message, patterned after log.Print. +func (l *Logger) Info(args ...interface{}) { + log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...) +} + +// logName returns the log-friendly name/type. +func logName(pluginType, name, alias string) string { + if alias == "" { + return pluginType + "." + name + } + return pluginType + "." + name + "::" + alias +} + +func setLogIfExist(i interface{}, log telegraf.Logger) { + valI := reflect.ValueOf(i) + + if valI.Type().Kind() != reflect.Ptr { + valI = reflect.New(reflect.TypeOf(i)) + } + + field := valI.Elem().FieldByName("Log") + if !field.IsValid() { + return + } + + switch field.Type().String() { + case "telegraf.Logger": + if field.CanSet() { + field.Set(reflect.ValueOf(log)) + } + } + + return +} diff --git a/internal/models/log_test.go b/internal/models/log_test.go new file mode 100644 index 000000000..d4bb6ca09 --- /dev/null +++ b/internal/models/log_test.go @@ -0,0 +1,70 @@ +package models + +import ( + "testing" + + "github.com/influxdata/telegraf/selfstat" + "github.com/stretchr/testify/require" +) + +func TestErrorCounting(t *testing.T) { + iLog := Logger{Name: "inputs.test", Errs: selfstat.Register( + "gather", + "errors", + map[string]string{"input": "test"}, + )} + iLog.Error("something went wrong") + iLog.Errorf("something went wrong") + + aLog := Logger{Name: "aggregators.test", Errs: selfstat.Register( + "aggregate", + "errors", + map[string]string{"aggregator": "test"}, + )} + aLog.Name = "aggregators.test" + aLog.Error("another thing happened") + + oLog := Logger{Name: "outputs.test", Errs: selfstat.Register( + "write", + "errors", + map[string]string{"output": "test"}, + )} + oLog.Error("another thing happened") + + pLog := Logger{Name: "processors.test", Errs: selfstat.Register( + "process", + "errors", + map[string]string{"processor": "test"}, + )} + pLog.Error("another thing happened") + + require.Equal(t, int64(2), iLog.Errs.Get()) + require.Equal(t, int64(1), aLog.Errs.Get()) + require.Equal(t, int64(1), oLog.Errs.Get()) + require.Equal(t, int64(1), pLog.Errs.Get()) +} + +func TestLogging(t *testing.T) { + log := Logger{Name: "inputs.test", Errs: selfstat.Register( + "gather", + "errors", + map[string]string{"input": "test"}, + )} + + log.Errs.Set(0) + + log.Debugf("something happened") + log.Debug("something happened") + + log.Warnf("something happened") + log.Warn("something happened") + require.Equal(t, int64(0), log.Errs.Get()) + + log.Infof("something happened") + log.Info("something happened") + require.Equal(t, int64(0), log.Errs.Get()) + + log.Errorf("something happened") + log.Error("something happened") + require.Equal(t, int64(2), log.Errs.Get()) +} diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index e029aad56..ee46e5b13 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -1,7 +1,6 @@ package models import ( - "log" "sync" "time" @@ -16,6 +15,7 @@ type RunningAggregator struct { Config *AggregatorConfig periodStart time.Time periodEnd time.Time + log telegraf.Logger MetricsPushed selfstat.Stat MetricsFiltered selfstat.Stat @@ -23,39 +23,46 @@ type RunningAggregator struct { PushTime selfstat.Stat } -func NewRunningAggregator( - aggregator telegraf.Aggregator, - config *AggregatorConfig, -) *RunningAggregator { +func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConfig) *RunningAggregator { + logger := &Logger{ + Name: logName("aggregators", config.Name, config.Alias), + Errs: selfstat.Register("aggregate", "errors", + map[string]string{"input": config.Name, "alias": config.Alias}), + } + + setLogIfExist(aggregator, logger) + return &RunningAggregator{ Aggregator: aggregator, Config: config, MetricsPushed: selfstat.Register( "aggregate", "metrics_pushed", - map[string]string{"aggregator": config.Name}, + map[string]string{"aggregator": config.Name, "alias": config.Alias}, ), MetricsFiltered: selfstat.Register( "aggregate", "metrics_filtered", - map[string]string{"aggregator": config.Name}, + map[string]string{"aggregator": config.Name, "alias": config.Alias}, ), MetricsDropped: selfstat.Register( "aggregate", "metrics_dropped", - map[string]string{"aggregator": config.Name}, + map[string]string{"aggregator": config.Name, "alias": config.Alias}, ), PushTime: selfstat.Register( "aggregate", "push_time_ns", - map[string]string{"aggregator": config.Name}, + map[string]string{"aggregator": config.Name, "alias": config.Alias}, ), + log: logger, } } // AggregatorConfig is the common config for all aggregators. type AggregatorConfig struct { Name string + Alias string DropOriginal bool Period time.Duration Delay time.Duration @@ -68,8 +75,8 @@ type AggregatorConfig struct { Filter Filter } -func (r *RunningAggregator) Name() string { - return "aggregators." + r.Config.Name +func (r *RunningAggregator) LogName() string { + return logName("aggregators", r.Config.Name, r.Config.Alias) } func (r *RunningAggregator) Init() error { @@ -93,7 +100,7 @@ func (r *RunningAggregator) EndPeriod() time.Time { func (r *RunningAggregator) UpdateWindow(start, until time.Time) { r.periodStart = start r.periodEnd = until - log.Printf("D! [%s] Updated aggregation range [%s, %s]", r.Name(), start, until) + r.log.Debugf("Updated aggregation range [%s, %s]", start, until) } func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { @@ -137,8 +144,8 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool { defer r.Unlock() if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) { - log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s", - r.Name(), m.Time(), r.periodStart, r.periodEnd, r.Config.Grace) + r.log.Debugf("metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s", + m.Time(), r.periodStart, r.periodEnd, r.Config.Grace) r.MetricsDropped.Incr(1) return r.Config.DropOriginal } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 73c14fc0f..85f0afb81 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -13,6 +13,7 @@ type RunningInput struct { Input telegraf.Input Config *InputConfig + log telegraf.Logger defaultTags map[string]string MetricsGathered selfstat.Stat @@ -20,25 +21,35 @@ type RunningInput struct { } func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { + logger := &Logger{ + Name: logName("inputs", config.Name, config.Alias), + Errs: selfstat.Register("gather", "errors", + map[string]string{"input": config.Name, "alias": config.Alias}), + } + + setLogIfExist(input, logger) + return &RunningInput{ Input: input, Config: config, MetricsGathered: selfstat.Register( "gather", "metrics_gathered", - map[string]string{"input": config.Name}, + map[string]string{"input": config.Name, "alias": config.Alias}, ), GatherTime: selfstat.RegisterTiming( "gather", "gather_time_ns", - map[string]string{"input": config.Name}, + map[string]string{"input": config.Name, "alias": config.Alias}, ), + log: logger, } } // InputConfig is the common config for all inputs. type InputConfig struct { Name string + Alias string Interval time.Duration NameOverride string @@ -48,14 +59,14 @@ type InputConfig struct { Filter Filter } -func (r *RunningInput) Name() string { - return "inputs." + r.Config.Name -} - func (r *RunningInput) metricFiltered(metric telegraf.Metric) { metric.Drop() } +func (r *RunningInput) LogName() string { + return logName("inputs", r.Config.Name, r.Config.Alias) +} + func (r *RunningInput) Init() error { if p, ok := r.Input.(telegraf.Initializer); ok { err := p.Init() diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 438ecd480..86e68f057 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -1,7 +1,6 @@ package models import ( - "log" "sync" "sync/atomic" "time" @@ -21,6 +20,7 @@ const ( // OutputConfig containing name and filter type OutputConfig struct { Name string + Alias string Filter Filter FlushInterval time.Duration @@ -34,7 +34,6 @@ type RunningOutput struct { newMetricsCount int64 droppedMetrics int64 - Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int @@ -46,6 +45,7 @@ type RunningOutput struct { BatchReady chan time.Time buffer *Buffer + log telegraf.Logger aggMutex sync.Mutex } @@ -53,56 +53,77 @@ type RunningOutput struct { func NewRunningOutput( name string, output telegraf.Output, - conf *OutputConfig, + config *OutputConfig, batchSize int, bufferLimit int, ) *RunningOutput { - if conf.MetricBufferLimit > 0 { - bufferLimit = conf.MetricBufferLimit + logger := &Logger{ + Name: logName("outputs", config.Name, config.Alias), + Errs: selfstat.Register("gather", "errors", + map[string]string{"output": config.Name, "alias": config.Alias}), + } + + setLogIfExist(output, logger) + + if config.MetricBufferLimit > 0 { + bufferLimit = config.MetricBufferLimit } if bufferLimit == 0 { bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT } - if conf.MetricBatchSize > 0 { - batchSize = conf.MetricBatchSize + if config.MetricBatchSize > 0 { + batchSize = config.MetricBatchSize } if batchSize == 0 { batchSize = DEFAULT_METRIC_BATCH_SIZE } + ro := &RunningOutput{ - Name: name, - buffer: NewBuffer(name, bufferLimit), + buffer: NewBuffer(config.LogName(), bufferLimit), BatchReady: make(chan time.Time, 1), Output: output, - Config: conf, + Config: config, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, MetricsFiltered: selfstat.Register( "write", "metrics_filtered", - map[string]string{"output": name}, + map[string]string{"output": config.Name, "alias": config.Alias}, ), WriteTime: selfstat.RegisterTiming( "write", "write_time_ns", - map[string]string{"output": name}, + map[string]string{"output": config.Name, "alias": config.Alias}, ), + log: logger, } return ro } +func (c *OutputConfig) LogName() string { + if c.Alias == "" { + return c.Name + } + return c.Name + "::" + c.Alias +} + +func (r *RunningOutput) LogName() string { + return logName("outputs", r.Config.Name, r.Config.Alias) +} + func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) { ro.MetricsFiltered.Incr(1) metric.Drop() } -func (ro *RunningOutput) Init() error { - if p, ok := ro.Output.(telegraf.Initializer); ok { +func (r *RunningOutput) Init() error { + if p, ok := r.Output.(telegraf.Initializer); ok { err := p.Init() if err != nil { return err } + } return nil } @@ -192,35 +213,32 @@ func (ro *RunningOutput) WriteBatch() error { return nil } -func (ro *RunningOutput) Close() { - err := ro.Output.Close() +func (r *RunningOutput) Close() { + err := r.Output.Close() if err != nil { - log.Printf("E! [outputs.%s] Error closing output: %v", ro.Name, err) + r.log.Errorf("Error closing output: %v", err) } } -func (ro *RunningOutput) write(metrics []telegraf.Metric) error { - dropped := atomic.LoadInt64(&ro.droppedMetrics) +func (r *RunningOutput) write(metrics []telegraf.Metric) error { + dropped := atomic.LoadInt64(&r.droppedMetrics) if dropped > 0 { - log.Printf("W! [outputs.%s] Metric buffer overflow; %d metrics have been dropped", - ro.Name, dropped) - atomic.StoreInt64(&ro.droppedMetrics, 0) + r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped) + atomic.StoreInt64(&r.droppedMetrics, 0) } start := time.Now() - err := ro.Output.Write(metrics) + err := r.Output.Write(metrics) elapsed := time.Since(start) - ro.WriteTime.Incr(elapsed.Nanoseconds()) + r.WriteTime.Incr(elapsed.Nanoseconds()) if err == nil { - log.Printf("D! [outputs.%s] wrote batch of %d metrics in %s\n", - ro.Name, len(metrics), elapsed) + r.log.Debugf("Wrote batch of %d metrics in %s", len(metrics), elapsed) } return err } -func (ro *RunningOutput) LogBufferStatus() { - nBuffer := ro.buffer.Len() - log.Printf("D! [outputs.%s] buffer fullness: %d / %d metrics. ", - ro.Name, nBuffer, ro.MetricBufferLimit) +func (r *RunningOutput) LogBufferStatus() { + nBuffer := r.buffer.Len() + r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit) } diff --git a/internal/models/running_processor.go b/internal/models/running_processor.go index 90d32fde5..5a12716e5 100644 --- a/internal/models/running_processor.go +++ b/internal/models/running_processor.go @@ -4,12 +4,12 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) type RunningProcessor struct { - Name string - sync.Mutex + log telegraf.Logger Processor telegraf.Processor Config *ProcessorConfig } @@ -23,10 +23,27 @@ func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp // FilterConfig containing a name and filter type ProcessorConfig struct { Name string + Alias string Order int64 Filter Filter } +func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor { + logger := &Logger{ + Name: logName("processors", config.Name, config.Alias), + Errs: selfstat.Register("process", "errors", + map[string]string{"input": config.Name, "alias": config.Alias}), + } + + setLogIfExist(processor, logger) + + return &RunningProcessor{ + Processor: processor, + Config: config, + log: logger, + } +} + func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) { metric.Drop() } @@ -40,8 +57,8 @@ func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool { return false } -func (rp *RunningProcessor) Init() error { - if p, ok := rp.Processor.(telegraf.Initializer); ok { +func (r *RunningProcessor) Init() error { + if p, ok := r.Processor.(telegraf.Initializer); ok { err := p.Init() if err != nil { return err diff --git a/plugin.go b/plugin.go new file mode 100644 index 000000000..f79721958 --- /dev/null +++ b/plugin.go @@ -0,0 +1,30 @@ +package telegraf + +// Initializer is an interface that all plugin types: Inputs, Outputs, +// Processors, and Aggregators can optionally implement to initialize the +// plugin. +type Initializer interface { + // Init performs one time setup of the plugin and returns an error if the + // configuration is invalid. + Init() error +} + +// Logger defines an interface for logging. +type Logger interface { + // Errorf logs an error message, patterned after log.Printf. + Errorf(format string, args ...interface{}) + // Error logs an error message, patterned after log.Print. + Error(args ...interface{}) + // Debugf logs a debug message, patterned after log.Printf. + Debugf(format string, args ...interface{}) + // Debug logs a debug message, patterned after log.Print. + Debug(args ...interface{}) + // Warnf logs a warning message, patterned after log.Printf. + Warnf(format string, args ...interface{}) + // Warn logs a warning message, patterned after log.Print. + Warn(args ...interface{}) + // Infof logs an information message, patterned after log.Printf. + Infof(format string, args ...interface{}) + // Info logs an information message, patterned after log.Print. + Info(args ...interface{}) +} diff --git a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go index 57734c705..45a304e60 100644 --- a/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go +++ b/plugins/inputs/cloud_pubsub_push/pubsub_push_test.go @@ -183,6 +183,10 @@ func (tm *testMetricMaker) Name() string { return "TestPlugin" } +func (tm *testMetricMaker) LogName() string { + return tm.Name() +} + func (tm *testMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { return metric } diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 615736b3c..2d3643ad0 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -3,7 +3,6 @@ package exec import ( "bytes" "fmt" - "log" "os/exec" "path/filepath" "runtime" @@ -51,6 +50,7 @@ type Exec struct { parser parsers.Parser runner Runner + log telegraf.Logger } func NewExec() *Exec { @@ -161,7 +161,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync if isNagios { metrics, err = nagios.TryAddState(runErr, metrics) if err != nil { - log.Printf("E! [inputs.exec] failed to add nagios state: %s", err) + e.log.Errorf("failed to add nagios state: %s", err) } } @@ -229,6 +229,10 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { return nil } +func (e *Exec) Init() error { + return nil +} + func init() { inputs.Add("exec", func() telegraf.Input { return NewExec() diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 5aaef8961..0523a181d 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -97,6 +96,7 @@ func TestExec(t *testing.T) { MetricName: "exec", }) e := &Exec{ + log: testutil.Logger{}, runner: newRunnerMock([]byte(validJson), nil, nil), Commands: []string{"testcommand arg1"}, parser: parser, @@ -126,6 +126,7 @@ func TestExecMalformed(t *testing.T) { MetricName: "exec", }) e := &Exec{ + log: testutil.Logger{}, runner: newRunnerMock([]byte(malformedJson), nil, nil), Commands: []string{"badcommand arg1"}, parser: parser, @@ -142,6 +143,7 @@ func TestCommandError(t *testing.T) { MetricName: "exec", }) e := &Exec{ + log: testutil.Logger{}, runner: newRunnerMock(nil, nil, fmt.Errorf("exit status code 1")), Commands: []string{"badcommand"}, parser: parser, diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 56576082f..c7c29a638 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net" "net/http" "net/url" @@ -101,12 +100,15 @@ type HTTPConfig struct { InfluxUintSupport bool `toml:"influx_uint_support"` Serializer *influx.Serializer + log telegraf.Logger } type httpClient struct { client *http.Client config HTTPConfig createdDatabases map[string]bool + + log telegraf.Logger } func NewHTTPClient(config HTTPConfig) (*httpClient, error) { @@ -174,6 +176,7 @@ func NewHTTPClient(config HTTPConfig) (*httpClient, error) { }, createdDatabases: make(map[string]bool), config: config, + log: config.log, } return client, nil } @@ -183,6 +186,10 @@ func (c *httpClient) URL() string { return c.config.URL.String() } +func (c *httpClient) SetLogger(log telegraf.Logger) { + c.log = log +} + // Database returns the default database that this client connects too. func (c *httpClient) Database() string { return c.config.Database @@ -262,7 +269,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] { err := c.CreateDatabase(ctx, db) if err != nil { - log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", + c.log.Warnf("when writing to [%s]: database %q creation failed: %v", c.config.URL, db, err) } } @@ -328,7 +335,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegr // discarded for being older than the retention policy. Usually this not // a cause for concern and we don't want to retry. if strings.Contains(desc, errStringPointsBeyondRP) { - log.Printf("W! [outputs.influxdb]: when writing to [%s]: received error %v", + c.log.Warnf("when writing to [%s]: received error %v", c.URL(), desc) return nil } @@ -337,7 +344,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegr // correctable at this point and so the point is dropped instead of // retrying. if strings.Contains(desc, errStringPartialWrite) { - log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points", + c.log.Errorf("when writing to [%s]: received error %v; discarding points", c.URL(), desc) return nil } @@ -345,7 +352,7 @@ func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegr // This error indicates a bug in either Telegraf line protocol // serialization, retries would not be successful. if strings.Contains(desc, errStringUnableToParse) { - log.Printf("E! [outputs.influxdb]: when writing to [%s]: received error %v; discarding points", + c.log.Errorf("when writing to [%s]: received error %v; discarding points", c.URL(), desc) return nil } diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index 2b6b45eef..98ec4ef5b 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -21,6 +21,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -471,6 +472,7 @@ func TestHTTP_Write(t *testing.T) { client, err := influxdb.NewHTTPClient(tt.config) require.NoError(t, err) + client.SetLogger(testutil.Logger{}) err = client.Write(ctx, metrics) if tt.errFunc != nil { tt.errFunc(t, err) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index b07d58fc2..0a6f66696 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "math/rand" "net/url" "time" @@ -28,6 +27,7 @@ type Client interface { Database() string URL() string Close() + SetLogger(telegraf.Logger) } // InfluxDB struct is the primary data structure for the plugin @@ -59,6 +59,7 @@ type InfluxDB struct { CreateUDPClientF func(config *UDPConfig) (Client, error) serializer *influx.Serializer + Log telegraf.Logger } var sampleConfig = ` @@ -171,6 +172,8 @@ func (i *InfluxDB) Connect() error { return err } + c.SetLogger(i.Log) + i.clients = append(i.clients, c) case "http", "https", "unix": c, err := i.httpClient(ctx, parts, proxy) @@ -178,6 +181,8 @@ func (i *InfluxDB) Connect() error { return err } + c.SetLogger(i.Log) + i.clients = append(i.clients, c) default: return fmt.Errorf("unsupported scheme [%q]: %q", u, parts.Scheme) @@ -221,13 +226,13 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { if !i.SkipDatabaseCreation { err := client.CreateDatabase(ctx, apiError.Database) if err != nil { - log.Printf("E! [outputs.influxdb] when writing to [%s]: database %q not found and failed to recreate", + i.Log.Errorf("when writing to [%s]: database %q not found and failed to recreate", client.URL(), apiError.Database) } } } - log.Printf("E! [outputs.influxdb] when writing to [%s]: %v", client.URL(), err) + i.Log.Errorf("when writing to [%s]: %v", client.URL(), err) } return errors.New("could not write any address") @@ -281,7 +286,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) if !i.SkipDatabaseCreation { err = c.CreateDatabase(ctx, c.Database()) if err != nil { - log.Printf("W! [outputs.influxdb] when writing to [%s]: database %q creation failed: %v", + i.Log.Warnf("when writing to [%s]: database %q creation failed: %v", c.URL(), i.Database, err) } } diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 73f481e9a..4b86de4de 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -20,6 +21,8 @@ type MockClient struct { CreateDatabaseF func(ctx context.Context, database string) error DatabaseF func() string CloseF func() + + log telegraf.Logger } func (c *MockClient) URL() string { @@ -42,6 +45,10 @@ func (c *MockClient) Close() { c.CloseF() } +func (c *MockClient) SetLogger(log telegraf.Logger) { + c.log = log +} + func TestDeprecatedURLSupport(t *testing.T) { var actual *influxdb.UDPConfig output := influxdb.InfluxDB{ @@ -52,6 +59,9 @@ func TestDeprecatedURLSupport(t *testing.T) { return &MockClient{}, nil }, } + + output.Log = testutil.Logger{} + err := output.Connect() require.NoError(t, err) require.Equal(t, "udp://localhost:8089", actual.URL.String()) @@ -72,6 +82,9 @@ func TestDefaultURL(t *testing.T) { }, nil }, } + + output.Log = testutil.Logger{} + err := output.Connect() require.NoError(t, err) require.Equal(t, "http://localhost:8086", actual.URL.String()) @@ -89,6 +102,8 @@ func TestConnectUDPConfig(t *testing.T) { return &MockClient{}, nil }, } + output.Log = testutil.Logger{} + err := output.Connect() require.NoError(t, err) @@ -130,6 +145,9 @@ func TestConnectHTTPConfig(t *testing.T) { }, nil }, } + + output.Log = testutil.Logger{} + err := output.Connect() require.NoError(t, err) @@ -153,7 +171,6 @@ func TestConnectHTTPConfig(t *testing.T) { func TestWriteRecreateDatabaseIfDatabaseNotFound(t *testing.T) { output := influxdb.InfluxDB{ URLs: []string{"http://localhost:8086"}, - CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { return &MockClient{ DatabaseF: func() string { @@ -173,12 +190,13 @@ func TestWriteRecreateDatabaseIfDatabaseNotFound(t *testing.T) { }, URLF: func() string { return "http://localhost:8086" - }, }, nil }, } + output.Log = testutil.Logger{} + err := output.Connect() require.NoError(t, err) diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index a33b98563..76fdb7862 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "fmt" - "log" "net" "net/url" @@ -32,6 +31,7 @@ type UDPConfig struct { URL *url.URL Serializer *influx.Serializer Dialer Dialer + Log telegraf.Logger } func NewUDPClient(config UDPConfig) (*udpClient, error) { @@ -69,12 +69,18 @@ type udpClient struct { dialer Dialer serializer *influx.Serializer url *url.URL + + log telegraf.Logger } func (c *udpClient) URL() string { return c.url.String() } +func (c *udpClient) SetLogger(log telegraf.Logger) { + c.log = log +} + func (c *udpClient) Database() string { return "" } @@ -93,7 +99,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error if err != nil { // Since we are serializing multiple metrics, don't fail the // entire batch just because of one unserializable metric. - log.Printf("E! [outputs.influxdb] when writing to [%s] could not serialize metric: %v", + c.log.Errorf("when writing to [%s] could not serialize metric: %v", c.URL(), err) continue } diff --git a/plugins/outputs/influxdb/udp_test.go b/plugins/outputs/influxdb/udp_test.go index 136ebb787..61c64ff77 100644 --- a/plugins/outputs/influxdb/udp_test.go +++ b/plugins/outputs/influxdb/udp_test.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -78,6 +79,7 @@ func TestUDP_URL(t *testing.T) { } client, err := influxdb.NewUDPClient(config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) require.Equal(t, u.String(), client.URL()) @@ -101,6 +103,7 @@ func TestUDP_Simple(t *testing.T) { }, } client, err := influxdb.NewUDPClient(config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) ctx := context.Background() @@ -127,6 +130,7 @@ func TestUDP_DialError(t *testing.T) { }, } client, err := influxdb.NewUDPClient(config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) ctx := context.Background() @@ -156,6 +160,7 @@ func TestUDP_WriteError(t *testing.T) { }, } client, err := influxdb.NewUDPClient(config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) ctx := context.Background() @@ -219,6 +224,7 @@ func TestUDP_ErrorLogging(t *testing.T) { log.SetOutput(&b) client, err := influxdb.NewUDPClient(tt.config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) ctx := context.Background() @@ -262,6 +268,7 @@ func TestUDP_WriteWithRealConn(t *testing.T) { URL: u, } client, err := influxdb.NewUDPClient(config) + client.SetLogger(testutil.Logger{}) require.NoError(t, err) ctx := context.Background() diff --git a/testutil/log.go b/testutil/log.go new file mode 100644 index 000000000..5e0458dc7 --- /dev/null +++ b/testutil/log.go @@ -0,0 +1,50 @@ +package testutil + +import ( + "log" +) + +// Logger defines a logging structure for plugins. +type Logger struct { + Name string // Name is the plugin name, will be printed in the `[]`. +} + +// Errorf logs an error message, patterned after log.Printf. +func (l Logger) Errorf(format string, args ...interface{}) { + log.Printf("E! ["+l.Name+"] "+format, args...) +} + +// Error logs an error message, patterned after log.Print. +func (l Logger) Error(args ...interface{}) { + log.Print(append([]interface{}{"E! [" + l.Name + "] "}, args...)...) +} + +// Debugf logs a debug message, patterned after log.Printf. +func (l Logger) Debugf(format string, args ...interface{}) { + log.Printf("D! ["+l.Name+"] "+format, args...) +} + +// Debug logs a debug message, patterned after log.Print. +func (l Logger) Debug(args ...interface{}) { + log.Print(append([]interface{}{"D! [" + l.Name + "] "}, args...)...) +} + +// Warnf logs a warning message, patterned after log.Printf. +func (l Logger) Warnf(format string, args ...interface{}) { + log.Printf("W! ["+l.Name+"] "+format, args...) +} + +// Warn logs a warning message, patterned after log.Print. +func (l Logger) Warn(args ...interface{}) { + log.Print(append([]interface{}{"W! [" + l.Name + "] "}, args...)...) +} + +// Infof logs an information message, patterned after log.Printf. +func (l Logger) Infof(format string, args ...interface{}) { + log.Printf("I! ["+l.Name+"] "+format, args...) +} + +// Info logs an information message, patterned after log.Print. +func (l Logger) Info(args ...interface{}) { + log.Print(append([]interface{}{"I! [" + l.Name + "] "}, args...)...) +}