getting measurement name and interval for downsampling from configurations

This commit is contained in:
Maksadbek 2016-08-07 19:12:17 +03:00
parent 573c062704
commit 740b9eef6f
2 changed files with 43 additions and 33 deletions

View File

@ -29,7 +29,10 @@ type InfluxDB struct {
WriteConsistency string WriteConsistency string
Timeout internal.Duration Timeout internal.Duration
UDPPayload int `toml:"udp_payload"` UDPPayload int `toml:"udp_payload"`
Downsampler *Downsampling
Downsampler *Downsampling
DownsamplingName string `toml:"downsampling_name"`
DownsamplingInterval int `toml:"downsampling_interval"`
// Path to CA file // Path to CA file
SSLCA string `toml:"ssl_ca"` SSLCA string `toml:"ssl_ca"`
@ -76,6 +79,11 @@ var sampleConfig = `
# ssl_key = "/etc/telegraf/key.pem" # ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification ## Use SSL but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## Downsampling time interval(in minutes), by default it is 0, and turned off.
## if the value is greater than 0, then it starts aggregating metrics
## and writing into separate measurement.
# downsampling_interval = 0
` `
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
@ -211,7 +219,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
return err return err
} }
i.Downsampler.Add(metrics...) // if the downsampling is enabled(the interval is not 0)
// then add metric into the slice
if i.DownsamplingInterval != 0 {
i.Downsampler.Add(metrics...)
}
err = i.flush(bp) err = i.flush(bp)
return err return err
} }
@ -234,11 +246,16 @@ func (i *InfluxDB) batchPointsFromMetrics(metrics ...telegraf.Metric) (client.Ba
} }
func (i *InfluxDB) Run() { func (i *InfluxDB) Run() {
tick := time.Tick(i.Downsampler.TimeRange) // if the DownsamplingInterval interval is not 0
// then it is enabled, otherwise skip downsampling
if i.DownsamplingInterval == 0 {
return
}
tick := time.Tick(time.Minute * time.Duration(i.DownsamplingInterval))
for { for {
select { select {
case <-tick: case <-tick:
aggrData, err := i.Downsampler.Aggregate() aggrData, err := i.Downsampler.Aggregate(i.DownsamplingName)
if err != nil { if err != nil {
continue continue
} }
@ -269,10 +286,8 @@ func (i *InfluxDB) Run() {
func init() { func init() {
influxdb := &InfluxDB{ influxdb := &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5}, Timeout: internal.Duration{Duration: time.Second * 5},
Downsampler: &Downsampling{ Downsampler: NewDownsampler(),
TimeRange: time.Duration(time.Minute * 2),
},
} }
go influxdb.Run() go influxdb.Run()
outputs.Add("influxdb", func() telegraf.Output { outputs.Add("influxdb", func() telegraf.Output {
@ -283,16 +298,12 @@ func init() {
// Downsampling // Downsampling
type Downsampling struct { type Downsampling struct {
sync.RWMutex sync.RWMutex
Name string
Metrics []telegraf.Metric Metrics []telegraf.Metric
TimeRange time.Duration
Aggregations map[string][]Aggregation Aggregations map[string][]Aggregation
} }
func NewDownsampling(name string, timeRange time.Duration) *Downsampling { func NewDownsampler() *Downsampling {
return &Downsampling{ return &Downsampling{
Name: name,
TimeRange: timeRange,
Aggregations: make(map[string][]Aggregation), Aggregations: make(map[string][]Aggregation),
} }
} }
@ -329,7 +340,7 @@ func (d *Downsampling) Add(metrics ...telegraf.Metric) {
} }
// Aggregate calculates the mean value of fields by given time // Aggregate calculates the mean value of fields by given time
func (d *Downsampling) Aggregate() (telegraf.Metric, error) { func (d *Downsampling) Aggregate(name string) (telegraf.Metric, error) {
metrics := map[string]interface{}{} metrics := map[string]interface{}{}
var ( var (
aggrMetric, sum, mean telegraf.Metric aggrMetric, sum, mean telegraf.Metric
@ -338,12 +349,12 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) {
for name, aggr := range d.Aggregations { for name, aggr := range d.Aggregations {
switch name { switch name {
case "sum": case "sum":
sum, err = d.Sum(aggr...) sum, err = d.Sum(name, aggr...)
if err != nil { if err != nil {
return aggrMetric, err return aggrMetric, err
} }
case "mean": case "mean":
mean, err = d.Mean(aggr...) mean, err = d.Mean(name, aggr...)
if err != nil { if err != nil {
return aggrMetric, err return aggrMetric, err
} }
@ -364,7 +375,7 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) {
} }
aggrMetric, err = telegraf.NewMetric( aggrMetric, err = telegraf.NewMetric(
d.Name, name,
map[string]string{}, map[string]string{},
metrics, metrics,
time.Now(), time.Now(),
@ -373,7 +384,7 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) {
} }
// Sum calculate the sum values of given fields // Sum calculate the sum values of given fields
func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) { func (d *Downsampling) Sum(name string, fields ...Aggregation) (telegraf.Metric, error) {
var ( var (
sumMetric telegraf.Metric sumMetric telegraf.Metric
sums = make(map[string]interface{}) sums = make(map[string]interface{})
@ -412,7 +423,7 @@ func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) {
d.RUnlock() d.RUnlock()
sumMetric, err := telegraf.NewMetric( sumMetric, err := telegraf.NewMetric(
d.Name, name,
map[string]string{}, map[string]string{},
sums, sums,
time.Now(), time.Now(),
@ -421,7 +432,7 @@ func (d *Downsampling) Sum(fields ...Aggregation) (telegraf.Metric, error) {
} }
// Mean calculates the mean values of given fields // Mean calculates the mean values of given fields
func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) { func (d *Downsampling) Mean(name string, fields ...Aggregation) (telegraf.Metric, error) {
var ( var (
aggrMetric telegraf.Metric aggrMetric telegraf.Metric
sums = make(map[string]interface{}) sums = make(map[string]interface{})
@ -478,7 +489,7 @@ func (d *Downsampling) Mean(fields ...Aggregation) (telegraf.Metric, error) {
} }
aggrMetric, err := telegraf.NewMetric( aggrMetric, err := telegraf.NewMetric(
d.Name, name,
map[string]string{}, map[string]string{},
sums, sums,
time.Now(), time.Now(),

View File

@ -49,7 +49,7 @@ func TestHTTPInflux(t *testing.T) {
} }
func TestDownsampling_mean(t *testing.T) { func TestDownsampling_mean(t *testing.T) {
ds := NewDownsampling("downsampling", time.Minute) ds := NewDownsampler()
ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80)) ds.Add(testutil.TestMetric(80))
@ -61,14 +61,14 @@ func TestDownsampling_mean(t *testing.T) {
}, },
} }
aggr, err := ds.Mean(aggregations...) aggr, err := ds.Mean("downsampled", aggregations...)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(100), aggr.Fields()["mean_value"]) require.Equal(t, int64(100), aggr.Fields()["mean_value"])
} }
func TestDownsampling_sum(t *testing.T) { func TestDownsampling_sum(t *testing.T) {
ds := NewDownsampling("downsampling", time.Minute) ds := NewDownsampler()
ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80)) ds.Add(testutil.TestMetric(80))
@ -79,14 +79,14 @@ func TestDownsampling_sum(t *testing.T) {
Alias: "sum_value", Alias: "sum_value",
}, },
} }
aggr, err := ds.Sum(aggregations...) aggr, err := ds.Sum("downsampled", aggregations...)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(200), aggr.Fields()["sum_value"]) require.Equal(t, int64(200), aggr.Fields()["sum_value"])
} }
func TestDownsampling_aggregate(t *testing.T) { func TestDownsampling_aggregate(t *testing.T) {
ds := NewDownsampling("downsampling", time.Minute) ds := NewDownsampler()
ds.Add(testutil.TestMetric(120)) ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80)) ds.Add(testutil.TestMetric(80))
@ -106,7 +106,7 @@ func TestDownsampling_aggregate(t *testing.T) {
ds.AddAggregations(aggregations...) ds.AddAggregations(aggregations...)
aggr, err := ds.Aggregate() aggr, err := ds.Aggregate("downsampled")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(100), aggr.Fields()["mean_value"]) require.Equal(t, int64(100), aggr.Fields()["mean_value"])
@ -158,10 +158,7 @@ func TestDownsampling_run(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
downsampler := &Downsampling{ downsampler := &Downsampling{}
TimeRange: time.Duration(time.Second * 10),
Name: "downsampling",
}
downsampler.Aggregations = make(map[string][]Aggregation) downsampler.Aggregations = make(map[string][]Aggregation)
downsampler.AddAggregations(Aggregation{ downsampler.AddAggregations(Aggregation{
@ -171,8 +168,10 @@ func TestDownsampling_run(t *testing.T) {
}) })
influxdb := &InfluxDB{ influxdb := &InfluxDB{
Downsampler: downsampler, Downsampler: downsampler,
URLs: []string{ts.URL}, DownsamplingName: "downsampled",
DownsamplingInterval: 1,
URLs: []string{ts.URL},
} }
go influxdb.Run() go influxdb.Run()