Compare commits
8 Commits
feature/13
...
feature/js
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
690c0b6673 | ||
|
|
407675c741 | ||
|
|
b09fcc70c8 | ||
|
|
9836b1eb02 | ||
|
|
a79f1b7e0d | ||
|
|
d4a4ac25bb | ||
|
|
92e156c784 | ||
|
|
342d3d633a |
@@ -4,6 +4,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
|||||||
|
|
||||||
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
|
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
|
||||||
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
|
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
|
||||||
|
1. [GJSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#gjson)
|
||||||
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
|
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
|
||||||
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
||||||
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
||||||
@@ -104,10 +105,9 @@ but can be overridden using the `name_override` config option.
|
|||||||
|
|
||||||
#### JSON Configuration:
|
#### JSON Configuration:
|
||||||
|
|
||||||
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
|
The JSON data format supports specifying "tag keys". If specified, keys
|
||||||
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
|
will be searched for in the root-level of the JSON blob. If the key(s) exist,
|
||||||
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
|
they will be applied as tags to the Telegraf metrics.
|
||||||
all int and float values will be set as fields by default.
|
|
||||||
|
|
||||||
For example, if you had this configuration:
|
For example, if you had this configuration:
|
||||||
|
|
||||||
@@ -174,7 +174,6 @@ For example, if the following configuration:
|
|||||||
"my_tag_1",
|
"my_tag_1",
|
||||||
"my_tag_2"
|
"my_tag_2"
|
||||||
]
|
]
|
||||||
field_keys = ["b_c"]
|
|
||||||
```
|
```
|
||||||
|
|
||||||
with this JSON output from a command:
|
with this JSON output from a command:
|
||||||
@@ -200,13 +199,76 @@ with this JSON output from a command:
|
|||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
|
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
|
||||||
|
|
||||||
```
|
```
|
||||||
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
|
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
|
||||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
|
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# GJSON:
|
||||||
|
GJSON also parses JSON data, but uses paths to name and identify fields of your choosing.
|
||||||
|
|
||||||
|
The GJSON parser supports 5 different configuration fields for json objects:
|
||||||
|
|
||||||
|
1.'gjson_tag_paths'
|
||||||
|
2.'gjson_string_paths'
|
||||||
|
3.'gjson_int_paths'
|
||||||
|
4.'gjson_float_paths'
|
||||||
|
5.'gjson_bool_paths'
|
||||||
|
|
||||||
|
Each field is a map type that will map a field_name to a field_path. Path syntax is described below.
|
||||||
|
Path maps should be configured as:
|
||||||
|
`toml gjson_tag_paths = {"field_name" = "field.path", "field_name2" = "field.path2"}`
|
||||||
|
|
||||||
|
Any paths specified in gjson_tag_paths will be converted to strings and stored as tags.
|
||||||
|
Any paths otherwise specified will be their marked type and stored as fields.
|
||||||
|
|
||||||
|
#### GJSON Configuration:
|
||||||
|
Paths are a series of keys seperated by a dot, ie "obj.sub_obj".
|
||||||
|
Paths should not lead to an JSON array, but a single object.
|
||||||
|
An error message will be thrown if a path describes an array.
|
||||||
|
Further reading for path syntax can be found here: https://github.com/tidwall/gjson
|
||||||
|
|
||||||
|
As an example, if you had the json:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"name": {"first": "Tom", "last": "Anderson"},
|
||||||
|
"age":37,
|
||||||
|
"children": ["Sara","Alex","Jack"],
|
||||||
|
"fav.movie": "Deer Hunter",
|
||||||
|
"friends": [
|
||||||
|
{"first": "Dale", "last": "Murphy", "age": 44},
|
||||||
|
{"first": "Roger", "last": "Craig", "age": 68},
|
||||||
|
{"first": "Jane", "last": "Murphy", "age": 47}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
with the config:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.exec]]
|
||||||
|
## Commands array
|
||||||
|
commands = ["/usr/bin/mycollector --foo=bar"]
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
data_format = "gjson"
|
||||||
|
|
||||||
|
name_override = "gjson_sample"
|
||||||
|
|
||||||
|
gjson_tag_paths = {"first_name_tag" = "name.first"}
|
||||||
|
gjson_string_paths = {"last_name" = "name.last"}
|
||||||
|
gjson_int_paths = {"age" = "age", "Janes_age" = "friends.2.age"}
|
||||||
|
```
|
||||||
|
|
||||||
|
would output the metric:
|
||||||
|
`gjson_sample, first_name_tag=Tom last_name=Anderson,age=37,Janes_age=47`
|
||||||
|
|
||||||
|
|
||||||
# Value:
|
# Value:
|
||||||
|
|
||||||
The "value" data format translates single values into Telegraf metrics. This
|
The "value" data format translates single values into Telegraf metrics. This
|
||||||
|
|||||||
@@ -1261,18 +1261,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if node, ok := tbl.Fields["field_keys"]; ok {
|
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
|
||||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
|
||||||
for _, elem := range ary.Value {
|
|
||||||
if str, ok := elem.(*ast.String); ok {
|
|
||||||
c.FieldKeys = append(c.FieldKeys, str.Value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if node, ok := tbl.Fields["data_type"]; ok {
|
if node, ok := tbl.Fields["data_type"]; ok {
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
if str, ok := kv.Value.(*ast.String); ok {
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
@@ -1350,13 +1338,77 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.GJSONTagPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_tag_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONTagPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONBoolPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_bool_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONBoolPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONFloatPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_float_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONFloatPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONStringPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_string_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONStringPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.GJSONIntPaths = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["gjson_int_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.GJSONIntPaths[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c.MetricName = name
|
c.MetricName = name
|
||||||
|
|
||||||
delete(tbl.Fields, "data_format")
|
delete(tbl.Fields, "data_format")
|
||||||
delete(tbl.Fields, "separator")
|
delete(tbl.Fields, "separator")
|
||||||
delete(tbl.Fields, "templates")
|
delete(tbl.Fields, "templates")
|
||||||
delete(tbl.Fields, "tag_keys")
|
delete(tbl.Fields, "tag_keys")
|
||||||
delete(tbl.Fields, "field_keys")
|
|
||||||
delete(tbl.Fields, "data_type")
|
delete(tbl.Fields, "data_type")
|
||||||
delete(tbl.Fields, "collectd_auth_file")
|
delete(tbl.Fields, "collectd_auth_file")
|
||||||
delete(tbl.Fields, "collectd_security_level")
|
delete(tbl.Fields, "collectd_security_level")
|
||||||
@@ -1366,6 +1418,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
delete(tbl.Fields, "dropwizard_time_format")
|
delete(tbl.Fields, "dropwizard_time_format")
|
||||||
delete(tbl.Fields, "dropwizard_tags_path")
|
delete(tbl.Fields, "dropwizard_tags_path")
|
||||||
delete(tbl.Fields, "dropwizard_tag_paths")
|
delete(tbl.Fields, "dropwizard_tag_paths")
|
||||||
|
delete(tbl.Fields, "gjson_tag_paths")
|
||||||
|
delete(tbl.Fields, "gjson_bool_paths")
|
||||||
|
delete(tbl.Fields, "gjson_float_paths")
|
||||||
|
delete(tbl.Fields, "gjson_string_paths")
|
||||||
|
delete(tbl.Fields, "gjson_int_paths")
|
||||||
|
|
||||||
return parsers.NewParser(c)
|
return parsers.NewParser(c)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,10 +143,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||||||
"Testdata did not produce correct memcached metadata.")
|
"Testdata did not produce correct memcached metadata.")
|
||||||
|
|
||||||
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
||||||
p, err := parsers.NewParser(&parsers.Config{
|
p, err := parsers.NewJSONParser("exec", nil, nil)
|
||||||
MetricName: "exec",
|
|
||||||
DataFormat: "json",
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
ex.SetParser(p)
|
ex.SetParser(p)
|
||||||
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
||||||
|
|||||||
@@ -93,10 +93,7 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestExec(t *testing.T) {
|
func TestExec(t *testing.T) {
|
||||||
parser, _ := parsers.NewParser(&parsers.Config{
|
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "exec",
|
|
||||||
})
|
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock([]byte(validJson), nil),
|
runner: newRunnerMock([]byte(validJson), nil),
|
||||||
Commands: []string{"testcommand arg1"},
|
Commands: []string{"testcommand arg1"},
|
||||||
@@ -122,10 +119,7 @@ func TestExec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestExecMalformed(t *testing.T) {
|
func TestExecMalformed(t *testing.T) {
|
||||||
parser, _ := parsers.NewParser(&parsers.Config{
|
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "exec",
|
|
||||||
})
|
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock([]byte(malformedJson), nil),
|
runner: newRunnerMock([]byte(malformedJson), nil),
|
||||||
Commands: []string{"badcommand arg1"},
|
Commands: []string{"badcommand arg1"},
|
||||||
@@ -138,10 +132,7 @@ func TestExecMalformed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCommandError(t *testing.T) {
|
func TestCommandError(t *testing.T) {
|
||||||
parser, _ := parsers.NewParser(&parsers.Config{
|
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "exec",
|
|
||||||
})
|
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
|
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
|
||||||
Commands: []string{"badcommand"},
|
Commands: []string{"badcommand"},
|
||||||
|
|||||||
@@ -26,11 +26,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
|
|||||||
URLs: []string{url},
|
URLs: []string{url},
|
||||||
}
|
}
|
||||||
metricName := "metricName"
|
metricName := "metricName"
|
||||||
|
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||||
p, _ := parsers.NewParser(&parsers.Config{
|
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "metricName",
|
|
||||||
})
|
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -67,11 +63,8 @@ func TestHTTPHeaders(t *testing.T) {
|
|||||||
URLs: []string{url},
|
URLs: []string{url},
|
||||||
Headers: map[string]string{header: headerValue},
|
Headers: map[string]string{header: headerValue},
|
||||||
}
|
}
|
||||||
|
metricName := "metricName"
|
||||||
p, _ := parsers.NewParser(&parsers.Config{
|
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "metricName",
|
|
||||||
})
|
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -90,10 +83,7 @@ func TestInvalidStatusCode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
metricName := "metricName"
|
metricName := "metricName"
|
||||||
p, _ := parsers.NewParser(&parsers.Config{
|
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: metricName,
|
|
||||||
})
|
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -115,10 +105,8 @@ func TestMethod(t *testing.T) {
|
|||||||
Method: "POST",
|
Method: "POST",
|
||||||
}
|
}
|
||||||
|
|
||||||
p, _ := parsers.NewParser(&parsers.Config{
|
metricName := "metricName"
|
||||||
DataFormat: "json",
|
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||||
MetricName: "metricName",
|
|
||||||
})
|
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|||||||
@@ -181,12 +181,7 @@ func (h *HttpJson) gatherServer(
|
|||||||
"server": serverURL,
|
"server": serverURL,
|
||||||
}
|
}
|
||||||
|
|
||||||
parser, err := parsers.NewParser(&parsers.Config{
|
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: msrmnt_name,
|
|
||||||
TagKeys: h.TagKeys,
|
|
||||||
DefaultTags: tags,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
k.acc = &acc
|
k.acc = &acc
|
||||||
defer close(k.done)
|
defer close(k.done)
|
||||||
|
|
||||||
k.parser, _ = parsers.NewParser(&parsers.Config{
|
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "kafka_json_test",
|
|
||||||
})
|
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgJSON)
|
in <- saramaMsg(testMsgJSON)
|
||||||
acc.Wait(1)
|
acc.Wait(1)
|
||||||
|
|||||||
@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
k.acc = &acc
|
k.acc = &acc
|
||||||
defer close(k.done)
|
defer close(k.done)
|
||||||
|
|
||||||
k.parser, _ = parsers.NewParser(&parsers.Config{
|
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "kafka_json_test",
|
|
||||||
})
|
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgJSON)
|
in <- saramaMsg(testMsgJSON)
|
||||||
acc.Wait(1)
|
acc.Wait(1)
|
||||||
|
|||||||
@@ -172,10 +172,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewParser(&parsers.Config{
|
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "nats_json_test",
|
|
||||||
})
|
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsgJSON)
|
in <- mqttMsg(testMsgJSON)
|
||||||
|
|
||||||
|
|||||||
@@ -108,10 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewParser(&parsers.Config{
|
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "nats_json_test",
|
|
||||||
})
|
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsgJSON)
|
in <- natsMsg(testMsgJSON)
|
||||||
|
|||||||
@@ -300,10 +300,7 @@ func TestRunParserJSONMsg(t *testing.T) {
|
|||||||
listener.acc = &acc
|
listener.acc = &acc
|
||||||
defer close(listener.done)
|
defer close(listener.done)
|
||||||
|
|
||||||
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "udp_json_test",
|
|
||||||
})
|
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
|
|||||||
@@ -193,10 +193,7 @@ func TestRunParserJSONMsg(t *testing.T) {
|
|||||||
listener.acc = &acc
|
listener.acc = &acc
|
||||||
defer close(listener.done)
|
defer close(listener.done)
|
||||||
|
|
||||||
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
||||||
DataFormat: "json",
|
|
||||||
MetricName: "udp_json_test",
|
|
||||||
})
|
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
|
|||||||
96
plugins/parsers/gjson/parser.go
Normal file
96
plugins/parsers/gjson/parser.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package gjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JSONPath struct {
|
||||||
|
MetricName string
|
||||||
|
TagPath map[string]string
|
||||||
|
FloatPath map[string]string
|
||||||
|
IntPath map[string]string
|
||||||
|
StrPath map[string]string
|
||||||
|
BoolPath map[string]string
|
||||||
|
DefaultTags map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
for k, v := range j.DefaultTags {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
metrics := make([]telegraf.Metric, 0)
|
||||||
|
|
||||||
|
for k, v := range j.TagPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags[k] = c.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.FloatPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.Float()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.IntPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.Int()
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.BoolPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if c.String() == "true" {
|
||||||
|
fields[k] = true
|
||||||
|
} else if c.String() == "false" {
|
||||||
|
fields[k] = false
|
||||||
|
} else {
|
||||||
|
log.Printf("E! Cannot decode: %v as bool", c.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range j.StrPath {
|
||||||
|
c := gjson.GetBytes(buf, v)
|
||||||
|
if c.IsArray() {
|
||||||
|
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[k] = c.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := metric.New(j.MetricName, tags, fields, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) {
|
||||||
|
m, err := j.Parse([]byte(str))
|
||||||
|
return m[0], err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *JSONPath) SetDefaultTags(tags map[string]string) {
|
||||||
|
j.DefaultTags = tags
|
||||||
|
}
|
||||||
72
plugins/parsers/gjson/parser_test.go
Normal file
72
plugins/parsers/gjson/parser_test.go
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
package gjson
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseJsonPath(t *testing.T) {
|
||||||
|
testString := `{
|
||||||
|
"total_devices": 5,
|
||||||
|
"total_threads": 10,
|
||||||
|
"shares": {
|
||||||
|
"total": 5,
|
||||||
|
"accepted": 5,
|
||||||
|
"rejected": 0,
|
||||||
|
"avg_find_time": 4,
|
||||||
|
"tester": "work",
|
||||||
|
"tester2": true,
|
||||||
|
"tester3": {
|
||||||
|
"hello":"sup",
|
||||||
|
"fun":"money",
|
||||||
|
"break":9
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
jsonParser := JSONPath{
|
||||||
|
MetricName: "jsonpather",
|
||||||
|
TagPath: map[string]string{"hello": "shares.tester3.hello"},
|
||||||
|
BoolPath: map[string]string{"bool": "shares.tester2"},
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := jsonParser.Parse([]byte(testString))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTagTypes(t *testing.T) {
|
||||||
|
testString := `{
|
||||||
|
"total_devices": 5,
|
||||||
|
"total_threads": 10,
|
||||||
|
"shares": {
|
||||||
|
"total": 5,
|
||||||
|
"accepted": 5,
|
||||||
|
"rejected": 0,
|
||||||
|
"my_bool": true,
|
||||||
|
"tester": "work",
|
||||||
|
"tester2": {
|
||||||
|
"hello":"sup",
|
||||||
|
"fun":true,
|
||||||
|
"break":9.97
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
r := JSONPath{
|
||||||
|
TagPath: map[string]string{"int1": "total_devices", "my_bool": "shares.my_bool"},
|
||||||
|
FloatPath: map[string]string{"total": "shares.total"},
|
||||||
|
BoolPath: map[string]string{"fun": "shares.tester2.fun"},
|
||||||
|
StrPath: map[string]string{"hello": "shares.tester2.hello"},
|
||||||
|
IntPath: map[string]string{"accepted": "shares.accepted"},
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := r.Parse([]byte(testString))
|
||||||
|
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, true, reflect.DeepEqual(map[string]interface{}{"total": 5.0, "fun": true, "hello": "sup", "accepted": int64(5)}, metrics[0].Fields()))
|
||||||
|
}
|
||||||
@@ -20,7 +20,6 @@ var (
|
|||||||
type JSONParser struct {
|
type JSONParser struct {
|
||||||
MetricName string
|
MetricName string
|
||||||
TagKeys []string
|
TagKeys []string
|
||||||
FieldKeys []string
|
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,17 +86,6 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//if field_keys is specified, only those values should be reported as fields
|
|
||||||
if len(p.FieldKeys) > 0 {
|
|
||||||
nFields := make(map[string]interface{})
|
|
||||||
for _, name := range p.FieldKeys {
|
|
||||||
if fields[name] != nil {
|
|
||||||
nFields[name] = fields[name]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tags, nFields
|
|
||||||
}
|
|
||||||
|
|
||||||
//remove any additional string/bool values from fields
|
//remove any additional string/bool values from fields
|
||||||
for k := range fields {
|
for k := range fields {
|
||||||
switch fields[k].(type) {
|
switch fields[k].(type) {
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package json
|
package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -449,28 +448,22 @@ func TestJSONParseNestedArray(t *testing.T) {
|
|||||||
"total_devices": 5,
|
"total_devices": 5,
|
||||||
"total_threads": 10,
|
"total_threads": 10,
|
||||||
"shares": {
|
"shares": {
|
||||||
"total": 5,
|
"total": 5,
|
||||||
"accepted": 5,
|
"accepted": 5,
|
||||||
"rejected": 0,
|
"rejected": 0,
|
||||||
"avg_find_time": 4,
|
"avg_find_time": 4,
|
||||||
"tester": "work",
|
"tester": "work",
|
||||||
"tester2": "don't want this",
|
"tester2": "don't want this",
|
||||||
"tester3": {
|
"tester3": 7.93
|
||||||
"hello":"sup",
|
|
||||||
"fun":"money",
|
|
||||||
"break":9
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}`
|
}`
|
||||||
|
|
||||||
parser := JSONParser{
|
parser := JSONParser{
|
||||||
MetricName: "json_test",
|
MetricName: "json_test",
|
||||||
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
|
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
|
||||||
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics, err := parser.Parse([]byte(testString))
|
metrics, err := parser.Parse([]byte(testString))
|
||||||
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
|
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/gjson"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
@@ -56,8 +57,6 @@ type Config struct {
|
|||||||
|
|
||||||
// TagKeys only apply to JSON data
|
// TagKeys only apply to JSON data
|
||||||
TagKeys []string
|
TagKeys []string
|
||||||
// FieldKeys only apply to JSON
|
|
||||||
FieldKeys []string
|
|
||||||
// MetricName applies to JSON & value. This will be the name of the measurement.
|
// MetricName applies to JSON & value. This will be the name of the measurement.
|
||||||
MetricName string
|
MetricName string
|
||||||
|
|
||||||
@@ -89,6 +88,13 @@ type Config struct {
|
|||||||
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||||
// used if TagsPath is empty or doesn't return any tags
|
// used if TagsPath is empty or doesn't return any tags
|
||||||
DropwizardTagPathsMap map[string]string
|
DropwizardTagPathsMap map[string]string
|
||||||
|
|
||||||
|
//for gjson format
|
||||||
|
GJSONTagPaths map[string]string
|
||||||
|
GJSONBoolPaths map[string]string
|
||||||
|
GJSONFloatPaths map[string]string
|
||||||
|
GJSONStringPaths map[string]string
|
||||||
|
GJSONIntPaths map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParser returns a Parser interface based on the given config.
|
// NewParser returns a Parser interface based on the given config.
|
||||||
@@ -97,8 +103,8 @@ func NewParser(config *Config) (Parser, error) {
|
|||||||
var parser Parser
|
var parser Parser
|
||||||
switch config.DataFormat {
|
switch config.DataFormat {
|
||||||
case "json":
|
case "json":
|
||||||
parser, err = newJSONParser(config.MetricName,
|
parser, err = NewJSONParser(config.MetricName,
|
||||||
config.TagKeys, config.FieldKeys, config.DefaultTags)
|
config.TagKeys, config.DefaultTags)
|
||||||
case "value":
|
case "value":
|
||||||
parser, err = NewValueParser(config.MetricName,
|
parser, err = NewValueParser(config.MetricName,
|
||||||
config.DataType, config.DefaultTags)
|
config.DataType, config.DefaultTags)
|
||||||
@@ -122,28 +128,37 @@ func NewParser(config *Config) (Parser, error) {
|
|||||||
config.DefaultTags,
|
config.DefaultTags,
|
||||||
config.Separator,
|
config.Separator,
|
||||||
config.Templates)
|
config.Templates)
|
||||||
|
|
||||||
|
case "gjson":
|
||||||
|
parser, err = newGJSONParser(config.MetricName,
|
||||||
|
config.GJSONTagPaths,
|
||||||
|
config.GJSONStringPaths,
|
||||||
|
config.GJSONBoolPaths,
|
||||||
|
config.GJSONFloatPaths,
|
||||||
|
config.GJSONIntPaths)
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||||
}
|
}
|
||||||
return parser, err
|
return parser, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newJSONParser(
|
func newGJSONParser(metricName string,
|
||||||
metricName string,
|
tagPaths map[string]string,
|
||||||
tagKeys []string,
|
strPaths map[string]string,
|
||||||
fieldKeys []string,
|
boolPaths map[string]string,
|
||||||
defaultTags map[string]string,
|
floatPaths map[string]string,
|
||||||
) (Parser, error) {
|
intPaths map[string]string) (Parser, error) {
|
||||||
parser := &json.JSONParser{
|
parser := &gjson.JSONPath{
|
||||||
MetricName: metricName,
|
MetricName: metricName,
|
||||||
TagKeys: tagKeys,
|
TagPath: tagPaths,
|
||||||
FieldKeys: fieldKeys,
|
StrPath: strPaths,
|
||||||
DefaultTags: defaultTags,
|
BoolPath: boolPaths,
|
||||||
|
FloatPath: floatPaths,
|
||||||
|
IntPath: intPaths,
|
||||||
}
|
}
|
||||||
return parser, nil
|
return parser, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//Deprecated: Use NewParser to get a JSONParser object
|
|
||||||
func NewJSONParser(
|
func NewJSONParser(
|
||||||
metricName string,
|
metricName string,
|
||||||
tagKeys []string,
|
tagKeys []string,
|
||||||
|
|||||||
Reference in New Issue
Block a user