By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.
167 lines
3.6 KiB
Go
167 lines
3.6 KiB
Go
package models
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/metric"
|
|
)
|
|
|
|
type RunningAggregator struct {
|
|
a telegraf.Aggregator
|
|
Config *AggregatorConfig
|
|
|
|
metrics chan telegraf.Metric
|
|
|
|
periodStart time.Time
|
|
periodEnd time.Time
|
|
}
|
|
|
|
func NewRunningAggregator(
|
|
a telegraf.Aggregator,
|
|
conf *AggregatorConfig,
|
|
) *RunningAggregator {
|
|
return &RunningAggregator{
|
|
a: a,
|
|
Config: conf,
|
|
metrics: make(chan telegraf.Metric, 100),
|
|
}
|
|
}
|
|
|
|
// AggregatorConfig containing configuration parameters for the running
|
|
// aggregator plugin.
|
|
type AggregatorConfig struct {
|
|
Name string
|
|
|
|
DropOriginal bool
|
|
NameOverride string
|
|
MeasurementPrefix string
|
|
MeasurementSuffix string
|
|
Tags map[string]string
|
|
Filter Filter
|
|
|
|
Period time.Duration
|
|
Delay time.Duration
|
|
}
|
|
|
|
func (r *RunningAggregator) Name() string {
|
|
return "aggregators." + r.Config.Name
|
|
}
|
|
|
|
func (r *RunningAggregator) MakeMetric(
|
|
measurement string,
|
|
fields map[string]interface{},
|
|
tags map[string]string,
|
|
mType telegraf.ValueType,
|
|
t time.Time,
|
|
) telegraf.Metric {
|
|
m := makemetric(
|
|
measurement,
|
|
fields,
|
|
tags,
|
|
r.Config.NameOverride,
|
|
r.Config.MeasurementPrefix,
|
|
r.Config.MeasurementSuffix,
|
|
r.Config.Tags,
|
|
nil,
|
|
r.Config.Filter,
|
|
false,
|
|
mType,
|
|
t,
|
|
)
|
|
|
|
if m != nil {
|
|
m.SetAggregate(true)
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// Add applies the given metric to the aggregator.
|
|
// Before applying to the plugin, it will run any defined filters on the metric.
|
|
// Apply returns true if the original metric should be dropped.
|
|
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
|
|
if r.Config.Filter.IsActive() {
|
|
// check if the aggregator should apply this metric
|
|
name := in.Name()
|
|
fields := in.Fields()
|
|
tags := in.Tags()
|
|
t := in.Time()
|
|
if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
|
|
// aggregator should not apply this metric
|
|
return false
|
|
}
|
|
|
|
in, _ = metric.New(name, tags, fields, t)
|
|
}
|
|
|
|
r.metrics <- in
|
|
return r.Config.DropOriginal
|
|
}
|
|
func (r *RunningAggregator) add(in telegraf.Metric) {
|
|
r.a.Add(in)
|
|
}
|
|
|
|
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
|
|
r.a.Push(acc)
|
|
}
|
|
|
|
func (r *RunningAggregator) reset() {
|
|
r.a.Reset()
|
|
}
|
|
|
|
// Run runs the running aggregator, listens for incoming metrics, and waits
|
|
// for period ticks to tell it when to push and reset the aggregator.
|
|
func (r *RunningAggregator) Run(
|
|
acc telegraf.Accumulator,
|
|
shutdown chan struct{},
|
|
) {
|
|
// The start of the period is truncated to the nearest second.
|
|
//
|
|
// Every metric then gets it's timestamp checked and is dropped if it
|
|
// is not within:
|
|
//
|
|
// start < t < end + truncation + delay
|
|
//
|
|
// So if we start at now = 00:00.2 with a 10s period and 0.3s delay:
|
|
// now = 00:00.2
|
|
// start = 00:00
|
|
// truncation = 00:00.2
|
|
// end = 00:10
|
|
// 1st interval: 00:00 - 00:10.5
|
|
// 2nd interval: 00:10 - 00:20.5
|
|
// etc.
|
|
//
|
|
now := time.Now()
|
|
r.periodStart = now.Truncate(time.Second)
|
|
truncation := now.Sub(r.periodStart)
|
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
|
time.Sleep(r.Config.Delay)
|
|
periodT := time.NewTicker(r.Config.Period)
|
|
defer periodT.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-shutdown:
|
|
if len(r.metrics) > 0 {
|
|
// wait until metrics are flushed before exiting
|
|
continue
|
|
}
|
|
return
|
|
case m := <-r.metrics:
|
|
if m.Time().Before(r.periodStart) ||
|
|
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
|
|
// the metric is outside the current aggregation period, so
|
|
// skip it.
|
|
continue
|
|
}
|
|
r.add(m)
|
|
case <-periodT.C:
|
|
r.periodStart = r.periodEnd
|
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
|
r.push(acc)
|
|
r.reset()
|
|
}
|
|
}
|
|
}
|