diff --git a/agent/agent.go b/agent/agent.go index 9ac51471a..5795eb0d4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -142,110 +142,252 @@ func (a *Agent) Run(ctx context.Context) error { return nil } -// Test runs the inputs once and prints the output to stdout in line protocol. -func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { - var wg sync.WaitGroup - metricC := make(chan telegraf.Metric) - nulC := make(chan telegraf.Metric) - defer func() { - close(metricC) - close(nulC) - wg.Wait() - }() - - wg.Add(1) - go func() { - defer wg.Done() - +// Test runs the inputs, processors and aggregators for a single gather and +// writes the metrics to stdout. +func (a *Agent) Test(ctx context.Context, wait time.Duration) error { + outputF := func(src <-chan telegraf.Metric) { s := influx.NewSerializer() s.SetFieldSortOrder(influx.SortFields) - for metric := range metricC { + + for metric := range src { octets, err := s.Serialize(metric) if err == nil { fmt.Print("> ", string(octets)) } metric.Reject() } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for range nulC { - } - }() - - hasServiceInputs := false - for _, input := range a.Config.Inputs { - if _, ok := input.Input.(telegraf.ServiceInput); ok { - hasServiceInputs = true - break - } } + err := a.test(ctx, wait, outputF) + if err != nil { + return err + } + + if models.GlobalGatherErrors.Get() != 0 { + return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) + } + return nil + +} + +// Once runs the full agent for a single gather. +func (a *Agent) Once(ctx context.Context, wait time.Duration) error { + outputF := func(src <-chan telegraf.Metric) { + interval := a.Config.Agent.FlushInterval.Duration + + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + for _, output := range a.Config.Outputs { + interval := interval + // Overwrite agent flush_interval if this plugin has its own. + if output.Config.FlushInterval != 0 { + interval = output.Config.FlushInterval + } + + jitter := 0 * time.Second + + ticker := NewRollingTicker(interval, jitter) + defer ticker.Stop() + + wg.Add(1) + go func(output *models.RunningOutput) { + defer wg.Done() + a.flushLoop(ctx, output, ticker) + }(output) + } + + for metric := range src { + for i, output := range a.Config.Outputs { + if i == len(a.Config.Outputs)-1 { + output.AddMetric(metric) + } else { + output.AddMetric(metric.Copy()) + } + } + } + + cancel() + wg.Wait() + } + + err := a.test(ctx, wait, outputF) + if err != nil { + return err + } + + if models.GlobalGatherErrors.Get() != 0 { + return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get()) + } + + unsent := 0 + for _, output := range a.Config.Outputs { + unsent += output.BufferLength() + } + if unsent != 0 { + return fmt.Errorf("output plugins unable to send %d metrics", unsent) + } + return nil +} + +// Test runs the agent and performs a single gather sending output to the +// outputF. After gathering pauses for the wait duration to allow service +// inputs to run. +func (a *Agent) test(ctx context.Context, wait time.Duration, outputF func(<-chan telegraf.Metric)) error { log.Printf("D! [agent] Initializing plugins") err := a.initPlugins() if err != nil { return err } - if hasServiceInputs { - log.Printf("D! [agent] Starting service inputs") - err := a.startServiceInputs(ctx, metricC) - if err != nil { - return err - } + log.Printf("D! [agent] Connecting outputs") + err = a.connectOutputs(ctx) + if err != nil { + return err } - hasErrors := false - for _, input := range a.Config.Inputs { - select { - case <-ctx.Done(): - return nil - default: - break - } + inputC := make(chan telegraf.Metric, 100) + procC := make(chan telegraf.Metric, 100) + outputC := make(chan telegraf.Metric, 100) - acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Precision()) + startTime := time.Now() - // Special instructions for some inputs. cpu, for example, needs to be - // run twice in order to return cpu usage percentages. - switch input.Config.Name { - case "cpu", "mongodb", "procstat": - nulAcc := NewAccumulator(input, nulC) - nulAcc.SetPrecision(a.Precision()) - if err := input.Input.Gather(nulAcc); err != nil { - acc.AddError(err) - hasErrors = true + var wg sync.WaitGroup + + src := inputC + dst := inputC + + wg.Add(1) + go func(dst chan telegraf.Metric) { + defer wg.Done() + + a.testRunInputs(ctx, wait, dst) + + close(dst) + log.Printf("D! [agent] Input channel closed") + }(dst) + + src = dst + + if len(a.Config.Processors) > 0 { + dst = procC + + wg.Add(1) + go func(src, dst chan telegraf.Metric) { + defer wg.Done() + + err := a.runProcessors(src, dst) + if err != nil { + log.Printf("E! [agent] Error running processors: %v", err) } + close(dst) + log.Printf("D! [agent] Processor channel closed") + }(src, dst) - time.Sleep(500 * time.Millisecond) - if err := input.Input.Gather(acc); err != nil { - acc.AddError(err) - hasErrors = true + src = dst + } + + if len(a.Config.Aggregators) > 0 { + dst = outputC + + wg.Add(1) + go func(src, dst chan telegraf.Metric) { + defer wg.Done() + + err := a.runAggregators(startTime, src, dst) + if err != nil { + log.Printf("E! [agent] Error running aggregators: %v", err) } - default: - if err := input.Input.Gather(acc); err != nil { - acc.AddError(err) - hasErrors = true - } - } + close(dst) + log.Printf("D! [agent] Output channel closed") + }(src, dst) + + src = dst } - if hasServiceInputs { - log.Printf("D! [agent] Waiting for service inputs") - internal.SleepContext(ctx, waitDuration) - log.Printf("D! [agent] Stopping service inputs") - a.stopServiceInputs() - } + wg.Add(1) + go func(src <-chan telegraf.Metric) { + defer wg.Done() + outputF(src) + }(src) + + wg.Wait() + + log.Printf("D! [agent] Closing outputs") + a.closeOutputs() + + log.Printf("D! [agent] Stopped Successfully") - if hasErrors { - return fmt.Errorf("One or more input plugins had an error") - } return nil } +func (a *Agent) testRunInputs( + ctx context.Context, + wait time.Duration, + dst chan<- telegraf.Metric, +) { + log.Printf("D! [agent] Starting service inputs") + for _, input := range a.Config.Inputs { + if si, ok := input.Input.(telegraf.ServiceInput); ok { + // Service input plugins are not subject to timestamp rounding. + // This only applies to the accumulator passed to Start(), the + // Gather() accumulator does apply rounding according to the + // precision agent setting. + acc := NewAccumulator(input, dst) + acc.SetPrecision(time.Nanosecond) + + err := si.Start(acc) + if err != nil { + acc.AddError(err) + si.Stop() + continue + } + } + } + + nul := make(chan telegraf.Metric) + go func() { + for range nul { + } + }() + + var wg sync.WaitGroup + for _, input := range a.Config.Inputs { + wg.Add(1) + go func(input *models.RunningInput) { + defer wg.Done() + + // Run plugins that require multiple gathers to calculate rate + // and delta metrics twice. + switch input.Config.Name { + case "cpu", "mongodb", "procstat": + nulAcc := NewAccumulator(input, nul) + nulAcc.SetPrecision(a.Precision()) + if err := input.Input.Gather(nulAcc); err != nil { + nulAcc.AddError(err) + } + + time.Sleep(500 * time.Millisecond) + } + + acc := NewAccumulator(input, dst) + acc.SetPrecision(a.Precision()) + + if err := input.Input.Gather(acc); err != nil { + acc.AddError(err) + } + }(input) + } + wg.Wait() + close(nul) + + internal.SleepContext(ctx, wait) + + log.Printf("D! [agent] Stopping service inputs") + a.stopServiceInputs() + +} + // runInputs starts and triggers the periodic gather for Inputs. // // When the context is done the timers are stopped and this function returns diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 4f51bc2e1..7e0b4ec1c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)") var fPlugins = flag.String("plugin-directory", "", "path to directory containing external plugins") +var fRunOnce = flag.Bool("once", false, "run one gather and exit") var ( version string @@ -169,9 +170,14 @@ func runAgent(ctx context.Context, logger.SetupLogging(logConfig) + if *fRunOnce { + wait := time.Duration(*fTestWait) * time.Second + return ag.Once(ctx, wait) + } + if *fTest || *fTestWait != 0 { - testWaitDuration := time.Duration(*fTestWait) * time.Second - return ag.Test(ctx, testWaitDuration) + wait := time.Duration(*fTestWait) * time.Second + return ag.Test(ctx, wait) } log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " ")) diff --git a/internal/usage.go b/internal/usage.go index b0df62a6f..6eff30e6b 100644 --- a/internal/usage.go +++ b/internal/usage.go @@ -32,11 +32,10 @@ The commands & flags are: Valid values are 'agent', 'global_tags', 'outputs', 'processors', 'aggregators' and 'inputs' --sample-config print out full sample configuration - --test enable test mode: gather metrics, print them out, - and exit. Note: Test mode only runs inputs, not - processors, aggregators, or outputs + --once enable once mode: gather metrics once, write them, and exit + --test enable test mode: gather metrics once and print them --test-wait wait up to this many seconds for service - inputs to complete in test mode + inputs to complete in test or once mode --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit diff --git a/internal/usage_windows.go b/internal/usage_windows.go index e205d6c1f..7fee6a1f1 100644 --- a/internal/usage_windows.go +++ b/internal/usage_windows.go @@ -29,11 +29,10 @@ The commands & flags are: --section-filter filter config sections to output, separator is : Valid values are 'agent', 'global_tags', 'outputs', 'processors', 'aggregators' and 'inputs' - --test enable test mode: gather metrics, print them out, - and exit. Note: Test mode only runs inputs, not - processors, aggregators, or outputs + --once enable once mode: gather metrics once, write them, and exit + --test enable test mode: gather metrics once and print them --test-wait wait up to this many seconds for service - inputs to complete in test mode + inputs to complete in test or once mode --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit diff --git a/models/running_output.go b/models/running_output.go index 256c18715..452ab796b 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -261,3 +261,7 @@ func (r *RunningOutput) LogBufferStatus() { func (r *RunningOutput) Log() telegraf.Logger { return r.log } + +func (r *RunningOutput) BufferLength() int { + return r.buffer.Len() +}