diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 53ae4f4cc..7183977b7 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -192,7 +192,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } - i.Downsampler.Add(metrics) + err = i.Downsampler.Add(metrics...) + if err != nil { + return err + } for _, metric := range metrics { bp.AddPoint(metric.Point()) @@ -243,7 +246,7 @@ type Downsampling struct { type Aggregation map[string]string -func (d *Downsampling) Add(metrics []telegraf.Metric) error { +func (d *Downsampling) Add(metrics ...telegraf.Metric) error { d.Lock() d.Metrics = append(d.Metrics, metrics...) d.Unlock() @@ -256,7 +259,6 @@ func (d *Downsampling) Run() { case <-time.After(d.TimeRange): aggrData := d.Aggregate() fmt.Printf("%+v\n", aggrData) - } } } @@ -280,7 +282,11 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { d.RLock() for _, metric := range d.Metrics { - for fieldName, value := range metric.Fields() { + for _, fieldName := range fields { + value, ok := metric.Fields()[fieldName] + if !ok { + continue + } oldVal := sums[fieldName] switch value := value.(type) { case int: @@ -288,7 +294,7 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { case int32: sums[fieldName] = oldVal.(int32) + value case int64: - sums[fieldName] = oldVal.(int64) + value + sums[fieldName] = oldVal.(int) + int(value) case float32: sums[fieldName] = oldVal.(float32) + value case float64: @@ -301,13 +307,26 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { d.RUnlock() for i := range sums { - sums[i] = sums[i] / size + switch value := sums[i].(type) { + case int: + sums[i] = value / int(size) + case int32: + sums[i] = value / int32(size) + case int64: + sums[i] = value / int64(size) + case float32: + sums[i] = value / float32(size) + case float64: + sums[i] = value / float64(size) + default: + continue + } } aggrMetric, err := telegraf.NewMetric( d.Name, map[string]string{}, - fields, + sums, time.Now(), ) return aggrMetric, err diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 8d5288ede..d1f1acb56 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -41,19 +42,41 @@ func TestHTTPInflux(t *testing.T) { require.NoError(t, err) } -/* -func TestInfluxDS(t *testing.T) { - downsampler := &DS{ - TimeRange: time.Minute, - } - i := InfluxDB{ - URLs: []string{"udp://localhost:8089"}, - DS: downsampler, - } +func TestDownsampling_mean(t *testing.T) { + ds := &Downsampling{} + metricA, err := telegraf.NewMetric( + "earthshaker", + map[string]string{}, + map[string]interface{}{ + "damage": "high", + "agility": 12, + "strength": 120, + "intelligence": 60, + }, + time.Now(), + ) - err := i.Connect() + metricB, err := telegraf.NewMetric( + "sven", + map[string]string{}, + map[string]interface{}{ + "strength": 80, + "intelligence": 140, + }, + time.Now(), + ) require.NoError(t, err) - i.DS.Add(testutil.MockMetrics()) + err = ds.Add(metricA) + require.NoError(t, err) + + err = ds.Add(metricB) + require.NoError(t, err) + + aggr, err := ds.Mean("strength", "intelligence", "power") + require.NoError(t, err) + + require.Equal(t, int64(100), aggr.Fields()["strength"]) + require.Equal(t, int64(100), aggr.Fields()["intelligence"]) + require.Equal(t, int64(0), aggr.Fields()["power"]) } -*/