Compare commits
4 Commits
feature/13
...
logparser-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f88f665cd9 | ||
|
|
a2df042d92 | ||
|
|
c7a72b9a9d | ||
|
|
09f884b4f0 |
@@ -104,10 +104,9 @@ but can be overridden using the `name_override` config option.
|
||||
|
||||
#### JSON Configuration:
|
||||
|
||||
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
|
||||
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
|
||||
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
|
||||
all int and float values will be set as fields by default.
|
||||
The JSON data format supports specifying "tag keys". If specified, keys
|
||||
will be searched for in the root-level of the JSON blob. If the key(s) exist,
|
||||
they will be applied as tags to the Telegraf metrics.
|
||||
|
||||
For example, if you had this configuration:
|
||||
|
||||
@@ -174,7 +173,6 @@ For example, if the following configuration:
|
||||
"my_tag_1",
|
||||
"my_tag_2"
|
||||
]
|
||||
field_keys = ["b_c"]
|
||||
```
|
||||
|
||||
with this JSON output from a command:
|
||||
@@ -200,11 +198,11 @@ with this JSON output from a command:
|
||||
]
|
||||
```
|
||||
|
||||
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
|
||||
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
|
||||
|
||||
```
|
||||
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
|
||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
|
||||
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
|
||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
||||
```
|
||||
|
||||
# Value:
|
||||
|
||||
@@ -1261,18 +1261,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if node, ok := tbl.Fields["field_keys"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||
for _, elem := range ary.Value {
|
||||
if str, ok := elem.(*ast.String); ok {
|
||||
c.FieldKeys = append(c.FieldKeys, str.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if node, ok := tbl.Fields["data_type"]; ok {
|
||||
if kv, ok := node.(*ast.KeyValue); ok {
|
||||
if str, ok := kv.Value.(*ast.String); ok {
|
||||
@@ -1356,7 +1344,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||
delete(tbl.Fields, "separator")
|
||||
delete(tbl.Fields, "templates")
|
||||
delete(tbl.Fields, "tag_keys")
|
||||
delete(tbl.Fields, "field_keys")
|
||||
delete(tbl.Fields, "data_type")
|
||||
delete(tbl.Fields, "collectd_auth_file")
|
||||
delete(tbl.Fields, "collectd_security_level")
|
||||
|
||||
@@ -143,10 +143,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
||||
"Testdata did not produce correct memcached metadata.")
|
||||
|
||||
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
||||
p, err := parsers.NewParser(&parsers.Config{
|
||||
MetricName: "exec",
|
||||
DataFormat: "json",
|
||||
})
|
||||
p, err := parsers.NewJSONParser("exec", nil, nil)
|
||||
assert.NoError(t, err)
|
||||
ex.SetParser(p)
|
||||
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
||||
|
||||
@@ -93,10 +93,7 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
|
||||
}
|
||||
|
||||
func TestExec(t *testing.T) {
|
||||
parser, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "exec",
|
||||
})
|
||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||
e := &Exec{
|
||||
runner: newRunnerMock([]byte(validJson), nil),
|
||||
Commands: []string{"testcommand arg1"},
|
||||
@@ -122,10 +119,7 @@ func TestExec(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestExecMalformed(t *testing.T) {
|
||||
parser, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "exec",
|
||||
})
|
||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||
e := &Exec{
|
||||
runner: newRunnerMock([]byte(malformedJson), nil),
|
||||
Commands: []string{"badcommand arg1"},
|
||||
@@ -138,10 +132,7 @@ func TestExecMalformed(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCommandError(t *testing.T) {
|
||||
parser, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "exec",
|
||||
})
|
||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
||||
e := &Exec{
|
||||
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
|
||||
Commands: []string{"badcommand"},
|
||||
|
||||
@@ -26,11 +26,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
|
||||
URLs: []string{url},
|
||||
}
|
||||
metricName := "metricName"
|
||||
|
||||
p, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "metricName",
|
||||
})
|
||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||
plugin.SetParser(p)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
@@ -67,11 +63,8 @@ func TestHTTPHeaders(t *testing.T) {
|
||||
URLs: []string{url},
|
||||
Headers: map[string]string{header: headerValue},
|
||||
}
|
||||
|
||||
p, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "metricName",
|
||||
})
|
||||
metricName := "metricName"
|
||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||
plugin.SetParser(p)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
@@ -90,10 +83,7 @@ func TestInvalidStatusCode(t *testing.T) {
|
||||
}
|
||||
|
||||
metricName := "metricName"
|
||||
p, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: metricName,
|
||||
})
|
||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||
plugin.SetParser(p)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
@@ -115,10 +105,8 @@ func TestMethod(t *testing.T) {
|
||||
Method: "POST",
|
||||
}
|
||||
|
||||
p, _ := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "metricName",
|
||||
})
|
||||
metricName := "metricName"
|
||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
||||
plugin.SetParser(p)
|
||||
|
||||
var acc testutil.Accumulator
|
||||
|
||||
@@ -181,12 +181,7 @@ func (h *HttpJson) gatherServer(
|
||||
"server": serverURL,
|
||||
}
|
||||
|
||||
parser, err := parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: msrmnt_name,
|
||||
TagKeys: h.TagKeys,
|
||||
DefaultTags: tags,
|
||||
})
|
||||
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "kafka_json_test",
|
||||
})
|
||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||
go k.receiver()
|
||||
in <- saramaMsg(testMsgJSON)
|
||||
acc.Wait(1)
|
||||
|
||||
@@ -125,10 +125,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
k.acc = &acc
|
||||
defer close(k.done)
|
||||
|
||||
k.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "kafka_json_test",
|
||||
})
|
||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
||||
go k.receiver()
|
||||
in <- saramaMsg(testMsgJSON)
|
||||
acc.Wait(1)
|
||||
|
||||
@@ -340,6 +340,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
|
||||
ts, err := time.ParseInLocation(t, v, p.loc)
|
||||
if err == nil {
|
||||
if ts.Year() == 0 {
|
||||
ts = ts.AddDate(timestamp.Year(), 0, 0)
|
||||
}
|
||||
timestamp = ts
|
||||
} else {
|
||||
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
|
||||
@@ -469,7 +472,6 @@ func (t *tsModder) tsMod(ts time.Time) time.Time {
|
||||
t.rollover = 0
|
||||
return ts
|
||||
}
|
||||
|
||||
if ts.Equal(t.last) {
|
||||
t.dupe = ts
|
||||
}
|
||||
|
||||
@@ -1000,3 +1000,20 @@ func TestReplaceTimestampComma(t *testing.T) {
|
||||
//Convert Nanosecond to milisecond for compare
|
||||
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
|
||||
}
|
||||
|
||||
func TestEmptyYearInTimestamp(t *testing.T) {
|
||||
p := &Parser{
|
||||
Patterns: []string{`%{APPLE_SYSLOG_TIME_SHORT:timestamp:ts-"Jan 2 15:04:05"} %{HOSTNAME} %{APP_NAME:app_name}\[%{NUMBER:pid:int}\]%{GREEDYDATA:message}`},
|
||||
CustomPatterns: `
|
||||
APPLE_SYSLOG_TIME_SHORT %{MONTH} +%{MONTHDAY} %{TIME}
|
||||
APP_NAME [a-zA-Z0-9\.]+
|
||||
`,
|
||||
}
|
||||
require.NoError(t, p.Compile())
|
||||
p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: info> Scale factor of main display = 2.0")
|
||||
m, err := p.ParseLine("Nov 6 13:57:03 generic iTunes[6504]: objc[6504]: Object descriptor was null.")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, 2018, m.Time().Year())
|
||||
|
||||
}
|
||||
|
||||
@@ -172,10 +172,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
n.acc = &acc
|
||||
defer close(n.done)
|
||||
|
||||
n.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "nats_json_test",
|
||||
})
|
||||
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
||||
go n.receiver()
|
||||
in <- mqttMsg(testMsgJSON)
|
||||
|
||||
|
||||
@@ -108,10 +108,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
||||
n.acc = &acc
|
||||
defer close(n.done)
|
||||
|
||||
n.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "nats_json_test",
|
||||
})
|
||||
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
||||
n.wg.Add(1)
|
||||
go n.receiver()
|
||||
in <- natsMsg(testMsgJSON)
|
||||
|
||||
@@ -300,10 +300,7 @@ func TestRunParserJSONMsg(t *testing.T) {
|
||||
listener.acc = &acc
|
||||
defer close(listener.done)
|
||||
|
||||
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "udp_json_test",
|
||||
})
|
||||
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
||||
listener.wg.Add(1)
|
||||
go listener.tcpParser()
|
||||
|
||||
|
||||
@@ -193,10 +193,7 @@ func TestRunParserJSONMsg(t *testing.T) {
|
||||
listener.acc = &acc
|
||||
defer close(listener.done)
|
||||
|
||||
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
||||
DataFormat: "json",
|
||||
MetricName: "udp_json_test",
|
||||
})
|
||||
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
||||
listener.wg.Add(1)
|
||||
go listener.udpParser()
|
||||
|
||||
|
||||
@@ -20,7 +20,6 @@ var (
|
||||
type JSONParser struct {
|
||||
MetricName string
|
||||
TagKeys []string
|
||||
FieldKeys []string
|
||||
DefaultTags map[string]string
|
||||
}
|
||||
|
||||
@@ -87,17 +86,6 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
|
||||
}
|
||||
}
|
||||
|
||||
//if field_keys is specified, only those values should be reported as fields
|
||||
if len(p.FieldKeys) > 0 {
|
||||
nFields := make(map[string]interface{})
|
||||
for _, name := range p.FieldKeys {
|
||||
if fields[name] != nil {
|
||||
nFields[name] = fields[name]
|
||||
}
|
||||
}
|
||||
return tags, nFields
|
||||
}
|
||||
|
||||
//remove any additional string/bool values from fields
|
||||
for k := range fields {
|
||||
switch fields[k].(type) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package json
|
||||
|
||||
import (
|
||||
"log"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -449,28 +448,22 @@ func TestJSONParseNestedArray(t *testing.T) {
|
||||
"total_devices": 5,
|
||||
"total_threads": 10,
|
||||
"shares": {
|
||||
"total": 5,
|
||||
"accepted": 5,
|
||||
"rejected": 0,
|
||||
"avg_find_time": 4,
|
||||
"tester": "work",
|
||||
"tester2": "don't want this",
|
||||
"tester3": {
|
||||
"hello":"sup",
|
||||
"fun":"money",
|
||||
"break":9
|
||||
}
|
||||
"total": 5,
|
||||
"accepted": 5,
|
||||
"rejected": 0,
|
||||
"avg_find_time": 4,
|
||||
"tester": "work",
|
||||
"tester2": "don't want this",
|
||||
"tester3": 7.93
|
||||
}
|
||||
}`
|
||||
|
||||
parser := JSONParser{
|
||||
MetricName: "json_test",
|
||||
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
|
||||
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
|
||||
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
|
||||
}
|
||||
|
||||
metrics, err := parser.Parse([]byte(testString))
|
||||
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
|
||||
}
|
||||
|
||||
@@ -56,8 +56,6 @@ type Config struct {
|
||||
|
||||
// TagKeys only apply to JSON data
|
||||
TagKeys []string
|
||||
// FieldKeys only apply to JSON
|
||||
FieldKeys []string
|
||||
// MetricName applies to JSON & value. This will be the name of the measurement.
|
||||
MetricName string
|
||||
|
||||
@@ -97,8 +95,8 @@ func NewParser(config *Config) (Parser, error) {
|
||||
var parser Parser
|
||||
switch config.DataFormat {
|
||||
case "json":
|
||||
parser, err = newJSONParser(config.MetricName,
|
||||
config.TagKeys, config.FieldKeys, config.DefaultTags)
|
||||
parser, err = NewJSONParser(config.MetricName,
|
||||
config.TagKeys, config.DefaultTags)
|
||||
case "value":
|
||||
parser, err = NewValueParser(config.MetricName,
|
||||
config.DataType, config.DefaultTags)
|
||||
@@ -128,22 +126,6 @@ func NewParser(config *Config) (Parser, error) {
|
||||
return parser, err
|
||||
}
|
||||
|
||||
func newJSONParser(
|
||||
metricName string,
|
||||
tagKeys []string,
|
||||
fieldKeys []string,
|
||||
defaultTags map[string]string,
|
||||
) (Parser, error) {
|
||||
parser := &json.JSONParser{
|
||||
MetricName: metricName,
|
||||
TagKeys: tagKeys,
|
||||
FieldKeys: fieldKeys,
|
||||
DefaultTags: defaultTags,
|
||||
}
|
||||
return parser, nil
|
||||
}
|
||||
|
||||
//Deprecated: Use NewParser to get a JSONParser object
|
||||
func NewJSONParser(
|
||||
metricName string,
|
||||
tagKeys []string,
|
||||
|
||||
Reference in New Issue
Block a user