654 lines
14 KiB
Go
654 lines
14 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/internal/config"
|
|
"github.com/influxdata/telegraf/internal/models"
|
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
|
)
|
|
|
|
// Agent runs a set of plugins.
|
|
type Agent struct {
|
|
Config *config.Config
|
|
}
|
|
|
|
// NewAgent returns an Agent for the given Config.
|
|
func NewAgent(config *config.Config) (*Agent, error) {
|
|
a := &Agent{
|
|
Config: config,
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
// Run starts and runs the Agent until the context is done.
|
|
func (a *Agent) Run(ctx context.Context) error {
|
|
log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
|
|
"Flush Interval:%s",
|
|
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
|
|
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
|
|
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
log.Printf("D! [agent] Connecting outputs")
|
|
err := a.connectOutputs(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
inputC := make(chan telegraf.Metric, 100)
|
|
procC := make(chan telegraf.Metric, 100)
|
|
outputC := make(chan telegraf.Metric, 100)
|
|
|
|
startTime := time.Now()
|
|
|
|
log.Printf("D! [agent] Starting service inputs")
|
|
err = a.startServiceInputs(ctx, inputC)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
src := inputC
|
|
dst := inputC
|
|
|
|
wg.Add(1)
|
|
go func(dst chan telegraf.Metric) {
|
|
defer wg.Done()
|
|
|
|
err := a.runInputs(ctx, startTime, dst)
|
|
if err != nil {
|
|
log.Printf("E! [agent] Error running inputs: %v", err)
|
|
}
|
|
|
|
log.Printf("D! [agent] Stopping service inputs")
|
|
a.stopServiceInputs()
|
|
|
|
close(dst)
|
|
log.Printf("D! [agent] Input channel closed")
|
|
}(dst)
|
|
|
|
src = dst
|
|
|
|
if len(a.Config.Processors) > 0 {
|
|
dst = procC
|
|
|
|
wg.Add(1)
|
|
go func(src, dst chan telegraf.Metric) {
|
|
defer wg.Done()
|
|
|
|
err := a.runProcessors(src, dst)
|
|
if err != nil {
|
|
log.Printf("E! [agent] Error running processors: %v", err)
|
|
}
|
|
close(dst)
|
|
log.Printf("D! [agent] Processor channel closed")
|
|
}(src, dst)
|
|
|
|
src = dst
|
|
}
|
|
|
|
if len(a.Config.Aggregators) > 0 {
|
|
dst = outputC
|
|
|
|
wg.Add(1)
|
|
go func(src, dst chan telegraf.Metric) {
|
|
defer wg.Done()
|
|
|
|
err := a.runAggregators(startTime, src, dst)
|
|
if err != nil {
|
|
log.Printf("E! [agent] Error running aggregators: %v", err)
|
|
}
|
|
close(dst)
|
|
log.Printf("D! [agent] Output channel closed")
|
|
}(src, dst)
|
|
|
|
src = dst
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(src chan telegraf.Metric) {
|
|
defer wg.Done()
|
|
|
|
err := a.runOutputs(startTime, src)
|
|
if err != nil {
|
|
log.Printf("E! [agent] Error running outputs: %v", err)
|
|
}
|
|
}(src)
|
|
|
|
wg.Wait()
|
|
|
|
log.Printf("D! [agent] Closing outputs")
|
|
err = a.closeOutputs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("D! [agent] Stopped Successfully")
|
|
return nil
|
|
}
|
|
|
|
// Test runs the inputs once and prints the output to stdout in line protocol.
|
|
func (a *Agent) Test(ctx context.Context) error {
|
|
var wg sync.WaitGroup
|
|
metricC := make(chan telegraf.Metric)
|
|
nulC := make(chan telegraf.Metric)
|
|
defer func() {
|
|
close(metricC)
|
|
close(nulC)
|
|
wg.Wait()
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
s := influx.NewSerializer()
|
|
s.SetFieldSortOrder(influx.SortFields)
|
|
for metric := range metricC {
|
|
octets, err := s.Serialize(metric)
|
|
if err == nil {
|
|
fmt.Print("> ", string(octets))
|
|
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range nulC {
|
|
}
|
|
}()
|
|
|
|
for _, input := range a.Config.Inputs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
if _, ok := input.Input.(telegraf.ServiceInput); ok {
|
|
log.Printf("W!: [agent] skipping plugin [[%s]]: service inputs not supported in --test mode",
|
|
input.Name())
|
|
continue
|
|
}
|
|
|
|
acc := NewAccumulator(input, metricC)
|
|
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
|
a.Config.Agent.Interval.Duration)
|
|
input.SetDefaultTags(a.Config.Tags)
|
|
|
|
// Special instructions for some inputs. cpu, for example, needs to be
|
|
// run twice in order to return cpu usage percentages.
|
|
switch input.Name() {
|
|
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
|
|
nulAcc := NewAccumulator(input, nulC)
|
|
nulAcc.SetPrecision(a.Config.Agent.Precision.Duration,
|
|
a.Config.Agent.Interval.Duration)
|
|
if err := input.Input.Gather(nulAcc); err != nil {
|
|
return err
|
|
}
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
if err := input.Input.Gather(acc); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
if err := input.Input.Gather(acc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runInputs starts and triggers the periodic gather for Inputs.
|
|
//
|
|
// When the context is done the timers are stopped and this function returns
|
|
// after all ongoing Gather calls complete.
|
|
func (a *Agent) runInputs(
|
|
ctx context.Context,
|
|
startTime time.Time,
|
|
dst chan<- telegraf.Metric,
|
|
) error {
|
|
var wg sync.WaitGroup
|
|
for _, input := range a.Config.Inputs {
|
|
interval := a.Config.Agent.Interval.Duration
|
|
precision := a.Config.Agent.Precision.Duration
|
|
jitter := a.Config.Agent.CollectionJitter.Duration
|
|
|
|
// Overwrite agent interval if this plugin has its own.
|
|
if input.Config.Interval != 0 {
|
|
interval = input.Config.Interval
|
|
}
|
|
|
|
acc := NewAccumulator(input, dst)
|
|
acc.SetPrecision(precision, interval)
|
|
|
|
wg.Add(1)
|
|
go func(input *models.RunningInput) {
|
|
defer wg.Done()
|
|
|
|
if a.Config.Agent.RoundInterval {
|
|
err := internal.SleepContext(
|
|
ctx, internal.AlignDuration(startTime, interval))
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
a.gatherOnInterval(ctx, acc, input, interval, jitter)
|
|
}(input)
|
|
}
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// gather runs an input's gather function periodically until the context is
|
|
// done.
|
|
func (a *Agent) gatherOnInterval(
|
|
ctx context.Context,
|
|
acc telegraf.Accumulator,
|
|
input *models.RunningInput,
|
|
interval time.Duration,
|
|
jitter time.Duration,
|
|
) {
|
|
defer panicRecover(input)
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
err := internal.SleepContext(ctx, internal.RandomDuration(jitter))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = a.gatherOnce(acc, input, interval)
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
continue
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// gatherOnce runs the input's Gather function once, logging a warning each
|
|
// interval it fails to complete before.
|
|
func (a *Agent) gatherOnce(
|
|
acc telegraf.Accumulator,
|
|
input *models.RunningInput,
|
|
timeout time.Duration,
|
|
) error {
|
|
ticker := time.NewTicker(timeout)
|
|
defer ticker.Stop()
|
|
|
|
done := make(chan error)
|
|
go func() {
|
|
done <- input.Gather(acc)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case err := <-done:
|
|
return err
|
|
case <-ticker.C:
|
|
log.Printf("W! [agent] input %q did not complete within its interval",
|
|
input.Name())
|
|
}
|
|
}
|
|
}
|
|
|
|
// runProcessors applies processors to metrics.
|
|
func (a *Agent) runProcessors(
|
|
src <-chan telegraf.Metric,
|
|
agg chan<- telegraf.Metric,
|
|
) error {
|
|
for metric := range src {
|
|
metrics := a.applyProcessors(metric)
|
|
|
|
for _, metric := range metrics {
|
|
agg <- metric
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// applyProcessors applies all processors to a metric.
|
|
func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric {
|
|
metrics := []telegraf.Metric{m}
|
|
for _, processor := range a.Config.Processors {
|
|
metrics = processor.Apply(metrics...)
|
|
}
|
|
|
|
return metrics
|
|
}
|
|
|
|
// runAggregators triggers the periodic push for Aggregators.
|
|
//
|
|
// When the context is done a final push will occur and then this function
|
|
// will return.
|
|
func (a *Agent) runAggregators(
|
|
startTime time.Time,
|
|
src <-chan telegraf.Metric,
|
|
dst chan<- telegraf.Metric,
|
|
) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for metric := range src {
|
|
var dropOriginal bool
|
|
for _, agg := range a.Config.Aggregators {
|
|
if ok := agg.Add(metric); ok {
|
|
dropOriginal = true
|
|
}
|
|
}
|
|
|
|
if !dropOriginal {
|
|
dst <- metric
|
|
}
|
|
}
|
|
cancel()
|
|
}()
|
|
|
|
precision := a.Config.Agent.Precision.Duration
|
|
interval := a.Config.Agent.Interval.Duration
|
|
aggregations := make(chan telegraf.Metric, 100)
|
|
for _, agg := range a.Config.Aggregators {
|
|
wg.Add(1)
|
|
go func(agg *models.RunningAggregator) {
|
|
defer wg.Done()
|
|
|
|
if a.Config.Agent.RoundInterval {
|
|
// Aggregators are aligned to the agent interval regardless of
|
|
// their period.
|
|
err := internal.SleepContext(ctx, internal.AlignDuration(startTime, interval))
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
agg.SetPeriodStart(startTime)
|
|
|
|
acc := NewAccumulator(agg, aggregations)
|
|
acc.SetPrecision(precision, interval)
|
|
a.push(ctx, agg, acc)
|
|
close(aggregations)
|
|
}(agg)
|
|
}
|
|
|
|
for metric := range aggregations {
|
|
metrics := a.applyProcessors(metric)
|
|
for _, metric := range metrics {
|
|
dst <- metric
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// push runs the push for a single aggregator every period. More simple than
|
|
// the output/input version as timeout should be less likely.... not really
|
|
// because the output channel can block for now.
|
|
func (a *Agent) push(
|
|
ctx context.Context,
|
|
aggregator *models.RunningAggregator,
|
|
acc telegraf.Accumulator,
|
|
) {
|
|
ticker := time.NewTicker(aggregator.Period())
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
break
|
|
case <-ctx.Done():
|
|
aggregator.Push(acc)
|
|
return
|
|
}
|
|
|
|
aggregator.Push(acc)
|
|
}
|
|
}
|
|
|
|
// runOutputs triggers the periodic write for Outputs.
|
|
//
|
|
// When the context is done, outputs continue to run until their buffer is
|
|
// closed, afterwich they run flush once more.
|
|
func (a *Agent) runOutputs(
|
|
startTime time.Time,
|
|
src <-chan telegraf.Metric,
|
|
) error {
|
|
interval := a.Config.Agent.FlushInterval.Duration
|
|
jitter := a.Config.Agent.FlushJitter.Duration
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
var wg sync.WaitGroup
|
|
for _, output := range a.Config.Outputs {
|
|
interval := interval
|
|
// Overwrite agent flush_interval if this plugin has its own.
|
|
if output.Config.FlushInterval != 0 {
|
|
interval = output.Config.FlushInterval
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(output *models.RunningOutput) {
|
|
defer wg.Done()
|
|
|
|
if a.Config.Agent.RoundInterval {
|
|
err := internal.SleepContext(
|
|
ctx, internal.AlignDuration(startTime, interval))
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
a.flush(ctx, output, interval, jitter)
|
|
}(output)
|
|
}
|
|
|
|
for metric := range src {
|
|
for i, output := range a.Config.Outputs {
|
|
if i == len(a.Config.Outputs)-1 {
|
|
output.AddMetric(metric)
|
|
} else {
|
|
output.AddMetric(metric.Copy())
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Println("I! [agent] Hang on, flushing any cached metrics before shutdown")
|
|
cancel()
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// flush runs an output's flush function periodically until the context is
|
|
// done.
|
|
func (a *Agent) flush(
|
|
ctx context.Context,
|
|
output *models.RunningOutput,
|
|
interval time.Duration,
|
|
jitter time.Duration,
|
|
) {
|
|
// since we are watching two channels we need a ticker with the jitter
|
|
// integrated.
|
|
ticker := NewTicker(interval, jitter)
|
|
defer ticker.Stop()
|
|
|
|
logError := func(err error) {
|
|
if err != nil {
|
|
log.Printf("E! [agent] Error writing to output [%s]: %v", output.Name, err)
|
|
}
|
|
}
|
|
|
|
for {
|
|
// Favor shutdown over other methods.
|
|
select {
|
|
case <-ctx.Done():
|
|
logError(a.flushOnce(output, interval, output.Write))
|
|
return
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
logError(a.flushOnce(output, interval, output.Write))
|
|
case <-output.BatchReady:
|
|
// Favor the ticker over batch ready
|
|
select {
|
|
case <-ticker.C:
|
|
logError(a.flushOnce(output, interval, output.Write))
|
|
default:
|
|
logError(a.flushOnce(output, interval, output.WriteBatch))
|
|
}
|
|
case <-ctx.Done():
|
|
logError(a.flushOnce(output, interval, output.Write))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushOnce runs the output's Write function once, logging a warning each
|
|
// interval it fails to complete before.
|
|
func (a *Agent) flushOnce(
|
|
output *models.RunningOutput,
|
|
timeout time.Duration,
|
|
writeFunc func() error,
|
|
) error {
|
|
ticker := time.NewTicker(timeout)
|
|
defer ticker.Stop()
|
|
|
|
done := make(chan error)
|
|
go func() {
|
|
done <- writeFunc()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case err := <-done:
|
|
output.LogBufferStatus()
|
|
return err
|
|
case <-ticker.C:
|
|
log.Printf("W! [agent] output %q did not complete within its flush interval",
|
|
output.Name)
|
|
output.LogBufferStatus()
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// connectOutputs connects to all outputs.
|
|
func (a *Agent) connectOutputs(ctx context.Context) error {
|
|
for _, output := range a.Config.Outputs {
|
|
log.Printf("D! [agent] Attempting connection to output: %s\n", output.Name)
|
|
err := output.Output.Connect()
|
|
if err != nil {
|
|
log.Printf("E! [agent] Failed to connect to output %s, retrying in 15s, "+
|
|
"error was '%s' \n", output.Name, err)
|
|
|
|
err := internal.SleepContext(ctx, 15*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = output.Output.Connect()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
log.Printf("D! [agent] Successfully connected to output: %s\n", output.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// closeOutputs closes all outputs.
|
|
func (a *Agent) closeOutputs() error {
|
|
var err error
|
|
for _, output := range a.Config.Outputs {
|
|
err = output.Output.Close()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// startServiceInputs starts all service inputs.
|
|
func (a *Agent) startServiceInputs(
|
|
ctx context.Context,
|
|
dst chan<- telegraf.Metric,
|
|
) error {
|
|
started := []telegraf.ServiceInput{}
|
|
|
|
for _, input := range a.Config.Inputs {
|
|
if si, ok := input.Input.(telegraf.ServiceInput); ok {
|
|
// Service input plugins are not subject to timestamp rounding.
|
|
// This only applies to the accumulator passed to Start(), the
|
|
// Gather() accumulator does apply rounding according to the
|
|
// precision agent setting.
|
|
acc := NewAccumulator(input, dst)
|
|
acc.SetPrecision(time.Nanosecond, 0)
|
|
|
|
err := si.Start(acc)
|
|
if err != nil {
|
|
log.Printf("E! [agent] Service for input %s failed to start: %v",
|
|
input.Name(), err)
|
|
|
|
for _, si := range started {
|
|
si.Stop()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
started = append(started, si)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// stopServiceInputs stops all service inputs.
|
|
func (a *Agent) stopServiceInputs() {
|
|
for _, input := range a.Config.Inputs {
|
|
if si, ok := input.Input.(telegraf.ServiceInput); ok {
|
|
si.Stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
// panicRecover displays an error if an input panics.
|
|
func panicRecover(input *models.RunningInput) {
|
|
if err := recover(); err != nil {
|
|
trace := make([]byte, 2048)
|
|
runtime.Stack(trace, true)
|
|
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
|
|
input.Name(), err, trace)
|
|
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
|
|
"stack trace, configuration, and OS information: " +
|
|
"https://github.com/influxdata/telegraf/issues/new/choose")
|
|
}
|
|
}
|