diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 24c053b66..18d2074fd 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -174,36 +174,9 @@ func (i *InfluxDB) Description() string { return "Configuration for influxdb server to send metrics to" } -// Choose a random server in the cluster to write to until a successful write -// occurs, logging each unsuccessful. If all servers fail, return error. -func (i *InfluxDB) Write(metrics []telegraf.Metric) error { - if len(i.conns) == 0 { - err := i.Connect() - if err != nil { - return err - } - } - bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: i.Database, - RetentionPolicy: i.RetentionPolicy, - WriteConsistency: i.WriteConsistency, - }) - if err != nil { - return err - } - - err = i.Downsampler.Add(metrics...) - if err != nil { - return err - } - - for _, metric := range metrics { - bp.AddPoint(metric.Point()) - } - +func (i *InfluxDB) flush(bp client.BatchPoints) error { // This will get set to nil if a successful write occurs - err = errors.New("Could not write to any InfluxDB server in cluster") - + err := errors.New("Could not write to any InfluxDB server in cluster") p := rand.Perm(len(i.conns)) for _, n := range p { if e := i.conns[n].Write(bp); e != nil { @@ -221,15 +194,87 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { break } } - return err } +// Choose a random server in the cluster to write to until a successful write +// occurs, logging each unsuccessful. If all servers fail, return error. +func (i *InfluxDB) Write(metrics []telegraf.Metric) error { + if len(i.conns) == 0 { + err := i.Connect() + if err != nil { + return err + } + } + bp, err := i.batchPointsFromMetrics(metrics...) + if err != nil { + return err + } + + i.Downsampler.Add(metrics...) + err = i.flush(bp) + return err +} + +func (i *InfluxDB) batchPointsFromMetrics(metrics ...telegraf.Metric) (client.BatchPoints, error) { + bp, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: i.Database, + RetentionPolicy: i.RetentionPolicy, + WriteConsistency: i.WriteConsistency, + }) + if err != nil { + return bp, err + } + + for _, metric := range metrics { + bp.AddPoint(metric.Point()) + } + + return bp, nil +} + +func (i *InfluxDB) Run() { + tick := time.Tick(i.Downsampler.TimeRange) + for { + select { + case <-tick: + aggrData, err := i.Downsampler.Aggregate() + if err != nil { + continue + } + + i.Downsampler.Lock() + i.Downsampler.Metrics = nil + i.Downsampler.Unlock() + + if len(i.conns) == 0 { + err := i.Connect() + if err != nil { + return + } + } + + bp, err := i.batchPointsFromMetrics(aggrData) + if err != nil { + return + } + + err = i.flush(bp) + if err != nil { + return + } + } + } +} + func init() { influxdb := &InfluxDB{ - Timeout: internal.Duration{Duration: time.Second * 5}, - Downsampler: new(Downsampling), + Timeout: internal.Duration{Duration: time.Second * 5}, + Downsampler: &Downsampling{ + TimeRange: time.Duration(time.Minute * 2), + }, } + go influxdb.Run() outputs.Add("influxdb", func() telegraf.Output { return influxdb }) @@ -244,6 +289,14 @@ type Downsampling struct { Aggregations map[string][]Aggregation } +func NewDownsampling(name string, timeRange time.Duration) *Downsampling { + return &Downsampling{ + Name: name, + TimeRange: timeRange, + Aggregations: make(map[string][]Aggregation), + } +} + // Aggregation maps the field names to aggregation function for them type Aggregation struct { FieldName string @@ -252,6 +305,10 @@ type Aggregation struct { } func (d *Downsampling) AddAggregations(aggrs ...Aggregation) { + if d.Aggregations == nil { + d.Aggregations = make(map[string][]Aggregation) + } + for _, aggr := range aggrs { switch aggr.FuncName { case "mean": @@ -264,26 +321,11 @@ func (d *Downsampling) AddAggregations(aggrs ...Aggregation) { } // Add appends metrics to the metrics that will be aggregated -func (d *Downsampling) Add(metrics ...telegraf.Metric) error { +func (d *Downsampling) Add(metrics ...telegraf.Metric) { d.Lock() d.Metrics = append(d.Metrics, metrics...) d.Unlock() - return nil -} - -// Run starts the downsampler -// it runs periodically -func (d *Downsampling) Run() { - for { - select { - case <-time.After(d.TimeRange): - aggrData, err := d.Aggregate() - if err != nil { - continue - } - fmt.Printf("%+v\n", aggrData) - } - } + return } // Aggregate calculates the mean value of fields by given time @@ -309,12 +351,16 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) { } } - for k, v := range sum.Fields() { - metrics[k] = v + if sum != nil && sum.Fields() != nil { + for k, v := range sum.Fields() { + metrics[k] = v + } } - for k, v := range mean.Fields() { - metrics[k] = v + if mean != nil && mean.Fields() != nil { + for k, v := range mean.Fields() { + metrics[k] = v + } } aggrMetric, err = telegraf.NewMetric( diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index fb880fea3..e8e411b50 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -2,10 +2,16 @@ package influxdb import ( "fmt" + "io/ioutil" + "math/rand" "net/http" "net/http/httptest" + "sync" "testing" + "time" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -13,7 +19,8 @@ import ( func TestUDPInflux(t *testing.T) { i := InfluxDB{ - URLs: []string{"udp://localhost:8089"}, + URLs: []string{"udp://localhost:8089"}, + Downsampler: &Downsampling{}, } err := i.Connect() @@ -31,7 +38,8 @@ func TestHTTPInflux(t *testing.T) { defer ts.Close() i := InfluxDB{ - URLs: []string{ts.URL}, + URLs: []string{ts.URL}, + Downsampler: &Downsampling{}, } err := i.Connect() @@ -41,13 +49,9 @@ func TestHTTPInflux(t *testing.T) { } func TestDownsampling_mean(t *testing.T) { - ds := &Downsampling{} - - err := ds.Add(testutil.TestMetric(120)) - require.NoError(t, err) - - err = ds.Add(testutil.TestMetric(80)) - require.NoError(t, err) + ds := NewDownsampling("downsampling", time.Minute) + ds.Add(testutil.TestMetric(120)) + ds.Add(testutil.TestMetric(80)) aggregations := []Aggregation{ Aggregation{ @@ -64,13 +68,9 @@ func TestDownsampling_mean(t *testing.T) { } func TestDownsampling_sum(t *testing.T) { - ds := &Downsampling{} - - err := ds.Add(testutil.TestMetric(120)) - require.NoError(t, err) - - err = ds.Add(testutil.TestMetric(80)) - require.NoError(t, err) + ds := NewDownsampling("downsampling", time.Minute) + ds.Add(testutil.TestMetric(120)) + ds.Add(testutil.TestMetric(80)) aggregations := []Aggregation{ Aggregation{ @@ -86,13 +86,10 @@ func TestDownsampling_sum(t *testing.T) { } func TestDownsampling_aggregate(t *testing.T) { - ds := &Downsampling{} + ds := NewDownsampling("downsampling", time.Minute) - err := ds.Add(testutil.TestMetric(120)) - require.NoError(t, err) - - err = ds.Add(testutil.TestMetric(80)) - require.NoError(t, err) + ds.Add(testutil.TestMetric(120)) + ds.Add(testutil.TestMetric(80)) aggregations := []Aggregation{ Aggregation{ @@ -107,7 +104,6 @@ func TestDownsampling_aggregate(t *testing.T) { }, } - ds.Aggregations = make(map[string][]Aggregation) ds.AddAggregations(aggregations...) aggr, err := ds.Aggregate() @@ -117,3 +113,88 @@ func TestDownsampling_aggregate(t *testing.T) { require.Equal(t, int64(200), aggr.Fields()["sum_value"]) } + +func TestDownsampling_run(t *testing.T) { + testCase := struct { + sum int + count int + sync.Mutex + }{} + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var createDatabaseQuery = "CREATE DATABASE IF NOT EXISTS \"\"" + + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"results":[{}]}`) + + err := r.ParseForm() + require.NoError(t, err) + + q := r.Form.Get("q") + if q == createDatabaseQuery { + return + } + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + + points, err := models.ParsePoints(body) + require.NoError(t, err) + + if len(points) == 0 { + return + } + + mean, ok := points[0].Fields()["mean_value"] + if !ok { + return + } + + testCase.Lock() + want := testCase.sum / testCase.count + testCase.sum = 0 + testCase.count = 0 + defer testCase.Unlock() + + require.EqualValues(t, want, mean) + + })) + defer ts.Close() + + downsampler := &Downsampling{ + TimeRange: time.Duration(time.Second * 10), + Name: "downsampling", + } + + downsampler.Aggregations = make(map[string][]Aggregation) + downsampler.AddAggregations(Aggregation{ + FieldName: "value", + FuncName: "mean", + Alias: "mean_value", + }) + + influxdb := &InfluxDB{ + Downsampler: downsampler, + URLs: []string{ts.URL}, + } + go influxdb.Run() + + rand.Seed(time.Now().Unix()) + + tick := time.Tick(3 * time.Second) + after := time.After(12 * time.Second) + + for { + select { + case <-tick: + testCase.count += 1 + val := rand.Intn(120) + testCase.sum += val + err := influxdb.Write([]telegraf.Metric{testutil.TestMetric(val)}) + require.NoError(t, err) + case <-after: + return + } + } +}