Ignore tracking for metrics added to aggregator (#5508)

This commit is contained in:
Daniel Nelson 2019-03-01 11:21:31 -08:00 committed by GitHub
parent 2c09010f72
commit c57f2d9d48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 14 deletions

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
@ -96,39 +97,37 @@ func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return m return m
} }
func (r *RunningAggregator) metricFiltered(metric telegraf.Metric) {
r.MetricsFiltered.Incr(1)
metric.Accept()
}
func (r *RunningAggregator) metricDropped(metric telegraf.Metric) { func (r *RunningAggregator) metricDropped(metric telegraf.Metric) {
r.MetricsDropped.Incr(1) r.MetricsDropped.Incr(1)
metric.Accept()
} }
// Add a metric to the aggregator and return true if the original metric // Add a metric to the aggregator and return true if the original metric
// should be dropped. // should be dropped.
func (r *RunningAggregator) Add(metric telegraf.Metric) bool { func (r *RunningAggregator) Add(m telegraf.Metric) bool {
if ok := r.Config.Filter.Select(metric); !ok { if ok := r.Config.Filter.Select(m); !ok {
return false return false
} }
metric = metric.Copy() // Make a copy of the metric but don't retain tracking; it doesn't make
// sense to fail a metric's delivery due to the aggregation not being
// sent because we can't create aggregations of historical data.
m = metric.FromMetric(m)
r.Config.Filter.Modify(metric) r.Config.Filter.Modify(m)
if len(metric.FieldList()) == 0 { if len(m.FieldList()) == 0 {
r.metricDropped(m)
return r.Config.DropOriginal return r.Config.DropOriginal
} }
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.periodStart.IsZero() || metric.Time().After(r.periodEnd) { if r.periodStart.IsZero() || m.Time().After(r.periodEnd) {
r.metricDropped(metric) r.metricDropped(m)
return r.Config.DropOriginal return r.Config.DropOriginal
} }
r.Aggregator.Add(metric) r.Aggregator.Add(m)
return r.Config.DropOriginal return r.Config.DropOriginal
} }

View File

@ -62,6 +62,28 @@ func New(
return m, nil return m, nil
} }
// FromMetric returns a deep copy of the metric with any tracking information
// removed.
func FromMetric(other telegraf.Metric) telegraf.Metric {
m := &metric{
name: other.Name(),
tags: make([]*telegraf.Tag, len(other.TagList())),
fields: make([]*telegraf.Field, len(other.FieldList())),
tm: other.Time(),
tp: other.Type(),
aggregate: other.IsAggregate(),
}
for i, tag := range other.TagList() {
m.tags[i] = &telegraf.Tag{Key: tag.Key, Value: tag.Value}
}
for i, field := range other.FieldList() {
m.fields[i] = &telegraf.Field{Key: field.Key, Value: field.Value}
}
return m
}
func (m *metric) String() string { func (m *metric) String() string {
return fmt.Sprintf("%s %v %v %d", m.name, m.Tags(), m.Fields(), m.tm.UnixNano()) return fmt.Sprintf("%s %v %v %d", m.name, m.Tags(), m.Fields(), m.tm.UnixNano())
} }