From 6ef341193f6d188c7e27dabe8822f1089180a4cb Mon Sep 17 00:00:00 2001 From: Maksadbek Date: Sat, 23 Jul 2016 15:41:58 +0300 Subject: [PATCH] aggregation data depending on function --- plugins/outputs/influxdb/influxdb.go | 113 ++++++++++++++----- plugins/outputs/influxdb/influxdb_test.go | 131 +++++++++++----------- 2 files changed, 148 insertions(+), 96 deletions(-) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 3d32b3f5c..818bed28b 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -238,14 +238,30 @@ func init() { // Downsampling type Downsampling struct { sync.RWMutex - Name string - Metrics []telegraf.Metric - TimeRange time.Duration - Aggrations Aggregation + Name string + Metrics []telegraf.Metric + TimeRange time.Duration + Aggregations map[string][]Aggregation } // Aggregation maps the field names to aggregation function for them -type Aggregation map[string]string +type Aggregation struct { + FieldName string + FuncName string + Alias string +} + +func (d *Downsampling) AddAggregations(aggrs ...Aggregation) { + for _, aggr := range aggrs { + switch aggr.FuncName { + case "mean": + d.Aggregations["mean"] = append(d.Aggregations["mean"], aggr) + case "sum": + d.Aggregations["sum"] = append(d.Aggregations["sum"], aggr) + default: + } + } +} // Add appends metrics to the metrics that will be aggregated func (d *Downsampling) Add(metrics ...telegraf.Metric) error { @@ -261,48 +277,86 @@ func (d *Downsampling) Run() { for { select { case <-time.After(d.TimeRange): - aggrData := d.Aggregate() + aggrData, err := d.Aggregate() + if err != nil { + continue + } fmt.Printf("%+v\n", aggrData) } } } // Aggregate calculates the mean value of fields by given time -func (d *Downsampling) Aggregate() []telegraf.Metric { +func (d *Downsampling) Aggregate() (telegraf.Metric, error) { + metrics := map[string]interface{}{} + var ( + aggrMetric, sum, mean telegraf.Metric + err error + ) + for name, aggr := range d.Aggregations { + switch name { + case "sum": + sum, err = d.Sum(aggr...) + if err != nil { + return aggrMetric, err + } + case "mean": + mean, err = d.Mean(aggr...) + if err != nil { + return aggrMetric, err + } + default: + } + } - return nil + for k, v := range sum.Fields() { + metrics[k] = v + } + + for k, v := range mean.Fields() { + metrics[k] = v + } + + aggrMetric, err = telegraf.NewMetric( + d.Name, + map[string]string{}, + metrics, + time.Now(), + ) + return aggrMetric, err } -// Sum calcuate the sum values of given fields -func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) { +// Sum calculate the sum values of given fields +func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) { var ( sumMetric telegraf.Metric sums = make(map[string]interface{}) ) for _, field := range fields { - sums[field] = 0 + sums[field.Alias] = 0 } d.RLock() for _, metric := range d.Metrics { - for _, fieldName := range fields { - value, ok := metric.Fields()[fieldName] + for _, field := range fields { + value, ok := metric.Fields()[field.FieldName] if !ok { continue } - oldVal := sums[fieldName] + oldVal := sums[field.Alias] switch value := value.(type) { case int: - sums[fieldName] = oldVal.(int) + value + sums[field.Alias] = oldVal.(int) + value case int32: - sums[fieldName] = oldVal.(int32) + value + sums[field.Alias] = oldVal.(int32) + value case int64: - sums[fieldName] = oldVal.(int) + int(value) + // TODO fix this + sums[field.Alias] = oldVal.(int) + int(value) case float32: - sums[fieldName] = oldVal.(float32) + value + sums[field.Alias] = oldVal.(float32) + value case float64: - sums[fieldName] = oldVal.(float64) + value + sums[field.Alias] = oldVal.(float64) + value default: continue } @@ -320,7 +374,7 @@ func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) { } // Mean calculates the mean values of given fields -func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { +func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) { var ( aggrMetric telegraf.Metric sums = make(map[string]interface{}) @@ -329,28 +383,29 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { // initialize sums map for _, field := range fields { - sums[field] = 0 + sums[field.Alias] = 0 } d.RLock() for _, metric := range d.Metrics { - for _, fieldName := range fields { - value, ok := metric.Fields()[fieldName] + for _, field := range fields { + value, ok := metric.Fields()[field.FieldName] if !ok { continue } - oldVal := sums[fieldName] + oldVal := sums[field.Alias] switch value := value.(type) { case int: - sums[fieldName] = oldVal.(int) + value + sums[field.Alias] = oldVal.(int) + value case int32: - sums[fieldName] = oldVal.(int32) + value + sums[field.Alias] = oldVal.(int32) + value case int64: - sums[fieldName] = oldVal.(int) + int(value) + // TODO: fix this + sums[field.Alias] = oldVal.(int) + int(value) case float32: - sums[fieldName] = oldVal.(float32) + value + sums[field.Alias] = oldVal.(float32) + value case float64: - sums[fieldName] = oldVal.(float64) + value + sums[field.Alias] = oldVal.(float64) + value default: continue } diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 52e0dfe7a..87121b324 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -5,9 +5,7 @@ import ( "net/http" "net/http/httptest" "testing" - "time" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -44,79 +42,78 @@ func TestHTTPInflux(t *testing.T) { func TestDownsampling_mean(t *testing.T) { ds := &Downsampling{} - metricA, err := telegraf.NewMetric( - "earthshaker", - map[string]string{}, - map[string]interface{}{ - "damage": "high", - "agility": 12, - "strength": 120, - "intelligence": 60, + + err := ds.Add(testutil.TestMetric(120)) + require.NoError(t, err) + + err = ds.Add(testutil.TestMetric(80)) + require.NoError(t, err) + + aggregations := []Aggregation{ + Aggregation{ + FieldName: "value", + FuncName: "mean", + Alias: "mean_value", }, - time.Now(), - ) + } + + aggr, err := ds.Mean(aggregations...) require.NoError(t, err) - metricB, err := telegraf.NewMetric( - "sven", - map[string]string{}, - map[string]interface{}{ - "strength": 80, - "intelligence": 140, - }, - time.Now(), - ) - require.NoError(t, err) - - err = ds.Add(metricA) - require.NoError(t, err) - - err = ds.Add(metricB) - require.NoError(t, err) - - aggr, err := ds.Mean("strength", "intelligence", "power") - require.NoError(t, err) - - require.Equal(t, int64(100), aggr.Fields()["strength"]) - require.Equal(t, int64(100), aggr.Fields()["intelligence"]) - require.Equal(t, int64(0), aggr.Fields()["power"]) + require.Equal(t, int64(100), aggr.Fields()["mean_value"]) } func TestDownsamling_sum(t *testing.T) { ds := &Downsampling{} - metricA, err := telegraf.NewMetric( - "earthshaker", - map[string]string{}, - map[string]interface{}{ - "damage": "high", - "agility": 12, - "strength": 120, - "intelligence": 60, + + err := ds.Add(testutil.TestMetric(120)) + require.NoError(t, err) + + err = ds.Add(testutil.TestMetric(80)) + require.NoError(t, err) + + aggregations := []Aggregation{ + Aggregation{ + FieldName: "value", + FuncName: "mean", + Alias: "sum_value", }, - time.Now(), - ) + } + aggr, err := ds.Sum(aggregations...) require.NoError(t, err) - metricB, err := telegraf.NewMetric( - "sven", - map[string]string{}, - map[string]interface{}{ - "strength": 80, - "intelligence": 140, - }, - time.Now(), - ) - require.NoError(t, err) - - err = ds.Add(metricA) - require.NoError(t, err) - - err = ds.Add(metricB) - require.NoError(t, err) - - aggr, err := ds.Sum("strength", "intelligence", "power") - require.NoError(t, err) - require.Equal(t, int64(200), aggr.Fields()["strength"]) - require.Equal(t, int64(200), aggr.Fields()["intelligence"]) - require.Equal(t, int64(0), aggr.Fields()["power"]) + require.Equal(t, int64(200), aggr.Fields()["sum_value"]) +} + +func TestDownsampling_aggregate(t *testing.T) { + ds := &Downsampling{} + + err := ds.Add(testutil.TestMetric(120)) + require.NoError(t, err) + + err = ds.Add(testutil.TestMetric(80)) + require.NoError(t, err) + + aggregations := []Aggregation{ + Aggregation{ + FieldName: "value", + FuncName: "mean", + Alias: "mean_value", + }, + Aggregation{ + FieldName: "value", + FuncName: "sum", + Alias: "sum_value", + }, + } + + ds.Aggregations = make(map[string][]Aggregation) + ds.AddAggregations(aggregations...) + + aggr, err := ds.Aggregate() + require.NoError(t, err) + + require.Equal(t, int64(100), aggr.Fields()["mean_value"]) + require.Equal(t, int64(200), aggr.Fields()["sum_value"]) + }