From 741ea839d2cd014d91c18412bd5f6d2d3e25f4ec Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Fri, 5 Jun 2020 10:43:43 -0400 Subject: [PATCH] add support for streaming processors (#7634) --- agent/agent.go | 1158 +++++++++++++--------- aggregator.go | 6 +- config/config.go | 109 +- config/config_test.go | 8 +- input.go | 6 +- models/running_processor.go | 74 +- models/running_processor_test.go | 20 +- output.go | 6 +- plugin.go | 10 + plugins/processors/registry.go | 17 +- plugins/processors/streamingprocessor.go | 49 + processor.go | 29 +- 12 files changed, 913 insertions(+), 579 deletions(-) create mode 100644 plugins/processors/streamingprocessor.go diff --git a/agent/agent.go b/agent/agent.go index 5795eb0d4..72e906a59 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -6,6 +6,7 @@ import ( "log" "os" "runtime" + "sort" "sync" "time" @@ -29,6 +30,70 @@ func NewAgent(config *config.Config) (*Agent, error) { return a, nil } +// inputUnit is a group of input plugins and the shared channel they write to. +// +// ┌───────┐ +// │ Input │───┐ +// └───────┘ │ +// ┌───────┐ │ ______ +// │ Input │───┼──▶ ()_____) +// └───────┘ │ +// ┌───────┐ │ +// │ Input │───┘ +// └───────┘ +type inputUnit struct { + dst chan<- telegraf.Metric + inputs []*models.RunningInput +} + +// ______ ┌───────────┐ ______ +// ()_____)──▶ │ Processor │──▶ ()_____) +// └───────────┘ +type processorUnit struct { + src <-chan telegraf.Metric + dst chan<- telegraf.Metric + processor *models.RunningProcessor +} + +// aggregatorUnit is a group of Aggregators and their source and sink channels. +// Typically the aggregators write to a processor channel and pass the original +// metrics to the output channel. The sink channels may be the same channel. +// +// ┌────────────┐ +// ┌──▶ │ Aggregator │───┐ +// │ └────────────┘ │ +// ______ │ ┌────────────┐ │ ______ +// ()_____)───┼──▶ │ Aggregator │───┼──▶ ()_____) +// │ └────────────┘ │ +// │ ┌────────────┐ │ +// ├──▶ │ Aggregator │───┘ +// │ └────────────┘ +// │ ______ +// └────────────────────────▶ ()_____) +type aggregatorUnit struct { + src <-chan telegraf.Metric + aggC chan<- telegraf.Metric + outputC chan<- telegraf.Metric + aggregators []*models.RunningAggregator +} + +// outputUnit is a group of Outputs and their source channel. Metrics on the +// channel are written to all outputs. +// +// ┌────────┐ +// ┌──▶ │ Output │ +// │ └────────┘ +// ______ ┌─────┐ │ ┌────────┐ +// ()_____)──▶ │ Fan │───┼──▶ │ Output │ +// └─────┘ │ └────────┘ +// │ ┌────────┐ +// └──▶ │ Output │ +// └────────┘ +type outputUnit struct { + src <-chan telegraf.Metric + outputs []*models.RunningOutput +} + // Run starts and runs the Agent until the context is done. func (a *Agent) Run(ctx context.Context) error { log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+ @@ -36,298 +101,150 @@ func (a *Agent) Run(ctx context.Context) error { a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet, a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) - if ctx.Err() != nil { - return ctx.Err() - } - log.Printf("D! [agent] Initializing plugins") err := a.initPlugins() if err != nil { return err } + startTime := time.Now() + log.Printf("D! [agent] Connecting outputs") - err = a.connectOutputs(ctx) + next, ou, err := a.startOutputs(ctx, a.Config.Outputs) if err != nil { return err } - inputC := make(chan telegraf.Metric, 100) - procC := make(chan telegraf.Metric, 100) - outputC := make(chan telegraf.Metric, 100) + var apu []*processorUnit + var au *aggregatorUnit + if len(a.Config.Aggregators) != 0 { + aggC := next + if len(a.Config.AggProcessors) != 0 { + aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors) + if err != nil { + return err + } + } - startTime := time.Now() + next, au, err = a.startAggregators(aggC, next, a.Config.Aggregators) + if err != nil { + return err + } + } - log.Printf("D! [agent] Starting service inputs") - err = a.startServiceInputs(ctx, inputC) + var pu []*processorUnit + if len(a.Config.Processors) != 0 { + next, pu, err = a.startProcessors(next, a.Config.Processors) + if err != nil { + return err + } + } + + iu, err := a.startInputs(next, a.Config.Inputs) if err != nil { return err } var wg sync.WaitGroup - - src := inputC - dst := inputC - wg.Add(1) - go func(dst chan telegraf.Metric) { + go func() { defer wg.Done() - - err := a.runInputs(ctx, startTime, dst) - if err != nil { - log.Printf("E! [agent] Error running inputs: %v", err) - } - - log.Printf("D! [agent] Stopping service inputs") - a.stopServiceInputs() - - close(dst) - log.Printf("D! [agent] Input channel closed") - }(dst) - - src = dst - - if len(a.Config.Processors) > 0 { - dst = procC - - wg.Add(1) - go func(src, dst chan telegraf.Metric) { - defer wg.Done() - - err := a.runProcessors(src, dst) - if err != nil { - log.Printf("E! [agent] Error running processors: %v", err) - } - close(dst) - log.Printf("D! [agent] Processor channel closed") - }(src, dst) - - src = dst - } - - if len(a.Config.Aggregators) > 0 { - dst = outputC - - wg.Add(1) - go func(src, dst chan telegraf.Metric) { - defer wg.Done() - - err := a.runAggregators(startTime, src, dst) - if err != nil { - log.Printf("E! [agent] Error running aggregators: %v", err) - } - close(dst) - log.Printf("D! [agent] Output channel closed") - }(src, dst) - - src = dst - } - - wg.Add(1) - go func(src chan telegraf.Metric) { - defer wg.Done() - - err := a.runOutputs(startTime, src) + err = a.runOutputs(ou) if err != nil { log.Printf("E! [agent] Error running outputs: %v", err) } - }(src) - - wg.Wait() - - log.Printf("D! [agent] Closing outputs") - a.closeOutputs() - - log.Printf("D! [agent] Stopped Successfully") - return nil -} - -// Test runs the inputs, processors and aggregators for a single gather and -// writes the metrics to stdout. -func (a *Agent) Test(ctx context.Context, wait time.Duration) error { - outputF := func(src <-chan telegraf.Metric) { - s := influx.NewSerializer() - s.SetFieldSortOrder(influx.SortFields) - - for metric := range src { - octets, err := s.Serialize(metric) - if err == nil { - fmt.Print("> ", string(octets)) - } - metric.Reject() - } - } - - err := a.test(ctx, wait, outputF) - if err != nil { - return err - } - - if models.GlobalGatherErrors.Get() != 0 { - return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) - } - return nil - -} - -// Once runs the full agent for a single gather. -func (a *Agent) Once(ctx context.Context, wait time.Duration) error { - outputF := func(src <-chan telegraf.Metric) { - interval := a.Config.Agent.FlushInterval.Duration - - ctx, cancel := context.WithCancel(context.Background()) - - var wg sync.WaitGroup - for _, output := range a.Config.Outputs { - interval := interval - // Overwrite agent flush_interval if this plugin has its own. - if output.Config.FlushInterval != 0 { - interval = output.Config.FlushInterval - } - - jitter := 0 * time.Second - - ticker := NewRollingTicker(interval, jitter) - defer ticker.Stop() - - wg.Add(1) - go func(output *models.RunningOutput) { - defer wg.Done() - a.flushLoop(ctx, output, ticker) - }(output) - } - - for metric := range src { - for i, output := range a.Config.Outputs { - if i == len(a.Config.Outputs)-1 { - output.AddMetric(metric) - } else { - output.AddMetric(metric.Copy()) - } - } - } - - cancel() - wg.Wait() - } - - err := a.test(ctx, wait, outputF) - if err != nil { - return err - } - - if models.GlobalGatherErrors.Get() != 0 { - return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) - } - - unsent := 0 - for _, output := range a.Config.Outputs { - unsent += output.BufferLength() - } - if unsent != 0 { - return fmt.Errorf("output plugins unable to send %d metrics", unsent) - } - return nil -} - -// Test runs the agent and performs a single gather sending output to the -// outputF. After gathering pauses for the wait duration to allow service -// inputs to run. -func (a *Agent) test(ctx context.Context, wait time.Duration, outputF func(<-chan telegraf.Metric)) error { - log.Printf("D! [agent] Initializing plugins") - err := a.initPlugins() - if err != nil { - return err - } - - log.Printf("D! [agent] Connecting outputs") - err = a.connectOutputs(ctx) - if err != nil { - return err - } - - inputC := make(chan telegraf.Metric, 100) - procC := make(chan telegraf.Metric, 100) - outputC := make(chan telegraf.Metric, 100) - - startTime := time.Now() - - var wg sync.WaitGroup - - src := inputC - dst := inputC - - wg.Add(1) - go func(dst chan telegraf.Metric) { - defer wg.Done() - - a.testRunInputs(ctx, wait, dst) - - close(dst) - log.Printf("D! [agent] Input channel closed") - }(dst) - - src = dst - - if len(a.Config.Processors) > 0 { - dst = procC + }() + if au != nil { wg.Add(1) - go func(src, dst chan telegraf.Metric) { + go func() { defer wg.Done() - - err := a.runProcessors(src, dst) + err = a.runProcessors(apu) if err != nil { log.Printf("E! [agent] Error running processors: %v", err) } - close(dst) - log.Printf("D! [agent] Processor channel closed") - }(src, dst) - - src = dst - } - - if len(a.Config.Aggregators) > 0 { - dst = outputC + }() wg.Add(1) - go func(src, dst chan telegraf.Metric) { + go func() { defer wg.Done() - - err := a.runAggregators(startTime, src, dst) + err = a.runAggregators(startTime, au) if err != nil { log.Printf("E! [agent] Error running aggregators: %v", err) } - close(dst) - log.Printf("D! [agent] Output channel closed") - }(src, dst) + }() + } - src = dst + if pu != nil { + wg.Add(1) + go func() { + defer wg.Done() + err = a.runProcessors(pu) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + }() } wg.Add(1) - go func(src <-chan telegraf.Metric) { + go func() { defer wg.Done() - outputF(src) - }(src) + err = a.runInputs(ctx, startTime, iu) + if err != nil { + log.Printf("E! [agent] Error running inputs: %v", err) + } + }() wg.Wait() - log.Printf("D! [agent] Closing outputs") - a.closeOutputs() - log.Printf("D! [agent] Stopped Successfully") + return err +} +// initPlugins runs the Init function on plugins. +func (a *Agent) initPlugins() error { + for _, input := range a.Config.Inputs { + err := input.Init() + if err != nil { + return fmt.Errorf("could not initialize input %s: %v", + input.LogName(), err) + } + } + for _, processor := range a.Config.Processors { + err := processor.Init() + if err != nil { + return fmt.Errorf("could not initialize processor %s: %v", + processor.Config.Name, err) + } + } + for _, aggregator := range a.Config.Aggregators { + err := aggregator.Init() + if err != nil { + return fmt.Errorf("could not initialize aggregator %s: %v", + aggregator.Config.Name, err) + } + } + for _, output := range a.Config.Outputs { + err := output.Init() + if err != nil { + return fmt.Errorf("could not initialize output %s: %v", + output.Config.Name, err) + } + } return nil } -func (a *Agent) testRunInputs( - ctx context.Context, - wait time.Duration, +func (a *Agent) startInputs( dst chan<- telegraf.Metric, -) { + inputs []*models.RunningInput, +) (*inputUnit, error) { log.Printf("D! [agent] Starting service inputs") - for _, input := range a.Config.Inputs { + + unit := &inputUnit{ + dst: dst, + } + + for _, input := range inputs { if si, ok := input.Input.(telegraf.ServiceInput); ok { // Service input plugins are not subject to timestamp rounding. // This only applies to the accumulator passed to Start(), the @@ -338,21 +255,115 @@ func (a *Agent) testRunInputs( err := si.Start(acc) if err != nil { - acc.AddError(err) - si.Stop() - continue + stopServiceInputs(unit.inputs) + return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err) } } + unit.inputs = append(unit.inputs, input) } + return unit, nil +} + +// runInputs starts and triggers the periodic gather for Inputs. +// +// When the context is done the timers are stopped and this function returns +// after all ongoing Gather calls complete. +func (a *Agent) runInputs( + ctx context.Context, + startTime time.Time, + unit *inputUnit, +) error { + var wg sync.WaitGroup + for _, input := range unit.inputs { + interval := a.Config.Agent.Interval.Duration + jitter := a.Config.Agent.CollectionJitter.Duration + + // Overwrite agent interval if this plugin has its own. + if input.Config.Interval != 0 { + interval = input.Config.Interval + } + + var ticker Ticker + if a.Config.Agent.RoundInterval { + ticker = NewAlignedTicker(startTime, interval, jitter) + } else { + ticker = NewUnalignedTicker(interval, jitter) + } + defer ticker.Stop() + + acc := NewAccumulator(input, unit.dst) + acc.SetPrecision(a.Precision()) + + wg.Add(1) + go func(input *models.RunningInput) { + defer wg.Done() + a.gatherLoop(ctx, acc, input, ticker) + }(input) + } + + wg.Wait() + + log.Printf("D! [agent] Stopping service inputs") + stopServiceInputs(unit.inputs) + + close(unit.dst) + log.Printf("D! [agent] Input channel closed") + + return nil +} + +// testStartInputs is a variation of startInputs for use in --test and --once +// mode. It differs by logging Start errors and returning only plugins +// successfully started. +func (a *Agent) testStartInputs( + dst chan<- telegraf.Metric, + inputs []*models.RunningInput, +) (*inputUnit, error) { + log.Printf("D! [agent] Starting service inputs") + + unit := &inputUnit{ + dst: dst, + } + + for _, input := range inputs { + if si, ok := input.Input.(telegraf.ServiceInput); ok { + // Service input plugins are not subject to timestamp rounding. + // This only applies to the accumulator passed to Start(), the + // Gather() accumulator does apply rounding according to the + // precision agent setting. + acc := NewAccumulator(input, dst) + acc.SetPrecision(time.Nanosecond) + + err := si.Start(acc) + if err != nil { + log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err) + } + + } + + unit.inputs = append(unit.inputs, input) + } + + return unit, nil +} + +// testRunInputs is a variation of runInputs for use in --test and --once mode. +// Instead of using a ticker to run the inputs they are called once immediately. +func (a *Agent) testRunInputs( + ctx context.Context, + wait time.Duration, + unit *inputUnit, +) error { + var wg sync.WaitGroup + nul := make(chan telegraf.Metric) go func() { for range nul { } }() - var wg sync.WaitGroup - for _, input := range a.Config.Inputs { + for _, input := range unit.inputs { wg.Add(1) go func(input *models.RunningInput) { defer wg.Done() @@ -370,7 +381,7 @@ func (a *Agent) testRunInputs( time.Sleep(500 * time.Millisecond) } - acc := NewAccumulator(input, dst) + acc := NewAccumulator(input, unit.dst) acc.SetPrecision(a.Precision()) if err := input.Input.Gather(acc); err != nil { @@ -379,54 +390,24 @@ func (a *Agent) testRunInputs( }(input) } wg.Wait() - close(nul) internal.SleepContext(ctx, wait) log.Printf("D! [agent] Stopping service inputs") - a.stopServiceInputs() + stopServiceInputs(unit.inputs) + close(unit.dst) + log.Printf("D! [agent] Input channel closed") + return nil } -// runInputs starts and triggers the periodic gather for Inputs. -// -// When the context is done the timers are stopped and this function returns -// after all ongoing Gather calls complete. -func (a *Agent) runInputs( - ctx context.Context, - startTime time.Time, - dst chan<- telegraf.Metric, -) error { - var wg sync.WaitGroup - for _, input := range a.Config.Inputs { - interval := a.Config.Agent.Interval.Duration - jitter := a.Config.Agent.CollectionJitter.Duration - - // Overwrite agent interval if this plugin has its own. - if input.Config.Interval != 0 { - interval = input.Config.Interval +// stopServiceInputs stops all service inputs. +func stopServiceInputs(inputs []*models.RunningInput) { + for _, input := range inputs { + if si, ok := input.Input.(telegraf.ServiceInput); ok { + si.Stop() } - - var ticker Ticker - if a.Config.Agent.RoundInterval { - ticker = NewAlignedTicker(startTime, interval, jitter) - } else { - ticker = NewUnalignedTicker(interval, jitter) - } - defer ticker.Stop() - - acc := NewAccumulator(input, dst) - acc.SetPrecision(a.Precision()) - - wg.Add(1) - go func(input *models.RunningInput) { - defer wg.Done() - a.gatherLoop(ctx, acc, input, ticker) - }(input) } - - wg.Wait() - return nil } // gather runs an input's gather function periodically until the context is @@ -475,30 +456,142 @@ func (a *Agent) gatherOnce( } } -// runProcessors applies processors to metrics. -func (a *Agent) runProcessors( - src <-chan telegraf.Metric, - agg chan<- telegraf.Metric, -) error { - for metric := range src { - metrics := a.applyProcessors(metric) +// startProcessors sets up the processor chain and calls Start on all +// processors. If an error occurs any started processors are Stopped. +func (a *Agent) startProcessors( + dst chan<- telegraf.Metric, + processors models.RunningProcessors, +) (chan<- telegraf.Metric, []*processorUnit, error) { + var units []*processorUnit - for _, metric := range metrics { - agg <- metric + // Sort from last to first + sort.SliceStable(processors, func(i, j int) bool { + return processors[i].Config.Order > processors[j].Config.Order + }) + + var src chan telegraf.Metric + for _, processor := range processors { + src = make(chan telegraf.Metric, 100) + acc := NewAccumulator(processor, dst) + + err := processor.Start(acc) + if err != nil { + for _, u := range units { + u.processor.Stop() + close(u.dst) + } + return nil, nil, fmt.Errorf("starting processor %s: %w", processor.LogName(), err) } + + units = append(units, &processorUnit{ + src: src, + dst: dst, + processor: processor, + }) + + dst = src } + return src, units, nil +} + +// runProcessors begins processing metrics and runs until the source channel is +// closed and all metrics have been written. +func (a *Agent) runProcessors( + units []*processorUnit, +) error { + var wg sync.WaitGroup + for _, unit := range units { + wg.Add(1) + go func(unit *processorUnit) { + defer wg.Done() + + acc := NewAccumulator(unit.processor, unit.dst) + for m := range unit.src { + unit.processor.Add(m, acc) + } + unit.processor.Stop() + close(unit.dst) + log.Printf("D! [agent] Processor channel closed") + }(unit) + } + wg.Wait() + return nil } -// applyProcessors applies all processors to a metric. -func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric { - metrics := []telegraf.Metric{m} - for _, processor := range a.Config.Processors { - metrics = processor.Apply(metrics...) +// startAggregators sets up the aggregator unit and returns the source channel. +func (a *Agent) startAggregators( + aggC chan<- telegraf.Metric, + outputC chan<- telegraf.Metric, + aggregators []*models.RunningAggregator, +) (chan<- telegraf.Metric, *aggregatorUnit, error) { + src := make(chan telegraf.Metric, 100) + unit := &aggregatorUnit{ + src: src, + aggC: aggC, + outputC: outputC, + aggregators: aggregators, + } + return src, unit, nil +} + +// runAggregators beings aggregating metrics and runs until the source channel +// is closed and all metrics have been written. +func (a *Agent) runAggregators( + startTime time.Time, + unit *aggregatorUnit, +) error { + ctx, cancel := context.WithCancel(context.Background()) + + // Before calling Add, initialize the aggregation window. This ensures + // that any metric created after start time will be aggregated. + for _, agg := range a.Config.Aggregators { + since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period()) + agg.UpdateWindow(since, until) } - return metrics + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for metric := range unit.src { + var dropOriginal bool + for _, agg := range a.Config.Aggregators { + if ok := agg.Add(metric); ok { + dropOriginal = true + } + } + + if !dropOriginal { + unit.outputC <- metric // keep original. + } else { + metric.Drop() + } + } + cancel() + }() + + for _, agg := range a.Config.Aggregators { + wg.Add(1) + go func(agg *models.RunningAggregator) { + defer wg.Done() + + acc := NewAccumulator(agg, unit.aggC) + acc.SetPrecision(a.Precision()) + a.push(ctx, agg, acc) + }(agg) + } + + wg.Wait() + + // In the case that there are no processors, both aggC and outputC are the + // same channel. If there are processors, we close the aggC and the + // processor chain will close the outputC when it finishes processing. + close(unit.aggC) + log.Printf("D! [agent] Aggregator channel closed") + + return nil } func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) { @@ -517,78 +610,6 @@ func updateWindow(start time.Time, roundInterval bool, period time.Duration) (ti return since, until } -// runAggregators adds metrics to the aggregators and triggers their periodic -// push call. -// -// Runs until src is closed and all metrics have been processed. Will call -// push one final time before returning. -func (a *Agent) runAggregators( - startTime time.Time, - src <-chan telegraf.Metric, - dst chan<- telegraf.Metric, -) error { - ctx, cancel := context.WithCancel(context.Background()) - - // Before calling Add, initialize the aggregation window. This ensures - // that any metric created after start time will be aggregated. - for _, agg := range a.Config.Aggregators { - since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period()) - agg.UpdateWindow(since, until) - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for metric := range src { - var dropOriginal bool - for _, agg := range a.Config.Aggregators { - if ok := agg.Add(metric); ok { - dropOriginal = true - } - } - - if !dropOriginal { - dst <- metric - } else { - metric.Drop() - } - } - cancel() - }() - - aggregations := make(chan telegraf.Metric, 100) - wg.Add(1) - go func() { - defer wg.Done() - - var aggWg sync.WaitGroup - for _, agg := range a.Config.Aggregators { - aggWg.Add(1) - go func(agg *models.RunningAggregator) { - defer aggWg.Done() - - acc := NewAccumulator(agg, aggregations) - acc.SetPrecision(a.Precision()) - a.push(ctx, agg, acc) - }(agg) - } - - aggWg.Wait() - close(aggregations) - }() - - for metric := range aggregations { - metrics := a.applyProcessors(metric) - for _, metric := range metrics { - dst <- metric - } - } - - wg.Wait() - return nil -} - // push runs the push for a single aggregator every period. func (a *Agent) push( ctx context.Context, @@ -613,22 +634,66 @@ func (a *Agent) push( } } -// runOutputs triggers the periodic write for Outputs. -// +// startOutputs calls Connect on all outputs and returns the source channel. +// If an error occurs calling Connect all stared plugins have Close called. +func (a *Agent) startOutputs( + ctx context.Context, + outputs []*models.RunningOutput, +) (chan<- telegraf.Metric, *outputUnit, error) { + src := make(chan telegraf.Metric, 100) + unit := &outputUnit{src: src} + for _, output := range outputs { + err := a.connectOutput(ctx, output) + if err != nil { + for _, output := range unit.outputs { + output.Close() + } + return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err) + } -// Runs until src is closed and all metrics have been processed. Will call -// Write one final time before returning. + unit.outputs = append(unit.outputs, output) + } + + return src, unit, nil +} + +// connectOutputs connects to all outputs. +func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error { + log.Printf("D! [agent] Attempting connection to [%s]", output.LogName()) + err := output.Output.Connect() + if err != nil { + log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+ + "error was '%s'", output.LogName(), err) + + err := internal.SleepContext(ctx, 15*time.Second) + if err != nil { + return err + } + + err = output.Output.Connect() + if err != nil { + return fmt.Errorf("Error connecting to output %q: %w", output.LogName(), err) + } + } + log.Printf("D! [agent] Successfully connected to %s", output.LogName()) + return nil +} + +// runOutputs begins processing metrics and returns until the source channel is +// closed and all metrics have been written. On shutdown metrics will be +// written one last time and dropped if unsuccessful. func (a *Agent) runOutputs( - startTime time.Time, - src <-chan telegraf.Metric, + unit *outputUnit, ) error { + var wg sync.WaitGroup + + // Start flush loop interval := a.Config.Agent.FlushInterval.Duration jitter := a.Config.Agent.FlushJitter.Duration ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - for _, output := range a.Config.Outputs { + for _, output := range unit.outputs { interval := interval // Overwrite agent flush_interval if this plugin has its own. if output.Config.FlushInterval != 0 { @@ -641,18 +706,19 @@ func (a *Agent) runOutputs( jitter = *output.Config.FlushJitter } - ticker := NewRollingTicker(interval, jitter) - defer ticker.Stop() - wg.Add(1) go func(output *models.RunningOutput) { defer wg.Done() + + ticker := NewRollingTicker(interval, jitter) + defer ticker.Stop() + a.flushLoop(ctx, output, ticker) }(output) } - for metric := range src { - for i, output := range a.Config.Outputs { + for metric := range unit.src { + for i, output := range unit.outputs { if i == len(a.Config.Outputs)-1 { output.AddMetric(metric) } else { @@ -738,115 +804,259 @@ func (a *Agent) flushOnce( output.LogBufferStatus() } } - } -// initPlugins runs the Init function on plugins. -func (a *Agent) initPlugins() error { - for _, input := range a.Config.Inputs { - err := input.Init() - if err != nil { - return fmt.Errorf("could not initialize input %s: %v", - input.LogName(), err) +// Test runs the inputs, processors and aggregators for a single gather and +// writes the metrics to stdout. +func (a *Agent) Test(ctx context.Context, wait time.Duration) error { + src := make(chan telegraf.Metric, 100) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + s := influx.NewSerializer() + s.SetFieldSortOrder(influx.SortFields) + + for metric := range src { + octets, err := s.Serialize(metric) + if err == nil { + fmt.Print("> ", string(octets)) + } + metric.Reject() } + }() + + err := a.test(ctx, wait, src) + if err != nil { + return err } - for _, processor := range a.Config.Processors { - err := processor.Init() - if err != nil { - return fmt.Errorf("could not initialize processor %s: %v", - processor.Config.Name, err) - } - } - for _, aggregator := range a.Config.Aggregators { - err := aggregator.Init() - if err != nil { - return fmt.Errorf("could not initialize aggregator %s: %v", - aggregator.Config.Name, err) - } - } - for _, output := range a.Config.Outputs { - err := output.Init() - if err != nil { - return fmt.Errorf("could not initialize output %s: %v", - output.Config.Name, err) - } + + wg.Wait() + + if models.GlobalGatherErrors.Get() != 0 { + return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) } return nil } -// connectOutputs connects to all outputs. -func (a *Agent) connectOutputs(ctx context.Context) error { - for _, output := range a.Config.Outputs { - log.Printf("D! [agent] Attempting connection to [%s]", output.LogName()) - err := output.Output.Connect() +// Test runs the agent and performs a single gather sending output to the +// outputF. After gathering pauses for the wait duration to allow service +// inputs to run. +func (a *Agent) test(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error { + log.Printf("D! [agent] Initializing plugins") + err := a.initPlugins() + if err != nil { + return err + } + + startTime := time.Now() + + next := outputC + + var apu []*processorUnit + var au *aggregatorUnit + if len(a.Config.Aggregators) != 0 { + procC := next + if len(a.Config.AggProcessors) != 0 { + procC, apu, err = a.startProcessors(next, a.Config.AggProcessors) + if err != nil { + return err + } + } + + next, au, err = a.startAggregators(procC, next, a.Config.Aggregators) if err != nil { - log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+ - "error was '%s'", output.LogName(), err) - - err := internal.SleepContext(ctx, 15*time.Second) - if err != nil { - return err - } - - err = output.Output.Connect() - if err != nil { - return err - } + return err } - log.Printf("D! [agent] Successfully connected to %s", output.LogName()) } + + var pu []*processorUnit + if len(a.Config.Processors) != 0 { + next, pu, err = a.startProcessors(next, a.Config.Processors) + if err != nil { + return err + } + } + + iu, err := a.testStartInputs(next, a.Config.Inputs) + if err != nil { + return err + } + + var wg sync.WaitGroup + + if au != nil { + wg.Add(1) + go func() { + defer wg.Done() + err = a.runProcessors(apu) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + err = a.runAggregators(startTime, au) + if err != nil { + log.Printf("E! [agent] Error running aggregators: %v", err) + } + }() + } + + if pu != nil { + wg.Add(1) + go func() { + defer wg.Done() + err = a.runProcessors(pu) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + err = a.testRunInputs(ctx, wait, iu) + if err != nil { + log.Printf("E! [agent] Error running inputs: %v", err) + } + }() + + wg.Wait() + + log.Printf("D! [agent] Stopped Successfully") + return nil } -// closeOutputs closes all outputs. -func (a *Agent) closeOutputs() { +// Once runs the full agent for a single gather. +func (a *Agent) Once(ctx context.Context, wait time.Duration) error { + err := a.once(ctx, wait) + if err != nil { + return err + } + + if models.GlobalGatherErrors.Get() != 0 { + return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) + } + + unsent := 0 for _, output := range a.Config.Outputs { - output.Close() + unsent += output.BufferLength() } -} - -// startServiceInputs starts all service inputs. -func (a *Agent) startServiceInputs( - ctx context.Context, - dst chan<- telegraf.Metric, -) error { - started := []telegraf.ServiceInput{} - - for _, input := range a.Config.Inputs { - if si, ok := input.Input.(telegraf.ServiceInput); ok { - // Service input plugins are not subject to timestamp rounding. - // This only applies to the accumulator passed to Start(), the - // Gather() accumulator does apply rounding according to the - // precision agent setting. - acc := NewAccumulator(input, dst) - acc.SetPrecision(time.Nanosecond) - - err := si.Start(acc) - if err != nil { - log.Printf("E! [agent] Service for [%s] failed to start: %v", - input.LogName(), err) - - for _, si := range started { - si.Stop() - } - - return err - } - - started = append(started, si) - } + if unsent != 0 { + return fmt.Errorf("output plugins unable to send %d metrics", unsent) } - return nil } -// stopServiceInputs stops all service inputs. -func (a *Agent) stopServiceInputs() { - for _, input := range a.Config.Inputs { - if si, ok := input.Input.(telegraf.ServiceInput); ok { - si.Stop() +// On runs the agent and performs a single gather sending output to the +// outputF. After gathering pauses for the wait duration to allow service +// inputs to run. +func (a *Agent) once(ctx context.Context, wait time.Duration) error { + log.Printf("D! [agent] Initializing plugins") + err := a.initPlugins() + if err != nil { + return err + } + + startTime := time.Now() + + log.Printf("D! [agent] Connecting outputs") + next, ou, err := a.startOutputs(ctx, a.Config.Outputs) + if err != nil { + return err + } + + var apu []*processorUnit + var au *aggregatorUnit + if len(a.Config.Aggregators) != 0 { + procC := next + if len(a.Config.AggProcessors) != 0 { + procC, apu, err = a.startProcessors(next, a.Config.AggProcessors) + if err != nil { + return err + } + } + + next, au, err = a.startAggregators(procC, next, a.Config.Aggregators) + if err != nil { + return err } } + + var pu []*processorUnit + if len(a.Config.Processors) != 0 { + next, pu, err = a.startProcessors(next, a.Config.Processors) + if err != nil { + return err + } + } + + iu, err := a.testStartInputs(next, a.Config.Inputs) + if err != nil { + return err + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err = a.runOutputs(ou) + if err != nil { + log.Printf("E! [agent] Error running outputs: %v", err) + } + }() + + if au != nil { + wg.Add(1) + go func() { + defer wg.Done() + err = a.runProcessors(apu) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + err = a.runAggregators(startTime, au) + if err != nil { + log.Printf("E! [agent] Error running aggregators: %v", err) + } + }() + } + + if pu != nil { + wg.Add(1) + go func() { + defer wg.Done() + err = a.runProcessors(pu) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) + } + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + err = a.testRunInputs(ctx, wait, iu) + if err != nil { + log.Printf("E! [agent] Error running inputs: %v", err) + } + }() + + wg.Wait() + + log.Printf("D! [agent] Stopped Successfully") + + return nil } // Returns the rounding precision for metrics. diff --git a/aggregator.go b/aggregator.go index 48aa8e4bf..f168b04d0 100644 --- a/aggregator.go +++ b/aggregator.go @@ -5,11 +5,7 @@ package telegraf // Add, Push, and Reset can not be called concurrently, so locking is not // required when implementing an Aggregator plugin. type Aggregator interface { - // SampleConfig returns the default configuration of the Input. - SampleConfig() string - - // Description returns a one-sentence description on the Input. - Description() string + PluginDescriber // Add the metric to the aggregator. Add(in Metric) diff --git a/config/config.go b/config/config.go index 23ba1b5b3..bca178cb0 100644 --- a/config/config.go +++ b/config/config.go @@ -65,7 +65,8 @@ type Config struct { Outputs []*models.RunningOutput Aggregators []*models.RunningAggregator // Processors have a slice wrapper type because they need to be sorted - Processors models.RunningProcessors + Processors models.RunningProcessors + AggProcessors models.RunningProcessors } func NewConfig() *Config { @@ -83,6 +84,7 @@ func NewConfig() *Config { Inputs: make([]*models.RunningInput, 0), Outputs: make([]*models.RunningOutput, 0), Processors: make([]*models.RunningProcessor, 0), + AggProcessors: make([]*models.RunningProcessor, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -561,12 +563,7 @@ func printFilteredGlobalSections(sectionFilters []string) { } } -type printer interface { - Description() string - SampleConfig() string -} - -func printConfig(name string, p printer, op string, commented bool) { +func printConfig(name string, p telegraf.PluginDescriber, op string, commented bool) { comment := "" if commented { comment = "# " @@ -684,12 +681,20 @@ func (c *Config) LoadConfig(path string) error { } data, err := loadConfig(path) if err != nil { - return fmt.Errorf("Error loading %s, %s", path, err) + return fmt.Errorf("Error loading config file %s: %w", path, err) } + if err = c.LoadConfigData(data); err != nil { + return fmt.Errorf("Error loading config file %s: %w", path, err) + } + return nil +} + +// LoadConfigData loads TOML-formatted config data +func (c *Config) LoadConfigData(data []byte) error { tbl, err := parseConfig(data) if err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing data: %s", err) } // Parse tags tables first: @@ -697,11 +702,10 @@ func (c *Config) LoadConfig(path string) error { if val, ok := tbl.Fields[tableName]; ok { subTable, ok := val.(*ast.Table) if !ok { - return fmt.Errorf("%s: invalid configuration", path) + return fmt.Errorf("invalid configuration, bad table name %q", tableName) } if err = toml.UnmarshalTable(subTable, c.Tags); err != nil { - log.Printf("E! Could not parse [global_tags] config\n") - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("error parsing table name %q: %w", tableName, err) } } } @@ -710,11 +714,10 @@ func (c *Config) LoadConfig(path string) error { if val, ok := tbl.Fields["agent"]; ok { subTable, ok := val.(*ast.Table) if !ok { - return fmt.Errorf("%s: invalid configuration", path) + return fmt.Errorf("invalid configuration, error parsing agent table") } if err = toml.UnmarshalTable(subTable, c.Agent); err != nil { - log.Printf("E! Could not parse [agent] config\n") - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("error parsing agent table: %w", err) } } @@ -735,7 +738,7 @@ func (c *Config) LoadConfig(path string) error { for name, val := range tbl.Fields { subTable, ok := val.(*ast.Table) if !ok { - return fmt.Errorf("%s: invalid configuration", path) + return fmt.Errorf("invalid configuration, error parsing field %q as table", name) } switch name { @@ -746,17 +749,17 @@ func (c *Config) LoadConfig(path string) error { // legacy [outputs.influxdb] support case *ast.Table: if err = c.addOutput(pluginName, pluginSubTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", pluginName, err) } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addOutput(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s array, %s", pluginName, err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s", + pluginName) } } case "inputs", "plugins": @@ -765,17 +768,17 @@ func (c *Config) LoadConfig(path string) error { // legacy [inputs.cpu] support case *ast.Table: if err = c.addInput(pluginName, pluginSubTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", pluginName, err) } case []*ast.Table: for _, t := range pluginSubTable { if err = c.addInput(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", pluginName, err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s", + pluginName) } } case "processors": @@ -784,12 +787,12 @@ func (c *Config) LoadConfig(path string) error { case []*ast.Table: for _, t := range pluginSubTable { if err = c.addProcessor(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", pluginName, err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s", + pluginName) } } case "aggregators": @@ -798,19 +801,19 @@ func (c *Config) LoadConfig(path string) error { case []*ast.Table: for _, t := range pluginSubTable { if err = c.addAggregator(pluginName, t); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", pluginName, err) } } default: - return fmt.Errorf("Unsupported config format: %s, file %s", - pluginName, path) + return fmt.Errorf("Unsupported config format: %s", + pluginName) } } // Assume it's an input input for legacy config file support if no other // identifiers are present default: if err = c.addInput(name, subTable); err != nil { - return fmt.Errorf("Error parsing %s, %s", path, err) + return fmt.Errorf("Error parsing %s, %s", name, err) } } } @@ -929,21 +932,48 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { if !ok { return fmt.Errorf("Undefined but requested processor: %s", name) } - processor := creator() processorConfig, err := buildProcessor(name, table) if err != nil { return err } - if err := toml.UnmarshalTable(table, processor); err != nil { + rf, err := c.newRunningProcessor(creator, processorConfig, name, table) + if err != nil { return err } + c.Processors = append(c.Processors, rf) + + // save a copy for the aggregator + rf, err = c.newRunningProcessor(creator, processorConfig, name, table) + if err != nil { + return err + } + c.AggProcessors = append(c.AggProcessors, rf) + + return nil +} + +func (c *Config) newRunningProcessor( + creator processors.StreamingCreator, + processorConfig *models.ProcessorConfig, + name string, + table *ast.Table, +) (*models.RunningProcessor, error) { + processor := creator() + + if p, ok := processor.(unwrappable); ok { + if err := toml.UnmarshalTable(table, p.Unwrap()); err != nil { + return nil, err + } + } else { + if err := toml.UnmarshalTable(table, processor); err != nil { + return nil, err + } + } rf := models.NewRunningProcessor(processor, processorConfig) - - c.Processors = append(c.Processors, rf) - return nil + return rf, nil } func (c *Config) addOutput(name string, table *ast.Table) error { @@ -2195,3 +2225,10 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { return oc, nil } + +// unwrappable lets you retrieve the original telegraf.Processor from the +// StreamingProcessor. This is necessary because the toml Unmarshaller won't +// look inside composed types. +type unwrappable interface { + Unwrap() telegraf.Processor +} diff --git a/config/config_test.go b/config/config_test.go index c4a960265..6c5e3662a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -207,7 +207,7 @@ func TestConfig_FieldNotDefined(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/invalid_field.toml") require.Error(t, err, "invalid field name") - assert.Equal(t, "Error parsing ./testdata/invalid_field.toml, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error()) + assert.Equal(t, "Error loading config file ./testdata/invalid_field.toml: Error parsing http_listener_v2, line 2: field corresponding to `not_a_field' is not defined in http_listener_v2.HTTPListenerV2", err.Error()) } @@ -215,12 +215,12 @@ func TestConfig_WrongFieldType(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/wrong_field_type.toml") require.Error(t, err, "invalid field type") - assert.Equal(t, "Error parsing ./testdata/wrong_field_type.toml, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error()) + assert.Equal(t, "Error loading config file ./testdata/wrong_field_type.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Port) cannot unmarshal TOML string into int", err.Error()) c = NewConfig() err = c.LoadConfig("./testdata/wrong_field_type2.toml") require.Error(t, err, "invalid field type2") - assert.Equal(t, "Error parsing ./testdata/wrong_field_type2.toml, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error()) + assert.Equal(t, "Error loading config file ./testdata/wrong_field_type2.toml: Error parsing http_listener_v2, line 2: (http_listener_v2.HTTPListenerV2.Methods) cannot unmarshal TOML string into []string", err.Error()) } func TestConfig_InlineTables(t *testing.T) { @@ -255,5 +255,5 @@ func TestConfig_BadOrdering(t *testing.T) { c := NewConfig() err := c.LoadConfig("./testdata/non_slice_slice.toml") require.Error(t, err, "bad ordering") - assert.Equal(t, "Error parsing ./testdata/non_slice_slice.toml, line 4: cannot unmarshal TOML array into string (need slice)", err.Error()) + assert.Equal(t, "Error loading config file ./testdata/non_slice_slice.toml: Error parsing http array, line 4: cannot unmarshal TOML array into string (need slice)", err.Error()) } diff --git a/input.go b/input.go index 071ab7d9d..08cfd75b9 100644 --- a/input.go +++ b/input.go @@ -1,11 +1,7 @@ package telegraf type Input interface { - // SampleConfig returns the default configuration of the Input - SampleConfig() string - - // Description returns a one-sentence description on the Input - Description() string + PluginDescriber // Gather takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" diff --git a/models/running_processor.go b/models/running_processor.go index a7871b3e8..86b1887a1 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -10,7 +10,7 @@ import ( type RunningProcessor struct { sync.Mutex log telegraf.Logger - Processor telegraf.Processor + Processor telegraf.StreamingProcessor Config *ProcessorConfig } @@ -28,7 +28,7 @@ type ProcessorConfig struct { Filter Filter } -func NewRunningProcessor(processor telegraf.Processor, config *ProcessorConfig) *RunningProcessor { +func NewRunningProcessor(processor telegraf.StreamingProcessor, config *ProcessorConfig) *RunningProcessor { tags := map[string]string{"processor": config.Name} if config.Alias != "" { tags["alias"] = config.Alias @@ -52,15 +52,6 @@ func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) { metric.Drop() } -func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool { - for _, m := range metrics { - if item == m { - return true - } - } - return false -} - func (r *RunningProcessor) Init() error { if p, ok := r.Processor.(telegraf.Initializer); ok { err := p.Init() @@ -71,34 +62,39 @@ func (r *RunningProcessor) Init() error { return nil } -func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { - rp.Lock() - defer rp.Unlock() - - ret := []telegraf.Metric{} - - for _, metric := range in { - // In processors when a filter selects a metric it is sent through the - // processor. Otherwise the metric continues downstream unmodified. - if ok := rp.Config.Filter.Select(metric); !ok { - ret = append(ret, metric) - continue - } - - rp.Config.Filter.Modify(metric) - if len(metric.FieldList()) == 0 { - rp.metricFiltered(metric) - continue - } - - // This metric should pass through the filter, so call the filter Apply - // function and append results to the output slice. - ret = append(ret, rp.Processor.Apply(metric)...) - } - - return ret -} - func (r *RunningProcessor) Log() telegraf.Logger { return r.log } + +func (r *RunningProcessor) LogName() string { + return logName("processors", r.Config.Name, r.Config.Alias) +} + +func (r *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric { + return metric +} + +func (r *RunningProcessor) Start(acc telegraf.Accumulator) error { + return r.Processor.Start(acc) +} + +func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) { + if ok := r.Config.Filter.Select(m); !ok { + // pass downstream + acc.AddMetric(m) + return + } + + r.Config.Filter.Modify(m) + if len(m.FieldList()) == 0 { + // drop metric + r.metricFiltered(m) + return + } + + r.Processor.Add(m, acc) +} + +func (r *RunningProcessor) Stop() { + r.Processor.Stop() +} diff --git a/models/running_processor_test.go b/models/running_processor_test.go index 4ac4743a7..ee1d50ef2 100644 --- a/models/running_processor_test.go +++ b/models/running_processor_test.go @@ -6,8 +6,8 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) @@ -43,7 +43,7 @@ func TagProcessor(key, value string) *MockProcessor { func TestRunningProcessor_Apply(t *testing.T) { type args struct { - Processor telegraf.Processor + Processor telegraf.StreamingProcessor Config *ProcessorConfig } @@ -56,7 +56,7 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "inactive filter applies metrics", args: args{ - Processor: TagProcessor("apply", "true"), + Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Config: &ProcessorConfig{ Filter: Filter{}, }, @@ -87,7 +87,7 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "filter applies", args: args{ - Processor: TagProcessor("apply", "true"), + Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Config: &ProcessorConfig{ Filter: Filter{ NamePass: []string{"cpu"}, @@ -120,7 +120,7 @@ func TestRunningProcessor_Apply(t *testing.T) { { name: "filter doesn't apply", args: args{ - Processor: TagProcessor("apply", "true"), + Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Config: &ProcessorConfig{ Filter: Filter{ NameDrop: []string{"cpu"}, @@ -158,7 +158,15 @@ func TestRunningProcessor_Apply(t *testing.T) { } rp.Config.Filter.Compile() - actual := rp.Apply(tt.input...) + acc := testutil.Accumulator{} + err := rp.Start(&acc) + require.NoError(t, err) + for _, m := range tt.input { + rp.Add(m, &acc) + } + rp.Stop() + + actual := acc.GetTelegrafMetrics() require.Equal(t, tt.expected, actual) }) } diff --git a/output.go b/output.go index 3c4a85ddb..0045b2ca6 100644 --- a/output.go +++ b/output.go @@ -1,14 +1,12 @@ package telegraf type Output interface { + PluginDescriber + // Connect to the Output Connect() error // Close any connections to the Output Close() error - // Description returns a one-sentence description on the Output - Description() string - // SampleConfig returns the default configuration of the Output - SampleConfig() string // Write takes in group of points to be written to the Output Write(metrics []Metric) error } diff --git a/plugin.go b/plugin.go index f79721958..29e8bb683 100644 --- a/plugin.go +++ b/plugin.go @@ -9,6 +9,16 @@ type Initializer interface { Init() error } +// PluginDescriber contains the functions all plugins must implement to describe +// themselves to Telegraf +type PluginDescriber interface { + // SampleConfig returns the default configuration of the Processor + SampleConfig() string + + // Description returns a one-sentence description on the Processor + Description() string +} + // Logger defines an interface for logging. type Logger interface { // Errorf logs an error message, patterned after log.Printf. diff --git a/plugins/processors/registry.go b/plugins/processors/registry.go index 592c688f3..efade2966 100644 --- a/plugins/processors/registry.go +++ b/plugins/processors/registry.go @@ -3,9 +3,24 @@ package processors import "github.com/influxdata/telegraf" type Creator func() telegraf.Processor +type StreamingCreator func() telegraf.StreamingProcessor -var Processors = map[string]Creator{} +// all processors are streaming processors. +// telegraf.Processor processors are upgraded to telegraf.StreamingProcessor +var Processors = map[string]StreamingCreator{} +// Add adds a telegraf.Processor processor func Add(name string, creator Creator) { + Processors[name] = upgradeToStreamingProcessor(creator) +} + +// AddStreaming adds a telegraf.StreamingProcessor streaming processor +func AddStreaming(name string, creator StreamingCreator) { Processors[name] = creator } + +func upgradeToStreamingProcessor(oldCreator Creator) StreamingCreator { + return func() telegraf.StreamingProcessor { + return NewStreamingProcessorFromProcessor(oldCreator()) + } +} diff --git a/plugins/processors/streamingprocessor.go b/plugins/processors/streamingprocessor.go new file mode 100644 index 000000000..4078ac26c --- /dev/null +++ b/plugins/processors/streamingprocessor.go @@ -0,0 +1,49 @@ +package processors + +import ( + "github.com/influxdata/telegraf" +) + +// NewStreamingProcessorFromProcessor is a converter that turns a standard +// processor into a streaming processor +func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.StreamingProcessor { + sp := &streamingProcessor{ + processor: p, + } + return sp +} + +type streamingProcessor struct { + processor telegraf.Processor + acc telegraf.Accumulator +} + +func (sp *streamingProcessor) SampleConfig() string { + return sp.processor.SampleConfig() +} + +func (sp *streamingProcessor) Description() string { + return sp.processor.Description() +} + +func (sp *streamingProcessor) Start(acc telegraf.Accumulator) error { + sp.acc = acc + return nil +} + +func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) { + for _, m := range sp.processor.Apply(m) { + acc.AddMetric(m) + } +} + +func (sp *streamingProcessor) Stop() error { + return nil +} + +// Unwrap lets you retrieve the original telegraf.Processor from the +// StreamingProcessor. This is necessary because the toml Unmarshaller won't +// look inside composed types. +func (sp *streamingProcessor) Unwrap() telegraf.Processor { + return sp.processor +} diff --git a/processor.go b/processor.go index e084adab7..5e2d46914 100644 --- a/processor.go +++ b/processor.go @@ -1,12 +1,31 @@ package telegraf +// Processor is a processor plugin interface for defining new inline processors. +// these are extremely efficient and should be used over StreamingProcessor if +// you do not need asynchronous metric writes. type Processor interface { - // SampleConfig returns the default configuration of the Input - SampleConfig() string - - // Description returns a one-sentence description on the Input - Description() string + PluginDescriber // Apply the filter to the given metric. Apply(in ...Metric) []Metric } + +// StreamingProcessor is a processor that can take in a stream of messages +type StreamingProcessor interface { + PluginDescriber + + // Start is the initializer for the processor + // Start is only called once per plugin instance, and never in parallel. + // Start should exit immediately after setup + Start(acc Accumulator) error + + // Add is called for each metric to be processed. + Add(metric Metric, acc Accumulator) + + // Stop gives you a callback to free resources. + // by the time Stop is called, the input stream will have already been closed + // and Add will not be called anymore. + // When stop returns, you should no longer be writing metrics to the + // accumulator. + Stop() error +}