Compare commits

...

10 Commits
1.3.1 ... 1.3.2

Author SHA1 Message Date
Daniel Nelson
0f419e9a0d Update 1.3.2 release date
(cherry picked from commit ca72df5868)
2017-06-14 12:17:44 -07:00
Daniel Nelson
958d689274 Update changelog
(cherry picked from commit ea787b83bf)
2017-06-13 18:07:59 -07:00
Daniel Nelson
2370c04dd7 Ensure prometheus metrics have same set of labels (#2857)
(cherry picked from commit 949072e8dc)
2017-06-13 18:07:34 -07:00
Daniel Nelson
4978c8ccb9 Update changelog
(cherry picked from commit 0c53de6700)
2017-06-08 16:56:10 -07:00
Daniel Nelson
abe5e28575 Fix support for mongodb/leofs urls without scheme (#2900)
This was broken by changes in go 1.8 to url.Parse.  This change allows
the string but prompts the user to move to the correct url string.

(cherry picked from commit b277e6e2d7)
2017-06-08 16:55:46 -07:00
Daniel Nelson
34da5a6e45 Update changelog
(cherry picked from commit 84dbf8bb25)
2017-06-07 13:48:58 -07:00
Daniel Nelson
4de1bf29cc Fix metric splitting edge cases (#2896)
Metrics needing one extra byte to fit the output buffer would not be split, so we would emit lines without a line ending. Metrics which overflowed by exactly one field length would be split one field too late, causing truncated fields.
(cherry picked from commit a275e6792a)
2017-06-07 13:47:58 -07:00
Daniel Nelson
f23845afef Update changelog
(cherry picked from commit a47e6e6efe)
2017-06-05 12:51:58 -07:00
Daniel Nelson
721ed8b3f2 Fix udp metric splitting (#2880)
(cherry picked from commit 5bab4616ff)
2017-06-05 12:50:59 -07:00
Daniel Nelson
385c114d00 Update release date 2017-05-31 14:53:01 -07:00
14 changed files with 760 additions and 285 deletions

View File

@@ -1,4 +1,12 @@
## v1.3.1 [unreleased] ## v1.3.2 [2017-06-14]
### Bugfixes
- [#2862](https://github.com/influxdata/telegraf/issues/2862): Fix InfluxDB UDP metric splitting.
- [#2888](https://github.com/influxdata/telegraf/issues/2888): Fix mongodb/leofs urls without scheme.
- [#2822](https://github.com/influxdata/telegraf/issues/2822): Fix inconsistent label dimensions in prometheus output.
## v1.3.1 [2017-05-31]
### Bugfixes ### Bugfixes

View File

@@ -218,7 +218,7 @@ func (m *metric) SerializeTo(dst []byte) int {
} }
func (m *metric) Split(maxSize int) []telegraf.Metric { func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize { if m.Len() <= maxSize {
return []telegraf.Metric{m} return []telegraf.Metric{m}
} }
var out []telegraf.Metric var out []telegraf.Metric
@@ -248,7 +248,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
// if true, then we need to create a metric _not_ including the currently // if true, then we need to create a metric _not_ including the currently
// selected field // selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize { if len(m.fields[i:j])+len(fields)+constant >= maxSize {
// if false, then we'll create a metric including the currently // if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too // selected field anyways. This means that the given maxSize is too
// small for a single field to fit. // small for a single field to fit.

View File

@@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestNewMetric(t *testing.T) { func TestNewMetric(t *testing.T) {
@@ -458,7 +459,7 @@ func TestSplitMetric(t *testing.T) {
assert.Len(t, split70, 3) assert.Len(t, split70, 3)
split60 := m.Split(60) split60 := m.Split(60)
assert.Len(t, split60, 4) assert.Len(t, split60, 5)
} }
// test splitting metric into various max lengths // test splitting metric into various max lengths
@@ -578,6 +579,42 @@ func TestSplitMetric_OneField(t *testing.T) {
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
} }
func TestSplitMetric_ExactSize(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len())
// check that no copy was made
require.Equal(t, &m, &actual[0])
}
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len() - 1)
require.Equal(t, 2, len(actual))
}
func TestNewMetricAggregate(t *testing.T) { func TestNewMetricAggregate(t *testing.T) {
now := time.Now() now := time.Now()

View File

@@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
// this for-loop is the sunny-day scenario, where we are given a // this for-loop is the sunny-day scenario, where we are given a
// buffer that is large enough to hold at least a single metric. // buffer that is large enough to hold at least a single metric.
// all of the cases below it are edge-cases. // all of the cases below it are edge-cases.
if r.metrics[r.iM].Len() < len(p[i:]) { if r.metrics[r.iM].Len() <= len(p[i:]) {
i += r.metrics[r.iM].SerializeTo(p[i:]) i += r.metrics[r.iM].SerializeTo(p[i:])
} else { } else {
break break
@@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
if len(tmp) > 1 { if len(tmp) > 1 {
r.splitMetrics = tmp r.splitMetrics = tmp
r.state = split r.state = split
if r.splitMetrics[0].Len() < len(p) { if r.splitMetrics[0].Len() <= len(p) {
i += r.splitMetrics[0].SerializeTo(p) i += r.splitMetrics[0].SerializeTo(p)
r.iSM = 1 r.iSM = 1
} else { } else {
@@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
} }
case split: case split:
if r.splitMetrics[r.iSM].Len() < len(p) { if r.splitMetrics[r.iSM].Len() <= len(p) {
// write the current split metric // write the current split metric
i += r.splitMetrics[r.iSM].SerializeTo(p) i += r.splitMetrics[r.iSM].SerializeTo(p)
r.iSM++ r.iSM++
@@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
r.iSM++ r.iSM++
if r.iSM == len(r.splitMetrics) { if r.iSM == len(r.splitMetrics) {
r.iM++ r.iM++
if r.iM == len(r.metrics) {
r.state = done
return i, io.EOF
}
r.state = normal r.state = normal
} else { } else {
r.state = split r.state = split

View File

@@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func BenchmarkMetricReader(b *testing.B) { func BenchmarkMetricReader(b *testing.B) {
@@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
} }
} }
// Regression test for when a metric is the same size as the buffer.
//
// Previously EOF would not be set until the next call to Read.
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, m1.Len())
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regression test for when a metric requires to be split and one of the
// split metrics is exactly the size of the buffer.
//
// Previously an empty string would be returned on the next Read without error,
// and then next Read call would panic.
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bb=2i 1481032190000000000\n // len 35
//
// Requires this specific split order:
// foo a=1i 1481032190000000000\n // len 29
// foo bb=2i 1481032190000000000\n // len 30
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a metric requires to be split and one of the
// split metrics is larger than the buffer.
//
// Previously the metric index would be set incorrectly causing a panic.
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{
"a": int64(1),
"bbb": int64(2),
}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 30)
// foo a=1i,bbb=2i 1481032190000000000\n // len 36
//
// foo a=1i 1481032190000000000\n // len 29
// foo bbb=2i 1481032190000000000\n // len 31
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
// Regresssion test for when a split metric exactly fits in the buffer.
//
// Previously the metric would be overflow split when not required.
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
ts := time.Unix(1481032190, 0)
m1, _ := New("foo", map[string]string{},
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
metrics := []telegraf.Metric{m1}
r := NewReader(metrics)
buf := make([]byte, 29)
// foo a=1i,b=2i 1481032190000000000\n // len 34
//
// foo a=1i 1481032190000000000\n // len 29
// foo b=2i 1481032190000000000\n // len 29
for {
n, err := r.Read(buf)
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
if n == 0 {
require.Equal(t, io.EOF, err)
break
}
// Lines should be terminated with a LF
if err == io.EOF {
require.Equal(t, uint8('\n'), buf[n-1])
break
}
require.NoError(t, err)
}
}
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) { func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
ts := time.Unix(1481032190, 0) ts := time.Unix(1481032190, 0)
m, _ := New("foo", map[string]string{}, m, _ := New("foo", map[string]string{},
@@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
assert.Equal(t, test.err, err, test.expRegex) assert.Equal(t, test.err, err, test.expRegex)
} }
} }
func TestMetricRoundtrip(t *testing.T) {
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
`
metrics, err := Parse([]byte(lp))
require.NoError(t, err)
r := NewReader(metrics)
buf := make([]byte, 128)
_, err = r.Read(buf)
require.NoError(t, err)
metrics, err = Parse(buf)
require.NoError(t, err)
}

View File

@@ -3,6 +3,7 @@ package leofs
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"log"
"net/url" "net/url"
"os/exec" "os/exec"
"strconv" "strconv"
@@ -18,7 +19,7 @@ import (
const oid = ".1.3.6.1.4.1.35450" const oid = ".1.3.6.1.4.1.35450"
// For Manager Master // For Manager Master
const defaultEndpoint = "127.0.0.1:4020" const defaultEndpoint = "udp://127.0.0.1:4020"
type ServerType int type ServerType int
@@ -135,9 +136,9 @@ var serverTypeMapping = map[string]ServerType{
} }
var sampleConfig = ` var sampleConfig = `
## An array of URI to gather stats about LeoFS. ## An array of URLs of the form:
## Specify an ip or hostname with port. ie 127.0.0.1:4020 ## "udp://" host [ ":" port]
servers = ["127.0.0.1:4021"] servers = ["udp://127.0.0.1:4020"]
` `
func (l *LeoFS) SampleConfig() string { func (l *LeoFS) SampleConfig() string {
@@ -154,17 +155,28 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, endpoint := range l.Servers { for i, endpoint := range l.Servers {
_, err := url.Parse(endpoint) if !strings.HasPrefix(endpoint, "udp://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
endpoint = "udp://" + endpoint
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
l.Servers[i] = endpoint
}
u, err := url.Parse(endpoint)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err)) acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
continue continue
} }
port, err := retrieveTokenAfterColon(endpoint) if u.Host == "" {
if err != nil { acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
acc.AddError(err)
continue continue
} }
port := u.Port()
if port == "" {
port = "4020"
}
st, ok := serverTypeMapping[port] st, ok := serverTypeMapping[port]
if !ok { if !ok {
st = ServerTypeStorage st = ServerTypeStorage

View File

@@ -4,12 +4,12 @@
```toml ```toml
[[inputs.mongodb]] [[inputs.mongodb]]
## An array of URI to gather stats about. Specify an ip or hostname ## An array of URLs of the form:
## with optional port add password. ie, ## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017, ## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. servers = ["mongodb://127.0.0.1:27017"]
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false gather_perdb_stats = false
## Optional SSL Config ## Optional SSL Config
@@ -19,14 +19,7 @@
## Use SSL but skip chain & host verification ## Use SSL but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
``` ```
This connection uri may be different based on your environment and mongodb
For authenticated mongodb instances use `mongodb://` connection URI
```toml
[[inputs.mongodb]]
servers = ["mongodb://username:password@10.XX.XX.XX:27101/mydatabase?authSource=admin"]
```
This connection uri may be different based on your environement and mongodb
setup. If the user doesn't have the required privilege to execute serverStatus setup. If the user doesn't have the required privilege to execute serverStatus
command the you will get this error on telegraf command the you will get this error on telegraf

View File

@@ -4,8 +4,10 @@ import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"log"
"net" "net"
"net/url" "net/url"
"strings"
"sync" "sync"
"time" "time"
@@ -37,12 +39,12 @@ type Ssl struct {
} }
var sampleConfig = ` var sampleConfig = `
## An array of URI to gather stats about. Specify an ip or hostname ## An array of URLs of the form:
## with optional port add password. ie, ## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017, ## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. servers = ["mongodb://127.0.0.1:27017"]
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false gather_perdb_stats = false
## Optional SSL Config ## Optional SSL Config
@@ -61,7 +63,7 @@ func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers" return "Read metrics from one or many MongoDB servers"
} }
var localhost = &url.URL{Host: "127.0.0.1:27017"} var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
@@ -72,19 +74,25 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, serv := range m.Servers { for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
}
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) acc.AddError(fmt.Errorf("Unable to parse address %q: %s", serv, err))
continue continue
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
} }
if u.Host == "" {
acc.AddError(fmt.Errorf("Unable to parse address %q", serv))
continue
} }
wg.Add(1) wg.Add(1)
go func(srv *Server) { go func(srv *Server) {
defer wg.Done() defer wg.Done()

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"net/url" "net/url"
) )
@@ -82,11 +83,29 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
if err != io.EOF && err != nil { if err != io.EOF && err != nil {
return totaln, err return totaln, err
} }
if c.buffer[nR-1] == uint8('\n') {
nW, err := c.conn.Write(c.buffer[0:nR]) nW, err := c.conn.Write(c.buffer[0:nR])
totaln += nW totaln += nW
if err != nil { if err != nil {
return totaln, err return totaln, err
} }
} else {
log.Printf("E! Could not fit point into UDP payload; dropping")
// Scan forward until next line break to realign.
for {
nR, err := r.Read(c.buffer)
if nR == 0 {
break
}
if err != io.EOF && err != nil {
return totaln, err
}
if c.buffer[nR-1] == uint8('\n') {
break
}
}
}
} }
return totaln, nil return totaln, nil
} }

View File

@@ -1,7 +1,6 @@
package client package client
import ( import (
"bytes"
"net" "net"
"testing" "testing"
"time" "time"
@@ -10,6 +9,7 @@ import (
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestUDPClient(t *testing.T) { func TestUDPClient(t *testing.T) {
@@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
pkt := <-packets pkt := <-packets
assert.Equal(t, "cpu value=99\n", pkt) assert.Equal(t, "cpu value=99\n", pkt)
metrics := `cpu value=99
cpu value=55
cpu value=44
cpu value=101
cpu value=91
cpu value=92
`
// test sending packet with 6 metrics in a stream.
reader := bytes.NewReader([]byte(metrics))
// contentLength is ignored:
n, err = client.WriteStream(reader, 10)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
//
// Test that UDP packets get broken up properly:
config2 := UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 25,
}
client2, err := NewUDP(config2)
assert.NoError(t, err)
wp := WriteParams{} wp := WriteParams{}
//
// Using Write():
buf := []byte(metrics)
n, err = client2.WriteWithParams(buf, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
//
// Using WriteStream():
reader = bytes.NewReader([]byte(metrics))
n, err = client2.WriteStreamWithParams(reader, 10, wp)
assert.Equal(t, n, len(metrics))
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
pkt = <-packets
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
pkt = <-packets
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
pkt = <-packets
assert.Equal(t, "=92\n", pkt)
// //
// Using WriteStream() & a metric.Reader: // Using WriteStream() & a metric.Reader:
config3 := UDPConfig{ config3 := UDPConfig{
@@ -159,4 +103,27 @@ cpu value=92
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt) assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
assert.NoError(t, client.Close()) assert.NoError(t, client.Close())
config = UDPConfig{
URL: "udp://localhost:8199",
PayloadSize: 40,
}
client4, err := NewUDP(config)
assert.NoError(t, err)
ts := time.Unix(1484142943, 0)
m1, _ = metric.New("test", map[string]string{},
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
m2, _ = metric.New("test", map[string]string{},
map[string]interface{}{"value": 1.1}, ts)
ms = []telegraf.Metric{m1, m2}
reader := metric.NewReader(ms)
n, err = client4.WriteStream(reader, 0)
assert.NoError(t, err)
require.Equal(t, 35, n)
assert.NoError(t, err)
pkt = <-packets
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
assert.NoError(t, client4.Close())
} }

View File

@@ -47,7 +47,6 @@ type InfluxDB struct {
Precision string Precision string
clients []client.Client clients []client.Client
splitPayload bool
} }
var sampleConfig = ` var sampleConfig = `
@@ -115,7 +114,6 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
i.splitPayload = true
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
@@ -166,22 +164,9 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
if !i.splitPayload {
return metrics
}
split := make([]telegraf.Metric, 0)
for _, m := range metrics {
split = append(split, m.Split(i.UDPPayload)...)
}
return split
}
// Write will choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
metrics = i.split(metrics)
bufsize := 0 bufsize := 0
for _, m := range metrics { for _, m := range metrics {

View File

@@ -6,10 +6,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client" "github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@@ -63,35 +60,6 @@ func TestUDPInflux(t *testing.T) {
require.NoError(t, i.Close()) require.NoError(t, i.Close())
} }
func TestBasicSplit(t *testing.T) {
c := &MockClient{}
i := InfluxDB{
clients: []client.Client{c},
UDPPayload: 50,
splitPayload: true,
}
// Input metrics:
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
//
// Split metrics:
// test1,tag1=value1 value1=1 1257894000000000000\n
// test1,tag1=value1 value2=2 1257894000000000000\n
m, err := metric.New("test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value1": 1.0, "value2": 2.0},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, err)
metrics := []telegraf.Metric{m}
err = i.Write(metrics)
require.Equal(t, 1, c.writeStreamCalled)
require.Equal(t, 94, c.contentLength)
require.NoError(t, err)
}
func TestHTTPInflux(t *testing.T) { func TestHTTPInflux(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path { switch r.URL.Path {

View File

@@ -6,6 +6,8 @@ import (
"log" "log"
"net/http" "net/http"
"regexp" "regexp"
"sort"
"strings"
"sync" "sync"
"time" "time"
@@ -17,19 +19,40 @@ import (
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type MetricWithExpiration struct { // SampleID uniquely identifies a Sample
Metric prometheus.Metric type SampleID string
// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time Expiration time.Time
} }
// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"` ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server server *http.Server
metrics map[string]*MetricWithExpiration
sync.Mutex sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
} }
var sampleConfig = ` var sampleConfig = `
@@ -41,7 +64,6 @@ var sampleConfig = `
` `
func (p *PrometheusClient) Start() error { func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p) prometheus.Register(p)
if p.Listen == "" { if p.Listen == "" {
@@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch) prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
} }
// Implements prometheus.Collector // Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)
if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}
// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
for key, m := range p.metrics { p.Expire()
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key) for name, family := range p.fam {
} else { // Get list of all labels on MetricFamily
ch <- m.Metric var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
} }
} }
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}
ch <- metric
}
}
}
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
} }
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
if len(metrics) == 0 { now := p.now()
return nil
}
for _, point := range metrics { for _, point := range metrics {
key := point.Name() tags := point.Tags()
key = invalidNameCharRE.ReplaceAllString(key, "_") vt := valueType(point.Type())
sampleID := CreateSampleID(tags)
// convert tags into prometheus labels labels := make(map[string]string)
var labels []string for k, v := range tags {
l := prometheus.Labels{} labels[sanitize(k)] = sanitize(v)
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
} }
// Get a type if it's available, defaulting to Untyped for fn, fv := range point.Fields() {
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
}
for n, val := range point.Fields() {
// Ignore string and bool fields. // Ignore string and bool fields.
switch val.(type) { var value float64
case string: switch fv := fv.(type) {
continue
case bool:
continue
}
// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
} else {
mname = fmt.Sprintf("%s_%s", key, n)
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
// switch for field type
switch val := val.(type) {
case int64: case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val)) value = float64(fv)
case float64: case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val) value = fv
default: default:
continue continue
} }
if err != nil {
log.Printf("E! Error creating prometheus metric, "+ sample := &Sample{
"key: %s, labels: %v,\nerr: %s\n", Labels: labels,
mname, l, err.Error()) Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
} }
p.metrics[desc.String()] = &MetricWithExpiration{ // Special handling of value field; supports passthrough from
Metric: metric, // the prometheus input.
Expiration: time.Now().Add(p.ExpirationInterval.Duration), var mname string
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
} }
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
fam.Samples[sampleID] = sample
} }
} }
return nil return nil
@@ -187,6 +266,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output { outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{ return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60}, ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
} }
}) })
} }

View File

@@ -4,16 +4,314 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs/prometheus" prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
) )
func setUnixTime(client *PrometheusClient, sec int64) {
client.now = func() time.Time {
return time.Unix(sec, 0)
}
}
// NewClient initializes a PrometheusClient.
func NewClient() *PrometheusClient {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
}
func TestWrite_Basic(t *testing.T) {
now := time.Now()
pt1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{
pt1,
}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{}, fam.LabelSet)
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
require.True(t, ok)
require.Equal(t, 0.0, sample.Value)
require.True(t, now.Before(sample.Expiration))
}
func TestWrite_IntField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 42.0, v.Value)
}
}
func TestWrite_FieldNotValue(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"howdy": 0.0},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_howdy"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 0.0, v.Value)
}
}
func TestWrite_SkipNonNumberField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": "howdy"},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
_, ok := client.fam["foo"]
require.False(t, ok)
}
func TestWrite_Counter(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.CounterValue, fam.ValueType)
}
func TestWrite_Sanitize(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo.bar",
map[string]string{"tag-with-dash": "localhost.local"},
map[string]interface{}{"field-with-dash": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_bar_field_with_dash"]
require.True(t, ok)
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, map[string]string{
"tag_with_dash": "localhost_local"}, sample1.Labels)
}
func TestWrite_Gauge(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Gauge)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
}
func TestWrite_MixedValueType(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now,
telegraf.Counter)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
now,
telegraf.Gauge)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 1, len(fam.Samples))
}
func TestWrite_Tags(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
now)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 1.0, sample1.Value)
require.True(t, now.Before(sample1.Expiration))
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
require.True(t, ok)
require.Equal(t, 2.0, sample2.Value)
require.True(t, now.Before(sample2.Expiration))
}
func TestExpire(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"bar",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
require.Equal(t, 2, len(client.fam))
client.Expire()
require.Equal(t, 1, len(client.fam))
}
func TestExpire_TagsNoDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
}
func TestExpire_TagsWithDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
}
var pTesting *PrometheusClient var pTesting *PrometheusClient
func TestPrometheusWritePointEmptyTag(t *testing.T) { func TestPrometheusWritePointEmptyTag(t *testing.T) {
@@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
} }
} }
func TestPrometheusExpireOldMetrics(t *testing.T) { func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pClient, p, err := setupPrometheus()
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{pt1}
require.NoError(t, pClient.Write(metrics))
for _, m := range pClient.metrics {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
now)
var metrics2 = []telegraf.Metric{pt2}
require.NoError(t, pClient.Write(metrics2))
expected := []struct {
name string
value float64
tags map[string]string
}{
{"test_point_2", 1.0, tags},
}
var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))
for _, e := range expected {
acc.AssertContainsFields(t, e.name,
map[string]interface{}{"value": e.value})
}
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
// Confirm that it's not in the PrometheusClient map anymore
assert.Equal(t, 1, len(pClient.metrics))
}
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
if pTesting == nil { if pTesting == nil {
pTesting = &PrometheusClient{Listen: "localhost:9127"} pTesting = NewClient()
pTesting.Listen = "localhost:9127"
err := pTesting.Start() err := pTesting.Start()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
} else { } else {
pTesting.metrics = make(map[string]*MetricWithExpiration) pTesting.fam = make(map[string]*MetricFamily)
} }
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
p := &prometheus.Prometheus{ p := &prometheus_input.Prometheus{
Urls: []string{"http://localhost:9127/metrics"}, Urls: []string{"http://localhost:9127/metrics"},
} }