Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f419e9a0d | ||
|
|
958d689274 | ||
|
|
2370c04dd7 | ||
|
|
4978c8ccb9 | ||
|
|
abe5e28575 | ||
|
|
34da5a6e45 | ||
|
|
4de1bf29cc | ||
|
|
f23845afef | ||
|
|
721ed8b3f2 | ||
|
|
385c114d00 |
10
CHANGELOG.md
10
CHANGELOG.md
@@ -1,4 +1,12 @@
|
||||
## v1.3.1 [unreleased]
|
||||
## v1.3.2 [2017-06-14]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#2862](https://github.com/influxdata/telegraf/issues/2862): Fix InfluxDB UDP metric splitting.
|
||||
- [#2888](https://github.com/influxdata/telegraf/issues/2888): Fix mongodb/leofs urls without scheme.
|
||||
- [#2822](https://github.com/influxdata/telegraf/issues/2822): Fix inconsistent label dimensions in prometheus output.
|
||||
|
||||
## v1.3.1 [2017-05-31]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
||||
@@ -218,7 +218,7 @@ func (m *metric) SerializeTo(dst []byte) int {
|
||||
}
|
||||
|
||||
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||
if m.Len() < maxSize {
|
||||
if m.Len() <= maxSize {
|
||||
return []telegraf.Metric{m}
|
||||
}
|
||||
var out []telegraf.Metric
|
||||
@@ -248,7 +248,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||
|
||||
// if true, then we need to create a metric _not_ including the currently
|
||||
// selected field
|
||||
if len(m.fields[i:j])+len(fields)+constant > maxSize {
|
||||
if len(m.fields[i:j])+len(fields)+constant >= maxSize {
|
||||
// if false, then we'll create a metric including the currently
|
||||
// selected field anyways. This means that the given maxSize is too
|
||||
// small for a single field to fit.
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewMetric(t *testing.T) {
|
||||
@@ -458,7 +459,7 @@ func TestSplitMetric(t *testing.T) {
|
||||
assert.Len(t, split70, 3)
|
||||
|
||||
split60 := m.Split(60)
|
||||
assert.Len(t, split60, 4)
|
||||
assert.Len(t, split60, 5)
|
||||
}
|
||||
|
||||
// test splitting metric into various max lengths
|
||||
@@ -578,6 +579,42 @@ func TestSplitMetric_OneField(t *testing.T) {
|
||||
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
||||
}
|
||||
|
||||
func TestSplitMetric_ExactSize(t *testing.T) {
|
||||
now := time.Unix(0, 1480940990034083306)
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"float": float64(100001),
|
||||
"int": int64(100001),
|
||||
"bool": true,
|
||||
"false": false,
|
||||
"string": "test",
|
||||
}
|
||||
m, err := New("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
actual := m.Split(m.Len())
|
||||
// check that no copy was made
|
||||
require.Equal(t, &m, &actual[0])
|
||||
}
|
||||
|
||||
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
|
||||
now := time.Unix(0, 1480940990034083306)
|
||||
tags := map[string]string{
|
||||
"host": "localhost",
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"float": float64(100001),
|
||||
"int": int64(100001),
|
||||
"bool": true,
|
||||
"false": false,
|
||||
}
|
||||
m, err := New("cpu", tags, fields, now)
|
||||
assert.NoError(t, err)
|
||||
actual := m.Split(m.Len() - 1)
|
||||
require.Equal(t, 2, len(actual))
|
||||
}
|
||||
|
||||
func TestNewMetricAggregate(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||
// this for-loop is the sunny-day scenario, where we are given a
|
||||
// buffer that is large enough to hold at least a single metric.
|
||||
// all of the cases below it are edge-cases.
|
||||
if r.metrics[r.iM].Len() < len(p[i:]) {
|
||||
if r.metrics[r.iM].Len() <= len(p[i:]) {
|
||||
i += r.metrics[r.iM].SerializeTo(p[i:])
|
||||
} else {
|
||||
break
|
||||
@@ -76,7 +76,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||
if len(tmp) > 1 {
|
||||
r.splitMetrics = tmp
|
||||
r.state = split
|
||||
if r.splitMetrics[0].Len() < len(p) {
|
||||
if r.splitMetrics[0].Len() <= len(p) {
|
||||
i += r.splitMetrics[0].SerializeTo(p)
|
||||
r.iSM = 1
|
||||
} else {
|
||||
@@ -99,7 +99,7 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
case split:
|
||||
if r.splitMetrics[r.iSM].Len() < len(p) {
|
||||
if r.splitMetrics[r.iSM].Len() <= len(p) {
|
||||
// write the current split metric
|
||||
i += r.splitMetrics[r.iSM].SerializeTo(p)
|
||||
r.iSM++
|
||||
@@ -131,6 +131,10 @@ func (r *reader) Read(p []byte) (n int, err error) {
|
||||
r.iSM++
|
||||
if r.iSM == len(r.splitMetrics) {
|
||||
r.iM++
|
||||
if r.iM == len(r.metrics) {
|
||||
r.state = done
|
||||
return i, io.EOF
|
||||
}
|
||||
r.state = normal
|
||||
} else {
|
||||
r.state = split
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkMetricReader(b *testing.B) {
|
||||
@@ -116,6 +116,140 @@ func TestMetricReader_OverflowMetric(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test for when a metric is the same size as the buffer.
|
||||
//
|
||||
// Previously EOF would not be set until the next call to Read.
|
||||
func TestMetricReader_MetricSizeEqualsBufferSize(t *testing.T) {
|
||||
ts := time.Unix(1481032190, 0)
|
||||
m1, _ := New("foo", map[string]string{},
|
||||
map[string]interface{}{"a": int64(1)}, ts)
|
||||
metrics := []telegraf.Metric{m1}
|
||||
|
||||
r := NewReader(metrics)
|
||||
buf := make([]byte, m1.Len())
|
||||
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||
if n == 0 {
|
||||
require.Equal(t, io.EOF, err)
|
||||
break
|
||||
}
|
||||
// Lines should be terminated with a LF
|
||||
if err == io.EOF {
|
||||
require.Equal(t, uint8('\n'), buf[n-1])
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test for when a metric requires to be split and one of the
|
||||
// split metrics is exactly the size of the buffer.
|
||||
//
|
||||
// Previously an empty string would be returned on the next Read without error,
|
||||
// and then next Read call would panic.
|
||||
func TestMetricReader_SplitWithExactLengthSplit(t *testing.T) {
|
||||
ts := time.Unix(1481032190, 0)
|
||||
m1, _ := New("foo", map[string]string{},
|
||||
map[string]interface{}{"a": int64(1), "bb": int64(2)}, ts)
|
||||
metrics := []telegraf.Metric{m1}
|
||||
|
||||
r := NewReader(metrics)
|
||||
buf := make([]byte, 30)
|
||||
|
||||
// foo a=1i,bb=2i 1481032190000000000\n // len 35
|
||||
//
|
||||
// Requires this specific split order:
|
||||
// foo a=1i 1481032190000000000\n // len 29
|
||||
// foo bb=2i 1481032190000000000\n // len 30
|
||||
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||
if n == 0 {
|
||||
require.Equal(t, io.EOF, err)
|
||||
break
|
||||
}
|
||||
// Lines should be terminated with a LF
|
||||
if err == io.EOF {
|
||||
require.Equal(t, uint8('\n'), buf[n-1])
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Regresssion test for when a metric requires to be split and one of the
|
||||
// split metrics is larger than the buffer.
|
||||
//
|
||||
// Previously the metric index would be set incorrectly causing a panic.
|
||||
func TestMetricReader_SplitOverflowOversized(t *testing.T) {
|
||||
ts := time.Unix(1481032190, 0)
|
||||
m1, _ := New("foo", map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": int64(1),
|
||||
"bbb": int64(2),
|
||||
}, ts)
|
||||
metrics := []telegraf.Metric{m1}
|
||||
|
||||
r := NewReader(metrics)
|
||||
buf := make([]byte, 30)
|
||||
|
||||
// foo a=1i,bbb=2i 1481032190000000000\n // len 36
|
||||
//
|
||||
// foo a=1i 1481032190000000000\n // len 29
|
||||
// foo bbb=2i 1481032190000000000\n // len 31
|
||||
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||
if n == 0 {
|
||||
require.Equal(t, io.EOF, err)
|
||||
break
|
||||
}
|
||||
// Lines should be terminated with a LF
|
||||
if err == io.EOF {
|
||||
require.Equal(t, uint8('\n'), buf[n-1])
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Regresssion test for when a split metric exactly fits in the buffer.
|
||||
//
|
||||
// Previously the metric would be overflow split when not required.
|
||||
func TestMetricReader_SplitOverflowUneeded(t *testing.T) {
|
||||
ts := time.Unix(1481032190, 0)
|
||||
m1, _ := New("foo", map[string]string{},
|
||||
map[string]interface{}{"a": int64(1), "b": int64(2)}, ts)
|
||||
metrics := []telegraf.Metric{m1}
|
||||
|
||||
r := NewReader(metrics)
|
||||
buf := make([]byte, 29)
|
||||
|
||||
// foo a=1i,b=2i 1481032190000000000\n // len 34
|
||||
//
|
||||
// foo a=1i 1481032190000000000\n // len 29
|
||||
// foo b=2i 1481032190000000000\n // len 29
|
||||
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
// Should never read 0 bytes unless at EOF, unless input buffer is 0 length
|
||||
if n == 0 {
|
||||
require.Equal(t, io.EOF, err)
|
||||
break
|
||||
}
|
||||
// Lines should be terminated with a LF
|
||||
if err == io.EOF {
|
||||
require.Equal(t, uint8('\n'), buf[n-1])
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricReader_OverflowMultipleMetrics(t *testing.T) {
|
||||
ts := time.Unix(1481032190, 0)
|
||||
m, _ := New("foo", map[string]string{},
|
||||
@@ -485,3 +619,17 @@ func TestMetricReader_SplitMetricChangingBuffer2(t *testing.T) {
|
||||
assert.Equal(t, test.err, err, test.expRegex)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricRoundtrip(t *testing.T) {
|
||||
const lp = `nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=netstat,sr=database IpExtInBcastOctets=12570626154i,IpExtInBcastPkts=95541226i,IpExtInCEPkts=0i,IpExtInCsumErrors=0i,IpExtInECT0Pkts=55674i,IpExtInECT1Pkts=0i,IpExtInMcastOctets=5928296i,IpExtInMcastPkts=174365i,IpExtInNoECTPkts=17965863529i,IpExtInNoRoutes=20i,IpExtInOctets=3334866321815i,IpExtInTruncatedPkts=0i,IpExtOutBcastOctets=0i,IpExtOutBcastPkts=0i,IpExtOutMcastOctets=0i,IpExtOutMcastPkts=0i,IpExtOutOctets=31397892391399i,TcpExtArpFilter=0i,TcpExtBusyPollRxPackets=0i,TcpExtDelayedACKLocked=14094i,TcpExtDelayedACKLost=302083i,TcpExtDelayedACKs=55486507i,TcpExtEmbryonicRsts=11879i,TcpExtIPReversePathFilter=0i,TcpExtListenDrops=1736i,TcpExtListenOverflows=0i,TcpExtLockDroppedIcmps=0i,TcpExtOfoPruned=0i,TcpExtOutOfWindowIcmps=8i,TcpExtPAWSActive=0i,TcpExtPAWSEstab=974i,TcpExtPAWSPassive=0i,TcpExtPruneCalled=0i,TcpExtRcvPruned=0i,TcpExtSyncookiesFailed=12593i,TcpExtSyncookiesRecv=0i,TcpExtSyncookiesSent=0i,TcpExtTCPACKSkippedChallenge=0i,TcpExtTCPACKSkippedFinWait2=0i,TcpExtTCPACKSkippedPAWS=806i,TcpExtTCPACKSkippedSeq=519i,TcpExtTCPACKSkippedSynRecv=0i,TcpExtTCPACKSkippedTimeWait=0i,TcpExtTCPAbortFailed=0i,TcpExtTCPAbortOnClose=22i,TcpExtTCPAbortOnData=36593i,TcpExtTCPAbortOnLinger=0i,TcpExtTCPAbortOnMemory=0i,TcpExtTCPAbortOnTimeout=674i,TcpExtTCPAutoCorking=494253233i,TcpExtTCPBacklogDrop=0i,TcpExtTCPChallengeACK=281i,TcpExtTCPDSACKIgnoredNoUndo=93354i,TcpExtTCPDSACKIgnoredOld=336i,TcpExtTCPDSACKOfoRecv=0i,TcpExtTCPDSACKOfoSent=7i,TcpExtTCPDSACKOldSent=302073i,TcpExtTCPDSACKRecv=215884i,TcpExtTCPDSACKUndo=7633i,TcpExtTCPDeferAcceptDrop=0i,TcpExtTCPDirectCopyFromBacklog=0i,TcpExtTCPDirectCopyFromPrequeue=0i,TcpExtTCPFACKReorder=1320i,TcpExtTCPFastOpenActive=0i,TcpExtTCPFastOpenActiveFail=0i,TcpExtTCPFastOpenCookieReqd=0i,TcpExtTCPFastOpenListenOverflow=0i,TcpExtTCPFastOpenPassive=0i,TcpExtTCPFastOpenPassiveFail=0i,TcpExtTCPFastRetrans=350681i,TcpExtTCPForwardRetrans=142168i,TcpExtTCPFromZeroWindowAdv=4317i,TcpExtTCPFullUndo=29502i,TcpExtTCPHPAcks=10267073000i,TcpExtTCPHPHits=5629837098i,TcpExtTCPHPHitsToUser=0i,TcpExtTCPHystartDelayCwnd=285127i,TcpExtTCPHystartDelayDetect=12318i,TcpExtTCPHystartTrainCwnd=69160570i,TcpExtTCPHystartTrainDetect=3315799i,TcpExtTCPLossFailures=109i,TcpExtTCPLossProbeRecovery=110819i,TcpExtTCPLossProbes=233995i,TcpExtTCPLossUndo=5276i,TcpExtTCPLostRetransmit=397i,TcpExtTCPMD5NotFound=0i,TcpExtTCPMD5Unexpected=0i,TcpExtTCPMemoryPressures=0i,TcpExtTCPMinTTLDrop=0i,TcpExtTCPOFODrop=0i,TcpExtTCPOFOMerge=7i,TcpExtTCPOFOQueue=15196i,TcpExtTCPOrigDataSent=29055119435i,TcpExtTCPPartialUndo=21320i,TcpExtTCPPrequeueDropped=0i,TcpExtTCPPrequeued=0i,TcpExtTCPPureAcks=1236441827i,TcpExtTCPRcvCoalesce=225590473i,TcpExtTCPRcvCollapsed=0i,TcpExtTCPRenoFailures=0i,TcpExtTCPRenoRecovery=0i,TcpExtTCPRenoRecoveryFail=0i,TcpExtTCPRenoReorder=0i,TcpExtTCPReqQFullDoCookies=0i,TcpExtTCPReqQFullDrop=0i,TcpExtTCPRetransFail=41i,TcpExtTCPSACKDiscard=0i,TcpExtTCPSACKReneging=0i,TcpExtTCPSACKReorder=4307i,TcpExtTCPSYNChallenge=244i,TcpExtTCPSackFailures=1698i,TcpExtTCPSackMerged=184668i,TcpExtTCPSackRecovery=97369i,TcpExtTCPSackRecoveryFail=381i,TcpExtTCPSackShiftFallback=2697079i,TcpExtTCPSackShifted=760299i,TcpExtTCPSchedulerFailed=0i,TcpExtTCPSlowStartRetrans=9276i,TcpExtTCPSpuriousRTOs=959i,TcpExtTCPSpuriousRtxHostQueues=2973i,TcpExtTCPSynRetrans=200970i,TcpExtTCPTSReorder=15221i,TcpExtTCPTimeWaitOverflow=0i,TcpExtTCPTimeouts=70127i,TcpExtTCPToZeroWindowAdv=4317i,TcpExtTCPWantZeroWindowAdv=2133i,TcpExtTW=24809813i,TcpExtTWKilled=0i,TcpExtTWRecycled=0i 1496460785000000000
|
||||
nstat,bu=linux,cls=server,dc=cer,env=production,host=hostname,name=snmp,sr=database IcmpInAddrMaskReps=0i,IcmpInAddrMasks=90i,IcmpInCsumErrors=0i,IcmpInDestUnreachs=284401i,IcmpInEchoReps=9i,IcmpInEchos=1761912i,IcmpInErrors=407i,IcmpInMsgs=2047767i,IcmpInParmProbs=0i,IcmpInRedirects=0i,IcmpInSrcQuenchs=0i,IcmpInTimeExcds=46i,IcmpInTimestampReps=0i,IcmpInTimestamps=1309i,IcmpMsgInType0=9i,IcmpMsgInType11=46i,IcmpMsgInType13=1309i,IcmpMsgInType17=90i,IcmpMsgInType3=284401i,IcmpMsgInType8=1761912i,IcmpMsgOutType0=1761912i,IcmpMsgOutType14=1248i,IcmpMsgOutType3=108709i,IcmpMsgOutType8=9i,IcmpOutAddrMaskReps=0i,IcmpOutAddrMasks=0i,IcmpOutDestUnreachs=108709i,IcmpOutEchoReps=1761912i,IcmpOutEchos=9i,IcmpOutErrors=0i,IcmpOutMsgs=1871878i,IcmpOutParmProbs=0i,IcmpOutRedirects=0i,IcmpOutSrcQuenchs=0i,IcmpOutTimeExcds=0i,IcmpOutTimestampReps=1248i,IcmpOutTimestamps=0i,IpDefaultTTL=64i,IpForwDatagrams=0i,IpForwarding=2i,IpFragCreates=0i,IpFragFails=0i,IpFragOKs=0i,IpInAddrErrors=0i,IpInDelivers=17658795773i,IpInDiscards=0i,IpInHdrErrors=0i,IpInReceives=17659269339i,IpInUnknownProtos=0i,IpOutDiscards=236976i,IpOutNoRoutes=1009i,IpOutRequests=23466783734i,IpReasmFails=0i,IpReasmOKs=0i,IpReasmReqds=0i,IpReasmTimeout=0i,TcpActiveOpens=23308977i,TcpAttemptFails=3757543i,TcpCurrEstab=280i,TcpEstabResets=184792i,TcpInCsumErrors=0i,TcpInErrs=232i,TcpInSegs=17536573089i,TcpMaxConn=-1i,TcpOutRsts=4051451i,TcpOutSegs=29836254873i,TcpPassiveOpens=176546974i,TcpRetransSegs=878085i,TcpRtoAlgorithm=1i,TcpRtoMax=120000i,TcpRtoMin=200i,UdpInCsumErrors=0i,UdpInDatagrams=24441661i,UdpInErrors=0i,UdpLiteInCsumErrors=0i,UdpLiteInDatagrams=0i,UdpLiteInErrors=0i,UdpLiteNoPorts=0i,UdpLiteOutDatagrams=0i,UdpLiteRcvbufErrors=0i,UdpLiteSndbufErrors=0i,UdpNoPorts=17660i,UdpOutDatagrams=51807896i,UdpRcvbufErrors=0i,UdpSndbufErrors=236922i 1496460785000000000
|
||||
`
|
||||
metrics, err := Parse([]byte(lp))
|
||||
require.NoError(t, err)
|
||||
r := NewReader(metrics)
|
||||
buf := make([]byte, 128)
|
||||
_, err = r.Read(buf)
|
||||
require.NoError(t, err)
|
||||
metrics, err = Parse(buf)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package leofs
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
@@ -18,7 +19,7 @@ import (
|
||||
const oid = ".1.3.6.1.4.1.35450"
|
||||
|
||||
// For Manager Master
|
||||
const defaultEndpoint = "127.0.0.1:4020"
|
||||
const defaultEndpoint = "udp://127.0.0.1:4020"
|
||||
|
||||
type ServerType int
|
||||
|
||||
@@ -135,9 +136,9 @@ var serverTypeMapping = map[string]ServerType{
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## An array of URI to gather stats about LeoFS.
|
||||
## Specify an ip or hostname with port. ie 127.0.0.1:4020
|
||||
servers = ["127.0.0.1:4021"]
|
||||
## An array of URLs of the form:
|
||||
## "udp://" host [ ":" port]
|
||||
servers = ["udp://127.0.0.1:4020"]
|
||||
`
|
||||
|
||||
func (l *LeoFS) SampleConfig() string {
|
||||
@@ -154,17 +155,28 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, endpoint := range l.Servers {
|
||||
_, err := url.Parse(endpoint)
|
||||
for i, endpoint := range l.Servers {
|
||||
if !strings.HasPrefix(endpoint, "udp://") {
|
||||
// Preserve backwards compatibility for hostnames without a
|
||||
// scheme, broken in go 1.8. Remove in Telegraf 2.0
|
||||
endpoint = "udp://" + endpoint
|
||||
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", endpoint)
|
||||
l.Servers[i] = endpoint
|
||||
}
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err))
|
||||
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", endpoint, err))
|
||||
continue
|
||||
}
|
||||
port, err := retrieveTokenAfterColon(endpoint)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
if u.Host == "" {
|
||||
acc.AddError(fmt.Errorf("Unable to parse address %q", endpoint))
|
||||
continue
|
||||
}
|
||||
|
||||
port := u.Port()
|
||||
if port == "" {
|
||||
port = "4020"
|
||||
}
|
||||
st, ok := serverTypeMapping[port]
|
||||
if !ok {
|
||||
st = ServerTypeStorage
|
||||
|
||||
@@ -4,12 +4,12 @@
|
||||
|
||||
```toml
|
||||
[[inputs.mongodb]]
|
||||
## An array of URI to gather stats about. Specify an ip or hostname
|
||||
## with optional port add password. ie,
|
||||
## An array of URLs of the form:
|
||||
## "mongodb://" [user ":" pass "@"] host [ ":" port]
|
||||
## For example:
|
||||
## mongodb://user:auth_key@10.10.3.30:27017,
|
||||
## mongodb://10.10.3.33:18832,
|
||||
## 10.0.0.1:10000, etc.
|
||||
servers = ["127.0.0.1:27017"]
|
||||
servers = ["mongodb://127.0.0.1:27017"]
|
||||
gather_perdb_stats = false
|
||||
|
||||
## Optional SSL Config
|
||||
@@ -19,15 +19,8 @@
|
||||
## Use SSL but skip chain & host verification
|
||||
# insecure_skip_verify = false
|
||||
```
|
||||
|
||||
For authenticated mongodb instances use `mongodb://` connection URI
|
||||
|
||||
```toml
|
||||
[[inputs.mongodb]]
|
||||
servers = ["mongodb://username:password@10.XX.XX.XX:27101/mydatabase?authSource=admin"]
|
||||
```
|
||||
This connection uri may be different based on your environement and mongodb
|
||||
setup. If the user doesn't have the required privilege to execute serverStatus
|
||||
This connection uri may be different based on your environment and mongodb
|
||||
setup. If the user doesn't have the required privilege to execute serverStatus
|
||||
command the you will get this error on telegraf
|
||||
|
||||
```
|
||||
|
||||
@@ -4,8 +4,10 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -37,12 +39,12 @@ type Ssl struct {
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## An array of URI to gather stats about. Specify an ip or hostname
|
||||
## with optional port add password. ie,
|
||||
## An array of URLs of the form:
|
||||
## "mongodb://" [user ":" pass "@"] host [ ":" port]
|
||||
## For example:
|
||||
## mongodb://user:auth_key@10.10.3.30:27017,
|
||||
## mongodb://10.10.3.33:18832,
|
||||
## 10.0.0.1:10000, etc.
|
||||
servers = ["127.0.0.1:27017"]
|
||||
servers = ["mongodb://127.0.0.1:27017"]
|
||||
gather_perdb_stats = false
|
||||
|
||||
## Optional SSL Config
|
||||
@@ -61,7 +63,7 @@ func (*MongoDB) Description() string {
|
||||
return "Read metrics from one or many MongoDB servers"
|
||||
}
|
||||
|
||||
var localhost = &url.URL{Host: "127.0.0.1:27017"}
|
||||
var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
|
||||
|
||||
// Reads stats from all configured servers accumulates stats.
|
||||
// Returns one of the errors encountered while gather stats (if any).
|
||||
@@ -72,19 +74,25 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, serv := range m.Servers {
|
||||
for i, serv := range m.Servers {
|
||||
if !strings.HasPrefix(serv, "mongodb://") {
|
||||
// Preserve backwards compatibility for hostnames without a
|
||||
// scheme, broken in go 1.8. Remove in Telegraf 2.0
|
||||
serv = "mongodb://" + serv
|
||||
log.Printf("W! [inputs.mongodb] Using %q as connection URL; please update your configuration to use an URL", serv)
|
||||
m.Servers[i] = serv
|
||||
}
|
||||
|
||||
u, err := url.Parse(serv)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err))
|
||||
acc.AddError(fmt.Errorf("Unable to parse address %q: %s", serv, err))
|
||||
continue
|
||||
} else if u.Scheme == "" {
|
||||
u.Scheme = "mongodb"
|
||||
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||
u.Host = serv
|
||||
if u.Path == u.Host {
|
||||
u.Path = ""
|
||||
}
|
||||
}
|
||||
if u.Host == "" {
|
||||
acc.AddError(fmt.Errorf("Unable to parse address %q", serv))
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(srv *Server) {
|
||||
defer wg.Done()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
)
|
||||
@@ -82,10 +83,28 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
|
||||
if err != io.EOF && err != nil {
|
||||
return totaln, err
|
||||
}
|
||||
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||
totaln += nW
|
||||
if err != nil {
|
||||
return totaln, err
|
||||
|
||||
if c.buffer[nR-1] == uint8('\n') {
|
||||
nW, err := c.conn.Write(c.buffer[0:nR])
|
||||
totaln += nW
|
||||
if err != nil {
|
||||
return totaln, err
|
||||
}
|
||||
} else {
|
||||
log.Printf("E! Could not fit point into UDP payload; dropping")
|
||||
// Scan forward until next line break to realign.
|
||||
for {
|
||||
nR, err := r.Read(c.buffer)
|
||||
if nR == 0 {
|
||||
break
|
||||
}
|
||||
if err != io.EOF && err != nil {
|
||||
return totaln, err
|
||||
}
|
||||
if c.buffer[nR-1] == uint8('\n') {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return totaln, nil
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -10,6 +9,7 @@ import (
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUDPClient(t *testing.T) {
|
||||
@@ -72,63 +72,7 @@ func TestUDPClient_Write(t *testing.T) {
|
||||
pkt := <-packets
|
||||
assert.Equal(t, "cpu value=99\n", pkt)
|
||||
|
||||
metrics := `cpu value=99
|
||||
cpu value=55
|
||||
cpu value=44
|
||||
cpu value=101
|
||||
cpu value=91
|
||||
cpu value=92
|
||||
`
|
||||
// test sending packet with 6 metrics in a stream.
|
||||
reader := bytes.NewReader([]byte(metrics))
|
||||
// contentLength is ignored:
|
||||
n, err = client.WriteStream(reader, 10)
|
||||
assert.Equal(t, n, len(metrics))
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "cpu value=99\ncpu value=55\ncpu value=44\ncpu value=101\ncpu value=91\ncpu value=92\n", pkt)
|
||||
|
||||
//
|
||||
// Test that UDP packets get broken up properly:
|
||||
config2 := UDPConfig{
|
||||
URL: "udp://localhost:8199",
|
||||
PayloadSize: 25,
|
||||
}
|
||||
client2, err := NewUDP(config2)
|
||||
assert.NoError(t, err)
|
||||
|
||||
wp := WriteParams{}
|
||||
|
||||
//
|
||||
// Using Write():
|
||||
buf := []byte(metrics)
|
||||
n, err = client2.WriteWithParams(buf, wp)
|
||||
assert.Equal(t, n, len(metrics))
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "=92\n", pkt)
|
||||
|
||||
//
|
||||
// Using WriteStream():
|
||||
reader = bytes.NewReader([]byte(metrics))
|
||||
n, err = client2.WriteStreamWithParams(reader, 10, wp)
|
||||
assert.Equal(t, n, len(metrics))
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "cpu value=99\ncpu value=55", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "\ncpu value=44\ncpu value=1", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "01\ncpu value=91\ncpu value", pkt)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "=92\n", pkt)
|
||||
|
||||
//
|
||||
// Using WriteStream() & a metric.Reader:
|
||||
config3 := UDPConfig{
|
||||
@@ -159,4 +103,27 @@ cpu value=92
|
||||
assert.Equal(t, "test value=1.1 1484142942000000000\n", pkt)
|
||||
|
||||
assert.NoError(t, client.Close())
|
||||
|
||||
config = UDPConfig{
|
||||
URL: "udp://localhost:8199",
|
||||
PayloadSize: 40,
|
||||
}
|
||||
client4, err := NewUDP(config)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ts := time.Unix(1484142943, 0)
|
||||
m1, _ = metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"this_is_a_very_long_field_name": 1.1}, ts)
|
||||
m2, _ = metric.New("test", map[string]string{},
|
||||
map[string]interface{}{"value": 1.1}, ts)
|
||||
ms = []telegraf.Metric{m1, m2}
|
||||
reader := metric.NewReader(ms)
|
||||
n, err = client4.WriteStream(reader, 0)
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, 35, n)
|
||||
assert.NoError(t, err)
|
||||
pkt = <-packets
|
||||
assert.Equal(t, "test value=1.1 1484142943000000000\n", pkt)
|
||||
|
||||
assert.NoError(t, client4.Close())
|
||||
}
|
||||
|
||||
@@ -46,8 +46,7 @@ type InfluxDB struct {
|
||||
// Precision is only here for legacy support. It will be ignored.
|
||||
Precision string
|
||||
|
||||
clients []client.Client
|
||||
splitPayload bool
|
||||
clients []client.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -115,7 +114,6 @@ func (i *InfluxDB) Connect() error {
|
||||
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
|
||||
}
|
||||
i.clients = append(i.clients, c)
|
||||
i.splitPayload = true
|
||||
default:
|
||||
// If URL doesn't start with "udp", assume HTTP client
|
||||
config := client.HTTPConfig{
|
||||
@@ -166,22 +164,9 @@ func (i *InfluxDB) Description() string {
|
||||
return "Configuration for influxdb server to send metrics to"
|
||||
}
|
||||
|
||||
func (i *InfluxDB) split(metrics []telegraf.Metric) []telegraf.Metric {
|
||||
if !i.splitPayload {
|
||||
return metrics
|
||||
}
|
||||
|
||||
split := make([]telegraf.Metric, 0)
|
||||
for _, m := range metrics {
|
||||
split = append(split, m.Split(i.UDPPayload)...)
|
||||
}
|
||||
return split
|
||||
}
|
||||
|
||||
// Write will choose a random server in the cluster to write to until a successful write
|
||||
// occurs, logging each unsuccessful. If all servers fail, return error.
|
||||
func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||
metrics = i.split(metrics)
|
||||
|
||||
bufsize := 0
|
||||
for _, m := range metrics {
|
||||
|
||||
@@ -6,10 +6,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb/client"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
@@ -63,35 +60,6 @@ func TestUDPInflux(t *testing.T) {
|
||||
require.NoError(t, i.Close())
|
||||
}
|
||||
|
||||
func TestBasicSplit(t *testing.T) {
|
||||
c := &MockClient{}
|
||||
i := InfluxDB{
|
||||
clients: []client.Client{c},
|
||||
UDPPayload: 50,
|
||||
splitPayload: true,
|
||||
}
|
||||
|
||||
// Input metrics:
|
||||
// test1,tag1=value1 value1=1 value2=2 1257894000000000000\n
|
||||
//
|
||||
// Split metrics:
|
||||
// test1,tag1=value1 value1=1 1257894000000000000\n
|
||||
// test1,tag1=value1 value2=2 1257894000000000000\n
|
||||
m, err := metric.New("test1",
|
||||
map[string]string{"tag1": "value1"},
|
||||
map[string]interface{}{"value1": 1.0, "value2": 2.0},
|
||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := []telegraf.Metric{m}
|
||||
err = i.Write(metrics)
|
||||
require.Equal(t, 1, c.writeStreamCalled)
|
||||
require.Equal(t, 94, c.contentLength)
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestHTTPInflux(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -17,19 +19,40 @@ import (
|
||||
|
||||
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||
|
||||
type MetricWithExpiration struct {
|
||||
Metric prometheus.Metric
|
||||
// SampleID uniquely identifies a Sample
|
||||
type SampleID string
|
||||
|
||||
// Sample represents the current value of a series.
|
||||
type Sample struct {
|
||||
// Labels are the Prometheus labels.
|
||||
Labels map[string]string
|
||||
// Value is the value in the Prometheus output.
|
||||
Value float64
|
||||
// Expiration is the deadline that this Sample is valid until.
|
||||
Expiration time.Time
|
||||
}
|
||||
|
||||
// MetricFamily contains the data required to build valid prometheus Metrics.
|
||||
type MetricFamily struct {
|
||||
// Samples are the Sample belonging to this MetricFamily.
|
||||
Samples map[SampleID]*Sample
|
||||
// Type of the Value.
|
||||
ValueType prometheus.ValueType
|
||||
// LabelSet is the label counts for all Samples.
|
||||
LabelSet map[string]int
|
||||
}
|
||||
|
||||
type PrometheusClient struct {
|
||||
Listen string
|
||||
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
||||
server *http.Server
|
||||
|
||||
metrics map[string]*MetricWithExpiration
|
||||
server *http.Server
|
||||
|
||||
sync.Mutex
|
||||
// fam is the non-expired MetricFamily by Prometheus metric name.
|
||||
fam map[string]*MetricFamily
|
||||
// now returns the current time.
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
@@ -41,7 +64,6 @@ var sampleConfig = `
|
||||
`
|
||||
|
||||
func (p *PrometheusClient) Start() error {
|
||||
p.metrics = make(map[string]*MetricWithExpiration)
|
||||
prometheus.Register(p)
|
||||
|
||||
if p.Listen == "" {
|
||||
@@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
|
||||
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
|
||||
}
|
||||
|
||||
// Implements prometheus.Collector
|
||||
// Expire removes Samples that have expired.
|
||||
func (p *PrometheusClient) Expire() {
|
||||
now := p.now()
|
||||
for name, family := range p.fam {
|
||||
for key, sample := range family.Samples {
|
||||
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
|
||||
for k, _ := range sample.Labels {
|
||||
family.LabelSet[k]--
|
||||
}
|
||||
delete(family.Samples, key)
|
||||
|
||||
if len(family.Samples) == 0 {
|
||||
delete(p.fam, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector
|
||||
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
for key, m := range p.metrics {
|
||||
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
|
||||
delete(p.metrics, key)
|
||||
} else {
|
||||
ch <- m.Metric
|
||||
p.Expire()
|
||||
|
||||
for name, family := range p.fam {
|
||||
// Get list of all labels on MetricFamily
|
||||
var labelNames []string
|
||||
for k, v := range family.LabelSet {
|
||||
if v > 0 {
|
||||
labelNames = append(labelNames, k)
|
||||
}
|
||||
}
|
||||
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
|
||||
|
||||
for _, sample := range family.Samples {
|
||||
// Get labels for this sample; unset labels will be set to the
|
||||
// empty string
|
||||
var labels []string
|
||||
for _, label := range labelNames {
|
||||
v := sample.Labels[label]
|
||||
labels = append(labels, v)
|
||||
}
|
||||
|
||||
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
|
||||
if err != nil {
|
||||
log.Printf("E! Error creating prometheus metric, "+
|
||||
"key: %s, labels: %v,\nerr: %s\n",
|
||||
name, labels, err.Error())
|
||||
}
|
||||
|
||||
ch <- metric
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sanitize(value string) string {
|
||||
return invalidNameCharRE.ReplaceAllString(value, "_")
|
||||
}
|
||||
|
||||
func valueType(tt telegraf.ValueType) prometheus.ValueType {
|
||||
switch tt {
|
||||
case telegraf.Counter:
|
||||
return prometheus.CounterValue
|
||||
case telegraf.Gauge:
|
||||
return prometheus.GaugeValue
|
||||
default:
|
||||
return prometheus.UntypedValue
|
||||
}
|
||||
}
|
||||
|
||||
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
|
||||
func CreateSampleID(tags map[string]string) SampleID {
|
||||
pairs := make([]string, 0, len(tags))
|
||||
for k, v := range tags {
|
||||
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
sort.Strings(pairs)
|
||||
return SampleID(strings.Join(pairs, ","))
|
||||
}
|
||||
|
||||
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
now := p.now()
|
||||
|
||||
for _, point := range metrics {
|
||||
key := point.Name()
|
||||
key = invalidNameCharRE.ReplaceAllString(key, "_")
|
||||
tags := point.Tags()
|
||||
vt := valueType(point.Type())
|
||||
sampleID := CreateSampleID(tags)
|
||||
|
||||
// convert tags into prometheus labels
|
||||
var labels []string
|
||||
l := prometheus.Labels{}
|
||||
for k, v := range point.Tags() {
|
||||
k = invalidNameCharRE.ReplaceAllString(k, "_")
|
||||
if len(k) == 0 {
|
||||
continue
|
||||
}
|
||||
labels = append(labels, k)
|
||||
l[k] = v
|
||||
labels := make(map[string]string)
|
||||
for k, v := range tags {
|
||||
labels[sanitize(k)] = sanitize(v)
|
||||
}
|
||||
|
||||
// Get a type if it's available, defaulting to Untyped
|
||||
var mType prometheus.ValueType
|
||||
switch point.Type() {
|
||||
case telegraf.Counter:
|
||||
mType = prometheus.CounterValue
|
||||
case telegraf.Gauge:
|
||||
mType = prometheus.GaugeValue
|
||||
default:
|
||||
mType = prometheus.UntypedValue
|
||||
}
|
||||
|
||||
for n, val := range point.Fields() {
|
||||
for fn, fv := range point.Fields() {
|
||||
// Ignore string and bool fields.
|
||||
switch val.(type) {
|
||||
case string:
|
||||
continue
|
||||
case bool:
|
||||
continue
|
||||
}
|
||||
|
||||
// sanitize the measurement name
|
||||
n = invalidNameCharRE.ReplaceAllString(n, "_")
|
||||
var mname string
|
||||
if n == "value" {
|
||||
mname = key
|
||||
} else {
|
||||
mname = fmt.Sprintf("%s_%s", key, n)
|
||||
}
|
||||
|
||||
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
|
||||
var metric prometheus.Metric
|
||||
var err error
|
||||
|
||||
// switch for field type
|
||||
switch val := val.(type) {
|
||||
var value float64
|
||||
switch fv := fv.(type) {
|
||||
case int64:
|
||||
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
|
||||
value = float64(fv)
|
||||
case float64:
|
||||
metric, err = prometheus.NewConstMetric(desc, mType, val)
|
||||
value = fv
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("E! Error creating prometheus metric, "+
|
||||
"key: %s, labels: %v,\nerr: %s\n",
|
||||
mname, l, err.Error())
|
||||
|
||||
sample := &Sample{
|
||||
Labels: labels,
|
||||
Value: value,
|
||||
Expiration: now.Add(p.ExpirationInterval.Duration),
|
||||
}
|
||||
|
||||
p.metrics[desc.String()] = &MetricWithExpiration{
|
||||
Metric: metric,
|
||||
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
|
||||
// Special handling of value field; supports passthrough from
|
||||
// the prometheus input.
|
||||
var mname string
|
||||
if fn == "value" {
|
||||
mname = sanitize(point.Name())
|
||||
} else {
|
||||
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||
}
|
||||
|
||||
var fam *MetricFamily
|
||||
var ok bool
|
||||
if fam, ok = p.fam[mname]; !ok {
|
||||
fam = &MetricFamily{
|
||||
Samples: make(map[SampleID]*Sample),
|
||||
ValueType: vt,
|
||||
LabelSet: make(map[string]int),
|
||||
}
|
||||
p.fam[mname] = fam
|
||||
} else {
|
||||
if fam.ValueType != vt {
|
||||
// Don't return an error since this would be a permanent error
|
||||
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for k, _ := range sample.Labels {
|
||||
fam.LabelSet[k]++
|
||||
}
|
||||
|
||||
fam.Samples[sampleID] = sample
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -187,6 +266,8 @@ func init() {
|
||||
outputs.Add("prometheus_client", func() telegraf.Output {
|
||||
return &PrometheusClient{
|
||||
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
||||
fam: make(map[string]*MetricFamily),
|
||||
now: time.Now,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,16 +4,314 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
||||
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setUnixTime(client *PrometheusClient, sec int64) {
|
||||
client.now = func() time.Time {
|
||||
return time.Unix(sec, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// NewClient initializes a PrometheusClient.
|
||||
func NewClient() *PrometheusClient {
|
||||
return &PrometheusClient{
|
||||
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
||||
fam: make(map[string]*MetricFamily),
|
||||
now: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrite_Basic(t *testing.T) {
|
||||
now := time.Now()
|
||||
pt1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 0.0},
|
||||
now)
|
||||
var metrics = []telegraf.Metric{
|
||||
pt1,
|
||||
}
|
||||
|
||||
client := NewClient()
|
||||
err = client.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
||||
require.Equal(t, map[string]int{}, fam.LabelSet)
|
||||
|
||||
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, 0.0, sample.Value)
|
||||
require.True(t, now.Before(sample.Expiration))
|
||||
}
|
||||
|
||||
func TestWrite_IntField(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 42},
|
||||
time.Now())
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
for _, v := range fam.Samples {
|
||||
require.Equal(t, 42.0, v.Value)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestWrite_FieldNotValue(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"howdy": 0.0},
|
||||
time.Now())
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo_howdy"]
|
||||
require.True(t, ok)
|
||||
for _, v := range fam.Samples {
|
||||
require.Equal(t, 0.0, v.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrite_SkipNonNumberField(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": "howdy"},
|
||||
time.Now())
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok := client.fam["foo"]
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestWrite_Counter(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 42},
|
||||
time.Now(),
|
||||
telegraf.Counter)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, prometheus.CounterValue, fam.ValueType)
|
||||
}
|
||||
|
||||
func TestWrite_Sanitize(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo.bar",
|
||||
map[string]string{"tag-with-dash": "localhost.local"},
|
||||
map[string]interface{}{"field-with-dash": 42},
|
||||
time.Now(),
|
||||
telegraf.Counter)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo_bar_field_with_dash"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
|
||||
|
||||
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, map[string]string{
|
||||
"tag_with_dash": "localhost_local"}, sample1.Labels)
|
||||
}
|
||||
|
||||
func TestWrite_Gauge(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 42},
|
||||
time.Now(),
|
||||
telegraf.Gauge)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
|
||||
}
|
||||
|
||||
func TestWrite_MixedValueType(t *testing.T) {
|
||||
now := time.Now()
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
now,
|
||||
telegraf.Counter)
|
||||
p2, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 2.0},
|
||||
now,
|
||||
telegraf.Gauge)
|
||||
var metrics = []telegraf.Metric{p1, p2}
|
||||
|
||||
client := NewClient()
|
||||
err = client.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, len(fam.Samples))
|
||||
}
|
||||
|
||||
func TestWrite_Tags(t *testing.T) {
|
||||
now := time.Now()
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
now)
|
||||
p2, err := metric.New(
|
||||
"foo",
|
||||
map[string]string{"host": "localhost"},
|
||||
map[string]interface{}{"value": 2.0},
|
||||
now)
|
||||
var metrics = []telegraf.Metric{p1, p2}
|
||||
|
||||
client := NewClient()
|
||||
err = client.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
||||
|
||||
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
||||
|
||||
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, 1.0, sample1.Value)
|
||||
require.True(t, now.Before(sample1.Expiration))
|
||||
|
||||
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, 2.0, sample2.Value)
|
||||
require.True(t, now.Before(sample2.Expiration))
|
||||
}
|
||||
|
||||
func TestExpire(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 0)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
p2, err := metric.New(
|
||||
"bar",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 2.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 1)
|
||||
err = client.Write([]telegraf.Metric{p2})
|
||||
|
||||
setUnixTime(client, 61)
|
||||
require.Equal(t, 2, len(client.fam))
|
||||
client.Expire()
|
||||
require.Equal(t, 1, len(client.fam))
|
||||
}
|
||||
|
||||
func TestExpire_TagsNoDecrement(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 0)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
p2, err := metric.New(
|
||||
"foo",
|
||||
map[string]string{"host": "localhost"},
|
||||
map[string]interface{}{"value": 2.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 1)
|
||||
err = client.Write([]telegraf.Metric{p2})
|
||||
|
||||
setUnixTime(client, 61)
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 2, len(fam.Samples))
|
||||
client.Expire()
|
||||
require.Equal(t, 1, len(fam.Samples))
|
||||
|
||||
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
||||
}
|
||||
|
||||
func TestExpire_TagsWithDecrement(t *testing.T) {
|
||||
client := NewClient()
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
map[string]string{"host": "localhost"},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 0)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
p2, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 2.0},
|
||||
time.Now())
|
||||
setUnixTime(client, 1)
|
||||
err = client.Write([]telegraf.Metric{p2})
|
||||
|
||||
setUnixTime(client, 61)
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 2, len(fam.Samples))
|
||||
client.Expire()
|
||||
require.Equal(t, 1, len(fam.Samples))
|
||||
|
||||
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
|
||||
}
|
||||
|
||||
var pTesting *PrometheusClient
|
||||
|
||||
func TestPrometheusWritePointEmptyTag(t *testing.T) {
|
||||
@@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrometheusExpireOldMetrics(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
pClient, p, err := setupPrometheus()
|
||||
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
|
||||
require.NoError(t, err)
|
||||
defer pClient.Stop()
|
||||
|
||||
now := time.Now()
|
||||
tags := make(map[string]string)
|
||||
pt1, _ := metric.New(
|
||||
"test_point_1",
|
||||
tags,
|
||||
map[string]interface{}{"value": 0.0},
|
||||
now)
|
||||
var metrics = []telegraf.Metric{pt1}
|
||||
require.NoError(t, pClient.Write(metrics))
|
||||
|
||||
for _, m := range pClient.metrics {
|
||||
m.Expiration = now.Add(time.Duration(-15) * time.Second)
|
||||
}
|
||||
|
||||
pt2, _ := metric.New(
|
||||
"test_point_2",
|
||||
tags,
|
||||
map[string]interface{}{"value": 1.0},
|
||||
now)
|
||||
var metrics2 = []telegraf.Metric{pt2}
|
||||
require.NoError(t, pClient.Write(metrics2))
|
||||
|
||||
expected := []struct {
|
||||
name string
|
||||
value float64
|
||||
tags map[string]string
|
||||
}{
|
||||
{"test_point_2", 1.0, tags},
|
||||
}
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
require.NoError(t, p.Gather(&acc))
|
||||
for _, e := range expected {
|
||||
acc.AssertContainsFields(t, e.name,
|
||||
map[string]interface{}{"value": e.value})
|
||||
}
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
|
||||
|
||||
// Confirm that it's not in the PrometheusClient map anymore
|
||||
assert.Equal(t, 1, len(pClient.metrics))
|
||||
}
|
||||
|
||||
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
|
||||
func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
|
||||
if pTesting == nil {
|
||||
pTesting = &PrometheusClient{Listen: "localhost:9127"}
|
||||
pTesting = NewClient()
|
||||
pTesting.Listen = "localhost:9127"
|
||||
err := pTesting.Start()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
pTesting.metrics = make(map[string]*MetricWithExpiration)
|
||||
pTesting.fam = make(map[string]*MetricFamily)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
|
||||
p := &prometheus.Prometheus{
|
||||
p := &prometheus_input.Prometheus{
|
||||
Urls: []string{"http://localhost:9127/metrics"},
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user