Compare commits
15 Commits
procstat-r
...
feature/js
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
690c0b6673 | ||
|
|
407675c741 | ||
|
|
b09fcc70c8 | ||
|
|
9836b1eb02 | ||
|
|
a79f1b7e0d | ||
|
|
d4a4ac25bb | ||
|
|
92e156c784 | ||
|
|
342d3d633a | ||
|
|
23523ffd10 | ||
|
|
523d761f34 | ||
|
|
3f28add025 | ||
|
|
ee6e4b0afd | ||
|
|
16454e25ba | ||
|
|
2a1feb6db9 | ||
|
|
61e197d254 |
@@ -20,6 +20,8 @@
|
||||
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
|
||||
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
|
||||
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
|
||||
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
|
||||
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
|
||||
|
||||
## v1.7.1 [unreleased]
|
||||
|
||||
@@ -27,6 +29,7 @@
|
||||
|
||||
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
|
||||
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
|
||||
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
|
||||
|
||||
## v1.7 [2018-06-12]
|
||||
|
||||
|
||||
10
Makefile
10
Makefile
@@ -54,11 +54,11 @@ fmtcheck:
|
||||
@echo '[INFO] done.'
|
||||
|
||||
test-windows:
|
||||
go test ./plugins/inputs/ping/...
|
||||
go test ./plugins/inputs/win_perf_counters/...
|
||||
go test ./plugins/inputs/win_services/...
|
||||
go test ./plugins/inputs/procstat/...
|
||||
go test ./plugins/inputs/ntpq/...
|
||||
go test -short ./plugins/inputs/ping/...
|
||||
go test -short ./plugins/inputs/win_perf_counters/...
|
||||
go test -short ./plugins/inputs/win_services/...
|
||||
go test -short ./plugins/inputs/procstat/...
|
||||
go test -short ./plugins/inputs/ntpq/...
|
||||
|
||||
# vet runs the Go source code static analysis tool `vet` to find
|
||||
# any common errors.
|
||||
|
||||
@@ -4,6 +4,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
||||
|
||||
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
|
||||
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
|
||||
1. [GJSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#gjson)
|
||||
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
|
||||
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
||||
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
||||
@@ -205,6 +206,69 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
|
||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
||||
```
|
||||
|
||||
# GJSON:
|
||||
GJSON also parses JSON data, but uses paths to name and identify fields of your choosing.
|
||||
|
||||
The GJSON parser supports 5 different configuration fields for json objects:
|
||||
|
||||
1.'gjson_tag_paths'
|
||||
2.'gjson_string_paths'
|
||||
3.'gjson_int_paths'
|
||||
4.'gjson_float_paths'
|
||||
5.'gjson_bool_paths'
|
||||
|
||||
Each field is a map type that will map a field_name to a field_path. Path syntax is described below.
|
||||
Path maps should be configured as:
|
||||
`toml gjson_tag_paths = {"field_name" = "field.path", "field_name2" = "field.path2"}`
|
||||
|
||||
Any paths specified in gjson_tag_paths will be converted to strings and stored as tags.
|
||||
Any paths otherwise specified will be their marked type and stored as fields.
|
||||
|
||||
#### GJSON Configuration:
|
||||
Paths are a series of keys seperated by a dot, ie "obj.sub_obj".
|
||||
Paths should not lead to an JSON array, but a single object.
|
||||
An error message will be thrown if a path describes an array.
|
||||
Further reading for path syntax can be found here: https://github.com/tidwall/gjson
|
||||
|
||||
As an example, if you had the json:
|
||||
|
||||
```json
|
||||
{
|
||||
"name": {"first": "Tom", "last": "Anderson"},
|
||||
"age":37,
|
||||
"children": ["Sara","Alex","Jack"],
|
||||
"fav.movie": "Deer Hunter",
|
||||
"friends": [
|
||||
{"first": "Dale", "last": "Murphy", "age": 44},
|
||||
{"first": "Roger", "last": "Craig", "age": 68},
|
||||
{"first": "Jane", "last": "Murphy", "age": 47}
|
||||
]
|
||||
}
|
||||
```
|
||||
with the config:
|
||||
|
||||
```toml
|
||||
[[inputs.exec]]
|
||||
## Commands array
|
||||
commands = ["/usr/bin/mycollector --foo=bar"]
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "gjson"
|
||||
|
||||
name_override = "gjson_sample"
|
||||
|
||||
gjson_tag_paths = {"first_name_tag" = "name.first"}
|
||||
gjson_string_paths = {"last_name" = "name.last"}
|
||||
gjson_int_paths = {"age" = "age", "Janes_age" = "friends.2.age"}
|
||||
```
|
||||
|
||||
would output the metric:
|
||||
`gjson_sample, first_name_tag=Tom last_name=Anderson,age=37,Janes_age=47`
|
||||
|
||||
|
||||
# Value:
|
||||
|
||||
The "value" data format translates single values into Telegraf metrics. This
|
||||
|
||||
@@ -1338,6 +1338,71 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||
}
|
||||
}
|
||||
|
||||
c.GJSONTagPaths = make(map[string]string)
|
||||
if node, ok := tbl.Fields["gjson_tag_paths"]; ok {
|
||||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
c.GJSONTagPaths[name] = str.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.GJSONBoolPaths = make(map[string]string)
|
||||
if node, ok := tbl.Fields["gjson_bool_paths"]; ok {
|
||||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
c.GJSONBoolPaths[name] = str.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.GJSONFloatPaths = make(map[string]string)
|
||||
if node, ok := tbl.Fields["gjson_float_paths"]; ok {
|
||||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
c.GJSONFloatPaths[name] = str.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.GJSONStringPaths = make(map[string]string)
|
||||
if node, ok := tbl.Fields["gjson_string_paths"]; ok {
|
||||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
c.GJSONStringPaths[name] = str.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.GJSONIntPaths = make(map[string]string)
|
||||
if node, ok := tbl.Fields["gjson_int_paths"]; ok {
|
||||
if subtbl, ok := node.(*ast.Table); ok {
|
||||
for name, val := range subtbl.Fields {
|
||||
if kv, ok := val.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
c.GJSONIntPaths[name] = str.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.MetricName = name
|
||||
|
||||
delete(tbl.Fields, "data_format")
|
||||
@@ -1353,6 +1418,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||
delete(tbl.Fields, "dropwizard_time_format")
|
||||
delete(tbl.Fields, "dropwizard_tags_path")
|
||||
delete(tbl.Fields, "dropwizard_tag_paths")
|
||||
delete(tbl.Fields, "gjson_tag_paths")
|
||||
delete(tbl.Fields, "gjson_bool_paths")
|
||||
delete(tbl.Fields, "gjson_float_paths")
|
||||
delete(tbl.Fields, "gjson_string_paths")
|
||||
delete(tbl.Fields, "gjson_int_paths")
|
||||
|
||||
return parsers.NewParser(c)
|
||||
}
|
||||
|
||||
@@ -108,7 +108,9 @@ You must capture at least one field per line.
|
||||
- ts-"CUSTOM"
|
||||
|
||||
CUSTOM time layouts must be within quotes and be the representation of the
|
||||
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`
|
||||
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
|
||||
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
|
||||
To match a comma decimal point you can use a period in the pattern string.
|
||||
See https://golang.org/pkg/time/#Parse for more details.
|
||||
|
||||
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),
|
||||
|
||||
@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
case DROP:
|
||||
// goodbye!
|
||||
default:
|
||||
// Replace commas with dot character
|
||||
v = strings.Replace(v, ",", ".", -1)
|
||||
|
||||
ts, err := time.ParseInLocation(t, v, p.loc)
|
||||
if err == nil {
|
||||
timestamp = ts
|
||||
|
||||
@@ -982,3 +982,21 @@ func TestSyslogTimestampParser(t *testing.T) {
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, 2018, m.Time().Year())
|
||||
}
|
||||
|
||||
func TestReplaceTimestampComma(t *testing.T) {
|
||||
|
||||
p := &Parser{
|
||||
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
|
||||
}
|
||||
|
||||
require.NoError(t, p.Compile())
|
||||
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
|
||||
require.Equal(t, 2018, m.Time().Year())
|
||||
require.Equal(t, 13, m.Time().Hour())
|
||||
require.Equal(t, 34, m.Time().Second())
|
||||
//Convert Nanosecond to milisecond for compare
|
||||
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
|
||||
return strings.TrimSpace(string(qd)), nil
|
||||
}
|
||||
|
||||
func qScan(path string) (int64, int64, int64, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
|
||||
finfos, err := f.Readdir(-1)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
|
||||
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
|
||||
var length, size int64
|
||||
var oldest time.Time
|
||||
for _, finfo := range finfos {
|
||||
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
|
||||
return nil
|
||||
}
|
||||
if finfo.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
length++
|
||||
size += finfo.Size()
|
||||
|
||||
ctime := statCTime(finfo.Sys())
|
||||
if ctime.IsZero() {
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
if oldest.IsZero() || ctime.Before(oldest) {
|
||||
oldest = ctime
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
var age int64
|
||||
if !oldest.IsZero() {
|
||||
age = int64(time.Now().Sub(oldest) / time.Second)
|
||||
} else if len(finfos) != 0 {
|
||||
} else if length != 0 {
|
||||
// system doesn't support ctime
|
||||
age = -1
|
||||
}
|
||||
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
|
||||
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
|
||||
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
|
||||
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
|
||||
if err != nil {
|
||||
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
|
||||
continue
|
||||
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
|
||||
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
|
||||
}
|
||||
|
||||
var dLength, dSize int64
|
||||
dAge := int64(-1)
|
||||
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
|
||||
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// the directories are created on first use
|
||||
continue
|
||||
}
|
||||
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
|
||||
return nil
|
||||
}
|
||||
dLength += length
|
||||
dSize += size
|
||||
if age > dAge {
|
||||
dAge = age
|
||||
}
|
||||
}
|
||||
fields := map[string]interface{}{"length": dLength, "size": dSize}
|
||||
if dAge != -1 {
|
||||
fields["age"] = dAge
|
||||
}
|
||||
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ package postfix
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(td)
|
||||
|
||||
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
|
||||
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
|
||||
}
|
||||
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
|
||||
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
|
||||
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
|
||||
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
|
||||
}
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
|
||||
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
|
||||
|
||||
p := Postfix{
|
||||
QueueDirectory: td,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# tail Input Plugin
|
||||
# Tail Input Plugin
|
||||
|
||||
The tail plugin "tails" a logfile and parses each log message.
|
||||
|
||||
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
|
||||
data_format = "influx"
|
||||
```
|
||||
|
||||
### Metrics:
|
||||
|
||||
Metrics are produced according to the `data_format` option. Additionally a
|
||||
tag labeled `path` is added to the metric containing the filename being tailed.
|
||||
|
||||
@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
|
||||
|
||||
m, err = t.parser.ParseLine(text)
|
||||
if err == nil {
|
||||
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
||||
if m != nil {
|
||||
tags := m.Tags()
|
||||
tags["path"] = tailer.Filename
|
||||
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
||||
}
|
||||
} else {
|
||||
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
|
||||
tailer.Filename, line.Text, err))
|
||||
|
||||
96
plugins/parsers/gjson/parser.go
Normal file
96
plugins/parsers/gjson/parser.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package gjson
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type JSONPath struct {
|
||||
MetricName string
|
||||
TagPath map[string]string
|
||||
FloatPath map[string]string
|
||||
IntPath map[string]string
|
||||
StrPath map[string]string
|
||||
BoolPath map[string]string
|
||||
DefaultTags map[string]string
|
||||
}
|
||||
|
||||
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
tags := make(map[string]string)
|
||||
for k, v := range j.DefaultTags {
|
||||
tags[k] = v
|
||||
}
|
||||
fields := make(map[string]interface{})
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
|
||||
for k, v := range j.TagPath {
|
||||
c := gjson.GetBytes(buf, v)
|
||||
if c.IsArray() {
|
||||
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||
continue
|
||||
}
|
||||
tags[k] = c.String()
|
||||
}
|
||||
|
||||
for k, v := range j.FloatPath {
|
||||
c := gjson.GetBytes(buf, v)
|
||||
if c.IsArray() {
|
||||
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||
continue
|
||||
}
|
||||
fields[k] = c.Float()
|
||||
}
|
||||
|
||||
for k, v := range j.IntPath {
|
||||
c := gjson.GetBytes(buf, v)
|
||||
if c.IsArray() {
|
||||
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||
continue
|
||||
}
|
||||
fields[k] = c.Int()
|
||||
}
|
||||
|
||||
for k, v := range j.BoolPath {
|
||||
c := gjson.GetBytes(buf, v)
|
||||
if c.IsArray() {
|
||||
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||
continue
|
||||
}
|
||||
if c.String() == "true" {
|
||||
fields[k] = true
|
||||
} else if c.String() == "false" {
|
||||
fields[k] = false
|
||||
} else {
|
||||
log.Printf("E! Cannot decode: %v as bool", c.String())
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range j.StrPath {
|
||||
c := gjson.GetBytes(buf, v)
|
||||
if c.IsArray() {
|
||||
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
|
||||
continue
|
||||
}
|
||||
fields[k] = c.String()
|
||||
}
|
||||
|
||||
m, err := metric.New(j.MetricName, tags, fields, time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics = append(metrics, m)
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) {
|
||||
m, err := j.Parse([]byte(str))
|
||||
return m[0], err
|
||||
}
|
||||
|
||||
func (j *JSONPath) SetDefaultTags(tags map[string]string) {
|
||||
j.DefaultTags = tags
|
||||
}
|
||||
72
plugins/parsers/gjson/parser_test.go
Normal file
72
plugins/parsers/gjson/parser_test.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package gjson
|
||||
|
||||
import (
|
||||
"log"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParseJsonPath(t *testing.T) {
|
||||
testString := `{
|
||||
"total_devices": 5,
|
||||
"total_threads": 10,
|
||||
"shares": {
|
||||
"total": 5,
|
||||
"accepted": 5,
|
||||
"rejected": 0,
|
||||
"avg_find_time": 4,
|
||||
"tester": "work",
|
||||
"tester2": true,
|
||||
"tester3": {
|
||||
"hello":"sup",
|
||||
"fun":"money",
|
||||
"break":9
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
jsonParser := JSONPath{
|
||||
MetricName: "jsonpather",
|
||||
TagPath: map[string]string{"hello": "shares.tester3.hello"},
|
||||
BoolPath: map[string]string{"bool": "shares.tester2"},
|
||||
}
|
||||
|
||||
metrics, err := jsonParser.Parse([]byte(testString))
|
||||
assert.NoError(t, err)
|
||||
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||
|
||||
}
|
||||
|
||||
func TestTagTypes(t *testing.T) {
|
||||
testString := `{
|
||||
"total_devices": 5,
|
||||
"total_threads": 10,
|
||||
"shares": {
|
||||
"total": 5,
|
||||
"accepted": 5,
|
||||
"rejected": 0,
|
||||
"my_bool": true,
|
||||
"tester": "work",
|
||||
"tester2": {
|
||||
"hello":"sup",
|
||||
"fun":true,
|
||||
"break":9.97
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
r := JSONPath{
|
||||
TagPath: map[string]string{"int1": "total_devices", "my_bool": "shares.my_bool"},
|
||||
FloatPath: map[string]string{"total": "shares.total"},
|
||||
BoolPath: map[string]string{"fun": "shares.tester2.fun"},
|
||||
StrPath: map[string]string{"hello": "shares.tester2.hello"},
|
||||
IntPath: map[string]string{"accepted": "shares.accepted"},
|
||||
}
|
||||
|
||||
metrics, err := r.Parse([]byte(testString))
|
||||
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, true, reflect.DeepEqual(map[string]interface{}{"total": 5.0, "fun": true, "hello": "sup", "accepted": int64(5)}, metrics[0].Fields()))
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/gjson"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||
@@ -87,6 +88,13 @@ type Config struct {
|
||||
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||
// used if TagsPath is empty or doesn't return any tags
|
||||
DropwizardTagPathsMap map[string]string
|
||||
|
||||
//for gjson format
|
||||
GJSONTagPaths map[string]string
|
||||
GJSONBoolPaths map[string]string
|
||||
GJSONFloatPaths map[string]string
|
||||
GJSONStringPaths map[string]string
|
||||
GJSONIntPaths map[string]string
|
||||
}
|
||||
|
||||
// NewParser returns a Parser interface based on the given config.
|
||||
@@ -120,12 +128,37 @@ func NewParser(config *Config) (Parser, error) {
|
||||
config.DefaultTags,
|
||||
config.Separator,
|
||||
config.Templates)
|
||||
|
||||
case "gjson":
|
||||
parser, err = newGJSONParser(config.MetricName,
|
||||
config.GJSONTagPaths,
|
||||
config.GJSONStringPaths,
|
||||
config.GJSONBoolPaths,
|
||||
config.GJSONFloatPaths,
|
||||
config.GJSONIntPaths)
|
||||
default:
|
||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||
}
|
||||
return parser, err
|
||||
}
|
||||
|
||||
func newGJSONParser(metricName string,
|
||||
tagPaths map[string]string,
|
||||
strPaths map[string]string,
|
||||
boolPaths map[string]string,
|
||||
floatPaths map[string]string,
|
||||
intPaths map[string]string) (Parser, error) {
|
||||
parser := &gjson.JSONPath{
|
||||
MetricName: metricName,
|
||||
TagPath: tagPaths,
|
||||
StrPath: strPaths,
|
||||
BoolPath: boolPaths,
|
||||
FloatPath: floatPaths,
|
||||
IntPath: intPaths,
|
||||
}
|
||||
return parser, nil
|
||||
}
|
||||
|
||||
func NewJSONParser(
|
||||
metricName string,
|
||||
tagKeys []string,
|
||||
|
||||
Reference in New Issue
Block a user