telegraf/agent/agent.go

773 lines
17 KiB
Go

package agent
import (
"context"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
// Agent runs a set of plugins.
type Agent struct {
Config *config.Config
}
// NewAgent returns an Agent for the given Config.
func NewAgent(config *config.Config) (*Agent, error) {
a := &Agent{
Config: config,
}
return a, nil
}
// Run starts and runs the Agent until the context is done.
func (a *Agent) Run(ctx context.Context) error {
log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s",
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}
log.Printf("D! [agent] Connecting outputs")
err = a.connectOutputs(ctx)
if err != nil {
return err
}
inputC := make(chan telegraf.Metric, 100)
procC := make(chan telegraf.Metric, 100)
outputC := make(chan telegraf.Metric, 100)
startTime := time.Now()
log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
return err
}
var wg sync.WaitGroup
src := inputC
dst := inputC
wg.Add(1)
go func(dst chan telegraf.Metric) {
defer wg.Done()
err := a.runInputs(ctx, startTime, dst)
if err != nil {
log.Printf("E! [agent] Error running inputs: %v", err)
}
log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()
close(dst)
log.Printf("D! [agent] Input channel closed")
}(dst)
src = dst
if len(a.Config.Processors) > 0 {
dst = procC
wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()
err := a.runProcessors(src, dst)
if err != nil {
log.Printf("E! [agent] Error running processors: %v", err)
}
close(dst)
log.Printf("D! [agent] Processor channel closed")
}(src, dst)
src = dst
}
if len(a.Config.Aggregators) > 0 {
dst = outputC
wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()
err := a.runAggregators(startTime, src, dst)
if err != nil {
log.Printf("E! [agent] Error running aggregators: %v", err)
}
close(dst)
log.Printf("D! [agent] Output channel closed")
}(src, dst)
src = dst
}
wg.Add(1)
go func(src chan telegraf.Metric) {
defer wg.Done()
err := a.runOutputs(startTime, src)
if err != nil {
log.Printf("E! [agent] Error running outputs: %v", err)
}
}(src)
wg.Wait()
log.Printf("D! [agent] Closing outputs")
a.closeOutputs()
log.Printf("D! [agent] Stopped Successfully")
return nil
}
// Test runs the inputs once and prints the output to stdout in line protocol.
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
var wg sync.WaitGroup
metricC := make(chan telegraf.Metric)
nulC := make(chan telegraf.Metric)
defer func() {
close(metricC)
close(nulC)
wg.Wait()
}()
wg.Add(1)
go func() {
defer wg.Done()
s := influx.NewSerializer()
s.SetFieldSortOrder(influx.SortFields)
for metric := range metricC {
octets, err := s.Serialize(metric)
if err == nil {
fmt.Print("> ", string(octets))
}
metric.Reject()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for range nulC {
}
}()
hasServiceInputs := false
for _, input := range a.Config.Inputs {
if _, ok := input.Input.(telegraf.ServiceInput); ok {
hasServiceInputs = true
break
}
}
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}
if hasServiceInputs {
log.Printf("D! [agent] Starting service inputs")
err := a.startServiceInputs(ctx, metricC)
if err != nil {
return err
}
}
hasErrors := false
for _, input := range a.Config.Inputs {
select {
case <-ctx.Done():
return nil
default:
break
}
acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Precision())
// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err)
hasErrors = true
}
time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
default:
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
}
}
if hasServiceInputs {
log.Printf("D! [agent] Waiting for service inputs")
internal.SleepContext(ctx, waitDuration)
log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()
}
if hasErrors {
return fmt.Errorf("One or more input plugins had an error")
}
return nil
}
// runInputs starts and triggers the periodic gather for Inputs.
//
// When the context is done the timers are stopped and this function returns
// after all ongoing Gather calls complete.
func (a *Agent) runInputs(
ctx context.Context,
startTime time.Time,
dst chan<- telegraf.Metric,
) error {
var wg sync.WaitGroup
for _, input := range a.Config.Inputs {
interval := a.Config.Agent.Interval.Duration
jitter := a.Config.Agent.CollectionJitter.Duration
// Overwrite agent interval if this plugin has its own.
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
acc := NewAccumulator(input, dst)
acc.SetPrecision(a.Precision())
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}
a.gatherOnInterval(ctx, acc, input, interval, jitter)
}(input)
}
wg.Wait()
return nil
}
// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherOnInterval(
ctx context.Context,
acc telegraf.Accumulator,
input *models.RunningInput,
interval time.Duration,
jitter time.Duration,
) {
defer panicRecover(input)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
err := internal.SleepContext(ctx, internal.RandomDuration(jitter))
if err != nil {
return
}
err = a.gatherOnce(acc, input, interval)
if err != nil {
acc.AddError(err)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}
// gatherOnce runs the input's Gather function once, logging a warning each
// interval it fails to complete before.
func (a *Agent) gatherOnce(
acc telegraf.Accumulator,
input *models.RunningInput,
timeout time.Duration,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()
done := make(chan error)
go func() {
done <- input.Gather(acc)
}()
for {
select {
case err := <-done:
return err
case <-ticker.C:
log.Printf("W! [agent] [%s] did not complete within its interval",
input.LogName())
}
}
}
// runProcessors applies processors to metrics.
func (a *Agent) runProcessors(
src <-chan telegraf.Metric,
agg chan<- telegraf.Metric,
) error {
for metric := range src {
metrics := a.applyProcessors(metric)
for _, metric := range metrics {
agg <- metric
}
}
return nil
}
// applyProcessors applies all processors to a metric.
func (a *Agent) applyProcessors(m telegraf.Metric) []telegraf.Metric {
metrics := []telegraf.Metric{m}
for _, processor := range a.Config.Processors {
metrics = processor.Apply(metrics...)
}
return metrics
}
func updateWindow(start time.Time, roundInterval bool, period time.Duration) (time.Time, time.Time) {
var until time.Time
if roundInterval {
until = internal.AlignTime(start, period)
if until == start {
until = internal.AlignTime(start.Add(time.Nanosecond), period)
}
} else {
until = start.Add(period)
}
since := until.Add(-period)
return since, until
}
// runAggregators adds metrics to the aggregators and triggers their periodic
// push call.
//
// Runs until src is closed and all metrics have been processed. Will call
// push one final time before returning.
func (a *Agent) runAggregators(
startTime time.Time,
src <-chan telegraf.Metric,
dst chan<- telegraf.Metric,
) error {
ctx, cancel := context.WithCancel(context.Background())
// Before calling Add, initialize the aggregation window. This ensures
// that any metric created after start time will be aggregated.
for _, agg := range a.Config.Aggregators {
since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period())
agg.UpdateWindow(since, until)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for metric := range src {
var dropOriginal bool
for _, agg := range a.Config.Aggregators {
if ok := agg.Add(metric); ok {
dropOriginal = true
}
}
if !dropOriginal {
dst <- metric
} else {
metric.Drop()
}
}
cancel()
}()
aggregations := make(chan telegraf.Metric, 100)
wg.Add(1)
go func() {
defer wg.Done()
var aggWg sync.WaitGroup
for _, agg := range a.Config.Aggregators {
aggWg.Add(1)
go func(agg *models.RunningAggregator) {
defer aggWg.Done()
acc := NewAccumulator(agg, aggregations)
acc.SetPrecision(a.Precision())
a.push(ctx, agg, acc)
}(agg)
}
aggWg.Wait()
close(aggregations)
}()
for metric := range aggregations {
metrics := a.applyProcessors(metric)
for _, metric := range metrics {
dst <- metric
}
}
wg.Wait()
return nil
}
// push runs the push for a single aggregator every period.
func (a *Agent) push(
ctx context.Context,
aggregator *models.RunningAggregator,
acc telegraf.Accumulator,
) {
for {
// Ensures that Push will be called for each period, even if it has
// already elapsed before this function is called. This is guaranteed
// because so long as only Push updates the EndPeriod. This method
// also avoids drift by not using a ticker.
until := time.Until(aggregator.EndPeriod())
select {
case <-time.After(until):
aggregator.Push(acc)
break
case <-ctx.Done():
aggregator.Push(acc)
return
}
}
}
// runOutputs triggers the periodic write for Outputs.
//
// Runs until src is closed and all metrics have been processed. Will call
// Write one final time before returning.
func (a *Agent) runOutputs(
startTime time.Time,
src <-chan telegraf.Metric,
) error {
interval := a.Config.Agent.FlushInterval.Duration
jitter := a.Config.Agent.FlushJitter.Duration
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
for _, output := range a.Config.Outputs {
interval := interval
// Overwrite agent flush_interval if this plugin has its own.
if output.Config.FlushInterval != 0 {
interval = output.Config.FlushInterval
}
jitter := jitter
// Overwrite agent flush_jitter if this plugin has its own.
if output.Config.FlushJitter != nil {
jitter = *output.Config.FlushJitter
}
wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
a.flushLoop(ctx, startTime, output, interval, jitter)
}(output)
}
for metric := range src {
for i, output := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
output.AddMetric(metric)
} else {
output.AddMetric(metric.Copy())
}
}
}
log.Println("I! [agent] Hang on, flushing any cached metrics before shutdown")
cancel()
wg.Wait()
return nil
}
// flushLoop runs an output's flush function periodically until the context is
// done.
func (a *Agent) flushLoop(
ctx context.Context,
startTime time.Time,
output *models.RunningOutput,
interval time.Duration,
jitter time.Duration,
) {
logError := func(err error) {
if err != nil {
log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
}
}
// watch for flush requests
flushRequested := make(chan os.Signal, 1)
watchForFlushSignal(flushRequested)
// align to round interval
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}
// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()
for {
// Favor shutdown over other methods.
select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
default:
}
select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-flushRequested:
logError(a.flushOnce(output, interval, output.Write))
case <-output.BatchReady:
// Favor the ticker over batch ready
select {
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
default:
logError(a.flushOnce(output, interval, output.WriteBatch))
}
}
}
}
// flushOnce runs the output's Write function once, logging a warning each
// interval it fails to complete before.
func (a *Agent) flushOnce(
output *models.RunningOutput,
timeout time.Duration,
writeFunc func() error,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()
done := make(chan error)
go func() {
done <- writeFunc()
}()
for {
select {
case err := <-done:
output.LogBufferStatus()
return err
case <-ticker.C:
log.Printf("W! [agent] [%q] did not complete within its flush interval",
output.LogName())
output.LogBufferStatus()
}
}
}
// initPlugins runs the Init function on plugins.
func (a *Agent) initPlugins() error {
for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.LogName(), err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %v",
processor.Config.Name, err)
}
}
for _, aggregator := range a.Config.Aggregators {
err := aggregator.Init()
if err != nil {
return fmt.Errorf("could not initialize aggregator %s: %v",
aggregator.Config.Name, err)
}
}
for _, output := range a.Config.Outputs {
err := output.Init()
if err != nil {
return fmt.Errorf("could not initialize output %s: %v",
output.Config.Name, err)
}
}
return nil
}
// connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs {
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was '%s'", output.LogName(), err)
err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
return err
}
err = output.Output.Connect()
if err != nil {
return err
}
}
log.Printf("D! [agent] Successfully connected to %s", output.LogName())
}
return nil
}
// closeOutputs closes all outputs.
func (a *Agent) closeOutputs() {
for _, output := range a.Config.Outputs {
output.Close()
}
}
// startServiceInputs starts all service inputs.
func (a *Agent) startServiceInputs(
ctx context.Context,
dst chan<- telegraf.Metric,
) error {
started := []telegraf.ServiceInput{}
for _, input := range a.Config.Inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
err := si.Start(acc)
if err != nil {
log.Printf("E! [agent] Service for [%s] failed to start: %v",
input.LogName(), err)
for _, si := range started {
si.Stop()
}
return err
}
started = append(started, si)
}
}
return nil
}
// stopServiceInputs stops all service inputs.
func (a *Agent) stopServiceInputs() {
for _, input := range a.Config.Inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
si.Stop()
}
}
}
// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration
if precision > 0 {
return precision
}
switch {
case interval >= time.Second:
return time.Second
case interval >= time.Millisecond:
return time.Millisecond
case interval >= time.Microsecond:
return time.Microsecond
default:
return time.Nanosecond
}
}
// panicRecover displays an error if an input panics.
func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("E! FATAL: [%s] panicked: %s, Stack:\n%s",
input.LogName(), err, trace)
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new/choose")
}
}