Add support to httpjson input plugin to parse nested objects and arrays of objects
This commit is contained in:
parent
f862c6585d
commit
2ab56eb61c
1
Godeps
1
Godeps
|
@ -29,6 +29,7 @@ github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
|
|||
github.com/influxdata/influxdb fc57c0f7c635df3873f3d64f0ed2100ddc94d5ae
|
||||
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
|
||||
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
|
||||
github.com/jmoiron/jsonq e874b168d07ecc7808bc950a17998a8aa3141d82
|
||||
github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
|
||||
github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a
|
||||
github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
|
||||
|
|
|
@ -1221,6 +1221,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||
}
|
||||
}
|
||||
|
||||
if node, ok := tbl.Fields["json_paths"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||
for _, elem := range ary.Value {
|
||||
if str, ok := elem.(*ast.String); ok {
|
||||
c.JSONPaths = append(c.JSONPaths, str.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if node, ok := tbl.Fields["data_type"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
|
|
|
@ -39,6 +39,18 @@ You can also specify which keys from server response should be considered tags:
|
|||
|
||||
If the JSON response is an array of objects, then each object will be parsed with the same configuration.
|
||||
|
||||
You can also specify paths to nested JSON objects and arrays of objects using dot-notation such that each object will be parsed with the same configuration.
|
||||
|
||||
```
|
||||
[[inputs.httpjson]]
|
||||
...
|
||||
|
||||
json_paths = [
|
||||
"path.to.my.metricsArr",
|
||||
"path.to.my.metricsObj"
|
||||
]
|
||||
```
|
||||
|
||||
You can also specify additional request parameters for the service:
|
||||
|
||||
```
|
||||
|
@ -202,3 +214,34 @@ httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats'
|
|||
httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2
|
||||
httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6
|
||||
```
|
||||
|
||||
# Example 4, nested arrays with local and global tag keys in Response:
|
||||
|
||||
The response JSON can be parsed to treat nested objects and arrays of objects as unique points with the top-level and object specific tags:
|
||||
|
||||
```
|
||||
[[inputs.httpjson]]
|
||||
name = "mycollector"
|
||||
servers = ["http://my.service.com/_stats"]
|
||||
method = "GET"
|
||||
tag_keys = ["service", "tagA", "tagB"]
|
||||
json_paths = ["metrics.myMetricsArr"]
|
||||
|
||||
```
|
||||
|
||||
which responds with the following JSON:
|
||||
|
||||
```json
|
||||
{"service":"myservice",
|
||||
"metrics":{"myMetricsArr":
|
||||
[{"tagA":"ABC", "tagB":"XYZ", "value":1.0},
|
||||
{"tagA":"DEF", "tagB":"UVW", "value":2.0}]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The collected metrics will be:
|
||||
```
|
||||
httpjson_mycollector,service=myservice,server=http://my.service.com/_stats,tagA=ABC,tagB=XYZ value=1.0
|
||||
httpjson_mycollector,service=myservice,server=http://my.service.com/_stats,tagA=DEF,tagB=UVW value=2.0
|
||||
```
|
||||
|
|
|
@ -22,6 +22,7 @@ type HttpJson struct {
|
|||
Servers []string
|
||||
Method string
|
||||
TagKeys []string
|
||||
JSONPaths []string
|
||||
ResponseTimeout internal.Duration
|
||||
Parameters map[string]string
|
||||
Headers map[string]string
|
||||
|
@ -199,7 +200,12 @@ func (h *HttpJson) gatherServer(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(h.JSONPaths) > 0 {
|
||||
parser, err = parsers.NewJSONQParser(msrmnt_name, h.TagKeys, tags, h.JSONPaths)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
metrics, err := parser.Parse([]byte(resp))
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -201,6 +201,33 @@ func genMockHttpJson(response string, statusCode int) []*HttpJson {
|
|||
"build",
|
||||
},
|
||||
},
|
||||
&HttpJson{
|
||||
client: &mockHTTPClient{responseBody: response, statusCode: statusCode},
|
||||
Servers: []string{
|
||||
"http://server5.example.com/metrics/",
|
||||
"http://server6.example.com/metrics/",
|
||||
},
|
||||
Name: "other_other_webapp",
|
||||
Method: "POST",
|
||||
Parameters: map[string]string{
|
||||
"httpParam1": "12",
|
||||
"httpParam2": "the second parameter",
|
||||
},
|
||||
Headers: map[string]string{
|
||||
"X-Auth-Token": "the-first-parameter",
|
||||
"apiVersion": "v1",
|
||||
},
|
||||
TagKeys: []string{
|
||||
"service",
|
||||
"tagA",
|
||||
"tagB",
|
||||
"tagC",
|
||||
},
|
||||
JSONPaths: []string{
|
||||
"metrics.myMetricsArr",
|
||||
"metrics.myMetricsObj",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -560,3 +587,58 @@ func TestHttpJsonArray200Tags(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
const validJSON3 = `
|
||||
{"service":"myservice",
|
||||
"notATag":"abcd",
|
||||
"notATag2":1.234,
|
||||
"metrics":{"myMetricsArr": [{"tagA":"ABC", "tagB":"XYZ", "value":1.0}, {"tagA":"DEF", "tagB":"UVW", "value":2.0}],
|
||||
"myMetricsObj": {"tagA": "HIJ", "tagC":"LMN", "value":3.0, "anotherValue": 4.0}
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
// Test that nested array data is collected correctly
|
||||
func TestHttpJsonNestedArrayTags(t *testing.T) {
|
||||
httpjson := genMockHttpJson(validJSON3, 200)
|
||||
|
||||
for _, service := range httpjson {
|
||||
if service.Name == "other_other_webapp" {
|
||||
var acc testutil.Accumulator
|
||||
err := service.Gather(&acc)
|
||||
// Set responsetime
|
||||
for _, p := range acc.Metrics {
|
||||
p.Fields["response_time"] = 1.0
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 18, acc.NFields())
|
||||
assert.Equal(t, uint64(8), acc.NMetrics())
|
||||
|
||||
for _, m := range acc.Metrics {
|
||||
if m.Tags["tagA"] == "ABC" {
|
||||
assert.Equal(t, "XYZ", m.Tags["tagB"])
|
||||
assert.Equal(t, "myservice", m.Tags["service"])
|
||||
assert.Equal(t, float64(1), m.Fields["value"])
|
||||
// assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
|
||||
} else if m.Tags["tagA"] == "DEF" {
|
||||
assert.Equal(t, "UVW", m.Tags["tagB"])
|
||||
assert.Equal(t, "myservice", m.Tags["service"])
|
||||
assert.Equal(t, float64(2), m.Fields["value"])
|
||||
// assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
|
||||
} else if m.Tags["tagA"] == "HIJ" {
|
||||
assert.Equal(t, "LMN", m.Tags["tagC"])
|
||||
assert.Equal(t, "myservice", m.Tags["service"])
|
||||
assert.Equal(t, float64(3.0), m.Fields["value"])
|
||||
// assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
|
||||
} else if _, ok := m.Tags["tagA"]; !ok {
|
||||
assert.Equal(t, "myservice", m.Tags["service"])
|
||||
assert.Equal(t, float64(1.234), m.Fields["notATag2"])
|
||||
// assert.Equal(t, "httpjson_"+service.Name, m.Measurement)
|
||||
} else {
|
||||
fmt.Printf("tags: %v\nfields: %v\nmeasurement: %v\n", m.Tags, m.Fields, m.Measurement)
|
||||
assert.FailNow(t, "unknown metric")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,12 +9,14 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/jmoiron/jsonq"
|
||||
)
|
||||
|
||||
type JSONParser struct {
|
||||
MetricName string
|
||||
TagKeys []string
|
||||
DefaultTags map[string]string
|
||||
JSONPaths []string
|
||||
}
|
||||
|
||||
func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
@ -65,6 +67,77 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
|
|||
return append(metrics, metric), nil
|
||||
}
|
||||
|
||||
func (p *JSONParser) parsePath(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {
|
||||
jq := jsonq.NewQuery(jsonOut)
|
||||
tags := make(map[string]string)
|
||||
for k, v := range p.DefaultTags {
|
||||
tags[k] = v
|
||||
}
|
||||
for _, tag := range p.TagKeys {
|
||||
if v, err := jq.String(tag); err == nil {
|
||||
tags[tag] = v
|
||||
}
|
||||
}
|
||||
|
||||
for _, path := range p.JSONPaths {
|
||||
pathByNode := strings.Split(path, ".")
|
||||
if a, err := jq.ArrayOfObjects(pathByNode...); err == nil {
|
||||
metrics, err = p.parseArrayPath(metrics, a, tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parentObj, err := jq.Object(pathByNode[:len(pathByNode)-1]...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(parentObj, pathByNode[len(pathByNode)-1])
|
||||
}
|
||||
if o, err := jq.Object(pathByNode...); err == nil {
|
||||
metrics, err = p.parseObjectPath(metrics, o, tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parentObj, err := jq.Object(pathByNode[:len(pathByNode)-1]...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
delete(parentObj, pathByNode[len(pathByNode)-1])
|
||||
}
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *JSONParser) parseArrayPath(metrics []telegraf.Metric, jsonOut []map[string]interface{}, tags map[string]string) ([]telegraf.Metric, error) {
|
||||
var err error
|
||||
for _, doc := range jsonOut {
|
||||
metrics, err = p.parseObjectPath(metrics, doc, tags)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *JSONParser) parseObjectPath(metrics []telegraf.Metric, jsonOut map[string]interface{}, tags map[string]string) ([]telegraf.Metric, error) {
|
||||
jq := jsonq.NewQuery(jsonOut)
|
||||
for _, tag := range p.TagKeys {
|
||||
if v, err := jq.String(tag); err == nil {
|
||||
tags[tag] = v
|
||||
}
|
||||
}
|
||||
f := JSONFlattener{}
|
||||
err := f.FlattenJSON("", jsonOut)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return append(metrics, metric), nil
|
||||
}
|
||||
|
||||
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
||||
if !isarray(buf) {
|
||||
|
@ -75,6 +148,9 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
err = fmt.Errorf("unable to parse out as JSON, %s", err)
|
||||
return nil, err
|
||||
}
|
||||
if len(p.JSONPaths) > 0 {
|
||||
metrics, err = p.parsePath(metrics, jsonOut)
|
||||
}
|
||||
return p.parseObject(metrics, jsonOut)
|
||||
}
|
||||
return p.parseArray(buf)
|
||||
|
|
|
@ -52,6 +52,8 @@ type Config struct {
|
|||
TagKeys []string
|
||||
// MetricName applies to JSON & value. This will be the name of the measurement.
|
||||
MetricName string
|
||||
// JSONPaths only appear in JSON data
|
||||
JSONPaths []string
|
||||
|
||||
// DataType only applies to value, this will be the type to parse value to
|
||||
DataType string
|
||||
|
@ -66,8 +68,16 @@ func NewParser(config *Config) (Parser, error) {
|
|||
var parser Parser
|
||||
switch config.DataFormat {
|
||||
case "json":
|
||||
parser, err = NewJSONParser(config.MetricName,
|
||||
config.TagKeys, config.DefaultTags)
|
||||
switch len(config.JSONPaths) {
|
||||
case 0:
|
||||
parser, err = NewJSONParser(config.MetricName,
|
||||
config.TagKeys, config.DefaultTags)
|
||||
default:
|
||||
parser, err = NewJSONQParser(config.MetricName,
|
||||
config.TagKeys, config.DefaultTags,
|
||||
config.JSONPaths)
|
||||
}
|
||||
|
||||
case "value":
|
||||
parser, err = NewValueParser(config.MetricName,
|
||||
config.DataType, config.DefaultTags)
|
||||
|
@ -97,6 +107,21 @@ func NewJSONParser(
|
|||
return parser, nil
|
||||
}
|
||||
|
||||
func NewJSONQParser(
|
||||
metricName string,
|
||||
tagKeys []string,
|
||||
defaultTags map[string]string,
|
||||
jsonPaths []string,
|
||||
) (Parser, error) {
|
||||
parser := &json.JSONParser{
|
||||
MetricName: metricName,
|
||||
TagKeys: tagKeys,
|
||||
DefaultTags: defaultTags,
|
||||
JSONPaths: jsonPaths,
|
||||
}
|
||||
return parser, nil
|
||||
}
|
||||
|
||||
func NewNagiosParser() (Parser, error) {
|
||||
return &nagios.NagiosParser{}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue