aggregation data depending on function

This commit is contained in:
Maksadbek 2016-07-23 15:41:58 +03:00
parent c63c12de42
commit 6ef341193f
2 changed files with 148 additions and 96 deletions

View File

@ -238,14 +238,30 @@ func init() {
// Downsampling // Downsampling
type Downsampling struct { type Downsampling struct {
sync.RWMutex sync.RWMutex
Name string Name string
Metrics []telegraf.Metric Metrics []telegraf.Metric
TimeRange time.Duration TimeRange time.Duration
Aggrations Aggregation Aggregations map[string][]Aggregation
} }
// Aggregation maps the field names to aggregation function for them // Aggregation maps the field names to aggregation function for them
type Aggregation map[string]string type Aggregation struct {
FieldName string
FuncName string
Alias string
}
func (d *Downsampling) AddAggregations(aggrs ...Aggregation) {
for _, aggr := range aggrs {
switch aggr.FuncName {
case "mean":
d.Aggregations["mean"] = append(d.Aggregations["mean"], aggr)
case "sum":
d.Aggregations["sum"] = append(d.Aggregations["sum"], aggr)
default:
}
}
}
// Add appends metrics to the metrics that will be aggregated // Add appends metrics to the metrics that will be aggregated
func (d *Downsampling) Add(metrics ...telegraf.Metric) error { func (d *Downsampling) Add(metrics ...telegraf.Metric) error {
@ -261,48 +277,86 @@ func (d *Downsampling) Run() {
for { for {
select { select {
case <-time.After(d.TimeRange): case <-time.After(d.TimeRange):
aggrData := d.Aggregate() aggrData, err := d.Aggregate()
if err != nil {
continue
}
fmt.Printf("%+v\n", aggrData) fmt.Printf("%+v\n", aggrData)
} }
} }
} }
// Aggregate calculates the mean value of fields by given time // Aggregate calculates the mean value of fields by given time
func (d *Downsampling) Aggregate() []telegraf.Metric { func (d *Downsampling) Aggregate() (telegraf.Metric, error) {
metrics := map[string]interface{}{}
var (
aggrMetric, sum, mean telegraf.Metric
err error
)
for name, aggr := range d.Aggregations {
switch name {
case "sum":
sum, err = d.Sum(aggr...)
if err != nil {
return aggrMetric, err
}
case "mean":
mean, err = d.Mean(aggr...)
if err != nil {
return aggrMetric, err
}
default:
}
}
return nil for k, v := range sum.Fields() {
metrics[k] = v
}
for k, v := range mean.Fields() {
metrics[k] = v
}
aggrMetric, err = telegraf.NewMetric(
d.Name,
map[string]string{},
metrics,
time.Now(),
)
return aggrMetric, err
} }
// Sum calcuate the sum values of given fields // Sum calculate the sum values of given fields
func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) { func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) {
var ( var (
sumMetric telegraf.Metric sumMetric telegraf.Metric
sums = make(map[string]interface{}) sums = make(map[string]interface{})
) )
for _, field := range fields { for _, field := range fields {
sums[field] = 0 sums[field.Alias] = 0
} }
d.RLock() d.RLock()
for _, metric := range d.Metrics { for _, metric := range d.Metrics {
for _, fieldName := range fields { for _, field := range fields {
value, ok := metric.Fields()[fieldName] value, ok := metric.Fields()[field.FieldName]
if !ok { if !ok {
continue continue
} }
oldVal := sums[fieldName] oldVal := sums[field.Alias]
switch value := value.(type) { switch value := value.(type) {
case int: case int:
sums[fieldName] = oldVal.(int) + value sums[field.Alias] = oldVal.(int) + value
case int32: case int32:
sums[fieldName] = oldVal.(int32) + value sums[field.Alias] = oldVal.(int32) + value
case int64: case int64:
sums[fieldName] = oldVal.(int) + int(value) // TODO fix this
sums[field.Alias] = oldVal.(int) + int(value)
case float32: case float32:
sums[fieldName] = oldVal.(float32) + value sums[field.Alias] = oldVal.(float32) + value
case float64: case float64:
sums[fieldName] = oldVal.(float64) + value sums[field.Alias] = oldVal.(float64) + value
default: default:
continue continue
} }
@ -320,7 +374,7 @@ func (d *Downsampling) Sum(fields ...string) (telegraf.Metric, error) {
} }
// Mean calculates the mean values of given fields // Mean calculates the mean values of given fields
func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) { func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) {
var ( var (
aggrMetric telegraf.Metric aggrMetric telegraf.Metric
sums = make(map[string]interface{}) sums = make(map[string]interface{})
@ -329,28 +383,29 @@ func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
// initialize sums map // initialize sums map
for _, field := range fields { for _, field := range fields {
sums[field] = 0 sums[field.Alias] = 0
} }
d.RLock() d.RLock()
for _, metric := range d.Metrics { for _, metric := range d.Metrics {
for _, fieldName := range fields { for _, field := range fields {
value, ok := metric.Fields()[fieldName] value, ok := metric.Fields()[field.FieldName]
if !ok { if !ok {
continue continue
} }
oldVal := sums[fieldName] oldVal := sums[field.Alias]
switch value := value.(type) { switch value := value.(type) {
case int: case int:
sums[fieldName] = oldVal.(int) + value sums[field.Alias] = oldVal.(int) + value
case int32: case int32:
sums[fieldName] = oldVal.(int32) + value sums[field.Alias] = oldVal.(int32) + value
case int64: case int64:
sums[fieldName] = oldVal.(int) + int(value) // TODO: fix this
sums[field.Alias] = oldVal.(int) + int(value)
case float32: case float32:
sums[fieldName] = oldVal.(float32) + value sums[field.Alias] = oldVal.(float32) + value
case float64: case float64:
sums[fieldName] = oldVal.(float64) + value sums[field.Alias] = oldVal.(float64) + value
default: default:
continue continue
} }

View File

@ -5,9 +5,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -44,79 +42,78 @@ func TestHTTPInflux(t *testing.T) {
func TestDownsampling_mean(t *testing.T) { func TestDownsampling_mean(t *testing.T) {
ds := &Downsampling{} ds := &Downsampling{}
metricA, err := telegraf.NewMetric(
"earthshaker", err := ds.Add(testutil.TestMetric(120))
map[string]string{}, require.NoError(t, err)
map[string]interface{}{
"damage": "high", err = ds.Add(testutil.TestMetric(80))
"agility": 12, require.NoError(t, err)
"strength": 120,
"intelligence": 60, aggregations := []Aggregation{
Aggregation{
FieldName: "value",
FuncName: "mean",
Alias: "mean_value",
}, },
time.Now(), }
)
aggr, err := ds.Mean(aggregations...)
require.NoError(t, err) require.NoError(t, err)
metricB, err := telegraf.NewMetric( require.Equal(t, int64(100), aggr.Fields()["mean_value"])
"sven",
map[string]string{},
map[string]interface{}{
"strength": 80,
"intelligence": 140,
},
time.Now(),
)
require.NoError(t, err)
err = ds.Add(metricA)
require.NoError(t, err)
err = ds.Add(metricB)
require.NoError(t, err)
aggr, err := ds.Mean("strength", "intelligence", "power")
require.NoError(t, err)
require.Equal(t, int64(100), aggr.Fields()["strength"])
require.Equal(t, int64(100), aggr.Fields()["intelligence"])
require.Equal(t, int64(0), aggr.Fields()["power"])
} }
func TestDownsamling_sum(t *testing.T) { func TestDownsamling_sum(t *testing.T) {
ds := &Downsampling{} ds := &Downsampling{}
metricA, err := telegraf.NewMetric(
"earthshaker", err := ds.Add(testutil.TestMetric(120))
map[string]string{}, require.NoError(t, err)
map[string]interface{}{
"damage": "high", err = ds.Add(testutil.TestMetric(80))
"agility": 12, require.NoError(t, err)
"strength": 120,
"intelligence": 60, aggregations := []Aggregation{
Aggregation{
FieldName: "value",
FuncName: "mean",
Alias: "sum_value",
}, },
time.Now(), }
) aggr, err := ds.Sum(aggregations...)
require.NoError(t, err) require.NoError(t, err)
metricB, err := telegraf.NewMetric( require.Equal(t, int64(200), aggr.Fields()["sum_value"])
"sven", }
map[string]string{},
map[string]interface{}{ func TestDownsampling_aggregate(t *testing.T) {
"strength": 80, ds := &Downsampling{}
"intelligence": 140,
}, err := ds.Add(testutil.TestMetric(120))
time.Now(), require.NoError(t, err)
)
require.NoError(t, err) err = ds.Add(testutil.TestMetric(80))
require.NoError(t, err)
err = ds.Add(metricA)
require.NoError(t, err) aggregations := []Aggregation{
Aggregation{
err = ds.Add(metricB) FieldName: "value",
require.NoError(t, err) FuncName: "mean",
Alias: "mean_value",
aggr, err := ds.Sum("strength", "intelligence", "power") },
require.NoError(t, err) Aggregation{
require.Equal(t, int64(200), aggr.Fields()["strength"]) FieldName: "value",
require.Equal(t, int64(200), aggr.Fields()["intelligence"]) FuncName: "sum",
require.Equal(t, int64(0), aggr.Fields()["power"]) Alias: "sum_value",
},
}
ds.Aggregations = make(map[string][]Aggregation)
ds.AddAggregations(aggregations...)
aggr, err := ds.Aggregate()
require.NoError(t, err)
require.Equal(t, int64(100), aggr.Fields()["mean_value"])
require.Equal(t, int64(200), aggr.Fields()["sum_value"])
} }