Fix InfluxDB output UDP line splitting (#5439)

This commit is contained in:
Olli-Pekka Lehto 2019-02-19 17:08:54 -06:00 committed by Daniel Nelson
parent e4d084fbc3
commit 5dfa3fa769
3 changed files with 39 additions and 2 deletions

View File

@ -1,6 +1,8 @@
package influxdb package influxdb
import ( import (
"bufio"
"bytes"
"context" "context"
"fmt" "fmt"
"log" "log"
@ -45,9 +47,9 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) {
serializer := config.Serializer serializer := config.Serializer
if serializer == nil { if serializer == nil {
s := influx.NewSerializer() s := influx.NewSerializer()
s.SetMaxLineBytes(config.MaxPayloadSize)
serializer = s serializer = s
} }
serializer.SetMaxLineBytes(size)
dialer := config.Dialer dialer := config.Dialer
if dialer == nil { if dialer == nil {
@ -96,7 +98,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
continue continue
} }
_, err = c.conn.Write(octets) scanner := bufio.NewScanner(bytes.NewReader(octets))
scanner.Split(scanLines)
for scanner.Scan() {
_, err = c.conn.Write(scanner.Bytes())
}
if err != nil { if err != nil {
c.conn.Close() c.conn.Close()
c.conn = nil c.conn = nil
@ -118,3 +124,15 @@ type netDialer struct {
func (d *netDialer) DialContext(ctx context.Context, network, address string) (Conn, error) { func (d *netDialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
return d.Dialer.DialContext(ctx, network, address) return d.Dialer.DialContext(ctx, network, address)
} }
func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0 : i+1], nil
}
return 0, nil, nil
}

View File

@ -240,6 +240,7 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
return err return err
} }
pairsLen = 0
firstField = true firstField = true
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer) bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)

View File

@ -275,6 +275,24 @@ var tests = []struct {
), ),
output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"), output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"),
}, },
{
name: "split_fields_overflow",
maxBytes: 43,
input: MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"abc": 123,
"def": 456,
"ghi": 789,
"jkl": 123,
},
time.Unix(1519194109, 42),
),
),
output: []byte("cpu abc=123i,def=456i 1519194109000000042\ncpu ghi=789i,jkl=123i 1519194109000000042\n"),
},
{ {
name: "name newline", name: "name newline",
input: MustMetric( input: MustMetric(