diff --git a/agent/agent.go b/agent/agent.go index 2687bbc0f..e6e982c02 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -136,7 +136,7 @@ func (a *Agent) Run(ctx context.Context) error { } // Test runs the inputs once and prints the output to stdout in line protocol. -func (a *Agent) Test(ctx context.Context) error { +func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { var wg sync.WaitGroup metricC := make(chan telegraf.Metric) nulC := make(chan telegraf.Metric) @@ -156,8 +156,8 @@ func (a *Agent) Test(ctx context.Context) error { octets, err := s.Serialize(metric) if err == nil { fmt.Print("> ", string(octets)) - } + metric.Reject() } }() @@ -168,43 +168,61 @@ func (a *Agent) Test(ctx context.Context) error { } }() + hasServiceInputs := false + for _, input := range a.Config.Inputs { + if _, ok := input.Input.(telegraf.ServiceInput); ok { + hasServiceInputs = true + break + } + } + + if hasServiceInputs { + log.Printf("D! [agent] Starting service inputs") + err := a.startServiceInputs(ctx, metricC) + if err != nil { + return err + } + } + for _, input := range a.Config.Inputs { select { case <-ctx.Done(): return nil default: - if _, ok := input.Input.(telegraf.ServiceInput); ok { - log.Printf("W!: [agent] skipping plugin [[%s]]: service inputs not supported in --test mode", - input.Name()) - continue + break + } + + acc := NewAccumulator(input, metricC) + acc.SetPrecision(a.Precision()) + + // Special instructions for some inputs. cpu, for example, needs to be + // run twice in order to return cpu usage percentages. + switch input.Name() { + case "inputs.cpu", "inputs.mongodb", "inputs.procstat": + nulAcc := NewAccumulator(input, nulC) + nulAcc.SetPrecision(a.Precision()) + if err := input.Input.Gather(nulAcc); err != nil { + acc.AddError(err) } - acc := NewAccumulator(input, metricC) - acc.SetPrecision(a.Precision()) - input.SetDefaultTags(a.Config.Tags) - - // Special instructions for some inputs. cpu, for example, needs to be - // run twice in order to return cpu usage percentages. - switch input.Name() { - case "inputs.cpu", "inputs.mongodb", "inputs.procstat": - nulAcc := NewAccumulator(input, nulC) - nulAcc.SetPrecision(a.Precision()) - if err := input.Input.Gather(nulAcc); err != nil { - return err - } - - time.Sleep(500 * time.Millisecond) - if err := input.Input.Gather(acc); err != nil { - return err - } - default: - if err := input.Input.Gather(acc); err != nil { - return err - } + time.Sleep(500 * time.Millisecond) + if err := input.Input.Gather(acc); err != nil { + acc.AddError(err) + } + default: + if err := input.Input.Gather(acc); err != nil { + acc.AddError(err) } } } + if hasServiceInputs { + log.Printf("D! [agent] Waiting for service inputs") + internal.SleepContext(ctx, waitDuration) + log.Printf("D! [agent] Stopping service inputs") + a.stopServiceInputs() + } + return nil } diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 3678387cd..4545833a7 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -13,6 +13,7 @@ import ( "runtime" "strings" "syscall" + "time" "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal" @@ -33,7 +34,8 @@ var pprofAddr = flag.String("pprof-addr", "", "pprof address to listen on, not activate pprof if empty") var fQuiet = flag.Bool("quiet", false, "run in quiet mode") -var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit") +var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit") +var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for service inputs to complete in test mode") var fConfig = flag.String("config", "", "configuration file to load") var fConfigDirectory = flag.String("config-directory", "", "directory containing additional *.conf files") @@ -167,8 +169,9 @@ func runAgent(ctx context.Context, logger.SetupLogging(logConfig) - if *fTest { - return ag.Test(ctx) + if *fTest || *fTestWait != 0 { + testWaitDuration := time.Duration(*fTestWait) * time.Second + return ag.Test(ctx, testWaitDuration) } log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " ")) diff --git a/internal/usage.go b/internal/usage.go index c783da3f4..7909d3558 100644 --- a/internal/usage.go +++ b/internal/usage.go @@ -31,6 +31,8 @@ The commands & flags are: --sample-config print out full sample configuration --test gather metrics, print them out, and exit; processors, aggregators, and outputs are not run + --test-wait wait up to this many seconds for service + inputs to complete in test mode --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit diff --git a/internal/usage_windows.go b/internal/usage_windows.go index 6e3c17835..af2506ec1 100644 --- a/internal/usage_windows.go +++ b/internal/usage_windows.go @@ -31,6 +31,8 @@ The commands & flags are: 'processors', 'aggregators' and 'inputs' --test gather metrics, print them out, and exit; processors, aggregators, and outputs are not run + --test-wait wait up to this many seconds for service + inputs to complete in test mode --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit