Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b6ffb344e | ||
|
|
c9d359f21b | ||
|
|
45ea321e4b | ||
|
|
853178c7dc | ||
|
|
b2c1d98cff |
@@ -4,9 +4,9 @@ machine:
|
||||
post:
|
||||
- sudo service zookeeper stop
|
||||
- go version
|
||||
- go version | grep 1.7.4 || sudo rm -rf /usr/local/go
|
||||
- wget https://storage.googleapis.com/golang/go1.7.4.linux-amd64.tar.gz
|
||||
- sudo tar -C /usr/local -xzf go1.7.4.linux-amd64.tar.gz
|
||||
- go version | grep 1.7.5 || sudo rm -rf /usr/local/go
|
||||
- wget https://storage.googleapis.com/golang/go1.7.5.linux-amd64.tar.gz
|
||||
- sudo tar -C /usr/local -xzf go1.7.5.linux-amd64.tar.gz
|
||||
- go version
|
||||
|
||||
dependencies:
|
||||
|
||||
@@ -90,6 +90,9 @@ func NewRunningOutput(
|
||||
// AddMetric adds a metric to the output. This function can also write cached
|
||||
// points if FlushBufferWhenFull is true.
|
||||
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
// Filter any tagexclude/taginclude parameters before adding metric
|
||||
if ro.Config.Filter.IsActive() {
|
||||
// In order to filter out tags, we need to create a new metric, since
|
||||
|
||||
@@ -75,6 +75,23 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddingNilMetric(t *testing.T) {
|
||||
conf := &OutputConfig{
|
||||
Filter: Filter{},
|
||||
}
|
||||
|
||||
m := &mockOutput{}
|
||||
ro := NewRunningOutput("test", m, conf, 1000, 10000)
|
||||
|
||||
ro.AddMetric(nil)
|
||||
ro.AddMetric(nil)
|
||||
ro.AddMetric(nil)
|
||||
|
||||
err := ro.Write()
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, m.Metrics(), 0)
|
||||
}
|
||||
|
||||
// Test that NameDrop filters ger properly applied.
|
||||
func TestRunningOutput_DropFilter(t *testing.T) {
|
||||
conf := &OutputConfig{
|
||||
|
||||
@@ -263,7 +263,7 @@ func (m *metric) Fields() map[string]interface{} {
|
||||
case '"':
|
||||
// string field
|
||||
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
||||
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
||||
case '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
||||
// number field
|
||||
switch m.fields[i:][i3-1] {
|
||||
case 'i':
|
||||
|
||||
@@ -44,6 +44,9 @@ cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
||||
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
||||
`
|
||||
|
||||
const negMetrics = `weather,host=local temp=-99i,temp_float=-99.4 1465839830100400200
|
||||
`
|
||||
|
||||
// some metrics are invalid
|
||||
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||
@@ -85,6 +88,26 @@ func TestParse(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseNegNumbers(t *testing.T) {
|
||||
metrics, err := Parse([]byte(negMetrics))
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
|
||||
assert.Equal(t,
|
||||
map[string]interface{}{
|
||||
"temp": int64(-99),
|
||||
"temp_float": float64(-99.4),
|
||||
},
|
||||
metrics[0].Fields(),
|
||||
)
|
||||
assert.Equal(t,
|
||||
map[string]string{
|
||||
"host": "local",
|
||||
},
|
||||
metrics[0].Tags(),
|
||||
)
|
||||
}
|
||||
|
||||
func TestParseErrors(t *testing.T) {
|
||||
start := time.Now()
|
||||
metrics, err := Parse([]byte(someInvalid))
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
const (
|
||||
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
||||
testMsgNeg = "cpu_load_short,host=server01 value=-23422.0 1422568543702900257\n"
|
||||
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
||||
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
||||
@@ -76,13 +77,28 @@ func TestPersistentClientIDFail(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
// Test that the parser parses NATS messages into metrics
|
||||
func TestRunParser(t *testing.T) {
|
||||
n, in := newTestMQTTConsumer()
|
||||
acc := testutil.Accumulator{}
|
||||
n.acc = &acc
|
||||
defer close(n.done)
|
||||
|
||||
n.parser, _ = parsers.NewInfluxParser()
|
||||
go n.receiver()
|
||||
in <- mqttMsg(testMsgNeg)
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
|
||||
if a := acc.NFields(); a != 1 {
|
||||
t.Errorf("got %v, expected %v", a, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunParserNegativeNumber(t *testing.T) {
|
||||
n, in := newTestMQTTConsumer()
|
||||
acc := testutil.Accumulator{}
|
||||
n.acc = &acc
|
||||
defer close(n.done)
|
||||
|
||||
n.parser, _ = parsers.NewInfluxParser()
|
||||
go n.receiver()
|
||||
in <- mqttMsg(testMsg)
|
||||
|
||||
@@ -19,6 +19,7 @@ var (
|
||||
|
||||
const (
|
||||
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||
negativeFloat = "cpu_load_short,cpu=cpu0 value=-13.4 1257894000000000000\n"
|
||||
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||
validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
|
||||
invalidInflux = "I don't think this is line protocol\n"
|
||||
@@ -82,6 +83,18 @@ func TestParseValidInflux(t *testing.T) {
|
||||
"cpu": "cpu0",
|
||||
}, metrics[0].Tags())
|
||||
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||
|
||||
metrics, err = parser.Parse([]byte(negativeFloat))
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, metrics, 1)
|
||||
assert.Equal(t, "cpu_load_short", metrics[0].Name())
|
||||
assert.Equal(t, map[string]interface{}{
|
||||
"value": float64(-13.4),
|
||||
}, metrics[0].Fields())
|
||||
assert.Equal(t, map[string]string{
|
||||
"cpu": "cpu0",
|
||||
}, metrics[0].Tags())
|
||||
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||
}
|
||||
|
||||
func TestParseLineValidInflux(t *testing.T) {
|
||||
|
||||
@@ -83,9 +83,9 @@ targets = {
|
||||
}
|
||||
|
||||
supported_builds = {
|
||||
"windows": [ "amd64" ],
|
||||
"windows": [ "amd64", "i386" ],
|
||||
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
|
||||
"freebsd": [ "amd64" ]
|
||||
"freebsd": [ "amd64", "i386" ]
|
||||
}
|
||||
|
||||
supported_packages = {
|
||||
|
||||
Reference in New Issue
Block a user