From 94ce67cc6789aacb55c63098a31deaf83f4dafb6 Mon Sep 17 00:00:00 2001 From: John Engelman Date: Tue, 15 Nov 2016 12:02:55 -0600 Subject: [PATCH] Add support to parse JSON array. (#1965) --- CHANGELOG.md | 3 + docs/DATA_FORMATS_INPUT.md | 56 +++++++++ plugins/inputs/httpjson/README.md | 52 ++++++++ plugins/inputs/httpjson/httpjson_test.go | 49 ++++++++ plugins/parsers/json/parser.go | 41 ++++++- plugins/parsers/json/parser_test.go | 144 ++++++++++++++++++++++- 6 files changed, 337 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bf56849a..db0b3a825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,9 @@ documentation for configuring journald. There is also a [`logfile` config option available in 1.1, which will allow users to easily configure telegraf to continue sending logs to /var/log/telegraf/telegraf.log. +- The JSON parser can now parse JSON data where the root object is an array. +The parsing configuration is applied to each element of the array. + ### Features - [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support. diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c14752d9c..f2a635d89 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -147,6 +147,62 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` +If the JSON data is an array, then each element of the array is parsed with the configured settings. +Each resulting metric will be output with the same timestamp. + +For example, if the following configuration: + +```toml +[[inputs.exec]] + ## Commands array + commands = ["/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" + + ## List of tag names to extract from top-level of JSON server response + tag_keys = [ + "my_tag_1", + "my_tag_2" + ] +``` + +with this JSON output from a command: + +```json +[ + { + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8 + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } +] +``` + +Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" + +``` +exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 +exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 +``` + # Value: The "value" data format translates single values into Telegraf metrics. This diff --git a/plugins/inputs/httpjson/README.md b/plugins/inputs/httpjson/README.md index 81680e6ec..c7c0e6797 100644 --- a/plugins/inputs/httpjson/README.md +++ b/plugins/inputs/httpjson/README.md @@ -37,6 +37,8 @@ You can also specify which keys from server response should be considered tags: ] ``` +If the JSON response is an array of objects, then each object will be parsed with the same configuration. + You can also specify additional request parameters for the service: ``` @@ -150,3 +152,53 @@ httpjson_mycollector1_b_e,server='http://my.service.com/_stats' value=5 httpjson_mycollector2_load,server='http://service.net/json/stats' value=100 httpjson_mycollector2_users,server='http://service.net/json/stats' value=1335 ``` + +# Example 3, Multiple Metrics in Response: + +The response JSON can be treated as an array of data points that are all parsed with the same configuration. + +``` +[[inputs.httpjson]] + name = "mycollector" + servers = [ + "http://my.service.com/_stats" + ] + # HTTP method to use (case-sensitive) + method = "GET" + tag_keys = ["service"] +``` + +which responds with the following JSON: + +```json +[ + { + "service": "service01", + "a": 0.5, + "b": { + "c": "some text", + "d": 0.1, + "e": 5 + } + }, + { + "service": "service02", + "a": 0.6, + "b": { + "c": "some text", + "d": 0.2, + "e": 6 + } + } +] +``` + +The collected metrics will be: +``` +httpjson_mycollector_a,service='service01',server='http://my.service.com/_stats' value=0.5 +httpjson_mycollector_b_d,service='service01',server='http://my.service.com/_stats' value=0.1 +httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stats' value=5 +httpjson_mycollector_a,service='service02',server='http://my.service.com/_stats' value=0.6 +httpjson_mycollector_b_d,service='service02',server='http://my.service.com/_stats' value=0.2 +httpjson_mycollector_b_e,service='service02',server='http://my.service.com/_stats' value=6 +``` diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index 31447b307..0029eb3e9 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -511,3 +511,52 @@ func TestHttpJson200Tags(t *testing.T) { } } } + +const validJSONArrayTags = ` +[ + { + "value": 15, + "role": "master", + "build": "123" + }, + { + "value": 17, + "role": "slave", + "build": "456" + } +]` + +// Test that array data is collected correctly +func TestHttpJsonArray200Tags(t *testing.T) { + httpjson := genMockHttpJson(validJSONArrayTags, 200) + + for _, service := range httpjson { + if service.Name == "other_webapp" { + var acc testutil.Accumulator + err := service.Gather(&acc) + // Set responsetime + for _, p := range acc.Metrics { + p.Fields["response_time"] = 1.0 + } + require.NoError(t, err) + assert.Equal(t, 8, acc.NFields()) + assert.Equal(t, uint64(4), acc.NMetrics()) + + for _, m := range acc.Metrics { + if m.Tags["role"] == "master" { + assert.Equal(t, "123", m.Tags["build"]) + assert.Equal(t, float64(15), m.Fields["value"]) + assert.Equal(t, float64(1), m.Fields["response_time"]) + assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else if m.Tags["role"] == "slave" { + assert.Equal(t, "456", m.Tags["build"]) + assert.Equal(t, float64(17), m.Fields["value"]) + assert.Equal(t, float64(1), m.Fields["response_time"]) + assert.Equal(t, "httpjson_"+service.Name, m.Measurement) + } else { + assert.FailNow(t, "unknown metric") + } + } + } + } +} diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index 180f2452a..a2c69ec28 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -1,6 +1,7 @@ package json import ( + "bytes" "encoding/json" "fmt" "strconv" @@ -16,15 +17,22 @@ type JSONParser struct { DefaultTags map[string]string } -func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) - var jsonOut map[string]interface{} + var jsonOut []map[string]interface{} err := json.Unmarshal(buf, &jsonOut) if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) + err = fmt.Errorf("unable to parse out as JSON Array, %s", err) return nil, err } + for _, item := range jsonOut { + metrics, err = p.parseObject(metrics, item) + } + return metrics, nil +} + +func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) for k, v := range p.DefaultTags { @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { } f := JSONFlattener{} - err = f.FlattenJSON("", jsonOut) + err := f.FlattenJSON("", jsonOut) if err != nil { return nil, err } @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { return append(metrics, metric), nil } +func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + if !isarray(buf) { + metrics := make([]telegraf.Metric, 0) + var jsonOut map[string]interface{} + err := json.Unmarshal(buf, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + return nil, err + } + return p.parseObject(metrics, jsonOut) + } + return p.parseArray(buf) +} + func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n")) @@ -115,3 +138,13 @@ func (f *JSONFlattener) FlattenJSON( } return nil } + +func isarray(buf []byte) bool { + ia := bytes.IndexByte(buf, '[') + ib := bytes.IndexByte(buf, '{') + if ia > -1 && ia < ib { + return true + } else { + return false + } +} diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index f3e6d9404..c8e2be1ad 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -7,10 +7,12 @@ import ( ) const ( - validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" - validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" - invalidJSON = "I don't think this is JSON" - invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" + validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" + validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" + validJSONArray = "[{\"a\": 5, \"b\": {\"c\": 6}}]" + validJSONArrayMultiple = "[{\"a\": 5, \"b\": {\"c\": 6}}, {\"a\": 7, \"b\": {\"c\": 8}}]" + invalidJSON = "I don't think this is JSON" + invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" ) const validJSONTags = ` @@ -24,6 +26,27 @@ const validJSONTags = ` } ` +const validJSONArrayTags = ` +[ +{ + "a": 5, + "b": { + "c": 6 + }, + "mytag": "foo", + "othertag": "baz" +}, +{ + "a": 7, + "b": { + "c": 8 + }, + "mytag": "bar", + "othertag": "baz" +} +] +` + func TestParseValidJSON(t *testing.T) { parser := JSONParser{ MetricName: "json_test", @@ -282,3 +305,116 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) { "mytag": "foobar", }, metrics[0].Tags()) } + +// Test that json arrays can be parsed +func TestParseValidJSONArray(t *testing.T) { + parser := JSONParser{ + MetricName: "json_array_test", + } + + // Most basic vanilla test + metrics, err := parser.Parse([]byte(validJSONArray)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Basic multiple datapoints + metrics, err = parser.Parse([]byte(validJSONArrayMultiple)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) +} + +func TestParseArrayWithTagKeys(t *testing.T) { + // Test that strings not matching tag keys are ignored + parser := JSONParser{ + MetricName: "json_array_test", + TagKeys: []string{"wrongtagkey"}, + } + metrics, err := parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + + // Test that single tag key is found and applied + parser = JSONParser{ + MetricName: "json_array_test", + TagKeys: []string{"mytag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + }, metrics[1].Tags()) + + // Test that both tag keys are found and applied + parser = JSONParser{ + MetricName: "json_array_test", + TagKeys: []string{"mytag", "othertag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + "othertag": "baz", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + "othertag": "baz", + }, metrics[1].Tags()) +}