diff --git a/agent/agent.go b/agent/agent.go index 2df603cd4..47d5336e4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -300,6 +300,8 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case metric := <-metricC: + // NOTE potential bottleneck here as we put each metric through the + // processors serially. mS := []telegraf.Metric{metric} for _, processor := range a.Config.Processors { mS = processor.Apply(mS...) @@ -321,7 +323,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration) // channel shared between all input threads for accumulating metrics - metricC := make(chan telegraf.Metric, 10000) + metricC := make(chan telegraf.Metric, 100) // Start all ServicePlugins for _, input := range a.Config.Inputs { diff --git a/internal/config/config.go b/internal/config/config.go index 676f833d6..0beb4a29c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -841,7 +841,11 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } - conf := &models.AggregatorConfig{Name: name} + conf := &models.AggregatorConfig{ + Name: name, + Delay: time.Millisecond * 100, + Period: time.Second * 30, + } if node, ok := tbl.Fields["period"]; ok { if kv, ok := node.(*ast.KeyValue); ok { diff --git a/internal/models/makemetric.go b/internal/models/makemetric.go index b198109a7..afaa9475e 100644 --- a/internal/models/makemetric.go +++ b/internal/models/makemetric.go @@ -19,6 +19,7 @@ import ( // applyFilter: if false, the above filter is not applied to each metric. // This is used by Aggregators, because aggregators use filters // on incoming metrics instead of on created metrics. +// TODO refactor this to not have such a huge func signature. func makemetric( measurement string, fields map[string]interface{}, diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index ca947bb12..9c8403f5f 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -109,13 +109,6 @@ func (r *RunningAggregator) Run( acc telegraf.Accumulator, shutdown chan struct{}, ) { - if r.Config.Delay == 0 { - r.Config.Delay = time.Millisecond * 100 - } - if r.Config.Period == 0 { - r.Config.Period = time.Second * 30 - } - time.Sleep(r.Config.Delay) periodT := time.NewTicker(r.Config.Period) defer periodT.Stop() diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index f816c0a80..495b8ddda 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -20,6 +20,7 @@ func TestAdd(t *testing.T) { Filter: Filter{ NamePass: []string{"*"}, }, + Period: time.Millisecond * 500, }) assert.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} diff --git a/metric.go b/metric.go index d21330ff6..5079ff4f1 100644 --- a/metric.go +++ b/metric.go @@ -35,6 +35,7 @@ type Metric interface { UnixNano() int64 // HashID returns a non-cryptographic hash of the metric (name + tags) + // NOTE: do not persist & depend on this value to disk. HashID() uint64 // Fields returns the fields for the metric diff --git a/plugins/aggregators/minmax/minmax.go b/plugins/aggregators/minmax/minmax.go index a79d7bb01..1c83c0cc2 100644 --- a/plugins/aggregators/minmax/minmax.go +++ b/plugins/aggregators/minmax/minmax.go @@ -6,10 +6,7 @@ import ( ) type MinMax struct { - // caches for metric fields, names, and tags - fieldCache map[uint64]map[string]minmax - nameCache map[uint64]string - tagCache map[uint64]map[string]string + cache map[uint64]aggregate } func NewMinMax() telegraf.Aggregator { @@ -18,6 +15,12 @@ func NewMinMax() telegraf.Aggregator { return mm } +type aggregate struct { + fields map[string]minmax + name string + tags map[string]string +} + type minmax struct { min float64 max float64 @@ -42,41 +45,41 @@ func (m *MinMax) Description() string { func (m *MinMax) Add(in telegraf.Metric) { id := in.HashID() - if _, ok := m.nameCache[id]; !ok { + if _, ok := m.cache[id]; !ok { // hit an uncached metric, create caches for first time: - m.nameCache[id] = in.Name() - m.tagCache[id] = in.Tags() - m.fieldCache[id] = make(map[string]minmax) + a := aggregate{ + name: in.Name(), + tags: in.Tags(), + fields: make(map[string]minmax), + } for k, v := range in.Fields() { if fv, ok := convert(v); ok { - m.fieldCache[id][k] = minmax{ + a.fields[k] = minmax{ min: fv, max: fv, } } } + m.cache[id] = a } else { for k, v := range in.Fields() { if fv, ok := convert(v); ok { - if _, ok := m.fieldCache[id][k]; !ok { + if _, ok := m.cache[id].fields[k]; !ok { // hit an uncached field of a cached metric - m.fieldCache[id][k] = minmax{ + m.cache[id].fields[k] = minmax{ min: fv, max: fv, } continue } - cmpmin := compare(m.fieldCache[id][k].min, fv) - cmpmax := compare(m.fieldCache[id][k].max, fv) - if cmpmin == 1 { - tmp := m.fieldCache[id][k] + if fv < m.cache[id].fields[k].min { + tmp := m.cache[id].fields[k] tmp.min = fv - m.fieldCache[id][k] = tmp - } - if cmpmax == -1 { - tmp := m.fieldCache[id][k] + m.cache[id].fields[k] = tmp + } else if fv > m.cache[id].fields[k].max { + tmp := m.cache[id].fields[k] tmp.max = fv - m.fieldCache[id][k] = tmp + m.cache[id].fields[k] = tmp } } } @@ -84,29 +87,18 @@ func (m *MinMax) Add(in telegraf.Metric) { } func (m *MinMax) Push(acc telegraf.Accumulator) { - for id, _ := range m.nameCache { + for _, aggregate := range m.cache { fields := map[string]interface{}{} - for k, v := range m.fieldCache[id] { + for k, v := range aggregate.fields { fields[k+"_min"] = v.min fields[k+"_max"] = v.max } - acc.AddFields(m.nameCache[id], fields, m.tagCache[id]) + acc.AddFields(aggregate.name, fields, aggregate.tags) } } func (m *MinMax) Reset() { - m.fieldCache = make(map[uint64]map[string]minmax) - m.nameCache = make(map[uint64]string) - m.tagCache = make(map[uint64]map[string]string) -} - -func compare(a, b float64) int { - if a < b { - return -1 - } else if a > b { - return 1 - } - return 0 + m.cache = make(map[uint64]aggregate) } func convert(in interface{}) (float64, bool) {