add gjson functionality with toml added to internal.config

This commit is contained in:
Max U 2018-07-02 09:43:32 -07:00
parent 342d3d633a
commit 92e156c784
5 changed files with 156 additions and 79 deletions

View File

@ -1338,6 +1338,58 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
c.GJSONTagPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_tag_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONTagPaths[name] = str.Value
}
}
}
}
}
c.GJSONBoolPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_bool_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONBoolPaths[name] = str.Value
}
}
}
}
}
c.GJSONFloatPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_float_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONFloatPaths[name] = str.Value
}
}
}
}
}
c.GJSONStringPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_string_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONStringPaths[name] = str.Value
}
}
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
@ -1353,6 +1405,10 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths") delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "gjson_tag_paths")
delete(tbl.Fields, "gjson_bool_paths")
delete(tbl.Fields, "gjson_float_paths")
delete(tbl.Fields, "gjson_string_paths")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@ -0,0 +1,67 @@
package gjson
import (
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/gjson"
)
type JSONPath struct {
MetricName string
TagPath map[string]string
FloatPath map[string]string
StrPath map[string]string
BoolPath map[string]string
DefaultTags map[string]string
}
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
tags := j.DefaultTags
fields := make(map[string]interface{})
metrics := make([]telegraf.Metric, 0)
for k, v := range j.TagPath {
c := gjson.GetBytes(buf, v)
tags[k] = c.Str
}
for k, v := range j.FloatPath {
c := gjson.GetBytes(buf, v)
fields[k] = c.Num
}
for k, v := range j.BoolPath {
c := gjson.GetBytes(buf, v)
if c.String() == "true" {
fields[k] = true
} else if c.String() == "false" {
fields[k] = false
} else {
log.Printf("E! Cannot decode: %v as bool", c.String())
}
}
for k, v := range j.StrPath {
c := gjson.GetBytes(buf, v)
fields[k] = c.Str
}
m, err := metric.New(j.MetricName, tags, fields, time.Now())
if err != nil {
return nil, err
}
metrics = append(metrics, m)
return metrics, nil
}
func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) {
m, err := j.Parse([]byte(str))
return m[0], err
}
func (j *JSONPath) SetDefaultTags(tags map[string]string) {
j.DefaultTags = tags
}

View File

@ -1,4 +1,4 @@
package jsonpath package gjson
import ( import (
"log" "log"
@ -17,7 +17,7 @@ func TestParseJsonPath(t *testing.T) {
"rejected": 0, "rejected": 0,
"avg_find_time": 4, "avg_find_time": 4,
"tester": "work", "tester": "work",
"tester2": "don't want this", "tester2": true,
"tester3": { "tester3": {
"hello":"sup", "hello":"sup",
"fun":"money", "fun":"money",
@ -28,7 +28,8 @@ func TestParseJsonPath(t *testing.T) {
jsonParser := JSONPath{ jsonParser := JSONPath{
MetricName: "jsonpather", MetricName: "jsonpather",
TagPath: map[string]string{"total": "$.shares.tester3"}, TagPath: map[string]string{"hello": "shares.tester3.hello"},
BoolPath: map[string]string{"bool": "shares.tester2"},
} }
metrics, err := jsonParser.Parse([]byte(testString)) metrics, err := jsonParser.Parse([]byte(testString))

View File

@ -1,76 +0,0 @@
package jsonpath
import (
"encoding/json"
"log"
"reflect"
"strconv"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/jsonpath"
)
type JSONPath struct {
MetricName string
TagPath map[string]string
FieldPath map[string]string
}
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
tags := make(map[string]string)
fields := make(map[string]interface{})
metrics := make([]telegraf.Metric, 0)
//turn buf into json interface
var jsonData interface{}
json.Unmarshal(buf, &jsonData)
log.Printf("unmarshaled jsonData: %v", jsonData)
for k, v := range j.TagPath {
c, err := jsonpath.JsonPathLookup(jsonData, v)
if err != nil {
log.Printf("E! Could not find JSON Path: %v", v)
}
cType := reflect.TypeOf(c)
//if path returns multiple values, split each into a different metric
if cType.Kind() == reflect.Array {
log.Printf("E! Multiple return values for path: %v", v)
continue
}
switch ct := c.(type) {
case string:
tags[k] = ct
case bool:
tags[k] = strconv.FormatBool(ct)
case float64:
tags[k] = strconv.FormatFloat(ct, 'f', -1, 64)
default:
log.Printf("E! [parsers.json] Unrecognized type %T", ct)
}
}
for k, v := range j.FieldPath {
c, err := jsonpath.JsonPathLookup(jsonData, v)
if err != nil {
log.Printf("E! Could not find JSON Path: %v", v)
continue
}
cType := reflect.TypeOf(c)
//if path returns multiple values, split each into a different metric
if cType.Kind() == reflect.Array {
log.Printf("E! Multiple return values for path: %v", v)
continue
}
fields[k] = c
}
m, _ := metric.New(j.MetricName, tags, fields, time.Now())
metrics = append(metrics, m)
return metrics, nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd" "github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/gjson"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
@ -87,6 +88,12 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values // an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags // used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string DropwizardTagPathsMap map[string]string
//for gjson format
TagPaths map[string]string
BoolPaths map[string]string
FloatPaths map[string]string
StringPaths map[string]string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@ -120,12 +127,34 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "gjson":
parser, err = newGJSONParser(config.MetricName,
config.TagPaths,
config.StringPaths,
config.BoolPaths,
config.FloatPaths)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
return parser, err return parser, err
} }
func newGJSONParser(metricName string,
tagPaths map[string]string,
strPaths map[string]string,
boolPaths map[string]string,
floatPaths map[string]string) (Parser, error) {
parser := &gjson.JSONPath{
MetricName: metricName,
TagPath: tagPaths,
StrPath: strPaths,
BoolPath: boolPaths,
FloatPath: floatPaths,
}
return parser, nil
}
func NewJSONParser( func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,