parent
f0498491b2
commit
cd5bef3e14
|
@ -305,7 +305,7 @@ func (m *metric) Fields() map[string]interface{} {
|
||||||
case '"':
|
case '"':
|
||||||
// string field
|
// string field
|
||||||
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
||||||
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
case '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
||||||
// number field
|
// number field
|
||||||
switch m.fields[i:][i3-1] {
|
switch m.fields[i:][i3-1] {
|
||||||
case 'i':
|
case 'i':
|
||||||
|
|
|
@ -44,6 +44,9 @@ cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
||||||
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
||||||
`
|
`
|
||||||
|
|
||||||
|
const negMetrics = `weather,host=local temp=-99i,temp_float=-99.4 1465839830100400200
|
||||||
|
`
|
||||||
|
|
||||||
// some metrics are invalid
|
// some metrics are invalid
|
||||||
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
|
@ -85,6 +88,26 @@ func TestParse(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseNegNumbers(t *testing.T) {
|
||||||
|
metrics, err := Parse([]byte(negMetrics))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
|
||||||
|
assert.Equal(t,
|
||||||
|
map[string]interface{}{
|
||||||
|
"temp": int64(-99),
|
||||||
|
"temp_float": float64(-99.4),
|
||||||
|
},
|
||||||
|
metrics[0].Fields(),
|
||||||
|
)
|
||||||
|
assert.Equal(t,
|
||||||
|
map[string]string{
|
||||||
|
"host": "local",
|
||||||
|
},
|
||||||
|
metrics[0].Tags(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseErrors(t *testing.T) {
|
func TestParseErrors(t *testing.T) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
metrics, err := Parse([]byte(someInvalid))
|
metrics, err := Parse([]byte(someInvalid))
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
||||||
|
testMsgNeg = "cpu_load_short,host=server01 value=-23422.0 1422568543702900257\n"
|
||||||
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
||||||
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||||
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
||||||
|
@ -76,13 +77,28 @@ func TestPersistentClientIDFail(t *testing.T) {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the parser parses NATS messages into metrics
|
|
||||||
func TestRunParser(t *testing.T) {
|
func TestRunParser(t *testing.T) {
|
||||||
n, in := newTestMQTTConsumer()
|
n, in := newTestMQTTConsumer()
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
|
go n.receiver()
|
||||||
|
in <- mqttMsg(testMsgNeg)
|
||||||
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
|
if a := acc.NFields(); a != 1 {
|
||||||
|
t.Errorf("got %v, expected %v", a, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunParserNegativeNumber(t *testing.T) {
|
||||||
|
n, in := newTestMQTTConsumer()
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
n.acc = &acc
|
||||||
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsg)
|
in <- mqttMsg(testMsg)
|
||||||
|
|
|
@ -19,6 +19,7 @@ var (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||||
|
negativeFloat = "cpu_load_short,cpu=cpu0 value=-13.4 1257894000000000000\n"
|
||||||
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||||
validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
|
validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
|
||||||
invalidInflux = "I don't think this is line protocol\n"
|
invalidInflux = "I don't think this is line protocol\n"
|
||||||
|
@ -82,6 +83,18 @@ func TestParseValidInflux(t *testing.T) {
|
||||||
"cpu": "cpu0",
|
"cpu": "cpu0",
|
||||||
}, metrics[0].Tags())
|
}, metrics[0].Tags())
|
||||||
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||||
|
|
||||||
|
metrics, err = parser.Parse([]byte(negativeFloat))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "cpu_load_short", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"value": float64(-13.4),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}, metrics[0].Tags())
|
||||||
|
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseLineValidInflux(t *testing.T) {
|
func TestParseLineValidInflux(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue