diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index d2c0523c7..685589348 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -16,6 +16,11 @@ import ( "github.com/influxdata/influxdb/client/v2" ) +const ( + // Size of the fields in each metrics + metricFieldNum = 25 +) + type InfluxDB struct { // URL is only for backwards compatability URL string @@ -37,6 +42,10 @@ type InfluxDB struct { SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool + // The limit for measurement size + // when the size exceeds given limit, + // points are splitted into multiple parts. + MetricFieldLimit int `toml:"metric_field_limit"` // Precision is only here for legacy support. It will be ignored. Precision string @@ -74,6 +83,11 @@ var sampleConfig = ` # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification # insecure_skip_verify = false + + ## The limit for measurement size, when the size exceeds given limit, + ## points are splitted into multiple parts before writing into InfluxDB. + ## To enable this, set it >0 + # metric_field_limit = 0 ` func (i *InfluxDB) Connect() error { @@ -194,30 +208,116 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { bp.AddPoint(metric.Point()) } - // This will get set to nil if a successful write occurs - err = errors.New("Could not write to any InfluxDB server in cluster") - p := rand.Perm(len(i.conns)) + for _, n := range p { - if e := i.conns[n].Write(bp); e != nil { - // Log write failure - log.Printf("ERROR: %s", e) - // If the database was not found, try to recreate it - if strings.Contains(e.Error(), "database not found") { - if errc := createDatabase(i.conns[n], i.Database); errc != nil { - log.Printf("ERROR: Database %s not found and failed to recreate\n", - i.Database) - } + conn := i.conns[n] + // if the connection is UDP and measurements has over i.MetricFieldLimit fields + // then, write breaking into small portions + if strings.HasPrefix(i.URLs[n], "udp") && i.MetricFieldLimit != 0 { + newbp, err := i.splitPoints(bp) + if err != nil { + return err } + err = i.flush(conn, newbp) } else { - err = nil + err = i.flush(conn, bp) + } + if err == nil { break } } - return err } +// flush writes batch points to the given connection +func (i *InfluxDB) flush(conn client.Client, bp client.BatchPoints) error { + // initial error, if write successes then error will be a nil + err := errors.New("Could not write to any InfluxDB server in cluster") + if e := conn.Write(bp); e != nil { + // Log write failure + log.Printf("ERROR: %s", e) + // If the database was not found, try to recreate it + if strings.Contains(e.Error(), "database not found") { + if errc := createDatabase(conn, i.Database); errc != nil { + log.Printf("ERROR: Database %s not found and failed to recreate\n", + i.Database) + } + } + } else { + err = nil + } + return err +} + +// splitPoints splits all measurements of point into multiple batches, +// the size of each batch will be metricFieldNum +// returns client.BatchPoints with splitted points and error value +func (i *InfluxDB) splitPoints(bp client.BatchPoints) (client.BatchPoints, error) { + // create new BatchPoints + batchPoints, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: i.Database, + Precision: i.Precision, + RetentionPolicy: i.RetentionPolicy, + WriteConsistency: i.WriteConsistency, + }) + // create new Point slice + // will hold splitted points + points := []*client.Point{} + if err != nil { + return batchPoints, err + } + // range over all points + for _, point := range bp.Points() { + // the length of the fields + length := len(point.Fields()) + // split fields only when its size is over batch size - i.MetricFieldLimit + if length > i.MetricFieldLimit { + // fields of the point + fields := point.Fields() + // from 0 to length with metricFieldNum range + // on each iteration, the points with metricFieldNum size + // are appended to points slice + for j := 0; j < length; j += metricFieldNum { + start := j + stop := j + metricFieldNum + // if the range is over length of the slice + // then, it is equal to length + if stop > length { + stop = length + } + batch := map[string]interface{}{} + // iterate over fields and add to batch map + for k, v := range fields { + // put into the batch + batch[k] = v + // delete the metric that already retrieved + delete(fields, k) + // break the loop if we reached the bound + if start++; start == stop { + break + } + } + // create a new point, it has the same params that + // the parent point(the point that measurements are splitted) has + // skip points if creation is failed + p, err := client.NewPoint(point.Name(), point.Tags(), batch, point.Time()) + if err != nil { + continue + } + // append to the global points slice + points = append(points, p) + } + } else { + points = append(points, point) + } + + } + // add splitted points to new BatchPoints + batchPoints.AddPoints(points) + return batchPoints, nil +} + func init() { outputs.Add("influxdb", func() telegraf.Output { return &InfluxDB{