add kafka new consumer support to kafka_consumer

+kafka new consumer support to kafka_consumer
+timestamp selector to json parser, can specify the timestamp field
+timestamp formatter to json parser, can parse the timestamp in timestamp field
This commit is contained in:
Kirk Young
2016-07-27 09:22:02 -07:00
parent 412f5b5acb
commit b6926c36e8
4 changed files with 192 additions and 44 deletions

View File

@@ -11,9 +11,11 @@ import (
)
type JSONParser struct {
MetricName string
TagKeys []string
DefaultTags map[string]string
MetricName string
TagKeys []string
DefaultTags map[string]string
TimestampSelector string
TimestampFormatter string
}
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
@@ -31,6 +33,25 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
tags[k] = v
}
timestampStr := ""
timestampData := jsonOut[p.TimestampSelector]
if timestampData != nil {
timestampStr = timestampData.(string)
}
if p.TimestampFormatter == "" {
p.TimestampFormatter = time.RFC3339Nano
}
timestamp := time.Now().UTC()
if timestampStr != "" {
timestampTmp, err := time.Parse(p.TimestampFormatter, timestampStr)
if err != nil {
return nil, err
}
timestamp = timestampTmp
}
for _, tag := range p.TagKeys {
switch v := jsonOut[tag].(type) {
case string:
@@ -45,7 +66,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return nil, err
}
metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, time.Now().UTC())
metric, err := telegraf.NewMetric(p.MetricName, tags, f.Fields, timestamp)
if err != nil {
return nil, err

View File

@@ -58,6 +58,12 @@ type Config struct {
// DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string
// TimestampSelector only applies to JSON
TimestampSelector string
// TimestampFormatter only applies to JSON
TimestampFormatter string
}
// NewParser returns a Parser interface based on the given config.
@@ -67,7 +73,8 @@ func NewParser(config *Config) (Parser, error) {
switch config.DataFormat {
case "json":
parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags)
config.TagKeys, config.DefaultTags,
config.TimestampSelector, config.TimestampFormatter)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
@@ -88,11 +95,23 @@ func NewJSONParser(
metricName string,
tagKeys []string,
defaultTags map[string]string,
timestampParameters ...string,
) (Parser, error) {
timestampSelector, timestampFormatter := "", ""
switch len(timestampParameters) {
case 2:
timestampFormatter = timestampParameters[1]
fallthrough
case 1:
timestampSelector = timestampParameters[0]
}
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
DefaultTags: defaultTags,
MetricName: metricName,
TagKeys: tagKeys,
DefaultTags: defaultTags,
TimestampSelector: timestampSelector,
TimestampFormatter: timestampFormatter,
}
return parser, nil
}