156 lines
3.0 KiB
Go
156 lines
3.0 KiB
Go
package telegraf
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdb/influxdb/client"
|
|
)
|
|
|
|
// BatchPoints is used to send a batch of data in a single write from telegraf
|
|
// to influx
|
|
type BatchPoints struct {
|
|
mu sync.Mutex
|
|
|
|
client.BatchPoints
|
|
|
|
Debug bool
|
|
|
|
Prefix string
|
|
|
|
Config *ConfiguredPlugin
|
|
}
|
|
|
|
// Add adds a measurement
|
|
func (bp *BatchPoints) Add(
|
|
measurement string,
|
|
val interface{},
|
|
tags map[string]string,
|
|
) {
|
|
bp.mu.Lock()
|
|
defer bp.mu.Unlock()
|
|
|
|
measurement = bp.Prefix + measurement
|
|
|
|
if bp.Config != nil {
|
|
if !bp.Config.ShouldPass(measurement, tags) {
|
|
return
|
|
}
|
|
}
|
|
|
|
if bp.Debug {
|
|
var tg []string
|
|
|
|
for k, v := range tags {
|
|
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
}
|
|
|
|
sort.Strings(tg)
|
|
|
|
fmt.Printf("> [%s] %s value=%v\n", strings.Join(tg, " "), measurement, val)
|
|
}
|
|
|
|
bp.Points = append(bp.Points, client.Point{
|
|
Measurement: measurement,
|
|
Tags: tags,
|
|
Fields: map[string]interface{}{
|
|
"value": val,
|
|
},
|
|
})
|
|
}
|
|
|
|
// AddFieldsWithTime adds a measurement with a provided timestamp
|
|
func (bp *BatchPoints) AddFieldsWithTime(
|
|
measurement string,
|
|
fields map[string]interface{},
|
|
tags map[string]string,
|
|
timestamp time.Time,
|
|
) {
|
|
// TODO this function should add the fields with the timestamp, but that will
|
|
// need to wait for the InfluxDB point precision/unit to be fixed
|
|
bp.AddFields(measurement, fields, tags)
|
|
// bp.mu.Lock()
|
|
// defer bp.mu.Unlock()
|
|
|
|
// measurement = bp.Prefix + measurement
|
|
|
|
// if bp.Config != nil {
|
|
// if !bp.Config.ShouldPass(measurement, tags) {
|
|
// return
|
|
// }
|
|
// }
|
|
|
|
// if bp.Debug {
|
|
// var tg []string
|
|
|
|
// for k, v := range tags {
|
|
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
// }
|
|
|
|
// var vals []string
|
|
|
|
// for k, v := range fields {
|
|
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
|
// }
|
|
|
|
// sort.Strings(tg)
|
|
// sort.Strings(vals)
|
|
|
|
// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
|
|
// }
|
|
|
|
// bp.Points = append(bp.Points, client.Point{
|
|
// Measurement: measurement,
|
|
// Tags: tags,
|
|
// Fields: fields,
|
|
// Time: timestamp,
|
|
// })
|
|
}
|
|
|
|
// AddFields will eventually replace the Add function, once we move to having a
|
|
// single plugin as a single measurement with multiple fields
|
|
func (bp *BatchPoints) AddFields(
|
|
measurement string,
|
|
fields map[string]interface{},
|
|
tags map[string]string,
|
|
) {
|
|
bp.mu.Lock()
|
|
defer bp.mu.Unlock()
|
|
|
|
measurement = bp.Prefix + measurement
|
|
|
|
if bp.Config != nil {
|
|
if !bp.Config.ShouldPass(measurement, tags) {
|
|
return
|
|
}
|
|
}
|
|
|
|
if bp.Debug {
|
|
var tg []string
|
|
|
|
for k, v := range tags {
|
|
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
|
|
}
|
|
|
|
var vals []string
|
|
|
|
for k, v := range fields {
|
|
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
|
|
}
|
|
|
|
sort.Strings(tg)
|
|
sort.Strings(vals)
|
|
|
|
fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
|
|
}
|
|
|
|
bp.Points = append(bp.Points, client.Point{
|
|
Measurement: measurement,
|
|
Tags: tags,
|
|
Fields: fields,
|
|
})
|
|
}
|