Add support for once mode; run processors and aggregators during test (#7474)

This commit is contained in:
Daniel Nelson 2020-06-01 15:26:20 -07:00 committed by GitHub
parent 573f144607
commit 4e93b87085
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 85 deletions

View File

@ -142,108 +142,250 @@ func (a *Agent) Run(ctx context.Context) error {
return nil return nil
} }
// Test runs the inputs once and prints the output to stdout in line protocol. // Test runs the inputs, processors and aggregators for a single gather and
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { // writes the metrics to stdout.
var wg sync.WaitGroup func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
metricC := make(chan telegraf.Metric) outputF := func(src <-chan telegraf.Metric) {
nulC := make(chan telegraf.Metric)
defer func() {
close(metricC)
close(nulC)
wg.Wait()
}()
wg.Add(1)
go func() {
defer wg.Done()
s := influx.NewSerializer() s := influx.NewSerializer()
s.SetFieldSortOrder(influx.SortFields) s.SetFieldSortOrder(influx.SortFields)
for metric := range metricC {
for metric := range src {
octets, err := s.Serialize(metric) octets, err := s.Serialize(metric)
if err == nil { if err == nil {
fmt.Print("> ", string(octets)) fmt.Print("> ", string(octets))
} }
metric.Reject() metric.Reject()
} }
}() }
err := a.test(ctx, wait, outputF)
if err != nil {
return err
}
if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
return nil
}
// Once runs the full agent for a single gather.
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
outputF := func(src <-chan telegraf.Metric) {
interval := a.Config.Agent.FlushInterval.Duration
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
for _, output := range a.Config.Outputs {
interval := interval
// Overwrite agent flush_interval if this plugin has its own.
if output.Config.FlushInterval != 0 {
interval = output.Config.FlushInterval
}
jitter := 0 * time.Second
ticker := NewRollingTicker(interval, jitter)
defer ticker.Stop()
wg.Add(1) wg.Add(1)
go func() { go func(output *models.RunningOutput) {
defer wg.Done() defer wg.Done()
for range nulC { a.flushLoop(ctx, output, ticker)
}(output)
} }
}()
hasServiceInputs := false for metric := range src {
for _, input := range a.Config.Inputs { for i, output := range a.Config.Outputs {
if _, ok := input.Input.(telegraf.ServiceInput); ok { if i == len(a.Config.Outputs)-1 {
hasServiceInputs = true output.AddMetric(metric)
break } else {
output.AddMetric(metric.Copy())
}
} }
} }
cancel()
wg.Wait()
}
err := a.test(ctx, wait, outputF)
if err != nil {
return err
}
if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
unsent := 0
for _, output := range a.Config.Outputs {
unsent += output.BufferLength()
}
if unsent != 0 {
return fmt.Errorf("output plugins unable to send %d metrics", unsent)
}
return nil
}
// Test runs the agent and performs a single gather sending output to the
// outputF. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) test(ctx context.Context, wait time.Duration, outputF func(<-chan telegraf.Metric)) error {
log.Printf("D! [agent] Initializing plugins") log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins() err := a.initPlugins()
if err != nil { if err != nil {
return err return err
} }
if hasServiceInputs { log.Printf("D! [agent] Connecting outputs")
log.Printf("D! [agent] Starting service inputs") err = a.connectOutputs(ctx)
err := a.startServiceInputs(ctx, metricC)
if err != nil { if err != nil {
return err return err
} }
inputC := make(chan telegraf.Metric, 100)
procC := make(chan telegraf.Metric, 100)
outputC := make(chan telegraf.Metric, 100)
startTime := time.Now()
var wg sync.WaitGroup
src := inputC
dst := inputC
wg.Add(1)
go func(dst chan telegraf.Metric) {
defer wg.Done()
a.testRunInputs(ctx, wait, dst)
close(dst)
log.Printf("D! [agent] Input channel closed")
}(dst)
src = dst
if len(a.Config.Processors) > 0 {
dst = procC
wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()
err := a.runProcessors(src, dst)
if err != nil {
log.Printf("E! [agent] Error running processors: %v", err)
}
close(dst)
log.Printf("D! [agent] Processor channel closed")
}(src, dst)
src = dst
} }
hasErrors := false if len(a.Config.Aggregators) > 0 {
for _, input := range a.Config.Inputs { dst = outputC
select {
case <-ctx.Done(): wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()
err := a.runAggregators(startTime, src, dst)
if err != nil {
log.Printf("E! [agent] Error running aggregators: %v", err)
}
close(dst)
log.Printf("D! [agent] Output channel closed")
}(src, dst)
src = dst
}
wg.Add(1)
go func(src <-chan telegraf.Metric) {
defer wg.Done()
outputF(src)
}(src)
wg.Wait()
log.Printf("D! [agent] Closing outputs")
a.closeOutputs()
log.Printf("D! [agent] Stopped Successfully")
return nil return nil
default: }
break
func (a *Agent) testRunInputs(
ctx context.Context,
wait time.Duration,
dst chan<- telegraf.Metric,
) {
log.Printf("D! [agent] Starting service inputs")
for _, input := range a.Config.Inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
err := si.Start(acc)
if err != nil {
acc.AddError(err)
si.Stop()
continue
}
}
} }
acc := NewAccumulator(input, metricC) nul := make(chan telegraf.Metric)
acc.SetPrecision(a.Precision()) go func() {
for range nul {
}
}()
// Special instructions for some inputs. cpu, for example, needs to be var wg sync.WaitGroup
// run twice in order to return cpu usage percentages. for _, input := range a.Config.Inputs {
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name { switch input.Config.Name {
case "cpu", "mongodb", "procstat": case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nulC) nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision()) nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil { if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err) nulAcc.AddError(err)
hasErrors = true
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
default:
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
}
} }
if hasServiceInputs { acc := NewAccumulator(input, dst)
log.Printf("D! [agent] Waiting for service inputs") acc.SetPrecision(a.Precision())
internal.SleepContext(ctx, waitDuration)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
}
}(input)
}
wg.Wait()
close(nul)
internal.SleepContext(ctx, wait)
log.Printf("D! [agent] Stopping service inputs") log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs() a.stopServiceInputs()
}
if hasErrors {
return fmt.Errorf("One or more input plugins had an error")
}
return nil
} }
// runInputs starts and triggers the periodic gather for Inputs. // runInputs starts and triggers the periodic gather for Inputs.

View File

@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)") var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var fPlugins = flag.String("plugin-directory", "", var fPlugins = flag.String("plugin-directory", "",
"path to directory containing external plugins") "path to directory containing external plugins")
var fRunOnce = flag.Bool("once", false, "run one gather and exit")
var ( var (
version string version string
@ -169,9 +170,14 @@ func runAgent(ctx context.Context,
logger.SetupLogging(logConfig) logger.SetupLogging(logConfig)
if *fRunOnce {
wait := time.Duration(*fTestWait) * time.Second
return ag.Once(ctx, wait)
}
if *fTest || *fTestWait != 0 { if *fTest || *fTestWait != 0 {
testWaitDuration := time.Duration(*fTestWait) * time.Second wait := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, testWaitDuration) return ag.Test(ctx, wait)
} }
log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " ")) log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))

View File

@ -32,11 +32,10 @@ The commands & flags are:
Valid values are 'agent', 'global_tags', 'outputs', Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs' 'processors', 'aggregators' and 'inputs'
--sample-config print out full sample configuration --sample-config print out full sample configuration
--test enable test mode: gather metrics, print them out, --once enable once mode: gather metrics once, write them, and exit
and exit. Note: Test mode only runs inputs, not --test enable test mode: gather metrics once and print them
processors, aggregators, or outputs
--test-wait wait up to this many seconds for service --test-wait wait up to this many seconds for service
inputs to complete in test mode inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql' --usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit --version display the version and exit

View File

@ -29,11 +29,10 @@ The commands & flags are:
--section-filter filter config sections to output, separator is : --section-filter filter config sections to output, separator is :
Valid values are 'agent', 'global_tags', 'outputs', Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs' 'processors', 'aggregators' and 'inputs'
--test enable test mode: gather metrics, print them out, --once enable once mode: gather metrics once, write them, and exit
and exit. Note: Test mode only runs inputs, not --test enable test mode: gather metrics once and print them
processors, aggregators, or outputs
--test-wait wait up to this many seconds for service --test-wait wait up to this many seconds for service
inputs to complete in test mode inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql' --usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit --version display the version and exit

View File

@ -261,3 +261,7 @@ func (r *RunningOutput) LogBufferStatus() {
func (r *RunningOutput) Log() telegraf.Logger { func (r *RunningOutput) Log() telegraf.Logger {
return r.log return r.log
} }
func (r *RunningOutput) BufferLength() int {
return r.buffer.Len()
}