split metrics based on UDPPayload size (#2795)

This commit is contained in:
Sebastian Borza 2017-05-12 16:42:18 -05:00 committed by Daniel Nelson
parent a47aa0dcc2
commit f74687dcc0
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
2 changed files with 18 additions and 8 deletions

View File

@ -25,6 +25,7 @@ type UDPConfig struct {
PayloadSize int PayloadSize int
} }
// NewUDP will return an instance of the telegraf UDP output plugin for influxdb
func NewUDP(config UDPConfig) (Client, error) { func NewUDP(config UDPConfig) (Client, error) {
p, err := url.Parse(config.URL) p, err := url.Parse(config.URL)
if err != nil { if err != nil {
@ -55,20 +56,22 @@ type udpClient struct {
buffer []byte buffer []byte
} }
// Query will send the provided query command to the client, returning an error if any issues arise
func (c *udpClient) Query(command string) error { func (c *udpClient) Query(command string) error {
return nil return nil
} }
// Write will send the byte stream to the given UDP client endpoint
func (c *udpClient) Write(b []byte) (int, error) { func (c *udpClient) Write(b []byte) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// write params are ignored by the UDP client // WriteWithParams are ignored by the UDP client, will forward to WriteStream
func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
return c.WriteStream(bytes.NewReader(b), -1) return c.WriteStream(bytes.NewReader(b), -1)
} }
// contentLength is ignored by the UDP client. // WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client
func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
var totaln int var totaln int
for { for {
@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
return totaln, nil return totaln, nil
} }
// contentLength is ignored by the UDP client. // WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client
// write params are ignored by the UDP client // write params are ignored by the UDP client
func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) {
return c.WriteStream(r, -1) return c.WriteStream(r, -1)
} }
// Close will terminate the provided client connection
func (c *udpClient) Close() error { func (c *udpClient) Close() error {
return c.conn.Close() return c.conn.Close()
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
) )
// InfluxDB struct is the primary data structure for the plugin
type InfluxDB struct { type InfluxDB struct {
// URL is only for backwards compatability // URL is only for backwards compatability
URL string URL string
@ -79,11 +80,10 @@ var sampleConfig = `
# insecure_skip_verify = false # insecure_skip_verify = false
` `
// Connect initiates the primary connection to the range of provided URLs
func (i *InfluxDB) Connect() error { func (i *InfluxDB) Connect() error {
var urls []string var urls []string
for _, u := range i.URLs { urls = append(urls, i.URLs...)
urls = append(urls, u)
}
// Backward-compatability with single Influx URL config files // Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list // This could eventually be removed in favor of specifying the urls as a list
@ -144,26 +144,32 @@ func (i *InfluxDB) Connect() error {
return nil return nil
} }
// Close will terminate the session to the backend, returning error if an issue arises
func (i *InfluxDB) Close() error { func (i *InfluxDB) Close() error {
return nil return nil
} }
// SampleConfig returns the formatted sample configuration for the plugin
func (i *InfluxDB) SampleConfig() string { func (i *InfluxDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description returns the human-readable function definition of the plugin
func (i *InfluxDB) Description() string { func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
// Choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0 bufsize := 0
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
splitData = append(splitData, m.Split(i.UDPPayload)...)
} }
r := metric.NewReader(metrics) r := metric.NewReader(splitData)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := fmt.Errorf("Could not write to any InfluxDB server in cluster") err := fmt.Errorf("Could not write to any InfluxDB server in cluster")