2015-05-22 23:45:14 +00:00
|
|
|
package telegraf
|
2015-04-06 16:32:10 +00:00
|
|
|
|
2015-04-07 00:24:24 +00:00
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sort"
|
|
|
|
"strings"
|
2015-06-22 01:16:46 +00:00
|
|
|
"sync"
|
2015-05-27 05:14:42 +00:00
|
|
|
"time"
|
2015-04-07 00:24:24 +00:00
|
|
|
|
|
|
|
"github.com/influxdb/influxdb/client"
|
|
|
|
)
|
2015-04-06 16:32:10 +00:00
|
|
|
|
2015-08-04 14:58:32 +00:00
|
|
|
// BatchPoints is used to send a batch of data in a single write from telegraf
|
|
|
|
// to influx
|
2015-04-06 16:32:10 +00:00
|
|
|
type BatchPoints struct {
|
2015-06-22 01:16:46 +00:00
|
|
|
mu sync.Mutex
|
|
|
|
|
2015-04-06 16:32:10 +00:00
|
|
|
client.BatchPoints
|
2015-04-07 00:24:24 +00:00
|
|
|
|
|
|
|
Debug bool
|
2015-05-18 19:15:15 +00:00
|
|
|
|
|
|
|
Prefix string
|
2015-05-20 05:19:32 +00:00
|
|
|
|
|
|
|
Config *ConfiguredPlugin
|
2015-04-06 16:32:10 +00:00
|
|
|
}
|
|
|
|
|
2015-08-04 14:58:32 +00:00
|
|
|
// Add adds a measurement
|
2015-05-29 20:25:48 +00:00
|
|
|
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
|
2015-06-22 01:16:46 +00:00
|
|
|
bp.mu.Lock()
|
|
|
|
defer bp.mu.Unlock()
|
|
|
|
|
2015-05-29 20:25:48 +00:00
|
|
|
measurement = bp.Prefix + measurement
|
2015-05-18 19:15:15 +00:00
|
|
|
|
2015-05-20 05:19:32 +00:00
|
|
|
if bp.Config != nil {
|
2015-05-29 20:25:48 +00:00
|
|
|
if !bp.Config.ShouldPass(measurement) {
|
2015-05-20 05:19:32 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-04-07 00:24:24 +00:00
|
|
|
if bp.Debug {
|
|
|
|
var tg []string
|
|
|
|
|
|
|
|
for k, v := range tags {
|
|
|
|
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Strings(tg)
|
|
|
|
|
2015-05-29 20:25:48 +00:00
|
|
|
fmt.Printf("> [%s] %s value=%v\n", strings.Join(tg, " "), measurement, val)
|
2015-04-07 00:24:24 +00:00
|
|
|
}
|
|
|
|
|
2015-04-06 16:32:10 +00:00
|
|
|
bp.Points = append(bp.Points, client.Point{
|
2015-05-29 20:25:48 +00:00
|
|
|
Measurement: measurement,
|
|
|
|
Tags: tags,
|
2015-04-06 16:32:10 +00:00
|
|
|
Fields: map[string]interface{}{
|
|
|
|
"value": val,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
}
|
2015-05-27 05:14:42 +00:00
|
|
|
|
2015-08-04 14:58:32 +00:00
|
|
|
// AddValuesWithTime adds a measurement with a provided timestamp
|
2015-05-27 05:14:42 +00:00
|
|
|
func (bp *BatchPoints) AddValuesWithTime(
|
2015-05-29 20:25:48 +00:00
|
|
|
measurement string,
|
2015-05-27 05:14:42 +00:00
|
|
|
values map[string]interface{},
|
|
|
|
tags map[string]string,
|
|
|
|
timestamp time.Time,
|
|
|
|
) {
|
2015-06-22 01:16:46 +00:00
|
|
|
bp.mu.Lock()
|
|
|
|
defer bp.mu.Unlock()
|
|
|
|
|
2015-05-29 20:25:48 +00:00
|
|
|
measurement = bp.Prefix + measurement
|
2015-05-27 05:14:42 +00:00
|
|
|
|
|
|
|
if bp.Config != nil {
|
2015-05-29 20:25:48 +00:00
|
|
|
if !bp.Config.ShouldPass(measurement) {
|
2015-05-27 05:14:42 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if bp.Debug {
|
|
|
|
var tg []string
|
|
|
|
|
|
|
|
for k, v := range tags {
|
|
|
|
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
|
|
}
|
|
|
|
|
|
|
|
var vals []string
|
|
|
|
|
|
|
|
for k, v := range values {
|
|
|
|
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Strings(tg)
|
|
|
|
sort.Strings(vals)
|
|
|
|
|
2015-05-29 20:25:48 +00:00
|
|
|
fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
|
2015-05-27 05:14:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bp.Points = append(bp.Points, client.Point{
|
2015-05-29 20:25:48 +00:00
|
|
|
Measurement: measurement,
|
|
|
|
Tags: tags,
|
|
|
|
Fields: values,
|
|
|
|
Time: timestamp,
|
2015-05-27 05:14:42 +00:00
|
|
|
})
|
|
|
|
}
|