Add name, time, path and string field options to JSON parser (#4351)
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -18,9 +19,14 @@ var (
|
||||
)
|
||||
|
||||
type JSONParser struct {
|
||||
MetricName string
|
||||
TagKeys []string
|
||||
DefaultTags map[string]string
|
||||
MetricName string
|
||||
TagKeys []string
|
||||
StringFields []string
|
||||
JSONNameKey string
|
||||
JSONQuery string
|
||||
JSONTimeKey string
|
||||
JSONTimeFormat string
|
||||
DefaultTags map[string]string
|
||||
}
|
||||
|
||||
func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
|
||||
@@ -34,6 +40,9 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
|
||||
}
|
||||
for _, item := range jsonOut {
|
||||
metrics, err = p.parseObject(metrics, item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return metrics, nil
|
||||
}
|
||||
@@ -51,10 +60,42 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//checks if json_name_key is set
|
||||
if p.JSONNameKey != "" {
|
||||
p.MetricName = f.Fields[p.JSONNameKey].(string)
|
||||
}
|
||||
|
||||
//if time key is specified, set it to nTime
|
||||
nTime := time.Now().UTC()
|
||||
if p.JSONTimeKey != "" {
|
||||
if p.JSONTimeFormat == "" {
|
||||
err := fmt.Errorf("use of 'json_time_key' requires 'json_time_format'")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if f.Fields[p.JSONTimeKey] == nil {
|
||||
err := fmt.Errorf("JSON time key could not be found")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
timeStr, ok := f.Fields[p.JSONTimeKey].(string)
|
||||
if !ok {
|
||||
err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey])
|
||||
return nil, err
|
||||
}
|
||||
nTime, err = time.Parse(p.JSONTimeFormat, timeStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//if the year is 0, set to current year
|
||||
if nTime.Year() == 0 {
|
||||
nTime = nTime.AddDate(time.Now().Year(), 0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
tags, nFields := p.switchFieldToTag(tags, f.Fields)
|
||||
|
||||
metric, err := metric.New(p.MetricName, tags, nFields, time.Now().UTC())
|
||||
|
||||
metric, err := metric.New(p.MetricName, tags, nFields, nTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -88,6 +129,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
|
||||
|
||||
//remove any additional string/bool values from fields
|
||||
for k := range fields {
|
||||
//check if field is in StringFields
|
||||
sField := false
|
||||
for _, v := range p.StringFields {
|
||||
if v == k {
|
||||
sField = true
|
||||
}
|
||||
}
|
||||
if sField {
|
||||
continue
|
||||
}
|
||||
|
||||
switch fields[k].(type) {
|
||||
case string:
|
||||
delete(fields, k)
|
||||
@@ -99,6 +151,15 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
|
||||
}
|
||||
|
||||
func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
if p.JSONQuery != "" {
|
||||
result := gjson.GetBytes(buf, p.JSONQuery)
|
||||
buf = []byte(result.Raw)
|
||||
if !result.IsArray() && !result.IsObject() {
|
||||
err := fmt.Errorf("E! Query path must lead to a JSON object or array of objects, but lead to: %v", result.Type)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
buf = bytes.TrimSpace(buf)
|
||||
buf = bytes.TrimPrefix(buf, utf8BOM)
|
||||
if len(buf) == 0 {
|
||||
@@ -126,7 +187,7 @@ func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line)
|
||||
return nil, fmt.Errorf("can not parse the line: %s, for data format: json ", line)
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
|
||||
Reference in New Issue
Block a user