Compare commits

...

15 Commits

Author SHA1 Message Date
Max U
91964888c9 comments reflect deprecated NewJsonParser 2018-06-28 16:05:58 -07:00
Max U
e25fbed0de add old exported func NewJSONParser 2018-06-28 16:00:32 -07:00
Max U
674b9578d7 readability tweak 2018-06-28 15:56:36 -07:00
Max U
885193bc8f change test files to reflect unexported newJsonParser func 2018-06-28 15:39:39 -07:00
Max U
4db667573a newJsonParser function is now unexported, must use NewParser(config) 2018-06-28 15:35:20 -07:00
Max U
7f2b2a08ae test case edit 2018-06-28 11:45:26 -07:00
Max U
c50bfc2c71 edit unittests for JSON parser 2018-06-27 16:09:14 -07:00
Max U
420dafd591 add "field_tags" to json parser config 2018-06-27 16:09:05 -07:00
Daniel Nelson
23523ffd10 Document path tag in tail input 2018-06-21 18:02:34 -07:00
Daniel Nelson
523d761f34 Update changelog 2018-06-21 17:59:31 -07:00
JongHyok Lee
3f28add025 Added path tag to tail input plugin (#4292) 2018-06-21 17:55:54 -07:00
Daniel Nelson
ee6e4b0afd Run windows tests with -short 2018-06-21 17:46:58 -07:00
Patrick Hemmer
16454e25ba Fix postfix input handling of multi-level queues (#4333) 2018-06-21 16:01:38 -07:00
Daniel Nelson
2a1feb6db9 Update changelog 2018-06-21 14:20:35 -07:00
Ayrdrie
61e197d254 Add support for comma in logparser timestamp format (#4311) 2018-06-21 14:19:15 -07:00
24 changed files with 202 additions and 95 deletions

View File

@@ -20,6 +20,8 @@
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
## v1.7.1 [unreleased]
@@ -27,6 +29,7 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
## v1.7 [2018-06-12]

View File

@@ -54,11 +54,11 @@ fmtcheck:
@echo '[INFO] done.'
test-windows:
go test ./plugins/inputs/ping/...
go test ./plugins/inputs/win_perf_counters/...
go test ./plugins/inputs/win_services/...
go test ./plugins/inputs/procstat/...
go test ./plugins/inputs/ntpq/...
go test -short ./plugins/inputs/ping/...
go test -short ./plugins/inputs/win_perf_counters/...
go test -short ./plugins/inputs/win_services/...
go test -short ./plugins/inputs/procstat/...
go test -short ./plugins/inputs/ntpq/...
# vet runs the Go source code static analysis tool `vet` to find
# any common errors.

View File

@@ -104,9 +104,10 @@ but can be overridden using the `name_override` config option.
#### JSON Configuration:
The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
all int and float values will be set as fields by default.
For example, if you had this configuration:
@@ -173,6 +174,7 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]
field_keys = ["b_c"]
```
with this JSON output from a command:
@@ -198,11 +200,11 @@ with this JSON output from a command:
]
```
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
```
# Value:

View File

@@ -1261,6 +1261,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
if node, ok := tbl.Fields["field_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FieldKeys = append(c.FieldKeys, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
@@ -1344,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "field_keys")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")

View File

@@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")
ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"

View File

@@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}
func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
@@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
}
func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
@@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
}
func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},

View File

@@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p)
var acc testutil.Accumulator
@@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST",
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)
var acc testutil.Accumulator

View File

@@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
return err
}

View File

@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)

View File

@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)

View File

@@ -108,7 +108,9 @@ You must capture at least one field per line.
- ts-"CUSTOM"
CUSTOM time layouts must be within quotes and be the representation of the
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
To match a comma decimal point you can use a period in the pattern string.
See https://golang.org/pkg/time/#Parse for more details.
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),

View File

@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP:
// goodbye!
default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
timestamp = ts

View File

@@ -982,3 +982,21 @@ func TestSyslogTimestampParser(t *testing.T) {
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}

View File

@@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
go n.receiver()
in <- mqttMsg(testMsgJSON)

View File

@@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"time"
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil
}
func qScan(path string) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
var length, size int64
var oldest time.Time
for _, finfo := range finfos {
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++
size += finfo.Size()
ctime := statCTime(finfo.Sys())
if ctime.IsZero() {
continue
return nil
}
if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime
}
return nil
})
if err != nil {
return 0, 0, 0, err
}
var age int64
if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second)
} else if len(finfos) != 0 {
} else if length != 0 {
// system doesn't support ctime
age = -1
}
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
}
}
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
}
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil
}

View File

@@ -3,7 +3,7 @@ package postfix
import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"github.com/influxdata/telegraf/testutil"
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
}
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
p := Postfix{
QueueDirectory: td,

View File

@@ -1,4 +1,4 @@
# tail Input Plugin
# Tail Input Plugin
The tail plugin "tails" a logfile and parses each log message.
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
data_format = "influx"
```
### Metrics:
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
m, err = t.parser.ParseLine(text)
if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err))

View File

@@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1)
go listener.tcpParser()

View File

@@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1)
go listener.udpParser()

View File

@@ -20,6 +20,7 @@ var (
type JSONParser struct {
MetricName string
TagKeys []string
FieldKeys []string
DefaultTags map[string]string
}
@@ -86,6 +87,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
}
}
//if field_keys is specified, only those values should be reported as fields
if len(p.FieldKeys) > 0 {
nFields := make(map[string]interface{})
for _, name := range p.FieldKeys {
if fields[name] != nil {
nFields[name] = fields[name]
}
}
return tags, nFields
}
//remove any additional string/bool values from fields
for k := range fields {
switch fields[k].(type) {

View File

@@ -1,6 +1,7 @@
package json
import (
"log"
"testing"
"github.com/stretchr/testify/assert"
@@ -448,22 +449,28 @@ func TestJSONParseNestedArray(t *testing.T) {
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": 7.93
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
}
}`
parser := JSONParser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
}
metrics, err := parser.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
}

View File

@@ -56,6 +56,8 @@ type Config struct {
// TagKeys only apply to JSON data
TagKeys []string
// FieldKeys only apply to JSON
FieldKeys []string
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string
@@ -95,8 +97,8 @@ func NewParser(config *Config) (Parser, error) {
var parser Parser
switch config.DataFormat {
case "json":
parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags)
parser, err = newJSONParser(config.MetricName,
config.TagKeys, config.FieldKeys, config.DefaultTags)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
@@ -126,6 +128,22 @@ func NewParser(config *Config) (Parser, error) {
return parser, err
}
func newJSONParser(
metricName string,
tagKeys []string,
fieldKeys []string,
defaultTags map[string]string,
) (Parser, error) {
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
FieldKeys: fieldKeys,
DefaultTags: defaultTags,
}
return parser, nil
}
//Deprecated: Use NewParser to get a JSONParser object
func NewJSONParser(
metricName string,
tagKeys []string,