From bec0b9ca536878d2fa33383b7954f1148b765391 Mon Sep 17 00:00:00 2001 From: Tait Clarridge Date: Sun, 29 Nov 2015 08:29:26 -0500 Subject: [PATCH] Add database_routing_tag for InfluxDB output plugin Allows user to configure a tag that, if it exists, will be used as the database name for the metric. X-Github-Closes #397 --- outputs/influxdb/influxdb.go | 67 ++++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index a9fa2edc3..4263dc457 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -15,15 +15,16 @@ import ( type InfluxDB struct { // URL is only for backwards compatability - URL string - URLs []string `toml:"urls"` - Username string - Password string - Database string - UserAgent string - Precision string - Timeout internal.Duration - UDPPayload int `toml:"udp_payload"` + URL string + URLs []string `toml:"urls"` + DBRoutingTag string `toml:"database_routing_tag"` + Username string + Password string + Database string + UserAgent string + Precision string + Timeout internal.Duration + UDPPayload int `toml:"udp_payload"` conns []client.Client } @@ -49,6 +50,10 @@ var sampleConfig = ` # user_agent = "telegraf" # Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes) # udp_payload = 512 + + # Route metrics to an InfluxDB database based on value of a tag + # note: if this tag has many unique values, write performance may suffer + # database_routing_tag = "mytag" ` func (i *InfluxDB) Connect() error { @@ -129,25 +134,51 @@ func (i *InfluxDB) Description() string { // Choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(points []*client.Point) error { - bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + var bps map[string]client.BatchPoints = make(map[string]client.BatchPoints) + + // Default database + bps["___default"], _ = client.NewBatchPoints(client.BatchPointsConfig{ Database: i.Database, Precision: i.Precision, }) for _, point := range points { - bp.AddPoint(point) + if i.DBRoutingTag != "" { + if h, ok := point.Tags()[i.DBRoutingTag]; ok { + // Create a new batch point config for this tag value + if _, ok := bps[h]; !ok { + bps[h], _ = client.NewBatchPoints(client.BatchPointsConfig{ + Database: h, + Precision: i.Precision, + }) + } + bps[h].AddPoint(point) + } + } + + // Nothing found in overrides, lets push this into the default bucket + bps["___default"].AddPoint(point) } // This will get set to nil if a successful write occurs err := errors.New("Could not write to any InfluxDB server in cluster") - p := rand.Perm(len(i.conns)) - for _, n := range p { - if e := i.conns[n].Write(bp); e != nil { - log.Println("ERROR: " + e.Error()) - } else { - err = nil - break + for k, bp := range bps { + p := rand.Perm(len(i.conns)) + for _, n := range p { + if e := i.conns[n].Write(bp); e != nil { + log.Println("ERROR: " + e.Error()) + + // Stop trying immediately if the error is for a missing database + // and we are trying a database routing tag + if k != "___default" && strings.HasPrefix(e.Error(), "database not found") { + err = nil + break + } + } else { + err = nil + break + } } } return err