From dfe7b5eec2d295eb9a7a33bfcabd27bba2a5d62c Mon Sep 17 00:00:00 2001 From: Piotr Popieluch Date: Wed, 6 Jun 2018 01:30:53 +0200 Subject: [PATCH] Don't skip metrics during startup in aggregate phase (#4230) --- agent/agent.go | 37 ++++++++++++++------------- internal/models/running_aggregator.go | 2 ++ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index f5f23cd05..cfac6ab78 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -362,24 +362,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { metricC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100) - // Start all ServicePlugins - for _, input := range a.Config.Inputs { - input.SetDefaultTags(a.Config.Tags) - switch p := input.Input.(type) { - case telegraf.ServiceInput: - acc := NewAccumulator(input, metricC) - // Service input plugins should set their own precision of their - // metrics. - acc.SetPrecision(time.Nanosecond, 0) - if err := p.Start(acc); err != nil { - log.Printf("E! Service for input %s failed to start, exiting\n%s\n", - input.Name(), err.Error()) - return err - } - defer p.Stop() - } - } - // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { i := int64(a.Config.Agent.Interval.Duration) @@ -419,6 +401,25 @@ func (a *Agent) Run(shutdown chan struct{}) error { }(input, interval) } + // Start all ServicePlugins inputs after all other + // plugins are loaded so that no metrics get dropped + for _, input := range a.Config.Inputs { + input.SetDefaultTags(a.Config.Tags) + switch p := input.Input.(type) { + case telegraf.ServiceInput: + acc := NewAccumulator(input, metricC) + // Service input plugins should set their own precision of their + // metrics. + acc.SetPrecision(time.Nanosecond, 0) + if err := p.Start(acc); err != nil { + log.Printf("E! Service for input %s failed to start, exiting\n%s\n", + input.Name(), err.Error()) + return err + } + defer p.Stop() + } + } + wg.Wait() a.Close() return nil diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 8189a6667..8cb04e4f6 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -1,6 +1,7 @@ package models import ( + "log" "time" "github.com/influxdata/telegraf" @@ -153,6 +154,7 @@ func (r *RunningAggregator) Run( m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) { // the metric is outside the current aggregation period, so // skip it. + log.Printf("D! aggregator: metric \"%s\" is not in the current timewindow, skipping", m.Name()) continue } r.add(m)