Fix udp metric splitting (#2880)
This commit is contained in:
parent
37e01808b5
commit
5bab4616ff
|
@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||||
// this for-loop is the sunny-day scenario, where we are given a
|
// this for-loop is the sunny-day scenario, where we are given a
|
||||||
// buffer that is large enough to hold at least a single metric.
|
// buffer that is large enough to hold at least a single metric.
|
||||||
// all of the cases below it are edge-cases.
|
// all of the cases below it are edge-cases.
|
||||||
if r.metrics[r.iM].Len() < len(p[i:]) {
|
if r.metrics[r.iM].Len() <= len(p[i:]) {
|
||||||
i += r.metrics[r.iM].SerializeTo(p[i:])
|
i += r.metrics[r.iM].SerializeTo(p[i:])
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
|
@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||||
if len(tmp) > 1 {
|
if len(tmp) > 1 {
|
||||||
r.splitMetrics = tmp
|
r.splitMetrics = tmp
|
||||||
r.state = split
|
r.state = split
|
||||||
if r.splitMetrics[0].Len() < len(p) {
|
if r.splitMetrics[0].Len() <= len(p) {
|
||||||
i += r.splitMetrics[0].SerializeTo(p)
|
i += r.splitMetrics[0].SerializeTo(p)
|
||||||
r.iSM = 1
|
r.iSM = 1
|
||||||
} else {
|
} else {
|
||||||
|
@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case split:
|
case split:
|
||||||
if r.splitMetrics[r.iSM].Len() < len(p) {
|
if r.splitMetrics[r.iSM].Len() <= len(p) {
|
||||||
// write the current split metric
|
// write the current split metric
|
||||||
i += r.splitMetrics[r.iSM].SerializeTo(p)
|
i += r.splitMetrics[r.iSM].SerializeTo(p)
|
||||||
r.iSM++
|
r.iSM++
|
||||||
|
@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||||
r.iSM++
|
r.iSM++
|
||||||
if r.iSM == len(r.splitMetrics) {
|
if r.iSM == len(r.splitMetrics) {
|
||||||
r.iM++
|
r.iM++
|
||||||
|
if r.iM == len(r.metrics) {
|
||||||
|
r.state = done
|
||||||
|
return i, io.EOF
|
||||||
|
}
|
||||||
r.state = normal
|
r.state = normal
|
||||||
} else {
|
} else {
|
||||||
r.state = split
|
r.state = split
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkMetricReader(b *testing.B) {
|
func BenchmarkMetricReader(b *testing.B) {
|
||||||
|
@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regression test for when a metric is the same size as the buffer.
|
||||||
|
//
|
||||||
|
// Previously EOF would not be set until the next call to Read.
|
||||||
|
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"a": int64(1)}, ts)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, m1.Len())
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||||
|
if n == 0 {
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Lines should be terminated with a LF
|
||||||
|
if err == io.EOF {
|
||||||
|
require.Equal(t, uint8('\n'), buf[n-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regression test for when a metric requires to be split and one of the
|
||||||
|
// split metrics is exactly the size of the buffer.
|
||||||
|
//
|
||||||
|
// Previously an empty string would be returned on the next Read without error,
|
||||||
|
// and then next Read call would panic.
|
||||||
|
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 30)
|
||||||
|
|
||||||
|
// foo a=1i,bb=2i 1481032190000000000\n // len 35
|
||||||
|
//
|
||||||
|
// Requires this specific split order:
|
||||||
|
// foo a=1i 1481032190000000000\n // len 29
|
||||||
|
// foo bb=2i 1481032190000000000\n // len 30
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||||
|
if n == 0 {
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Lines should be terminated with a LF
|
||||||
|
if err == io.EOF {
|
||||||
|
require.Equal(t, uint8('\n'), buf[n-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regresssion test for when a metric requires to be split and one of the
|
||||||
|
// split metrics is larger than the buffer.
|
||||||
|
//
|
||||||
|
// Previously the metric index would be set incorrectly causing a panic.
|
||||||
|
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"a": int64(1),
|
||||||
|
"bbb": int64(2),
|
||||||
|
}, ts)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 30)
|
||||||
|
|
||||||
|
// foo a=1i,bbb=2i 1481032190000000000\n // len 36
|
||||||
|
//
|
||||||
|
// foo a=1i 1481032190000000000\n // len 29
|
||||||
|
// foo bbb=2i 1481032190000000000\n // len 31
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||||
|
if n == 0 {
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Lines should be terminated with a LF
|
||||||
|
if err == io.EOF {
|
||||||
|
require.Equal(t, uint8('\n'), buf[n-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regresssion test for when a split metric exactly fits in the buffer.
|
||||||
|
//
|
||||||
|
// Previously the metric would be overflow split when not required.
|
||||||
|
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
|
||||||
|
ts := time.Unix(1481032190, 0)
|
||||||
|
m1, _ := New("foo", map[string]string{},
|
||||||
|
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 29)
|
||||||
|
|
||||||
|
// foo a=1i,b=2i 1481032190000000000\n // len 34
|
||||||
|
//
|
||||||
|
// foo a=1i 1481032190000000000\n // len 29
|
||||||
|
// foo b=2i 1481032190000000000\n // len 29
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||||
|
if n == 0 {
|
||||||
|
require.Equal(t, io.EOF, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Lines should be terminated with a LF
|
||||||
|
if err == io.EOF {
|
||||||
|
require.Equal(t, uint8('\n'), buf[n-1])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
|
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
|
||||||
ts := time.Unix(1481032190, 0)
|
ts := time.Unix(1481032190, 0)
|
||||||
m, _ := New("foo", map[string]string{},
|
m, _ := New("foo", map[string]string{},
|
||||||
|
@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
|
||||||
assert.Equal(t, test.err, err, test.expRegex)
|
assert.Equal(t, test.err, err, test.expRegex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricRoundtrip(t *testing.T) {
|
||||||
|
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
|
||||||
|
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
|
||||||
|
`
|
||||||
|
metrics, err := Parse([]byte(lp))
|
||||||
|
require.NoError(t, err)
|
||||||
|
r := NewReader(metrics)
|
||||||
|
buf := make([]byte, 128)
|
||||||
|
_, err = r.Read(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
metrics, err = Parse(buf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
)
|
)
|
||||||
|
@ -82,10 +83,28 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||||
if err != io.EOF && err != nil {
|
if err != io.EOF && err != nil {
|
||||||
return totaln, err
|
return totaln, err
|
||||||
}
|
}
|
||||||
nW, err := c.conn.Write(c.buffer[0:nR])
|
|
||||||
totaln += nW
|
if c.buffer[nR-1] == uint8('\n') {
|
||||||
if err != nil {
|
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||||
return totaln, err
|
totaln += nW
|
||||||
|
if err != nil {
|
||||||
|
return totaln, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("E! Could not fit point into UDP payload; dropping")
|
||||||
|
// Scan forward until next line break to realign.
|
||||||
|
for {
|
||||||
|
nR, err := r.Read(c.buffer)
|
||||||
|
if nR == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != io.EOF && err != nil {
|
||||||
|
return totaln, err
|
||||||
|
}
|
||||||
|
if c.buffer[nR-1] == uint8('\n') {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totaln, nil
|
return totaln, nil
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,6 +9,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUDPClient(t *testing.T) {
|
func TestUDPClient(t *testing.T) {
|
||||||
|
@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
|
||||||
pkt := <-packets
|
pkt := <-packets
|
||||||
assert.Equal(t, "cpu value=99\n", pkt)
|
assert.Equal(t, "cpu value=99\n", pkt)
|
||||||
|
|
||||||
metrics := `cpu value=99
|
|
||||||
cpu value=55
|
|
||||||
cpu value=44
|
|
||||||
cpu value=101
|
|
||||||
cpu value=91
|
|
||||||
cpu value=92
|
|
||||||
`
|
|
||||||
// test sending packet with 6 metrics in a stream.
|
|
||||||
reader := bytes.NewReader([]byte(metrics))
|
|
||||||
// contentLength is ignored:
|
|
||||||
n, err = client.WriteStream(reader, 10)
|
|
||||||
assert.Equal(t, n, len(metrics))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
|
|
||||||
|
|
||||||
//
|
|
||||||
// Test that UDP packets get broken up properly:
|
|
||||||
config2 := UDPConfig{
|
|
||||||
URL: "udp://localhost:8199",
|
|
||||||
PayloadSize: 25,
|
|
||||||
}
|
|
||||||
client2, err := NewUDP(config2)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
wp := WriteParams{}
|
wp := WriteParams{}
|
||||||
|
|
||||||
//
|
|
||||||
// Using Write():
|
|
||||||
buf := []byte(metrics)
|
|
||||||
n, err = client2.WriteWithParams(buf, wp)
|
|
||||||
assert.Equal(t, n, len(metrics))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "=92\n", pkt)
|
|
||||||
|
|
||||||
//
|
|
||||||
// Using WriteStream():
|
|
||||||
reader = bytes.NewReader([]byte(metrics))
|
|
||||||
n, err = client2.WriteStreamWithParams(reader, 10, wp)
|
|
||||||
assert.Equal(t, n, len(metrics))
|
|
||||||
assert.NoError(t, err)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
|
||||||
pkt = <-packets
|
|
||||||
assert.Equal(t, "=92\n", pkt)
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// Using WriteStream() & a metric.Reader:
|
// Using WriteStream() & a metric.Reader:
|
||||||
config3 := UDPConfig{
|
config3 := UDPConfig{
|
||||||
|
@ -159,4 +103,27 @@ cpu value=92
|
||||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||||
|
|
||||||
assert.NoError(t, client.Close())
|
assert.NoError(t, client.Close())
|
||||||
|
|
||||||
|
config = UDPConfig{
|
||||||
|
URL: "udp://localhost:8199",
|
||||||
|
PayloadSize: 40,
|
||||||
|
}
|
||||||
|
client4, err := NewUDP(config)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
ts := time.Unix(1484142943, 0)
|
||||||
|
m1, _ = metric.New("test", map[string]string{},
|
||||||
|
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
|
||||||
|
m2, _ = metric.New("test", map[string]string{},
|
||||||
|
map[string]interface{}{"value": 1.1}, ts)
|
||||||
|
ms = []telegraf.Metric{m1, m2}
|
||||||
|
reader := metric.NewReader(ms)
|
||||||
|
n, err = client4.WriteStream(reader, 0)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
require.Equal(t, 35, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pkt = <-packets
|
||||||
|
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
|
||||||
|
|
||||||
|
assert.NoError(t, client4.Close())
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,7 @@ type InfluxDB struct {
|
||||||
// Precision is only here for legacy support. It will be ignored.
|
// Precision is only here for legacy support. It will be ignored.
|
||||||
Precision string
|
Precision string
|
||||||
|
|
||||||
clients []client.Client
|
clients []client.Client
|
||||||
splitPayload bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
@ -115,7 +114,6 @@ func (i *InfluxDB) Connect() error {
|
||||||
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||||
}
|
}
|
||||||
i.clients = append(i.clients, c)
|
i.clients = append(i.clients, c)
|
||||||
i.splitPayload = true
|
|
||||||
default:
|
default:
|
||||||
// If URL doesn't start with "udp", assume HTTP client
|
// If URL doesn't start with "udp", assume HTTP client
|
||||||
config := client.HTTPConfig{
|
config := client.HTTPConfig{
|
||||||
|
@ -166,22 +164,9 @@ func (i *InfluxDB) Description() string {
|
||||||
return "Configuration for influxdb server to send metrics to"
|
return "Configuration for influxdb server to send metrics to"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
|
|
||||||
if !i.splitPayload {
|
|
||||||
return metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
split := make([]telegraf.Metric, 0)
|
|
||||||
for _, m := range metrics {
|
|
||||||
split = append(split, m.Split(i.UDPPayload)...)
|
|
||||||
}
|
|
||||||
return split
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write will choose a random server in the cluster to write to until a successful write
|
// Write will choose a random server in the cluster to write to until a successful write
|
||||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
metrics = i.split(metrics)
|
|
||||||
|
|
||||||
bufsize := 0
|
bufsize := 0
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|
|
@ -6,10 +6,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
@ -63,35 +60,6 @@ func TestUDPInflux(t *testing.T) {
|
||||||
require.NoError(t, i.Close())
|
require.NoError(t, i.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBasicSplit(t *testing.T) {
|
|
||||||
c := &MockClient{}
|
|
||||||
i := InfluxDB{
|
|
||||||
clients: []client.Client{c},
|
|
||||||
UDPPayload: 50,
|
|
||||||
splitPayload: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Input metrics:
|
|
||||||
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
|
|
||||||
//
|
|
||||||
// Split metrics:
|
|
||||||
// test1,tag1=value1 value1=1 1257894000000000000\n
|
|
||||||
// test1,tag1=value1 value2=2 1257894000000000000\n
|
|
||||||
m, err := metric.New("test1",
|
|
||||||
map[string]string{"tag1": "value1"},
|
|
||||||
map[string]interface{}{"value1": 1.0, "value2": 2.0},
|
|
||||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
|
||||||
)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
metrics := []telegraf.Metric{m}
|
|
||||||
err = i.Write(metrics)
|
|
||||||
require.Equal(t, 1, c.writeStreamCalled)
|
|
||||||
require.Equal(t, 94, c.contentLength)
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHTTPInflux(t *testing.T) {
|
func TestHTTPInflux(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
|
|
Loading…
Reference in New Issue