InfluxDB output plugin: option to write metrics breaking into batches
This commit is contained in:
parent
5b43901bd8
commit
bf57f1e74c
|
@ -16,6 +16,11 @@ import (
|
||||||
"github.com/influxdata/influxdb/client/v2"
|
"github.com/influxdata/influxdb/client/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Size of the fields in each metrics
|
||||||
|
metricFieldNum = 25
|
||||||
|
)
|
||||||
|
|
||||||
type InfluxDB struct {
|
type InfluxDB struct {
|
||||||
// URL is only for backwards compatability
|
// URL is only for backwards compatability
|
||||||
URL string
|
URL string
|
||||||
|
@ -37,6 +42,10 @@ type InfluxDB struct {
|
||||||
SSLKey string `toml:"ssl_key"`
|
SSLKey string `toml:"ssl_key"`
|
||||||
// Use SSL but skip chain & host verification
|
// Use SSL but skip chain & host verification
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
// The limit for measurement size
|
||||||
|
// when the size exceeds given limit,
|
||||||
|
// points are splitted into multiple parts.
|
||||||
|
MetricFieldLimit int `toml:"metric_field_limit"`
|
||||||
|
|
||||||
// Precision is only here for legacy support. It will be ignored.
|
// Precision is only here for legacy support. It will be ignored.
|
||||||
Precision string
|
Precision string
|
||||||
|
@ -74,6 +83,11 @@ var sampleConfig = `
|
||||||
# ssl_key = "/etc/telegraf/key.pem"
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
## Use SSL but skip chain & host verification
|
## Use SSL but skip chain & host verification
|
||||||
# insecure_skip_verify = false
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## The limit for measurement size, when the size exceeds given limit,
|
||||||
|
## points are splitted into multiple parts before writing into InfluxDB.
|
||||||
|
## To enable this, set it >0
|
||||||
|
# metric_field_limit = 0
|
||||||
`
|
`
|
||||||
|
|
||||||
func (i *InfluxDB) Connect() error {
|
func (i *InfluxDB) Connect() error {
|
||||||
|
@ -194,30 +208,116 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
bp.AddPoint(metric.Point())
|
bp.AddPoint(metric.Point())
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will get set to nil if a successful write occurs
|
|
||||||
err = errors.New("Could not write to any InfluxDB server in cluster")
|
|
||||||
|
|
||||||
p := rand.Perm(len(i.conns))
|
p := rand.Perm(len(i.conns))
|
||||||
|
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if e := i.conns[n].Write(bp); e != nil {
|
conn := i.conns[n]
|
||||||
// Log write failure
|
// if the connection is UDP and measurements has over i.MetricFieldLimit fields
|
||||||
log.Printf("ERROR: %s", e)
|
// then, write breaking into small portions
|
||||||
// If the database was not found, try to recreate it
|
if strings.HasPrefix(i.URLs[n], "udp") && i.MetricFieldLimit != 0 {
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
newbp, err := i.splitPoints(bp)
|
||||||
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: Database %s not found and failed to recreate\n",
|
return err
|
||||||
i.Database)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
err = i.flush(conn, newbp)
|
||||||
} else {
|
} else {
|
||||||
err = nil
|
err = i.flush(conn, bp)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flush writes batch points to the given connection
|
||||||
|
func (i *InfluxDB) flush(conn client.Client, bp client.BatchPoints) error {
|
||||||
|
// initial error, if write successes then error will be a nil
|
||||||
|
err := errors.New("Could not write to any InfluxDB server in cluster")
|
||||||
|
if e := conn.Write(bp); e != nil {
|
||||||
|
// Log write failure
|
||||||
|
log.Printf("ERROR: %s", e)
|
||||||
|
// If the database was not found, try to recreate it
|
||||||
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
|
if errc := createDatabase(conn, i.Database); errc != nil {
|
||||||
|
log.Printf("ERROR: Database %s not found and failed to recreate\n",
|
||||||
|
i.Database)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitPoints splits all measurements of point into multiple batches,
|
||||||
|
// the size of each batch will be metricFieldNum
|
||||||
|
// returns client.BatchPoints with splitted points and error value
|
||||||
|
func (i *InfluxDB) splitPoints(bp client.BatchPoints) (client.BatchPoints, error) {
|
||||||
|
// create new BatchPoints
|
||||||
|
batchPoints, err := client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
|
Database: i.Database,
|
||||||
|
Precision: i.Precision,
|
||||||
|
RetentionPolicy: i.RetentionPolicy,
|
||||||
|
WriteConsistency: i.WriteConsistency,
|
||||||
|
})
|
||||||
|
// create new Point slice
|
||||||
|
// will hold splitted points
|
||||||
|
points := []*client.Point{}
|
||||||
|
if err != nil {
|
||||||
|
return batchPoints, err
|
||||||
|
}
|
||||||
|
// range over all points
|
||||||
|
for _, point := range bp.Points() {
|
||||||
|
// the length of the fields
|
||||||
|
length := len(point.Fields())
|
||||||
|
// split fields only when its size is over batch size - i.MetricFieldLimit
|
||||||
|
if length > i.MetricFieldLimit {
|
||||||
|
// fields of the point
|
||||||
|
fields := point.Fields()
|
||||||
|
// from 0 to length with metricFieldNum range
|
||||||
|
// on each iteration, the points with metricFieldNum size
|
||||||
|
// are appended to points slice
|
||||||
|
for j := 0; j < length; j += metricFieldNum {
|
||||||
|
start := j
|
||||||
|
stop := j + metricFieldNum
|
||||||
|
// if the range is over length of the slice
|
||||||
|
// then, it is equal to length
|
||||||
|
if stop > length {
|
||||||
|
stop = length
|
||||||
|
}
|
||||||
|
batch := map[string]interface{}{}
|
||||||
|
// iterate over fields and add to batch map
|
||||||
|
for k, v := range fields {
|
||||||
|
// put into the batch
|
||||||
|
batch[k] = v
|
||||||
|
// delete the metric that already retrieved
|
||||||
|
delete(fields, k)
|
||||||
|
// break the loop if we reached the bound
|
||||||
|
if start++; start == stop {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// create a new point, it has the same params that
|
||||||
|
// the parent point(the point that measurements are splitted) has
|
||||||
|
// skip points if creation is failed
|
||||||
|
p, err := client.NewPoint(point.Name(), point.Tags(), batch, point.Time())
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// append to the global points slice
|
||||||
|
points = append(points, p)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
points = append(points, point)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// add splitted points to new BatchPoints
|
||||||
|
batchPoints.AddPoints(points)
|
||||||
|
return batchPoints, nil
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("influxdb", func() telegraf.Output {
|
outputs.Add("influxdb", func() telegraf.Output {
|
||||||
return &InfluxDB{
|
return &InfluxDB{
|
||||||
|
|
Loading…
Reference in New Issue