downsampling initial commit

This commit is contained in:
Maksadbek 2016-07-14 09:33:21 +05:00
parent 26315bfbea
commit f29e6b760e
2 changed files with 63 additions and 3 deletions

View File

@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/url" "net/url"
"strings" "strings"
"sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -28,6 +29,7 @@ type InfluxDB struct {
WriteConsistency string WriteConsistency string
Timeout internal.Duration Timeout internal.Duration
UDPPayload int `toml:"udp_payload"` UDPPayload int `toml:"udp_payload"`
DS *DS
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -190,6 +192,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err return err
} }
i.DS.Add(metrics)
for _, metric := range metrics { for _, metric := range metrics {
bp.AddPoint(metric.Point()) bp.AddPoint(metric.Point())
} }
@ -218,10 +222,50 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err return err
} }
// Downsampling
type Downsampling struct {
sync.RWMutex
Metrics []telegraf.Metric
Since time.Time
TimeRange time.Duration
}
type Aggregation struct {
Fieldname string
}
func (d *Downsampling) Add(metrics []telegraf.Metric) error {
d.Lock()
d.Metrics = append(d.Metrics, metrics...)
after := metrics[len(metrics)-1].Time()
if d.Since.Sub(after) >= d.TimeRange {
d.Aggregate()
}
d.Unlock()
return nil
}
func (d *Downsampling) Run() {
for {
}
}
// Aggregate calculates the mean value of fields by given time
func (d *Downsampling) Aggregate(fields ...string) []telegraf.Metric {
for _, metric := range d.Metrics {
fmt.Printf("%+v\n", metric.Fields())
fmt.Printf("%+v\n", metric.Point())
fmt.Printf("%+v\n", metric.Time())
}
return nil
}
func init() { func init() {
influxdb := &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
DS: new(DS),
}
outputs.Add("influxdb", func() telegraf.Output { outputs.Add("influxdb", func() telegraf.Output {
return &InfluxDB{ return influxdb
Timeout: internal.Duration{Duration: time.Second * 5},
}
}) })
} }

View File

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -39,3 +40,18 @@ func TestHTTPInflux(t *testing.T) {
err = i.Write(testutil.MockMetrics()) err = i.Write(testutil.MockMetrics())
require.NoError(t, err) require.NoError(t, err)
} }
func TestInfluxDS(t *testing.T) {
downsampler := &DS{
TimeRange: time.Minute,
}
i := InfluxDB{
URLs: []string{"udp://localhost:8089"},
DS: downsampler,
}
err := i.Connect()
require.NoError(t, err)
i.DS.Add(testutil.MockMetrics())
}