Only split metrics if there is an udp output (#2799)

This commit is contained in:
Daniel Nelson 2017-05-12 15:34:05 -07:00 committed by Daniel Nelson
parent b16eb6eae6
commit 27b89dff48
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
1 changed files with 17 additions and 5 deletions

View File

@ -2,6 +2,7 @@ package influxdb
import ( import (
"fmt" "fmt"
"io"
"log" "log"
"math/rand" "math/rand"
"strings" "strings"
@ -41,7 +42,8 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored. // Precision is only here for legacy support. It will be ignored.
Precision string Precision string
clients []client.Client clients []client.Client
splitPayload bool
} }
var sampleConfig = ` var sampleConfig = `
@ -109,6 +111,7 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
i.splitPayload = true
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
@ -159,17 +162,26 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader {
if !i.splitPayload {
return metric.NewReader(metrics)
}
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics {
splitData = append(splitData, m.Split(i.UDPPayload)...)
}
return metric.NewReader(splitData)
}
// Write will choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0 bufsize := 0
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
splitData = append(splitData, m.Split(i.UDPPayload)...)
} }
r := metric.NewReader(splitData) r := i.getReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := fmt.Errorf("Could not write to any InfluxDB server in cluster") err := fmt.Errorf("Could not write to any InfluxDB server in cluster")