mean calculation

This commit is contained in:
Maksadbek 2016-07-15 12:17:20 +05:00
parent f29e6b760e
commit b3f52e84e4
2 changed files with 70 additions and 25 deletions

View File

@ -29,7 +29,7 @@ type InfluxDB struct {
WriteConsistency string
Timeout internal.Duration
UDPPayload int `toml:"udp_payload"`
DS *DS
Downsampler *Downsampling
// Path to CA file
SSLCA string `toml:"ssl_ca"`
@ -192,7 +192,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err
}
i.DS.Add(metrics)
i.Downsampler.Add(metrics)
for _, metric := range metrics {
bp.AddPoint(metric.Point())
@ -222,50 +222,93 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err
}
func init() {
influxdb := &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
Downsampler: new(Downsampling),
}
outputs.Add("influxdb", func() telegraf.Output {
return influxdb
})
}
// Downsampling
type Downsampling struct {
sync.RWMutex
Metrics []telegraf.Metric
Since time.Time
TimeRange time.Duration
Name string
Metrics []telegraf.Metric
TimeRange time.Duration
Aggrations Aggregation
}
type Aggregation struct {
Fieldname string
}
type Aggregation map[string]string
func (d *Downsampling) Add(metrics []telegraf.Metric) error {
d.Lock()
d.Metrics = append(d.Metrics, metrics...)
after := metrics[len(metrics)-1].Time()
if d.Since.Sub(after) >= d.TimeRange {
d.Aggregate()
}
d.Unlock()
return nil
}
func (d *Downsampling) Run() {
for {
select {
case <-time.After(d.TimeRange):
aggrData := d.Aggregate()
fmt.Printf("%+v\n", aggrData)
}
}
}
// Aggregate calculates the mean value of fields by given time
func (d *Downsampling) Aggregate(fields ...string) []telegraf.Metric {
for _, metric := range d.Metrics {
fmt.Printf("%+v\n", metric.Fields())
fmt.Printf("%+v\n", metric.Point())
fmt.Printf("%+v\n", metric.Time())
}
func (d *Downsampling) Aggregate() []telegraf.Metric {
return nil
}
func init() {
influxdb := &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
DS: new(DS),
func (d *Downsampling) Mean(fields ...string) (telegraf.Metric, error) {
var (
aggrMetric telegraf.Metric
sums = make(map[string]interface{})
size = len(d.Metrics)
)
// initialize sums map
for _, field := range fields {
sums[field] = 0
}
outputs.Add("influxdb", func() telegraf.Output {
return influxdb
})
d.RLock()
for _, metric := range d.Metrics {
for fieldName, value := range metric.Fields() {
oldVal := sums[fieldName]
switch value := value.(type) {
case int:
sums[fieldName] = oldVal.(int) + value
case int32:
sums[fieldName] = oldVal.(int32) + value
case int64:
sums[fieldName] = oldVal.(int64) + value
case float32:
sums[fieldName] = oldVal.(float32) + value
case float64:
sums[fieldName] = oldVal.(float64) + value
default:
continue
}
}
}
d.RUnlock()
for i := range sums {
sums[i] = sums[i] / size
}
aggrMetric, err := telegraf.NewMetric(
d.Name,
map[string]string{},
fields,
time.Now(),
)
return aggrMetric, err
}

View File

@ -41,6 +41,7 @@ func TestHTTPInflux(t *testing.T) {
require.NoError(t, err)
}
/*
func TestInfluxDS(t *testing.T) {
downsampler := &DS{
TimeRange: time.Minute,
@ -55,3 +56,4 @@ func TestInfluxDS(t *testing.T) {
i.DS.Add(testutil.MockMetrics())
}
*/