165 lines
3.6 KiB
Go
165 lines
3.6 KiB
Go
package models
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/metric"
|
|
)
|
|
|
|
type RunningAggregator struct {
|
|
a telegraf.Aggregator
|
|
Config *AggregatorConfig
|
|
|
|
metrics chan telegraf.Metric
|
|
|
|
periodStart time.Time
|
|
periodEnd time.Time
|
|
}
|
|
|
|
func NewRunningAggregator(
|
|
a telegraf.Aggregator,
|
|
conf *AggregatorConfig,
|
|
) *RunningAggregator {
|
|
return &RunningAggregator{
|
|
a: a,
|
|
Config: conf,
|
|
metrics: make(chan telegraf.Metric, 100),
|
|
}
|
|
}
|
|
|
|
// AggregatorConfig containing configuration parameters for the running
|
|
// aggregator plugin.
|
|
type AggregatorConfig struct {
|
|
Name string
|
|
|
|
DropOriginal bool
|
|
NameOverride string
|
|
MeasurementPrefix string
|
|
MeasurementSuffix string
|
|
Tags map[string]string
|
|
Filter Filter
|
|
|
|
Period time.Duration
|
|
Delay time.Duration
|
|
}
|
|
|
|
func (r *RunningAggregator) Name() string {
|
|
return "aggregators." + r.Config.Name
|
|
}
|
|
|
|
func (r *RunningAggregator) MakeMetric(
|
|
measurement string,
|
|
fields map[string]interface{},
|
|
tags map[string]string,
|
|
mType telegraf.ValueType,
|
|
t time.Time,
|
|
) telegraf.Metric {
|
|
m := makemetric(
|
|
measurement,
|
|
fields,
|
|
tags,
|
|
r.Config.NameOverride,
|
|
r.Config.MeasurementPrefix,
|
|
r.Config.MeasurementSuffix,
|
|
r.Config.Tags,
|
|
nil,
|
|
r.Config.Filter,
|
|
false,
|
|
mType,
|
|
t,
|
|
)
|
|
|
|
m.SetAggregate(true)
|
|
|
|
return m
|
|
}
|
|
|
|
// Add applies the given metric to the aggregator.
|
|
// Before applying to the plugin, it will run any defined filters on the metric.
|
|
// Apply returns true if the original metric should be dropped.
|
|
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
|
|
if r.Config.Filter.IsActive() {
|
|
// check if the aggregator should apply this metric
|
|
name := in.Name()
|
|
fields := in.Fields()
|
|
tags := in.Tags()
|
|
t := in.Time()
|
|
if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
|
|
// aggregator should not apply this metric
|
|
return false
|
|
}
|
|
|
|
in, _ = metric.New(name, tags, fields, t)
|
|
}
|
|
|
|
r.metrics <- in
|
|
return r.Config.DropOriginal
|
|
}
|
|
func (r *RunningAggregator) add(in telegraf.Metric) {
|
|
r.a.Add(in)
|
|
}
|
|
|
|
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
|
|
r.a.Push(acc)
|
|
}
|
|
|
|
func (r *RunningAggregator) reset() {
|
|
r.a.Reset()
|
|
}
|
|
|
|
// Run runs the running aggregator, listens for incoming metrics, and waits
|
|
// for period ticks to tell it when to push and reset the aggregator.
|
|
func (r *RunningAggregator) Run(
|
|
acc telegraf.Accumulator,
|
|
shutdown chan struct{},
|
|
) {
|
|
// The start of the period is truncated to the nearest second.
|
|
//
|
|
// Every metric then gets it's timestamp checked and is dropped if it
|
|
// is not within:
|
|
//
|
|
// start < t < end + truncation + delay
|
|
//
|
|
// So if we start at now = 00:00.2 with a 10s period and 0.3s delay:
|
|
// now = 00:00.2
|
|
// start = 00:00
|
|
// truncation = 00:00.2
|
|
// end = 00:10
|
|
// 1st interval: 00:00 - 00:10.5
|
|
// 2nd interval: 00:10 - 00:20.5
|
|
// etc.
|
|
//
|
|
now := time.Now()
|
|
r.periodStart = now.Truncate(time.Second)
|
|
truncation := now.Sub(r.periodStart)
|
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
|
time.Sleep(r.Config.Delay)
|
|
periodT := time.NewTicker(r.Config.Period)
|
|
defer periodT.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-shutdown:
|
|
if len(r.metrics) > 0 {
|
|
// wait until metrics are flushed before exiting
|
|
continue
|
|
}
|
|
return
|
|
case m := <-r.metrics:
|
|
if m.Time().Before(r.periodStart) ||
|
|
m.Time().After(r.periodEnd.Add(truncation).Add(r.Config.Delay)) {
|
|
// the metric is outside the current aggregation period, so
|
|
// skip it.
|
|
continue
|
|
}
|
|
r.add(m)
|
|
case <-periodT.C:
|
|
r.periodStart = r.periodEnd
|
|
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
|
r.push(acc)
|
|
r.reset()
|
|
}
|
|
}
|
|
}
|