From 92e156c784a65aece29a4cf6a21008f9d8e190df Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 2 Jul 2018 09:43:32 -0700 Subject: [PATCH] add gjson functionality with toml added to internal.config --- internal/config/config.go | 56 ++++++++++++++ plugins/parsers/gjson/parser.go | 67 ++++++++++++++++ .../{jsonpath => gjson}/parser_test.go | 7 +- plugins/parsers/jsonpath/parser.go | 76 ------------------- plugins/parsers/registry.go | 29 +++++++ 5 files changed, 156 insertions(+), 79 deletions(-) create mode 100644 plugins/parsers/gjson/parser.go rename plugins/parsers/{jsonpath => gjson}/parser_test.go (79%) delete mode 100644 plugins/parsers/jsonpath/parser.go diff --git a/internal/config/config.go b/internal/config/config.go index 8a31c271e..aa09e66e0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1338,6 +1338,58 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + c.GJSONTagPaths = make(map[string]string) + if node, ok := tbl.Fields["gjson_tag_paths"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.GJSONTagPaths[name] = str.Value + } + } + } + } + } + + c.GJSONBoolPaths = make(map[string]string) + if node, ok := tbl.Fields["gjson_bool_paths"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.GJSONBoolPaths[name] = str.Value + } + } + } + } + } + + c.GJSONFloatPaths = make(map[string]string) + if node, ok := tbl.Fields["gjson_float_paths"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.GJSONFloatPaths[name] = str.Value + } + } + } + } + } + + c.GJSONStringPaths = make(map[string]string) + if node, ok := tbl.Fields["gjson_string_paths"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.GJSONStringPaths[name] = str.Value + } + } + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") @@ -1353,6 +1405,10 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tag_paths") + delete(tbl.Fields, "gjson_tag_paths") + delete(tbl.Fields, "gjson_bool_paths") + delete(tbl.Fields, "gjson_float_paths") + delete(tbl.Fields, "gjson_string_paths") return parsers.NewParser(c) } diff --git a/plugins/parsers/gjson/parser.go b/plugins/parsers/gjson/parser.go new file mode 100644 index 000000000..56badb61c --- /dev/null +++ b/plugins/parsers/gjson/parser.go @@ -0,0 +1,67 @@ +package gjson + +import ( + "log" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/tidwall/gjson" +) + +type JSONPath struct { + MetricName string + TagPath map[string]string + FloatPath map[string]string + StrPath map[string]string + BoolPath map[string]string + DefaultTags map[string]string +} + +func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) { + tags := j.DefaultTags + fields := make(map[string]interface{}) + metrics := make([]telegraf.Metric, 0) + + for k, v := range j.TagPath { + c := gjson.GetBytes(buf, v) + tags[k] = c.Str + } + + for k, v := range j.FloatPath { + c := gjson.GetBytes(buf, v) + fields[k] = c.Num + } + + for k, v := range j.BoolPath { + c := gjson.GetBytes(buf, v) + if c.String() == "true" { + fields[k] = true + } else if c.String() == "false" { + fields[k] = false + } else { + log.Printf("E! Cannot decode: %v as bool", c.String()) + } + } + + for k, v := range j.StrPath { + c := gjson.GetBytes(buf, v) + fields[k] = c.Str + } + + m, err := metric.New(j.MetricName, tags, fields, time.Now()) + if err != nil { + return nil, err + } + metrics = append(metrics, m) + return metrics, nil +} + +func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) { + m, err := j.Parse([]byte(str)) + return m[0], err +} + +func (j *JSONPath) SetDefaultTags(tags map[string]string) { + j.DefaultTags = tags +} diff --git a/plugins/parsers/jsonpath/parser_test.go b/plugins/parsers/gjson/parser_test.go similarity index 79% rename from plugins/parsers/jsonpath/parser_test.go rename to plugins/parsers/gjson/parser_test.go index c404758f8..c56288556 100644 --- a/plugins/parsers/jsonpath/parser_test.go +++ b/plugins/parsers/gjson/parser_test.go @@ -1,4 +1,4 @@ -package jsonpath +package gjson import ( "log" @@ -17,7 +17,7 @@ func TestParseJsonPath(t *testing.T) { "rejected": 0, "avg_find_time": 4, "tester": "work", - "tester2": "don't want this", + "tester2": true, "tester3": { "hello":"sup", "fun":"money", @@ -28,7 +28,8 @@ func TestParseJsonPath(t *testing.T) { jsonParser := JSONPath{ MetricName: "jsonpather", - TagPath: map[string]string{"total": "$.shares.tester3"}, + TagPath: map[string]string{"hello": "shares.tester3.hello"}, + BoolPath: map[string]string{"bool": "shares.tester2"}, } metrics, err := jsonParser.Parse([]byte(testString)) diff --git a/plugins/parsers/jsonpath/parser.go b/plugins/parsers/jsonpath/parser.go deleted file mode 100644 index 608c5907e..000000000 --- a/plugins/parsers/jsonpath/parser.go +++ /dev/null @@ -1,76 +0,0 @@ -package jsonpath - -import ( - "encoding/json" - "log" - "reflect" - "strconv" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/jsonpath" -) - -type JSONPath struct { - MetricName string - TagPath map[string]string - FieldPath map[string]string -} - -func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) { - tags := make(map[string]string) - fields := make(map[string]interface{}) - metrics := make([]telegraf.Metric, 0) - - //turn buf into json interface - var jsonData interface{} - json.Unmarshal(buf, &jsonData) - log.Printf("unmarshaled jsonData: %v", jsonData) - - for k, v := range j.TagPath { - c, err := jsonpath.JsonPathLookup(jsonData, v) - if err != nil { - log.Printf("E! Could not find JSON Path: %v", v) - } - cType := reflect.TypeOf(c) - - //if path returns multiple values, split each into a different metric - if cType.Kind() == reflect.Array { - log.Printf("E! Multiple return values for path: %v", v) - continue - } - - switch ct := c.(type) { - case string: - tags[k] = ct - case bool: - tags[k] = strconv.FormatBool(ct) - case float64: - tags[k] = strconv.FormatFloat(ct, 'f', -1, 64) - default: - log.Printf("E! [parsers.json] Unrecognized type %T", ct) - } - } - - for k, v := range j.FieldPath { - c, err := jsonpath.JsonPathLookup(jsonData, v) - if err != nil { - log.Printf("E! Could not find JSON Path: %v", v) - continue - } - - cType := reflect.TypeOf(c) - - //if path returns multiple values, split each into a different metric - if cType.Kind() == reflect.Array { - log.Printf("E! Multiple return values for path: %v", v) - continue - } - fields[k] = c - } - - m, _ := metric.New(j.MetricName, tags, fields, time.Now()) - metrics = append(metrics, m) - return metrics, nil -} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 58fce1722..4ff13e770 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" + "github.com/influxdata/telegraf/plugins/parsers/gjson" "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" @@ -87,6 +88,12 @@ type Config struct { // an optional map containing tag names as keys and json paths to retrieve the tag values from as values // used if TagsPath is empty or doesn't return any tags DropwizardTagPathsMap map[string]string + + //for gjson format + TagPaths map[string]string + BoolPaths map[string]string + FloatPaths map[string]string + StringPaths map[string]string } // NewParser returns a Parser interface based on the given config. @@ -120,12 +127,34 @@ func NewParser(config *Config) (Parser, error) { config.DefaultTags, config.Separator, config.Templates) + + case "gjson": + parser, err = newGJSONParser(config.MetricName, + config.TagPaths, + config.StringPaths, + config.BoolPaths, + config.FloatPaths) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } return parser, err } +func newGJSONParser(metricName string, + tagPaths map[string]string, + strPaths map[string]string, + boolPaths map[string]string, + floatPaths map[string]string) (Parser, error) { + parser := &gjson.JSONPath{ + MetricName: metricName, + TagPath: tagPaths, + StrPath: strPaths, + BoolPath: boolPaths, + FloatPath: floatPaths, + } + return parser, nil +} + func NewJSONParser( metricName string, tagKeys []string,