Refactor handling of MinMax functionality into RunningAggregator

allows for easier addition of a sliding window at a later time.

Also makes `period` be a generic argument for all aggregator plugins.
This commit is contained in:
Cameron Sparr 2016-09-22 18:10:51 +01:00
parent ef885eda62
commit fead80844e
7 changed files with 252 additions and 297 deletions

View File

@ -259,9 +259,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
for _, agg := range a.Config.Aggregators {
agg.Aggregator.Stop()
}
if len(outMetricC) > 0 {
// keep going until outMetricC is flushed
continue
@ -273,7 +270,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
var dropOriginal bool
if !m.IsAggregate() {
for _, agg := range a.Config.Aggregators {
if ok := agg.Apply(copyMetric(m)); ok {
if ok := agg.Add(copyMetric(m)); ok {
dropOriginal = true
}
}
@ -315,22 +312,6 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}
// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
@ -367,18 +348,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
// Start all Aggregators
for _, aggregator := range a.Config.Aggregators {
acc := NewAccumulator(aggregator, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
if err := aggregator.Aggregator.Start(acc); err != nil {
log.Printf("[%s] failed to start, exiting\n%s\n",
aggregator.Name(), err.Error())
return err
}
}
wg.Add(1)
go func() {
defer wg.Done()
@ -403,6 +372,33 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}(input, interval)
}
wg.Add(len(a.Config.Aggregators))
for _, aggregator := range a.Config.Aggregators {
go func(agg *models.RunningAggregator) {
defer wg.Done()
acc := NewAccumulator(agg, metricC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, shutdown)
}(aggregator)
}
wg.Wait()
return nil
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}

View File

@ -1,16 +1,22 @@
package telegraf
// Aggregator is an interface for implementing an Aggregator plugin.
// the RunningAggregator wraps this interface and guarantees that
// Add, Push, and Reset can not be called concurrently, so locking is not
// required when implementing an Aggregator plugin.
type Aggregator interface {
// SampleConfig returns the default configuration of the Input
// SampleConfig returns the default configuration of the Input.
SampleConfig() string
// Description returns a one-sentence description on the Input
// Description returns a one-sentence description on the Input.
Description() string
// Apply the metric to the aggregator
Apply(in Metric)
// Add the metric to the aggregator.
Add(in Metric)
// Start starts the service filter with the given accumulator
Start(acc Accumulator) error
Stop()
// Push pushes the current aggregates to the accumulator.
Push(acc Accumulator)
// Reset resets the aggregators caches and aggregates.
Reset()
}

View File

@ -693,7 +693,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
}
aggregator := creator()
aggregatorConfig, err := buildAggregator(name, table)
conf, err := buildAggregator(name, table)
if err != nil {
return err
}
@ -702,12 +702,7 @@ func (c *Config) addAggregator(name string, table *ast.Table) error {
return err
}
rf := &models.RunningAggregator{
Aggregator: aggregator,
Config: aggregatorConfig,
}
c.Aggregators = append(c.Aggregators, rf)
c.Aggregators = append(c.Aggregators, models.NewRunningAggregator(aggregator, conf))
return nil
}
@ -818,7 +813,6 @@ func (c *Config) addInput(name string, table *ast.Table) error {
// buildAggregator TODO doc
func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, error) {
conf := &models.AggregatorConfig{Name: name}
unsupportedFields := []string{"tagexclude", "taginclude"}
for _, field := range unsupportedFields {
if _, ok := tbl.Fields[field]; ok {
@ -826,6 +820,34 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}
conf := &models.AggregatorConfig{Name: name}
if node, ok := tbl.Fields["period"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Period = dur
}
}
}
if node, ok := tbl.Fields["delay"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
conf.Delay = dur
}
}
}
if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
@ -871,6 +893,8 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}
delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")

View File

@ -7,8 +7,21 @@ import (
)
type RunningAggregator struct {
Aggregator telegraf.Aggregator
a telegraf.Aggregator
Config *AggregatorConfig
metrics chan telegraf.Metric
}
func NewRunningAggregator(
a telegraf.Aggregator,
conf *AggregatorConfig,
) *RunningAggregator {
return &RunningAggregator{
a: a,
Config: conf,
metrics: make(chan telegraf.Metric, 100),
}
}
// AggregatorConfig containing configuration parameters for the running
@ -22,6 +35,9 @@ type AggregatorConfig struct {
MeasurementSuffix string
Tags map[string]string
Filter Filter
Period time.Duration
Delay time.Duration
}
func (r *RunningAggregator) Name() string {
@ -56,10 +72,10 @@ func (r *RunningAggregator) MakeMetric(
return m
}
// Apply applies the given metric to the aggregator.
// Add applies the given metric to the aggregator.
// Before applying to the plugin, it will run any defined filters on the metric.
// Apply returns true if the original metric should be dropped.
func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
func (r *RunningAggregator) Add(in telegraf.Metric) bool {
if r.Config.Filter.IsActive() {
// check if the aggregator should apply this metric
name := in.Name()
@ -74,6 +90,49 @@ func (r *RunningAggregator) Apply(in telegraf.Metric) bool {
in, _ = telegraf.NewMetric(name, tags, fields, t)
}
r.Aggregator.Apply(in)
r.metrics <- in
return r.Config.DropOriginal
}
func (r *RunningAggregator) add(in telegraf.Metric) {
r.a.Add(in)
}
func (r *RunningAggregator) push(acc telegraf.Accumulator) {
r.a.Push(acc)
}
func (r *RunningAggregator) reset() {
r.a.Reset()
}
func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
shutdown chan struct{},
) {
if r.Config.Delay == 0 {
r.Config.Delay = time.Millisecond * 100
}
if r.Config.Period == 0 {
r.Config.Period = time.Second * 30
}
time.Sleep(r.Config.Delay)
periodT := time.NewTicker(r.Config.Period)
defer periodT.Stop()
for {
select {
case <-shutdown:
if len(r.metrics) > 0 {
// wait until metrics are flushed before exiting
continue
}
return
case m := <-r.metrics:
r.add(m)
case <-periodT.C:
r.push(acc)
r.reset()
}
}
}

View File

@ -2,26 +2,28 @@ package models
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestApply(t *testing.T) {
func TestAdd(t *testing.T) {
a := &TestAggregator{}
ra := RunningAggregator{
Config: &AggregatorConfig{
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
},
Aggregator: a,
}
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, make(chan struct{}))
m := ra.MakeMetric(
"RITest",
@ -30,21 +32,64 @@ func TestApply(t *testing.T) {
telegraf.Untyped,
time.Now(),
)
assert.False(t, ra.Apply(m))
assert.Equal(t, int64(101), a.sum)
assert.False(t, ra.Add(m))
for {
if atomic.LoadInt64(&a.sum) > 0 {
break
}
}
assert.Equal(t, int64(101), atomic.LoadInt64(&a.sum))
}
func TestApplyDropOriginal(t *testing.T) {
ra := RunningAggregator{
Config: &AggregatorConfig{
func TestAddAndPushOnePeriod(t *testing.T) {
a := &TestAggregator{}
ra := NewRunningAggregator(a, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"*"},
},
Period: time.Millisecond * 500,
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
shutdown := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ra.Run(&acc, shutdown)
}()
m := ra.MakeMetric(
"RITest",
map[string]interface{}{"value": int(101)},
map[string]string{},
telegraf.Untyped,
time.Now(),
)
assert.False(t, ra.Add(m))
for {
if acc.NMetrics() > 0 {
break
}
}
acc.AssertContainsFields(t, "TestMetric", map[string]interface{}{"sum": int64(101)})
close(shutdown)
wg.Wait()
}
func TestAddDropOriginal(t *testing.T) {
ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
Filter: Filter{
NamePass: []string{"RI*"},
},
DropOriginal: true,
},
Aggregator: &TestAggregator{},
}
})
assert.NoError(t, ra.Config.Filter.Compile())
m := ra.MakeMetric(
@ -54,9 +99,9 @@ func TestApplyDropOriginal(t *testing.T) {
telegraf.Untyped,
time.Now(),
)
assert.True(t, ra.Apply(m))
assert.True(t, ra.Add(m))
// this metric name doesn't match the filter, so Apply will return false
// this metric name doesn't match the filter, so Add will return false
m2 := ra.MakeMetric(
"foobar",
map[string]interface{}{"value": int(101)},
@ -64,17 +109,15 @@ func TestApplyDropOriginal(t *testing.T) {
telegraf.Untyped,
time.Now(),
)
assert.False(t, ra.Apply(m2))
assert.False(t, ra.Add(m2))
}
// make an untyped, counter, & gauge metric
func TestMakeMetricA(t *testing.T) {
now := time.Now()
ra := RunningAggregator{
Config: &AggregatorConfig{
ra := NewRunningAggregator(&TestAggregator{}, &AggregatorConfig{
Name: "TestRunningAggregator",
},
}
})
assert.Equal(t, "aggregators.TestRunningAggregator", ra.Name())
m := ra.MakeMetric(
@ -138,13 +181,19 @@ type TestAggregator struct {
func (t *TestAggregator) Description() string { return "" }
func (t *TestAggregator) SampleConfig() string { return "" }
func (t *TestAggregator) Start(acc telegraf.Accumulator) error { return nil }
func (t *TestAggregator) Stop() {}
func (t *TestAggregator) Reset() {}
func (t *TestAggregator) Apply(in telegraf.Metric) {
func (t *TestAggregator) Push(acc telegraf.Accumulator) {
acc.AddFields("TestMetric",
map[string]interface{}{"sum": t.sum},
map[string]string{},
)
}
func (t *TestAggregator) Add(in telegraf.Metric) {
for _, v := range in.Fields() {
if vi, ok := v.(int64); ok {
t.sum += vi
atomic.AddInt64(&t.sum, vi)
}
}
}

View File

@ -1,28 +1,21 @@
package minmax
import (
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/aggregators"
)
type MinMax struct {
Period internal.Duration
// metrics waiting to be processed
metrics chan telegraf.Metric
shutdown chan struct{}
wg sync.WaitGroup
// caches for metric fields, names, and tags
fieldCache map[uint64]map[string]minmax
nameCache map[uint64]string
tagCache map[uint64]map[string]string
}
acc telegraf.Accumulator
func NewMinMax() telegraf.Aggregator {
mm := &MinMax{}
mm.Reset()
return mm
}
type minmax struct {
@ -43,11 +36,7 @@ func (m *MinMax) Description() string {
return "Keep the aggregate min/max of each metric passing through."
}
func (m *MinMax) Apply(in telegraf.Metric) {
m.metrics <- in
}
func (m *MinMax) apply(in telegraf.Metric) {
func (m *MinMax) Add(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.nameCache[id]; !ok {
// hit an uncached metric, create caches for first time:
@ -90,84 +79,23 @@ func (m *MinMax) apply(in telegraf.Metric) {
}
}
func (m *MinMax) Start(acc telegraf.Accumulator) error {
m.metrics = make(chan telegraf.Metric, 10)
m.shutdown = make(chan struct{})
m.clearCache()
m.acc = acc
m.wg.Add(1)
if m.Period.Duration > 0 {
go m.periodHandler()
} else {
go m.continuousHandler()
}
return nil
}
func (m *MinMax) Stop() {
close(m.shutdown)
m.wg.Wait()
}
func (m *MinMax) addfields(id uint64) {
func (m *MinMax) Push(acc telegraf.Accumulator) {
for id, _ := range m.nameCache {
fields := map[string]interface{}{}
for k, v := range m.fieldCache[id] {
fields[k+"_min"] = v.min
fields[k+"_max"] = v.max
}
m.acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
}
}
func (m *MinMax) clearCache() {
func (m *MinMax) Reset() {
m.fieldCache = make(map[uint64]map[string]minmax)
m.nameCache = make(map[uint64]string)
m.tagCache = make(map[uint64]map[string]string)
}
// periodHandler only adds the aggregate metrics on the configured Period.
// thus if telegraf's collection interval is 10s, and period is 30s, there
// will only be one aggregate sent every 3 metrics.
func (m *MinMax) periodHandler() {
// TODO make this sleep less of a hack!
time.Sleep(time.Millisecond * 200)
defer m.wg.Done()
ticker := time.NewTicker(m.Period.Duration)
defer ticker.Stop()
for {
select {
case in := <-m.metrics:
m.apply(in)
case <-m.shutdown:
if len(m.metrics) > 0 {
continue
}
return
case <-ticker.C:
for id, _ := range m.nameCache {
m.addfields(id)
}
m.clearCache()
}
}
}
// continuousHandler sends one metric for every metric that passes through it.
func (m *MinMax) continuousHandler() {
defer m.wg.Done()
for {
select {
case in := <-m.metrics:
m.apply(in)
m.addfields(in.HashID())
case <-m.shutdown:
if len(m.metrics) > 0 {
continue
}
return
}
}
}
func compare(a, b float64) int {
if a < b {
return -1
@ -190,6 +118,6 @@ func convert(in interface{}) (float64, bool) {
func init() {
aggregators.Add("minmax", func() telegraf.Aggregator {
return &MinMax{}
return NewMinMax()
})
}

View File

@ -5,10 +5,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var m1, _ = telegraf.NewMetric("m1",
@ -48,34 +45,22 @@ var m2, _ = telegraf.NewMetric("m1",
)
func BenchmarkApply(b *testing.B) {
minmax := MinMax{}
minmax.clearCache()
minmax := NewMinMax()
for n := 0; n < b.N; n++ {
minmax.apply(m1)
minmax.apply(m2)
minmax.Add(m1)
minmax.Add(m2)
}
}
// Test two metrics getting added, when running with a period, and the metrics
// are added in the same period.
// Test two metrics getting added.
func TestMinMaxWithPeriod(t *testing.T) {
acc := testutil.Accumulator{}
minmax := MinMax{
Period: internal.Duration{Duration: time.Millisecond * 500},
}
assert.NoError(t, minmax.Start(&acc))
defer minmax.Stop()
minmax := NewMinMax()
minmax.Apply(m1)
minmax.Apply(m2)
for {
if acc.NMetrics() > 0 {
break
}
time.Sleep(time.Millisecond)
}
minmax.Add(m1)
minmax.Add(m2)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_max": float64(1),
@ -107,23 +92,14 @@ func TestMinMaxWithPeriod(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test two metrics getting added, when running with a period, and the metrics
// are added in two different periods.
// Test two metrics getting added with a push/reset in between (simulates
// getting added in different periods.)
func TestMinMaxDifferentPeriods(t *testing.T) {
acc := testutil.Accumulator{}
minmax := MinMax{
Period: internal.Duration{Duration: time.Millisecond * 100},
}
assert.NoError(t, minmax.Start(&acc))
defer minmax.Stop()
minmax := NewMinMax()
minmax.Apply(m1)
for {
if acc.NMetrics() > 0 {
break
}
time.Sleep(time.Millisecond)
}
minmax.Add(m1)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_max": float64(1),
"a_min": float64(1),
@ -152,13 +128,9 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
acc.ClearMetrics()
minmax.Apply(m2)
for {
if acc.NMetrics() > 0 {
break
}
time.Sleep(time.Millisecond)
}
minmax.Reset()
minmax.Add(m2)
minmax.Push(&acc)
expectedFields = map[string]interface{}{
"a_max": float64(1),
"a_min": float64(1),
@ -188,82 +160,3 @@ func TestMinMaxDifferentPeriods(t *testing.T) {
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test two metrics getting added, when running without a period.
func TestMinMaxWithoutPeriod(t *testing.T) {
acc := testutil.Accumulator{}
minmax := MinMax{}
assert.NoError(t, minmax.Start(&acc))
defer minmax.Stop()
minmax.Apply(m1)
for {
if acc.NMetrics() > 0 {
break
}
time.Sleep(time.Millisecond)
}
expectedFields := map[string]interface{}{
"a_max": float64(1),
"a_min": float64(1),
"b_max": float64(1),
"b_min": float64(1),
"c_max": float64(1),
"c_min": float64(1),
"d_max": float64(1),
"d_min": float64(1),
"e_max": float64(1),
"e_min": float64(1),
"f_max": float64(2),
"f_min": float64(2),
"g_max": float64(2),
"g_min": float64(2),
"h_max": float64(2),
"h_min": float64(2),
"i_max": float64(2),
"i_min": float64(2),
"j_max": float64(3),
"j_min": float64(3),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
acc.ClearMetrics()
minmax.Apply(m2)
for {
if acc.NMetrics() > 0 {
break
}
time.Sleep(time.Millisecond)
}
expectedFields = map[string]interface{}{
"a_max": float64(1),
"a_min": float64(1),
"b_max": float64(3),
"b_min": float64(1),
"c_max": float64(3),
"c_min": float64(1),
"d_max": float64(3),
"d_min": float64(1),
"e_max": float64(3),
"e_min": float64(1),
"f_max": float64(2),
"f_min": float64(1),
"g_max": float64(2),
"g_min": float64(1),
"h_max": float64(2),
"h_min": float64(1),
"i_max": float64(2),
"i_min": float64(1),
"j_max": float64(3),
"j_min": float64(1),
"k_max": float64(200),
"k_min": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}