Don't skip metrics during startup in aggregate phase (#4230)
This commit is contained in:
parent
92a8f795f5
commit
dfe7b5eec2
|
@ -362,24 +362,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
metricC := make(chan telegraf.Metric, 100)
|
metricC := make(chan telegraf.Metric, 100)
|
||||||
aggC := make(chan telegraf.Metric, 100)
|
aggC := make(chan telegraf.Metric, 100)
|
||||||
|
|
||||||
// Start all ServicePlugins
|
|
||||||
for _, input := range a.Config.Inputs {
|
|
||||||
input.SetDefaultTags(a.Config.Tags)
|
|
||||||
switch p := input.Input.(type) {
|
|
||||||
case telegraf.ServiceInput:
|
|
||||||
acc := NewAccumulator(input, metricC)
|
|
||||||
// Service input plugins should set their own precision of their
|
|
||||||
// metrics.
|
|
||||||
acc.SetPrecision(time.Nanosecond, 0)
|
|
||||||
if err := p.Start(acc); err != nil {
|
|
||||||
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
|
|
||||||
input.Name(), err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer p.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Round collection to nearest interval by sleeping
|
// Round collection to nearest interval by sleeping
|
||||||
if a.Config.Agent.RoundInterval {
|
if a.Config.Agent.RoundInterval {
|
||||||
i := int64(a.Config.Agent.Interval.Duration)
|
i := int64(a.Config.Agent.Interval.Duration)
|
||||||
|
@ -419,6 +401,25 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
}(input, interval)
|
}(input, interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start all ServicePlugins inputs after all other
|
||||||
|
// plugins are loaded so that no metrics get dropped
|
||||||
|
for _, input := range a.Config.Inputs {
|
||||||
|
input.SetDefaultTags(a.Config.Tags)
|
||||||
|
switch p := input.Input.(type) {
|
||||||
|
case telegraf.ServiceInput:
|
||||||
|
acc := NewAccumulator(input, metricC)
|
||||||
|
// Service input plugins should set their own precision of their
|
||||||
|
// metrics.
|
||||||
|
acc.SetPrecision(time.Nanosecond, 0)
|
||||||
|
if err := p.Start(acc); err != nil {
|
||||||
|
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
|
||||||
|
input.Name(), err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer p.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
a.Close()
|
a.Close()
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
@ -153,6 +154,7 @@ func (r *RunningAggregator) Run(
|
||||||
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
|
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
|
||||||
// the metric is outside the current aggregation period, so
|
// the metric is outside the current aggregation period, so
|
||||||
// skip it.
|
// skip it.
|
||||||
|
log.Printf("D! aggregator: metric \"%s\" is not in the current timewindow, skipping", m.Name())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
r.add(m)
|
r.add(m)
|
||||||
|
|
Loading…
Reference in New Issue