Add Copy() function to Metric interface

This commit is contained in:
Cameron Sparr 2016-11-23 12:30:31 +00:00
parent 536dbfb724
commit 2b0cd2037b
2 changed files with 21 additions and 18 deletions

View File

@ -269,7 +269,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
var dropOriginal bool
if !m.IsAggregate() {
for _, agg := range a.Config.Aggregators {
if ok := agg.Add(copyMetric(m)); ok {
if ok := agg.Add(m.Copy()); ok {
dropOriginal = true
}
}
@ -279,7 +279,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(copyMetric(m))
o.AddMetric(m.Copy())
}
}
}
@ -385,19 +385,3 @@ func (a *Agent) Run(shutdown chan struct{}) error {
wg.Wait()
return nil
}
func copyMetric(m telegraf.Metric) telegraf.Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := telegraf.NewMetric(m.Name(), tags, fields, t)
return out
}

View File

@ -55,6 +55,9 @@ type Metric interface {
SetAggregate(bool)
// IsAggregate returns true if the metric is an aggregate
IsAggregate() bool
// Copy copies the metric
Copy() Metric
}
// metric is a wrapper of the influxdb client.Point struct
@ -175,3 +178,19 @@ func (m *metric) IsAggregate() bool {
func (m *metric) SetAggregate(b bool) {
m.isaggregate = b
}
func (m *metric) Copy() Metric {
t := time.Time(m.Time())
tags := make(map[string]string)
fields := make(map[string]interface{})
for k, v := range m.Tags() {
tags[k] = v
}
for k, v := range m.Fields() {
fields[k] = v
}
out, _ := NewMetric(m.Name(), tags, fields, t)
return out
}