diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 7183977b7..3d32b3f5c 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -244,8 +244,10 @@ type Downsampling struct { Aggrations Aggregation } +// Aggregation maps the field names to aggregation function for them type Aggregation map[string]string +// Add appends metrics to the metrics that will be aggregated func (d *Downsampling) Add(metrics ...telegraf.Metric) error { d.Lock() d.Metrics = append(d.Metrics, metrics...) @@ -253,6 +255,8 @@ func (d *Downsampling) Add(metrics ...telegraf.Metric) error { return nil } +// Run starts the downsampler +// it runs periodically func (d *Downsampling) Run() { for { select { @@ -265,9 +269,57 @@ func (d *Downsampling) Run() { // Aggregate calculates the mean value of fields by given time func (d *Downsampling) Aggregate() []telegraf.Metric { + return nil } +// Sum calcuate the sum values of given fields +func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) { + var ( + sumMetric telegraf.Metric + sums = make(map[string]interface{}) + ) + + for _, field := range fields { + sums[field] = 0 + } + + d.RLock() + for _, metric := range d.Metrics { + for _, fieldName := range fields { + value, ok := metric.Fields()[fieldName] + if !ok { + continue + } + oldVal := sums[fieldName] + switch value := value.(type) { + case int: + sums[fieldName] = oldVal.(int) + value + case int32: + sums[fieldName] = oldVal.(int32) + value + case int64: + sums[fieldName] = oldVal.(int) + int(value) + case float32: + sums[fieldName] = oldVal.(float32) + value + case float64: + sums[fieldName] = oldVal.(float64) + value + default: + continue + } + } + } + d.RUnlock() + + sumMetric, err := telegraf.NewMetric( + d.Name, + map[string]string{}, + sums, + time.Now(), + ) + return sumMetric, err +} + +// Mean calculates the mean values of given fields func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { var ( aggrMetric telegraf.Metric diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index d1f1acb56..52e0dfe7a 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -55,6 +55,7 @@ func TestDownsampling_mean(t *testing.T) { }, time.Now(), ) + require.NoError(t, err) metricB, err := telegraf.NewMetric( "sven", @@ -80,3 +81,42 @@ func TestDownsampling_mean(t *testing.T) { require.Equal(t, int64(100), aggr.Fields()["intelligence"]) require.Equal(t, int64(0), aggr.Fields()["power"]) } + +func TestDownsamling_sum(t *testing.T) { + ds := &Downsampling{} + metricA, err := telegraf.NewMetric( + "earthshaker", + map[string]string{}, + map[string]interface{}{ + "damage": "high", + "agility": 12, + "strength": 120, + "intelligence": 60, + }, + time.Now(), + ) + require.NoError(t, err) + + metricB, err := telegraf.NewMetric( + "sven", + map[string]string{}, + map[string]interface{}{ + "strength": 80, + "intelligence": 140, + }, + time.Now(), + ) + require.NoError(t, err) + + err = ds.Add(metricA) + require.NoError(t, err) + + err = ds.Add(metricB) + require.NoError(t, err) + + aggr, err := ds.Sum("strength", "intelligence", "power") + require.NoError(t, err) + require.Equal(t, int64(200), aggr.Fields()["strength"]) + require.Equal(t, int64(200), aggr.Fields()["intelligence"]) + require.Equal(t, int64(0), aggr.Fields()["power"]) +}