Compare commits

...

8 Commits

Author SHA1 Message Date
Max U 91964888c9 comments reflect deprecated NewJsonParser 2018-06-28 16:05:58 -07:00
Max U e25fbed0de add old exported func NewJSONParser 2018-06-28 16:00:32 -07:00
Max U 674b9578d7 readability tweak 2018-06-28 15:56:36 -07:00
Max U 885193bc8f change test files to reflect unexported newJsonParser func 2018-06-28 15:39:39 -07:00
Max U 4db667573a newJsonParser function is now unexported, must use NewParser(config) 2018-06-28 15:35:20 -07:00
Max U 7f2b2a08ae test case edit 2018-06-28 11:45:26 -07:00
Max U c50bfc2c71 edit unittests for JSON parser 2018-06-27 16:09:14 -07:00
Max U 420dafd591 add "field_tags" to json parser config 2018-06-27 16:09:05 -07:00
15 changed files with 132 additions and 33 deletions

View File

@ -104,9 +104,10 @@ but can be overridden using the `name_override` config option.
#### JSON Configuration: #### JSON Configuration:
The JSON data format supports specifying "tag keys". If specified, keys The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist, will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics. they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
all int and float values will be set as fields by default.
For example, if you had this configuration: For example, if you had this configuration:
@ -173,6 +174,7 @@ For example, if the following configuration:
"my_tag_1", "my_tag_1",
"my_tag_2" "my_tag_2"
] ]
field_keys = ["b_c"]
``` ```
with this JSON output from a command: with this JSON output from a command:
@ -198,11 +200,11 @@ with this JSON output from a command:
] ]
``` ```
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
``` ```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
``` ```
# Value: # Value:

View File

@ -1261,6 +1261,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
if node, ok := tbl.Fields["field_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FieldKeys = append(c.FieldKeys, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["data_type"]; ok { if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if str, ok := kv.Value.(*ast.String); ok {
@ -1344,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator") delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates") delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys") delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "field_keys")
delete(tbl.Fields, "data_type") delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file") delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level") delete(tbl.Fields, "collectd_security_level")

View File

@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.") "Testdata did not produce correct memcached metadata.")
ex := inputs.Inputs["exec"]().(*exec.Exec) ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil) p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err) assert.NoError(t, err)
ex.SetParser(p) ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar" ex.Command = "/usr/bin/myothercollector --foo=bar"

View File

@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
} }
func TestExec(t *testing.T) { func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock([]byte(validJson), nil), runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"}, Commands: []string{"testcommand arg1"},
@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
} }
func TestExecMalformed(t *testing.T) { func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil), runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"}, Commands: []string{"badcommand arg1"},
@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
} }
func TestCommandError(t *testing.T) { func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil) parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{ e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"}, Commands: []string{"badcommand"},

View File

@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url}, URLs: []string{url},
} }
metricName := "metricName" metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url}, URLs: []string{url},
Headers: map[string]string{header: headerValue}, Headers: map[string]string{header: headerValue},
} }
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil) p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
} }
metricName := "metricName" metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil) p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST", Method: "POST",
} }
metricName := "metricName" p, _ := parsers.NewParser(&parsers.Config{
p, _ := parsers.NewJSONParser(metricName, nil, nil) DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p) plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator

View File

@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL, "server": serverURL,
} }
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil { if err != nil {
return err return err
} }

View File

@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc k.acc = &acc
defer close(k.done) defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgJSON) in <- saramaMsg(testMsgJSON)
acc.Wait(1) acc.Wait(1)

View File

@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc k.acc = &acc
defer close(k.done) defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver() go k.receiver()
in <- saramaMsg(testMsgJSON) in <- saramaMsg(testMsgJSON)
acc.Wait(1) acc.Wait(1)

View File

@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc n.acc = &acc
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
go n.receiver() go n.receiver()
in <- mqttMsg(testMsgJSON) in <- mqttMsg(testMsgJSON)

View File

@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc n.acc = &acc
defer close(n.done) defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1) n.wg.Add(1)
go n.receiver() go n.receiver()
in <- natsMsg(testMsgJSON) in <- natsMsg(testMsgJSON)

View File

@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc listener.acc = &acc
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1) listener.wg.Add(1)
go listener.tcpParser() go listener.tcpParser()

View File

@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc listener.acc = &acc
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1) listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()

View File

@ -20,6 +20,7 @@ var (
type JSONParser struct { type JSONParser struct {
MetricName string MetricName string
TagKeys []string TagKeys []string
FieldKeys []string
DefaultTags map[string]string DefaultTags map[string]string
} }
@ -86,6 +87,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
} }
} }
//if field_keys is specified, only those values should be reported as fields
if len(p.FieldKeys) > 0 {
nFields := make(map[string]interface{})
for _, name := range p.FieldKeys {
if fields[name] != nil {
nFields[name] = fields[name]
}
}
return tags, nFields
}
//remove any additional string/bool values from fields //remove any additional string/bool values from fields
for k := range fields { for k := range fields {
switch fields[k].(type) { switch fields[k].(type) {

View File

@ -1,6 +1,7 @@
package json package json
import ( import (
"log"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -454,16 +455,22 @@ func TestJSONParseNestedArray(t *testing.T) {
"avg_find_time": 4, "avg_find_time": 4,
"tester": "work", "tester": "work",
"tester2": "don't want this", "tester2": "don't want this",
"tester3": 7.93 "tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
} }
}` }`
parser := JSONParser{ parser := JSONParser{
MetricName: "json_test", MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"}, TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
} }
metrics, err := parser.Parse([]byte(testString)) metrics, err := parser.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags())) require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
} }

View File

@ -56,6 +56,8 @@ type Config struct {
// TagKeys only apply to JSON data // TagKeys only apply to JSON data
TagKeys []string TagKeys []string
// FieldKeys only apply to JSON
FieldKeys []string
// MetricName applies to JSON & value. This will be the name of the measurement. // MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string MetricName string
@ -95,8 +97,8 @@ func NewParser(config *Config) (Parser, error) {
var parser Parser var parser Parser
switch config.DataFormat { switch config.DataFormat {
case "json": case "json":
parser, err = NewJSONParser(config.MetricName, parser, err = newJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags) config.TagKeys, config.FieldKeys, config.DefaultTags)
case "value": case "value":
parser, err = NewValueParser(config.MetricName, parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags) config.DataType, config.DefaultTags)
@ -126,6 +128,22 @@ func NewParser(config *Config) (Parser, error) {
return parser, err return parser, err
} }
func newJSONParser(
metricName string,
tagKeys []string,
fieldKeys []string,
defaultTags map[string]string,
) (Parser, error) {
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
FieldKeys: fieldKeys,
DefaultTags: defaultTags,
}
return parser, nil
}
//Deprecated: Use NewParser to get a JSONParser object
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,