sum aggregation function

This commit is contained in:
Maksadbek 2016-07-22 14:15:41 +03:00
parent 7fe84a1313
commit c63c12de42
2 changed files with 92 additions and 0 deletions

View File

@ -244,8 +244,10 @@ type Downsampling struct {
Aggrations Aggregation
}
// Aggregation maps the field names to aggregation function for them
type Aggregation map[string]string
// Add appends metrics to the metrics that will be aggregated
func (d *Downsampling) Add(metrics ...telegraf.Metric) error {
d.Lock()
d.Metrics = append(d.Metrics, metrics...)
@ -253,6 +255,8 @@ func (d *Downsampling) Add(metrics ...telegraf.Metric) error {
return nil
}
// Run starts the downsampler
// it runs periodically
func (d *Downsampling) Run() {
for {
select {
@ -265,9 +269,57 @@ func (d *Downsampling) Run() {
// Aggregate calculates the mean value of fields by given time
func (d *Downsampling) Aggregate() []telegraf.Metric {
return nil
}
// Sum calcuate the sum values of given fields
func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) {
var (
sumMetric telegraf.Metric
sums = make(map[string]interface{})
)
for _, field := range fields {
sums[field] = 0
}
d.RLock()
for _, metric := range d.Metrics {
for _, fieldName := range fields {
value, ok := metric.Fields()[fieldName]
if !ok {
continue
}
oldVal := sums[fieldName]
switch value := value.(type) {
case int:
sums[fieldName] = oldVal.(int) + value
case int32:
sums[fieldName] = oldVal.(int32) + value
case int64:
sums[fieldName] = oldVal.(int) + int(value)
case float32:
sums[fieldName] = oldVal.(float32) + value
case float64:
sums[fieldName] = oldVal.(float64) + value
default:
continue
}
}
}
d.RUnlock()
sumMetric, err := telegraf.NewMetric(
d.Name,
map[string]string{},
sums,
time.Now(),
)
return sumMetric, err
}
// Mean calculates the mean values of given fields
func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
var (
aggrMetric telegraf.Metric

View File

@ -55,6 +55,7 @@ func TestDownsampling_mean(t *testing.T) {
},
time.Now(),
)
require.NoError(t, err)
metricB, err := telegraf.NewMetric(
"sven",
@ -80,3 +81,42 @@ func TestDownsampling_mean(t *testing.T) {
require.Equal(t, int64(100), aggr.Fields()["intelligence"])
require.Equal(t, int64(0), aggr.Fields()["power"])
}
func TestDownsamling_sum(t *testing.T) {
ds := &Downsampling{}
metricA, err := telegraf.NewMetric(
"earthshaker",
map[string]string{},
map[string]interface{}{
"damage": "high",
"agility": 12,
"strength": 120,
"intelligence": 60,
},
time.Now(),
)
require.NoError(t, err)
metricB, err := telegraf.NewMetric(
"sven",
map[string]string{},
map[string]interface{}{
"strength": 80,
"intelligence": 140,
},
time.Now(),
)
require.NoError(t, err)
err = ds.Add(metricA)
require.NoError(t, err)
err = ds.Add(metricB)
require.NoError(t, err)
aggr, err := ds.Sum("strength", "intelligence", "power")
require.NoError(t, err)
require.Equal(t, int64(200), aggr.Fields()["strength"])
require.Equal(t, int64(200), aggr.Fields()["intelligence"])
require.Equal(t, int64(0), aggr.Fields()["power"])
}