diff --git a/Godeps b/Godeps index 6a0a17df1..f3356ad11 100644 --- a/Godeps +++ b/Godeps @@ -29,6 +29,7 @@ github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da github.com/influxdata/influxdb fc57c0f7c635df3873f3d64f0ed2100ddc94d5ae github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec +github.com/jmoiron/jsonq e874b168d07ecc7808bc950a17998a8aa3141d82 github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dac..b35935893 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1221,6 +1221,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["json_paths"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.JSONPaths = append(c.JSONPaths, str.Value) + } + } + } + } + } + if node, ok := tbl.Fields["data_type"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { diff --git a/plugins/inputs/httpjson/README.md b/plugins/inputs/httpjson/README.md index c7c0e6797..cec7bf46a 100644 --- a/plugins/inputs/httpjson/README.md +++ b/plugins/inputs/httpjson/README.md @@ -39,6 +39,18 @@ You can also specify which keys from server response should be considered tags: If the JSON response is an array of objects, then each object will be parsed with the same configuration. +You can also specify paths to nested JSON objects and arrays of objects using dot-notation such that each object will be parsed with the same configuration. + +``` +[[inputs.httpjson]] + ... + + json_paths = [ + "path.to.my.metricsArr", + "path.to.my.metricsObj" + ] +``` + You can also specify additional request parameters for the service: ``` @@ -202,3 +214,34 @@ httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2 httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6 ``` + +# Example 4, nested arrays with local and global tag keys in Response: + +The response JSON can be parsed to treat nested objects and arrays of objects as unique points with the top-level and object specific tags: + +``` +[[inputs.httpjson]] + name = "mycollector" + servers = ["http://my.service.com/_stats"] + method = "GET" + tag_keys = ["service", "tagA", "tagB"] + json_paths = ["metrics.myMetricsArr"] + +``` + +which responds with the following JSON: + +```json +{"service":"myservice", + "metrics":{"myMetricsArr": + [{"tagA":"ABC", "tagB":"XYZ", "value":1.0}, + {"tagA":"DEF", "tagB":"UVW", "value":2.0}] + } +} +``` + +The collected metrics will be: +``` +httpjson_mycollector,service=myservice,server=http://my.service.com/_stats,tagA=ABC,tagB=XYZ value=1.0 +httpjson_mycollector,service=myservice,server=http://my.service.com/_stats,tagA=DEF,tagB=UVW value=2.0 +``` diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 89bfccf77..b5c57ca22 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -22,6 +22,7 @@ type HttpJson struct { Servers []string Method string TagKeys []string + JSONPaths []string ResponseTimeout internal.Duration Parameters map[string]string Headers map[string]string @@ -199,7 +200,12 @@ func (h *HttpJson) gatherServer( if err != nil { return err } - + if len(h.JSONPaths) > 0 { + parser, err = parsers.NewJSONQParser(msrmnt_name, h.TagKeys, tags, h.JSONPaths) + if err != nil { + return err + } + } metrics, err := parser.Parse([]byte(resp)) if err != nil { return err diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index 0029eb3e9..8a1ddef52 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -201,6 +201,33 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson { "build", }, }, + &HttpJson{ + client: &mockHTTPClient{responseBody: response, statusCode: statusCode}, + Servers: []string{ + "http://server5.example.com/metrics/", + "http://server6.example.com/metrics/", + }, + Name: "other_other_webapp", + Method: "POST", + Parameters: map[string]string{ + "httpParam1": "12", + "httpParam2": "the second parameter", + }, + Headers: map[string]string{ + "X-Auth-Token": "the-first-parameter", + "apiVersion": "v1", + }, + TagKeys: []string{ + "service", + "tagA", + "tagB", + "tagC", + }, + JSONPaths: []string{ + "metrics.myMetricsArr", + "metrics.myMetricsObj", + }, + }, } } @@ -560,3 +587,58 @@ func TestHttpJsonArray200Tags(t *testing.T) { } } } + +const validJSON3 = ` +{"service":"myservice", + "notATag":"abcd", + "notATag2":1.234, + "metrics":{"myMetricsArr": [{"tagA":"ABC", "tagB":"XYZ", "value":1.0}, {"tagA":"DEF", "tagB":"UVW", "value":2.0}], + "myMetricsObj": {"tagA": "HIJ", "tagC":"LMN", "value":3.0, "anotherValue": 4.0} + } +} +` + +// Test that nested array data is collected correctly +func TestHttpJsonNestedArrayTags(t *testing.T) { + httpjson := genMockHttpJson(validJSON3, 200) + + for _, service := range httpjson { + if service.Name == "other_other_webapp" { + var acc testutil.Accumulator + err := service.Gather(&acc) + // Set responsetime + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err) + assert.Equal(t, 18, acc.NFields()) + assert.Equal(t, uint64(8), acc.NMetrics()) + + for _, m := range acc.Metrics { + if m.Tags["tagA"] == "ABC" { + assert.Equal(t, "XYZ", m.Tags["tagB"]) + assert.Equal(t, "myservice", m.Tags["service"]) + assert.Equal(t, float64(1), m.Fields["value"]) + // assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else if m.Tags["tagA"] == "DEF" { + assert.Equal(t, "UVW", m.Tags["tagB"]) + assert.Equal(t, "myservice", m.Tags["service"]) + assert.Equal(t, float64(2), m.Fields["value"]) + // assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else if m.Tags["tagA"] == "HIJ" { + assert.Equal(t, "LMN", m.Tags["tagC"]) + assert.Equal(t, "myservice", m.Tags["service"]) + assert.Equal(t, float64(3.0), m.Fields["value"]) + // assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else if _, ok := m.Tags["tagA"]; !ok { + assert.Equal(t, "myservice", m.Tags["service"]) + assert.Equal(t, float64(1.234), m.Fields["notATag2"]) + // assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else { + fmt.Printf("tags: %v\nfields: %v\nmeasurement: %v\n", m.Tags, m.Fields, m.Measurement) + assert.FailNow(t, "unknown metric") + } + } + } + } +} diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index a2c69ec28..1833d2193 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -9,12 +9,14 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/jmoiron/jsonq" ) type JSONParser struct { MetricName string TagKeys []string DefaultTags map[string]string + JSONPaths []string } func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { @@ -65,6 +67,77 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return append(metrics, metric), nil } +func (p *JSONParser) parsePath(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { + jq := jsonq.NewQuery(jsonOut) + tags := make(map[string]string) + for k, v := range p.DefaultTags { + tags[k] = v + } + for _, tag := range p.TagKeys { + if v, err := jq.String(tag); err == nil { + tags[tag] = v + } + } + + for _, path := range p.JSONPaths { + pathByNode := strings.Split(path, ".") + if a, err := jq.ArrayOfObjects(pathByNode...); err == nil { + metrics, err = p.parseArrayPath(metrics, a, tags) + if err != nil { + return nil, err + } + parentObj, err := jq.Object(pathByNode[:len(pathByNode)-1]...) + if err != nil { + return nil, err + } + delete(parentObj, pathByNode[len(pathByNode)-1]) + } + if o, err := jq.Object(pathByNode...); err == nil { + metrics, err = p.parseObjectPath(metrics, o, tags) + if err != nil { + return nil, err + } + parentObj, err := jq.Object(pathByNode[:len(pathByNode)-1]...) + if err != nil { + return nil, err + } + delete(parentObj, pathByNode[len(pathByNode)-1]) + } + } + + return metrics, nil +} + +func (p *JSONParser) parseArrayPath(metrics []telegraf.Metric, jsonOut []map[string]interface{}, tags map[string]string) ([]telegraf.Metric, error) { + var err error + for _, doc := range jsonOut { + metrics, err = p.parseObjectPath(metrics, doc, tags) + if err != nil { + return nil, err + } + } + return metrics, nil +} + +func (p *JSONParser) parseObjectPath(metrics []telegraf.Metric, jsonOut map[string]interface{}, tags map[string]string) ([]telegraf.Metric, error) { + jq := jsonq.NewQuery(jsonOut) + for _, tag := range p.TagKeys { + if v, err := jq.String(tag); err == nil { + tags[tag] = v + } + } + f := JSONFlattener{} + err := f.FlattenJSON("", jsonOut) + if err != nil { + return nil, err + } + metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC()) + if err != nil { + return nil, err + } + return append(metrics, metric), nil +} + func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { if !isarray(buf) { @@ -75,6 +148,9 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { err = fmt.Errorf("unable to parse out as JSON, %s", err) return nil, err } + if len(p.JSONPaths) > 0 { + metrics, err = p.parsePath(metrics, jsonOut) + } return p.parseObject(metrics, jsonOut) } return p.parseArray(buf) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 360d795bc..32216031b 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -52,6 +52,8 @@ type Config struct { TagKeys []string // MetricName applies to JSON & value. This will be the name of the measurement. MetricName string + // JSONPaths only appear in JSON data + JSONPaths []string // DataType only applies to value, this will be the type to parse value to DataType string @@ -66,8 +68,16 @@ func NewParser(config *Config) (Parser, error) { var parser Parser switch config.DataFormat { case "json": - parser, err = NewJSONParser(config.MetricName, - config.TagKeys, config.DefaultTags) + switch len(config.JSONPaths) { + case 0: + parser, err = NewJSONParser(config.MetricName, + config.TagKeys, config.DefaultTags) + default: + parser, err = NewJSONQParser(config.MetricName, + config.TagKeys, config.DefaultTags, + config.JSONPaths) + } + case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -97,6 +107,21 @@ func NewJSONParser( return parser, nil } +func NewJSONQParser( + metricName string, + tagKeys []string, + defaultTags map[string]string, + jsonPaths []string, +) (Parser, error) { + parser := &json.JSONParser{ + MetricName: metricName, + TagKeys: tagKeys, + DefaultTags: defaultTags, + JSONPaths: jsonPaths, + } + return parser, nil +} + func NewNagiosParser() (Parser, error) { return &nagios.NagiosParser{}, nil }