run periodically and aggregate data

This commit is contained in:
Maksadbek 2016-07-26 21:27:30 +03:00
parent 7698ea3f86
commit 6f86c5092f
2 changed files with 203 additions and 76 deletions

View File

@ -174,36 +174,9 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if len(i.conns) == 0 {
err := i.Connect()
if err != nil {
return err
}
}
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})
if err != nil {
return err
}
err = i.Downsampler.Add(metrics...)
if err != nil {
return err
}
for _, metric := range metrics {
bp.AddPoint(metric.Point())
}
func (i *InfluxDB) flush(bp client.BatchPoints) error {
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any InfluxDB server in cluster")
err := errors.New("Could not write to any InfluxDB server in cluster")
p := rand.Perm(len(i.conns))
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
@ -221,15 +194,87 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
break
}
}
return err
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
if len(i.conns) == 0 {
err := i.Connect()
if err != nil {
return err
}
}
bp, err := i.batchPointsFromMetrics(metrics...)
if err != nil {
return err
}
i.Downsampler.Add(metrics...)
err = i.flush(bp)
return err
}
func (i *InfluxDB) batchPointsFromMetrics(metrics ...telegraf.Metric) (client.BatchPoints, error) {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})
if err != nil {
return bp, err
}
for _, metric := range metrics {
bp.AddPoint(metric.Point())
}
return bp, nil
}
func (i *InfluxDB) Run() {
tick := time.Tick(i.Downsampler.TimeRange)
for {
select {
case <-tick:
aggrData, err := i.Downsampler.Aggregate()
if err != nil {
continue
}
i.Downsampler.Lock()
i.Downsampler.Metrics = nil
i.Downsampler.Unlock()
if len(i.conns) == 0 {
err := i.Connect()
if err != nil {
return
}
}
bp, err := i.batchPointsFromMetrics(aggrData)
if err != nil {
return
}
err = i.flush(bp)
if err != nil {
return
}
}
}
}
func init() {
influxdb := &InfluxDB{
Timeout: internal.Duration{Duration: time.Second * 5},
Downsampler: new(Downsampling),
Timeout: internal.Duration{Duration: time.Second * 5},
Downsampler: &Downsampling{
TimeRange: time.Duration(time.Minute * 2),
},
}
go influxdb.Run()
outputs.Add("influxdb", func() telegraf.Output {
return influxdb
})
@ -244,6 +289,14 @@ type Downsampling struct {
Aggregations map[string][]Aggregation
}
func NewDownsampling(name string, timeRange time.Duration) *Downsampling {
return &Downsampling{
Name: name,
TimeRange: timeRange,
Aggregations: make(map[string][]Aggregation),
}
}
// Aggregation maps the field names to aggregation function for them
type Aggregation struct {
FieldName string
@ -252,6 +305,10 @@ type Aggregation struct {
}
func (d *Downsampling) AddAggregations(aggrs ...Aggregation) {
if d.Aggregations == nil {
d.Aggregations = make(map[string][]Aggregation)
}
for _, aggr := range aggrs {
switch aggr.FuncName {
case "mean":
@ -264,26 +321,11 @@ func (d *Downsampling) AddAggregations(aggrs ...Aggregation) {
}
// Add appends metrics to the metrics that will be aggregated
func (d *Downsampling) Add(metrics ...telegraf.Metric) error {
func (d *Downsampling) Add(metrics ...telegraf.Metric) {
d.Lock()
d.Metrics = append(d.Metrics, metrics...)
d.Unlock()
return nil
}
// Run starts the downsampler
// it runs periodically
func (d *Downsampling) Run() {
for {
select {
case <-time.After(d.TimeRange):
aggrData, err := d.Aggregate()
if err != nil {
continue
}
fmt.Printf("%+v\n", aggrData)
}
}
return
}
// Aggregate calculates the mean value of fields by given time
@ -309,12 +351,16 @@ func (d *Downsampling) Aggregate() (telegraf.Metric, error) {
}
}
for k, v := range sum.Fields() {
metrics[k] = v
if sum != nil && sum.Fields() != nil {
for k, v := range sum.Fields() {
metrics[k] = v
}
}
for k, v := range mean.Fields() {
metrics[k] = v
if mean != nil && mean.Fields() != nil {
for k, v := range mean.Fields() {
metrics[k] = v
}
}
aggrMetric, err = telegraf.NewMetric(

View File

@ -2,10 +2,16 @@ package influxdb
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
@ -13,7 +19,8 @@ import (
func TestUDPInflux(t *testing.T) {
i := InfluxDB{
URLs: []string{"udp://localhost:8089"},
URLs: []string{"udp://localhost:8089"},
Downsampler: &Downsampling{},
}
err := i.Connect()
@ -31,7 +38,8 @@ func TestHTTPInflux(t *testing.T) {
defer ts.Close()
i := InfluxDB{
URLs: []string{ts.URL},
URLs: []string{ts.URL},
Downsampler: &Downsampling{},
}
err := i.Connect()
@ -41,13 +49,9 @@ func TestHTTPInflux(t *testing.T) {
}
func TestDownsampling_mean(t *testing.T) {
ds := &Downsampling{}
err := ds.Add(testutil.TestMetric(120))
require.NoError(t, err)
err = ds.Add(testutil.TestMetric(80))
require.NoError(t, err)
ds := NewDownsampling("downsampling", time.Minute)
ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80))
aggregations := []Aggregation{
Aggregation{
@ -64,13 +68,9 @@ func TestDownsampling_mean(t *testing.T) {
}
func TestDownsampling_sum(t *testing.T) {
ds := &Downsampling{}
err := ds.Add(testutil.TestMetric(120))
require.NoError(t, err)
err = ds.Add(testutil.TestMetric(80))
require.NoError(t, err)
ds := NewDownsampling("downsampling", time.Minute)
ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80))
aggregations := []Aggregation{
Aggregation{
@ -86,13 +86,10 @@ func TestDownsampling_sum(t *testing.T) {
}
func TestDownsampling_aggregate(t *testing.T) {
ds := &Downsampling{}
ds := NewDownsampling("downsampling", time.Minute)
err := ds.Add(testutil.TestMetric(120))
require.NoError(t, err)
err = ds.Add(testutil.TestMetric(80))
require.NoError(t, err)
ds.Add(testutil.TestMetric(120))
ds.Add(testutil.TestMetric(80))
aggregations := []Aggregation{
Aggregation{
@ -107,7 +104,6 @@ func TestDownsampling_aggregate(t *testing.T) {
},
}
ds.Aggregations = make(map[string][]Aggregation)
ds.AddAggregations(aggregations...)
aggr, err := ds.Aggregate()
@ -117,3 +113,88 @@ func TestDownsampling_aggregate(t *testing.T) {
require.Equal(t, int64(200), aggr.Fields()["sum_value"])
}
func TestDownsampling_run(t *testing.T) {
testCase := struct {
sum int
count int
sync.Mutex
}{}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var createDatabaseQuery = "CREATE DATABASE IF NOT EXISTS \"\""
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, `{"results":[{}]}`)
err := r.ParseForm()
require.NoError(t, err)
q := r.Form.Get("q")
if q == createDatabaseQuery {
return
}
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
points, err := models.ParsePoints(body)
require.NoError(t, err)
if len(points) == 0 {
return
}
mean, ok := points[0].Fields()["mean_value"]
if !ok {
return
}
testCase.Lock()
want := testCase.sum / testCase.count
testCase.sum = 0
testCase.count = 0
defer testCase.Unlock()
require.EqualValues(t, want, mean)
}))
defer ts.Close()
downsampler := &Downsampling{
TimeRange: time.Duration(time.Second * 10),
Name: "downsampling",
}
downsampler.Aggregations = make(map[string][]Aggregation)
downsampler.AddAggregations(Aggregation{
FieldName: "value",
FuncName: "mean",
Alias: "mean_value",
})
influxdb := &InfluxDB{
Downsampler: downsampler,
URLs: []string{ts.URL},
}
go influxdb.Run()
rand.Seed(time.Now().Unix())
tick := time.Tick(3 * time.Second)
after := time.After(12 * time.Second)
for {
select {
case <-tick:
testCase.count += 1
val := rand.Intn(120)
testCase.sum += val
err := influxdb.Write([]telegraf.Metric{testutil.TestMetric(val)})
require.NoError(t, err)
case <-after:
return
}
}
}