Add database_routing_tag for InfluxDB output plugin
Allows user to configure a tag that, if it exists, will be used as the database name for the metric. X-Github-Closes #397
This commit is contained in:
parent
13ccf420d7
commit
bec0b9ca53
|
@ -17,6 +17,7 @@ type InfluxDB struct {
|
||||||
// URL is only for backwards compatability
|
// URL is only for backwards compatability
|
||||||
URL string
|
URL string
|
||||||
URLs []string `toml:"urls"`
|
URLs []string `toml:"urls"`
|
||||||
|
DBRoutingTag string `toml:"database_routing_tag"`
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
Database string
|
Database string
|
||||||
|
@ -49,6 +50,10 @@ var sampleConfig = `
|
||||||
# user_agent = "telegraf"
|
# user_agent = "telegraf"
|
||||||
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
|
# Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
|
||||||
# udp_payload = 512
|
# udp_payload = 512
|
||||||
|
|
||||||
|
# Route metrics to an InfluxDB database based on value of a tag
|
||||||
|
# note: if this tag has many unique values, write performance may suffer
|
||||||
|
# database_routing_tag = "mytag"
|
||||||
`
|
`
|
||||||
|
|
||||||
func (i *InfluxDB) Connect() error {
|
func (i *InfluxDB) Connect() error {
|
||||||
|
@ -129,27 +134,53 @@ func (i *InfluxDB) Description() string {
|
||||||
// Choose a random server in the cluster to write to until a successful write
|
// Choose a random server in the cluster to write to until a successful write
|
||||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||||
func (i *InfluxDB) Write(points []*client.Point) error {
|
func (i *InfluxDB) Write(points []*client.Point) error {
|
||||||
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
|
var bps map[string]client.BatchPoints = make(map[string]client.BatchPoints)
|
||||||
|
|
||||||
|
// Default database
|
||||||
|
bps["___default"], _ = client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
Database: i.Database,
|
Database: i.Database,
|
||||||
Precision: i.Precision,
|
Precision: i.Precision,
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, point := range points {
|
for _, point := range points {
|
||||||
bp.AddPoint(point)
|
if i.DBRoutingTag != "" {
|
||||||
|
if h, ok := point.Tags()[i.DBRoutingTag]; ok {
|
||||||
|
// Create a new batch point config for this tag value
|
||||||
|
if _, ok := bps[h]; !ok {
|
||||||
|
bps[h], _ = client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
|
Database: h,
|
||||||
|
Precision: i.Precision,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
bps[h].AddPoint(point)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing found in overrides, lets push this into the default bucket
|
||||||
|
bps["___default"].AddPoint(point)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will get set to nil if a successful write occurs
|
// This will get set to nil if a successful write occurs
|
||||||
err := errors.New("Could not write to any InfluxDB server in cluster")
|
err := errors.New("Could not write to any InfluxDB server in cluster")
|
||||||
|
|
||||||
|
for k, bp := range bps {
|
||||||
p := rand.Perm(len(i.conns))
|
p := rand.Perm(len(i.conns))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if e := i.conns[n].Write(bp); e != nil {
|
if e := i.conns[n].Write(bp); e != nil {
|
||||||
log.Println("ERROR: " + e.Error())
|
log.Println("ERROR: " + e.Error())
|
||||||
|
|
||||||
|
// Stop trying immediately if the error is for a missing database
|
||||||
|
// and we are trying a database routing tag
|
||||||
|
if k != "___default" && strings.HasPrefix(e.Error(), "database not found") {
|
||||||
|
err = nil
|
||||||
|
break
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = nil
|
err = nil
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue