add timestamp selector to JSON parser

allow user to specify timestamp field instead of being forced to use local time
This commit is contained in:
Kirk Young 2016-07-27 10:55:00 -07:00
parent 412f5b5acb
commit b00f993f27
3 changed files with 59 additions and 11 deletions

View File

@ -106,6 +106,8 @@ The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist, will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics. they will be applied as tags to the Telegraf metrics.
JSON data format can specify the timestamp by "timestamp_selector" and then parse it using "timestamp_formatter"; this could be useful when dealing with metrics not generated locally.
For example, if you had this configuration: For example, if you had this configuration:
```toml ```toml
@ -127,6 +129,11 @@ For example, if you had this configuration:
"my_tag_1", "my_tag_1",
"my_tag_2" "my_tag_2"
] ]
timestamp_selector = "@timestamp"
## for more information about timestamp formatter, please refer to:
## https://golang.org/src/time/format.go
timestamp_formatter = "2006-01-02T15:04:05Z07:00"
``` ```
with this JSON output from a command: with this JSON output from a command:
@ -137,14 +144,15 @@ with this JSON output from a command:
"b": { "b": {
"c": 6 "c": 6
}, },
"my_tag_1": "foo" "my_tag_1": "foo",
"@timestamp": "2016-07-27T16:46:00.554Z"
} }
``` ```
Your Telegraf metrics would get tagged with "my_tag_1" Your Telegraf metrics would get tagged with "my_tag_1" and timestamp
``` ```
exec_mycollector,my_tag_1=foo a=5,b_c=6 exec_mycollector,my_tag_1=foo a=5,b_c=6 1469637960554000000
``` ```
# Value: # Value:

View File

@ -11,9 +11,11 @@ import (
) )
type JSONParser struct { type JSONParser struct {
MetricName string MetricName string
TagKeys []string TagKeys []string
DefaultTags map[string]string DefaultTags map[string]string
TimestampSelector string
TimestampFormatter string
} }
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
@ -31,6 +33,25 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
tags[k] = v tags[k] = v
} }
timestampStr := ""
timestampData := jsonOut[p.TimestampSelector]
if timestampData != nil {
timestampStr = timestampData.(string)
}
if p.TimestampFormatter == "" {
p.TimestampFormatter = time.RFC3339Nano
}
timestamp := time.Now().UTC()
if timestampStr != "" {
timestampTmp, err := time.Parse(p.TimestampFormatter, timestampStr)
if err != nil {
return nil, err
}
timestamp = timestampTmp
}
for _, tag := range p.TagKeys { for _, tag := range p.TagKeys {
switch v := jsonOut[tag].(type) { switch v := jsonOut[tag].(type) {
case string: case string:
@ -45,7 +66,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return nil, err return nil, err
} }
metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC()) metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, timestamp)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -58,6 +58,12 @@ type Config struct {
// DefaultTags are the default tags that will be added to all parsed metrics. // DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string DefaultTags map[string]string
// TimestampSelector only applies to JSON
TimestampSelector string
// TimestampFormatter only applies to JSON
TimestampFormatter string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@ -67,7 +73,8 @@ func NewParser(config *Config) (Parser, error) {
switch config.DataFormat { switch config.DataFormat {
case "json": case "json":
parser, err = NewJSONParser(config.MetricName, parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags) config.TagKeys, config.DefaultTags,
config.TimestampSelector, config.TimestampFormatter)
case "value": case "value":
parser, err = NewValueParser(config.MetricName, parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags) config.DataType, config.DefaultTags)
@ -88,11 +95,23 @@ func NewJSONParser(
metricName string, metricName string,
tagKeys []string, tagKeys []string,
defaultTags map[string]string, defaultTags map[string]string,
timestampParameters ...string,
) (Parser, error) { ) (Parser, error) {
timestampSelector, timestampFormatter := "", ""
switch len(timestampParameters) {
case 2:
timestampFormatter = timestampParameters[1]
fallthrough
case 1:
timestampSelector = timestampParameters[0]
}
parser := &json.JSONParser{ parser := &json.JSONParser{
MetricName: metricName, MetricName: metricName,
TagKeys: tagKeys, TagKeys: tagKeys,
DefaultTags: defaultTags, DefaultTags: defaultTags,
TimestampSelector: timestampSelector,
TimestampFormatter: timestampFormatter,
} }
return parser, nil return parser, nil
} }