From f29e6b760ecf330c5cfae1a959d69a518e504dd1 Mon Sep 17 00:00:00 2001 From: Maksadbek Date: Thu, 14 Jul 2016 09:33:21 +0500 Subject: [PATCH] downsampling initial commit --- plugins/outputs/influxdb/influxdb.go | 50 +++++++++++++++++++++-- plugins/outputs/influxdb/influxdb_test.go | 16 ++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 1d6110b34..57f251e5f 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/url" "strings" + "sync" "time" "github.com/influxdata/telegraf" @@ -28,6 +29,7 @@ type InfluxDB struct { WriteConsistency string Timeout internal.Duration UDPPayload int `toml:"udp_payload"` + DS *DS // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -190,6 +192,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } + i.DS.Add(metrics) + for _, metric := range metrics { bp.AddPoint(metric.Point()) } @@ -218,10 +222,50 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } +// Downsampling +type Downsampling struct { + sync.RWMutex + Metrics []telegraf.Metric + Since time.Time + TimeRange time.Duration +} + +type Aggregation struct { + Fieldname string +} + +func (d *Downsampling) Add(metrics []telegraf.Metric) error { + d.Lock() + d.Metrics = append(d.Metrics, metrics...) + after := metrics[len(metrics)-1].Time() + if d.Since.Sub(after) >= d.TimeRange { + d.Aggregate() + } + d.Unlock() + return nil +} + +func (d *Downsampling) Run() { + for { + } +} + +// Aggregate calculates the mean value of fields by given time +func (d *Downsampling) Aggregate(fields ...string) []telegraf.Metric { + for _, metric := range d.Metrics { + fmt.Printf("%+v\n", metric.Fields()) + fmt.Printf("%+v\n", metric.Point()) + fmt.Printf("%+v\n", metric.Time()) + } + return nil +} + func init() { + influxdb := &InfluxDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + DS: new(DS), + } outputs.Add("influxdb", func() telegraf.Output { - return &InfluxDB{ - Timeout: internal.Duration{Duration: time.Second * 5}, - } + return influxdb }) } diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 1414fa839..4df6e8708 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/influxdata/telegraf/testutil" @@ -39,3 +40,18 @@ func TestHTTPInflux(t *testing.T) { err = i.Write(testutil.MockMetrics()) require.NoError(t, err) } + +func TestInfluxDS(t *testing.T) { + downsampler := &DS{ + TimeRange: time.Minute, + } + i := InfluxDB{ + URLs: []string{"udp://localhost:8089"}, + DS: downsampler, + } + + err := i.Connect() + require.NoError(t, err) + + i.DS.Add(testutil.MockMetrics()) +}