Don't skip metrics during startup in aggregate phase (#4230)

This commit is contained in:
Piotr Popieluch 2018-06-06 01:30:53 +02:00 committed by Daniel Nelson
parent b8b139678e
commit 9a7b088839
2 changed files with 21 additions and 18 deletions

View File

@ -362,24 +362,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100) metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100)
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
// Round collection to nearest interval by sleeping // Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval { if a.Config.Agent.RoundInterval {
i := int64(a.Config.Agent.Interval.Duration) i := int64(a.Config.Agent.Interval.Duration)
@ -419,6 +401,25 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval) }(input, interval)
} }
// Start all ServicePlugins inputs after all other
// plugins are loaded so that no metrics get dropped
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input, metricC)
// Service input plugins should set their own precision of their
// metrics.
acc.SetPrecision(time.Nanosecond, 0)
if err := p.Start(acc); err != nil {
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name(), err.Error())
return err
}
defer p.Stop()
}
}
wg.Wait() wg.Wait()
a.Close() a.Close()
return nil return nil

View File

@ -1,6 +1,7 @@
package models package models
import ( import (
"log"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -153,6 +154,7 @@ func (r *RunningAggregator) Run(
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) { m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
// the metric is outside the current aggregation period, so // the metric is outside the current aggregation period, so
// skip it. // skip it.
log.Printf("D! aggregator: metric \"%s\" is not in the current timewindow, skipping", m.Name())
continue continue
} }
r.add(m) r.add(m)