From 5bab4616ff3cb01e2f8a65509d1bcf85031de615 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 5 Jun 2017 12:44:29 -0700 Subject: [PATCH] Fix udp metric splitting (#2880) --- metric/reader.go | 10 +- metric/reader_test.go | 150 +++++++++++++++++++- plugins/outputs/influxdb/client/udp.go | 27 +++- plugins/outputs/influxdb/client/udp_test.go | 81 ++++------- plugins/outputs/influxdb/influxdb.go | 17 +-- plugins/outputs/influxdb/influxdb_test.go | 32 ----- 6 files changed, 204 insertions(+), 113 deletions(-) diff --git a/metric/reader.go b/metric/reader.go index df0729963..ef9d2977d 100644 --- a/metric/reader.go +++ b/metric/reader.go @@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) { // this for-loop is the sunny-day scenario, where we are given a // buffer that is large enough to hold at least a single metric. // all of the cases below it are edge-cases. - if r.metrics[r.iM].Len() < len(p[i:]) { + if r.metrics[r.iM].Len() <= len(p[i:]) { i += r.metrics[r.iM].SerializeTo(p[i:]) } else { break @@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) { if len(tmp) > 1 { r.splitMetrics = tmp r.state = split - if r.splitMetrics[0].Len() < len(p) { + if r.splitMetrics[0].Len() <= len(p) { i += r.splitMetrics[0].SerializeTo(p) r.iSM = 1 } else { @@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) { } case split: - if r.splitMetrics[r.iSM].Len() < len(p) { + if r.splitMetrics[r.iSM].Len() <= len(p) { // write the current split metric i += r.splitMetrics[r.iSM].SerializeTo(p) r.iSM++ @@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) { r.iSM++ if r.iSM == len(r.splitMetrics) { r.iM++ + if r.iM == len(r.metrics) { + r.state = done + return i, io.EOF + } r.state = normal } else { r.state = split diff --git a/metric/reader_test.go b/metric/reader_test.go index a1c864ad5..1537fc960 100644 --- a/metric/reader_test.go +++ b/metric/reader_test.go @@ -8,8 +8,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func BenchmarkMetricReader(b *testing.B) { @@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) { } } +// Regression test for when a metric is the same size as the buffer. +// +// Previously EOF would not be set until the next call to Read. +func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{"a": int64(1)}, ts) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, m1.Len()) + + for { + n, err := r.Read(buf) + // Should never read 0 bytes unless at EOF, unless input buffer is 0 length + if n == 0 { + require.Equal(t, io.EOF, err) + break + } + // Lines should be terminated with a LF + if err == io.EOF { + require.Equal(t, uint8('\n'), buf[n-1]) + break + } + require.NoError(t, err) + } +} + +// Regression test for when a metric requires to be split and one of the +// split metrics is exactly the size of the buffer. +// +// Previously an empty string would be returned on the next Read without error, +// and then next Read call would panic. +func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, 30) + + // foo a=1i,bb=2i 1481032190000000000\n // len 35 + // + // Requires this specific split order: + // foo a=1i 1481032190000000000\n // len 29 + // foo bb=2i 1481032190000000000\n // len 30 + + for { + n, err := r.Read(buf) + // Should never read 0 bytes unless at EOF, unless input buffer is 0 length + if n == 0 { + require.Equal(t, io.EOF, err) + break + } + // Lines should be terminated with a LF + if err == io.EOF { + require.Equal(t, uint8('\n'), buf[n-1]) + break + } + require.NoError(t, err) + } +} + +// Regresssion test for when a metric requires to be split and one of the +// split metrics is larger than the buffer. +// +// Previously the metric index would be set incorrectly causing a panic. +func TestMetricReader_SplitOverflowOversized(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{ + "a": int64(1), + "bbb": int64(2), + }, ts) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, 30) + + // foo a=1i,bbb=2i 1481032190000000000\n // len 36 + // + // foo a=1i 1481032190000000000\n // len 29 + // foo bbb=2i 1481032190000000000\n // len 31 + + for { + n, err := r.Read(buf) + // Should never read 0 bytes unless at EOF, unless input buffer is 0 length + if n == 0 { + require.Equal(t, io.EOF, err) + break + } + // Lines should be terminated with a LF + if err == io.EOF { + require.Equal(t, uint8('\n'), buf[n-1]) + break + } + require.NoError(t, err) + } +} + +// Regresssion test for when a split metric exactly fits in the buffer. +// +// Previously the metric would be overflow split when not required. +func TestMetricReader_SplitOverflowUneeded(t *testing.T) { + ts := time.Unix(1481032190, 0) + m1, _ := New("foo", map[string]string{}, + map[string]interface{}{"a": int64(1), "b": int64(2)}, ts) + metrics := []telegraf.Metric{m1} + + r := NewReader(metrics) + buf := make([]byte, 29) + + // foo a=1i,b=2i 1481032190000000000\n // len 34 + // + // foo a=1i 1481032190000000000\n // len 29 + // foo b=2i 1481032190000000000\n // len 29 + + for { + n, err := r.Read(buf) + // Should never read 0 bytes unless at EOF, unless input buffer is 0 length + if n == 0 { + require.Equal(t, io.EOF, err) + break + } + // Lines should be terminated with a LF + if err == io.EOF { + require.Equal(t, uint8('\n'), buf[n-1]) + break + } + require.NoError(t, err) + } +} + func TestMetricReader_OverflowMultipleMetrics(t *testing.T) { ts := time.Unix(1481032190, 0) m, _ := New("foo", map[string]string{}, @@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) { assert.Equal(t, test.err, err, test.expRegex) } } + +func TestMetricRoundtrip(t *testing.T) { + const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000 +nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000 +` + metrics, err := Parse([]byte(lp)) + require.NoError(t, err) + r := NewReader(metrics) + buf := make([]byte, 128) + _, err = r.Read(buf) + require.NoError(t, err) + metrics, err = Parse(buf) + require.NoError(t, err) +} diff --git a/plugins/outputs/influxdb/client/udp.go b/plugins/outputs/influxdb/client/udp.go index 89d6894a1..1dd4d9936 100644 --- a/plugins/outputs/influxdb/client/udp.go +++ b/plugins/outputs/influxdb/client/udp.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "log" "net" "net/url" ) @@ -82,10 +83,28 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { if err != io.EOF && err != nil { return totaln, err } - nW, err := c.conn.Write(c.buffer[0:nR]) - totaln += nW - if err != nil { - return totaln, err + + if c.buffer[nR-1] == uint8('\n') { + nW, err := c.conn.Write(c.buffer[0:nR]) + totaln += nW + if err != nil { + return totaln, err + } + } else { + log.Printf("E! Could not fit point into UDP payload; dropping") + // Scan forward until next line break to realign. + for { + nR, err := r.Read(c.buffer) + if nR == 0 { + break + } + if err != io.EOF && err != nil { + return totaln, err + } + if c.buffer[nR-1] == uint8('\n') { + break + } + } } } return totaln, nil diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go index 84efe0b22..9308144b5 100644 --- a/plugins/outputs/influxdb/client/udp_test.go +++ b/plugins/outputs/influxdb/client/udp_test.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "net" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestUDPClient(t *testing.T) { @@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) { pkt := <-packets assert.Equal(t, "cpu value=99\n", pkt) - metrics := `cpu value=99 -cpu value=55 -cpu value=44 -cpu value=101 -cpu value=91 -cpu value=92 -` - // test sending packet with 6 metrics in a stream. - reader := bytes.NewReader([]byte(metrics)) - // contentLength is ignored: - n, err = client.WriteStream(reader, 10) - assert.Equal(t, n, len(metrics)) - assert.NoError(t, err) - pkt = <-packets - assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt) - - // - // Test that UDP packets get broken up properly: - config2 := UDPConfig{ - URL: "udp://localhost:8199", - PayloadSize: 25, - } - client2, err := NewUDP(config2) - assert.NoError(t, err) - wp := WriteParams{} - - // - // Using Write(): - buf := []byte(metrics) - n, err = client2.WriteWithParams(buf, wp) - assert.Equal(t, n, len(metrics)) - assert.NoError(t, err) - pkt = <-packets - assert.Equal(t, "cpu value=99\ncpu value=55", pkt) - pkt = <-packets - assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt) - pkt = <-packets - assert.Equal(t, "01\ncpu value=91\ncpu value", pkt) - pkt = <-packets - assert.Equal(t, "=92\n", pkt) - - // - // Using WriteStream(): - reader = bytes.NewReader([]byte(metrics)) - n, err = client2.WriteStreamWithParams(reader, 10, wp) - assert.Equal(t, n, len(metrics)) - assert.NoError(t, err) - pkt = <-packets - assert.Equal(t, "cpu value=99\ncpu value=55", pkt) - pkt = <-packets - assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt) - pkt = <-packets - assert.Equal(t, "01\ncpu value=91\ncpu value", pkt) - pkt = <-packets - assert.Equal(t, "=92\n", pkt) - // // Using WriteStream() & a metric.Reader: config3 := UDPConfig{ @@ -159,4 +103,27 @@ cpu value=92 assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) assert.NoError(t, client.Close()) + + config = UDPConfig{ + URL: "udp://localhost:8199", + PayloadSize: 40, + } + client4, err := NewUDP(config) + assert.NoError(t, err) + + ts := time.Unix(1484142943, 0) + m1, _ = metric.New("test", map[string]string{}, + map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts) + m2, _ = metric.New("test", map[string]string{}, + map[string]interface{}{"value": 1.1}, ts) + ms = []telegraf.Metric{m1, m2} + reader := metric.NewReader(ms) + n, err = client4.WriteStream(reader, 0) + assert.NoError(t, err) + require.Equal(t, 35, n) + assert.NoError(t, err) + pkt = <-packets + assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt) + + assert.NoError(t, client4.Close()) } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index df3b34226..fe1adc4bb 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -46,8 +46,7 @@ type InfluxDB struct { // Precision is only here for legacy support. It will be ignored. Precision string - clients []client.Client - splitPayload bool + clients []client.Client } var sampleConfig = ` @@ -115,7 +114,6 @@ func (i *InfluxDB) Connect() error { return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) } i.clients = append(i.clients, c) - i.splitPayload = true default: // If URL doesn't start with "udp", assume HTTP client config := client.HTTPConfig{ @@ -166,22 +164,9 @@ func (i *InfluxDB) Description() string { return "Configuration for influxdb server to send metrics to" } -func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric { - if !i.splitPayload { - return metrics - } - - split := make([]telegraf.Metric, 0) - for _, m := range metrics { - split = append(split, m.Split(i.UDPPayload)...) - } - return split -} - // Write will choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { - metrics = i.split(metrics) bufsize := 0 for _, m := range metrics { diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 04019794f..5dfa63c66 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -6,10 +6,7 @@ import ( "net/http" "net/http/httptest" "testing" - "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/testutil" @@ -63,35 +60,6 @@ func TestUDPInflux(t *testing.T) { require.NoError(t, i.Close()) } -func TestBasicSplit(t *testing.T) { - c := &MockClient{} - i := InfluxDB{ - clients: []client.Client{c}, - UDPPayload: 50, - splitPayload: true, - } - - // Input metrics: - // test1,tag1=value1 value1=1 value2=2 1257894000000000000\n - // - // Split metrics: - // test1,tag1=value1 value1=1 1257894000000000000\n - // test1,tag1=value1 value2=2 1257894000000000000\n - m, err := metric.New("test1", - map[string]string{"tag1": "value1"}, - map[string]interface{}{"value1": 1.0, "value2": 2.0}, - time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), - ) - require.NoError(t, err) - - metrics := []telegraf.Metric{m} - err = i.Write(metrics) - require.Equal(t, 1, c.writeStreamCalled) - require.Equal(t, 94, c.contentLength) - - require.NoError(t, err) -} - func TestHTTPInflux(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path {