mean calculation, covered with tests

This commit is contained in:
Maksadbek 2016-07-22 12:17:16 +03:00
parent b3f52e84e4
commit 7fe84a1313
2 changed files with 61 additions and 19 deletions

View File

@ -192,7 +192,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err return err
} }
i.Downsampler.Add(metrics) err = i.Downsampler.Add(metrics...)
if err != nil {
return err
}
for _, metric := range metrics { for _, metric := range metrics {
bp.AddPoint(metric.Point()) bp.AddPoint(metric.Point())
@ -243,7 +246,7 @@ type Downsampling struct {
type Aggregation map[string]string type Aggregation map[string]string
func (d *Downsampling) Add(metrics []telegraf.Metric) error { func (d *Downsampling) Add(metrics ...telegraf.Metric) error {
d.Lock() d.Lock()
d.Metrics = append(d.Metrics, metrics...) d.Metrics = append(d.Metrics, metrics...)
d.Unlock() d.Unlock()
@ -256,7 +259,6 @@ func (d *Downsampling) Run() {
case <-time.After(d.TimeRange): case <-time.After(d.TimeRange):
aggrData := d.Aggregate() aggrData := d.Aggregate()
fmt.Printf("%+v\n", aggrData) fmt.Printf("%+v\n", aggrData)
} }
} }
} }
@ -280,7 +282,11 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
d.RLock() d.RLock()
for _, metric := range d.Metrics { for _, metric := range d.Metrics {
for fieldName, value := range metric.Fields() { for _, fieldName := range fields {
value, ok := metric.Fields()[fieldName]
if !ok {
continue
}
oldVal := sums[fieldName] oldVal := sums[fieldName]
switch value := value.(type) { switch value := value.(type) {
case int: case int:
@ -288,7 +294,7 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
case int32: case int32:
sums[fieldName] = oldVal.(int32) + value sums[fieldName] = oldVal.(int32) + value
case int64: case int64:
sums[fieldName] = oldVal.(int64) + value sums[fieldName] = oldVal.(int) + int(value)
case float32: case float32:
sums[fieldName] = oldVal.(float32) + value sums[fieldName] = oldVal.(float32) + value
case float64: case float64:
@ -301,13 +307,26 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
d.RUnlock() d.RUnlock()
for i := range sums { for i := range sums {
sums[i] = sums[i] / size switch value := sums[i].(type) {
case int:
sums[i] = value / int(size)
case int32:
sums[i] = value / int32(size)
case int64:
sums[i] = value / int64(size)
case float32:
sums[i] = value / float32(size)
case float64:
sums[i] = value / float64(size)
default:
continue
}
} }
aggrMetric, err := telegraf.NewMetric( aggrMetric, err := telegraf.NewMetric(
d.Name, d.Name,
map[string]string{}, map[string]string{},
fields, sums,
time.Now(), time.Now(),
) )
return aggrMetric, err return aggrMetric, err

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -41,19 +42,41 @@ func TestHTTPInflux(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
/* func TestDownsampling_mean(t *testing.T) {
func TestInfluxDS(t *testing.T) { ds := &Downsampling{}
downsampler := &DS{ metricA, err := telegraf.NewMetric(
TimeRange: time.Minute, "earthshaker",
} map[string]string{},
i := InfluxDB{ map[string]interface{}{
URLs: []string{"udp://localhost:8089"}, "damage": "high",
DS: downsampler, "agility": 12,
} "strength": 120,
"intelligence": 60,
},
time.Now(),
)
err := i.Connect() metricB, err := telegraf.NewMetric(
"sven",
map[string]string{},
map[string]interface{}{
"strength": 80,
"intelligence": 140,
},
time.Now(),
)
require.NoError(t, err) require.NoError(t, err)
i.DS.Add(testutil.MockMetrics()) err = ds.Add(metricA)
require.NoError(t, err)
err = ds.Add(metricB)
require.NoError(t, err)
aggr, err := ds.Mean("strength", "intelligence", "power")
require.NoError(t, err)
require.Equal(t, int64(100), aggr.Fields()["strength"])
require.Equal(t, int64(100), aggr.Fields()["intelligence"])
require.Equal(t, int64(0), aggr.Fields()["power"])
} }
*/