diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 3729a1724..5a02ed4c6 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -29,7 +29,10 @@ type InfluxDB struct { WriteConsistency string Timeout internal.Duration UDPPayload int `toml:"udp_payload"` - Downsampler *Downsampling + + Downsampler *Downsampling + DownsamplingName string `toml:"downsampling_name"` + DownsamplingInterval int `toml:"downsampling_interval"` // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -76,6 +79,11 @@ var sampleConfig = ` # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification # insecure_skip_verify = false + + ## Downsampling time interval(in minutes), by default it is 0, and turned off. + ## if the value is greater than 0, then it starts aggregating metrics + ## and writing into separate measurement. + # downsampling_interval = 0 ` func (i *InfluxDB) Connect() error { @@ -211,7 +219,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } - i.Downsampler.Add(metrics...) + // if the downsampling is enabled(the interval is not 0) + // then add metric into the slice + if i.DownsamplingInterval != 0 { + i.Downsampler.Add(metrics...) + } err = i.flush(bp) return err } @@ -234,11 +246,16 @@ func (i *InfluxDB) batchPointsFromMetrics(metrics ...telegraf.Metric) (client.Ba } func (i *InfluxDB) Run() { - tick := time.Tick(i.Downsampler.TimeRange) + // if the DownsamplingInterval interval is not 0 + // then it is enabled, otherwise skip downsampling + if i.DownsamplingInterval == 0 { + return + } + tick := time.Tick(time.Minute * time.Duration(i.DownsamplingInterval)) for { select { case <-tick: - aggrData, err := i.Downsampler.Aggregate() + aggrData, err := i.Downsampler.Aggregate(i.DownsamplingName) if err != nil { continue } @@ -269,10 +286,8 @@ func (i *InfluxDB) Run() { func init() { influxdb := &InfluxDB{ - Timeout: internal.Duration{Duration: time.Second * 5}, - Downsampler: &Downsampling{ - TimeRange: time.Duration(time.Minute * 2), - }, + Timeout: internal.Duration{Duration: time.Second * 5}, + Downsampler: NewDownsampler(), } go influxdb.Run() outputs.Add("influxdb", func() telegraf.Output { @@ -283,16 +298,12 @@ func init() { // Downsampling type Downsampling struct { sync.RWMutex - Name string Metrics []telegraf.Metric - TimeRange time.Duration Aggregations map[string][]Aggregation } -func NewDownsampling(name string, timeRange time.Duration) *Downsampling { +func NewDownsampler() *Downsampling { return &Downsampling{ - Name: name, - TimeRange: timeRange, Aggregations: make(map[string][]Aggregation), } } @@ -329,7 +340,7 @@ func (d *Downsampling) Add(metrics ...telegraf.Metric) { } // Aggregate calculates the mean value of fields by given time -func (d *Downsampling) Aggregate() (telegraf.Metric, error) { +func (d *Downsampling) Aggregate(name string) (telegraf.Metric, error) { metrics := map[string]interface{}{} var ( aggrMetric, sum, mean telegraf.Metric @@ -338,12 +349,12 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) { for name, aggr := range d.Aggregations { switch name { case "sum": - sum, err = d.Sum(aggr...) + sum, err = d.Sum(name, aggr...) if err != nil { return aggrMetric, err } case "mean": - mean, err = d.Mean(aggr...) + mean, err = d.Mean(name, aggr...) if err != nil { return aggrMetric, err } @@ -364,7 +375,7 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) { } aggrMetric, err = telegraf.NewMetric( - d.Name, + name, map[string]string{}, metrics, time.Now(), @@ -373,7 +384,7 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) { } // Sum calculate the sum values of given fields -func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) { +func (d *Downsampling) Sum(name string, fields ...Aggregation) (telegraf.Metric, error) { var ( sumMetric telegraf.Metric sums = make(map[string]interface{}) @@ -412,7 +423,7 @@ func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) { d.RUnlock() sumMetric, err := telegraf.NewMetric( - d.Name, + name, map[string]string{}, sums, time.Now(), @@ -421,7 +432,7 @@ func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) { } // Mean calculates the mean values of given fields -func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) { +func (d *Downsampling) Mean(name string, fields ...Aggregation) (telegraf.Metric, error) { var ( aggrMetric telegraf.Metric sums = make(map[string]interface{}) @@ -478,7 +489,7 @@ func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) { } aggrMetric, err := telegraf.NewMetric( - d.Name, + name, map[string]string{}, sums, time.Now(), diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 669d947d9..15427d2f4 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -49,7 +49,7 @@ func TestHTTPInflux(t *testing.T) { } func TestDownsampling_mean(t *testing.T) { - ds := NewDownsampling("downsampling", time.Minute) + ds := NewDownsampler() ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(80)) @@ -61,14 +61,14 @@ func TestDownsampling_mean(t *testing.T) { }, } - aggr, err := ds.Mean(aggregations...) + aggr, err := ds.Mean("downsampled", aggregations...) require.NoError(t, err) require.Equal(t, int64(100), aggr.Fields()["mean_value"]) } func TestDownsampling_sum(t *testing.T) { - ds := NewDownsampling("downsampling", time.Minute) + ds := NewDownsampler() ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(80)) @@ -79,14 +79,14 @@ func TestDownsampling_sum(t *testing.T) { Alias: "sum_value", }, } - aggr, err := ds.Sum(aggregations...) + aggr, err := ds.Sum("downsampled", aggregations...) require.NoError(t, err) require.Equal(t, int64(200), aggr.Fields()["sum_value"]) } func TestDownsampling_aggregate(t *testing.T) { - ds := NewDownsampling("downsampling", time.Minute) + ds := NewDownsampler() ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(80)) @@ -106,7 +106,7 @@ func TestDownsampling_aggregate(t *testing.T) { ds.AddAggregations(aggregations...) - aggr, err := ds.Aggregate() + aggr, err := ds.Aggregate("downsampled") require.NoError(t, err) require.Equal(t, int64(100), aggr.Fields()["mean_value"]) @@ -158,10 +158,7 @@ func TestDownsampling_run(t *testing.T) { })) defer ts.Close() - downsampler := &Downsampling{ - TimeRange: time.Duration(time.Second * 10), - Name: "downsampling", - } + downsampler := &Downsampling{} downsampler.Aggregations = make(map[string][]Aggregation) downsampler.AddAggregations(Aggregation{ @@ -171,8 +168,10 @@ func TestDownsampling_run(t *testing.T) { }) influxdb := &InfluxDB{ - Downsampler: downsampler, - URLs: []string{ts.URL}, + Downsampler: downsampler, + DownsamplingName: "downsampled", + DownsamplingInterval: 1, + URLs: []string{ts.URL}, } go influxdb.Run()