139 lines
2.6 KiB
Go
139 lines
2.6 KiB
Go
package models
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
)
|
|
|
|
type RunningAggregator struct {
|
|
a telegraf.Aggregator
|
|
Config *AggregatorConfig
|
|
|
|
metrics chan telegraf.Metric
|
|
}
|
|
|
|
func NewRunningAggregator(
|
|
a telegraf.Aggregator,
|
|
conf *AggregatorConfig,
|
|
) *RunningAggregator {
|
|
return &RunningAggregator{
|
|
a: a,
|
|
Config: conf,
|
|
metrics: make(chan telegraf.Metric, 100),
|
|
}
|
|
}
|
|
|
|
// AggregatorConfig containing configuration parameters for the running
|
|
// aggregator plugin.
|
|
type AggregatorConfig struct {
|
|
Name string
|
|
|
|
DropOriginal bool
|
|
NameOverride string
|
|
MeasurementPrefix string
|
|
MeasurementSuffix string
|
|
Tags map[string]string
|
|
Filter Filter
|
|
|
|
Period time.Duration
|
|
Delay time.Duration
|
|
}
|
|
|
|
func (r *RunningAggregator) Name() string {
|
|
return "aggregators." + r.Config.Name
|
|
}
|
|
|
|
func (r *RunningAggregator) MakeMetric(
|
|
measurement string,
|
|
fields map[string]interface{},
|
|
tags map[string]string,
|
|
mType telegraf.ValueType,
|
|
t time.Time,
|
|
) telegraf.Metric {
|
|
m := makemetric(
|
|
measurement,
|
|
fields,
|
|
tags,
|
|
r.Config.NameOverride,
|
|
r.Config.MeasurementPrefix,
|
|
r.Config.MeasurementSuffix,
|
|
r.Config.Tags,
|
|
nil,
|
|
r.Config.Filter,
|
|
false,
|
|
false,
|
|
mType,
|
|
t,
|
|
)
|
|
|
|
m.SetAggregate(true)
|
|
|
|
return m
|
|
}
|
|
|
|
// Add applies the given metric to the aggregator.
|
|
// Before applying to the plugin, it will run any defined filters on the metric.
|
|
// Apply returns true if the original metric should be dropped.
|
|
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
|
|
if r.Config.Filter.IsActive() {
|
|
// check if the aggregator should apply this metric
|
|
name := in.Name()
|
|
fields := in.Fields()
|
|
tags := in.Tags()
|
|
t := in.Time()
|
|
if ok := r.Config.Filter.Apply(name, fields, tags); !ok {
|
|
// aggregator should not apply this metric
|
|
return false
|
|
}
|
|
|
|
in, _ = telegraf.NewMetric(name, tags, fields, t)
|
|
}
|
|
|
|
r.metrics <- in
|
|
return r.Config.DropOriginal
|
|
}
|
|
func (r *RunningAggregator) add(in telegraf.Metric) {
|
|
r.a.Add(in)
|
|
}
|
|
|
|
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
|
|
r.a.Push(acc)
|
|
}
|
|
|
|
func (r *RunningAggregator) reset() {
|
|
r.a.Reset()
|
|
}
|
|
|
|
func (r *RunningAggregator) Run(
|
|
acc telegraf.Accumulator,
|
|
shutdown chan struct{},
|
|
) {
|
|
if r.Config.Delay == 0 {
|
|
r.Config.Delay = time.Millisecond * 100
|
|
}
|
|
if r.Config.Period == 0 {
|
|
r.Config.Period = time.Second * 30
|
|
}
|
|
|
|
time.Sleep(r.Config.Delay)
|
|
periodT := time.NewTicker(r.Config.Period)
|
|
defer periodT.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-shutdown:
|
|
if len(r.metrics) > 0 {
|
|
// wait until metrics are flushed before exiting
|
|
continue
|
|
}
|
|
return
|
|
case m := <-r.metrics:
|
|
r.add(m)
|
|
case <-periodT.C:
|
|
r.push(acc)
|
|
r.reset()
|
|
}
|
|
}
|
|
}
|