Compare commits

...

15 Commits

Author SHA1 Message Date
Max U
690c0b6673 config file change 2018-07-02 16:18:40 -07:00
Max U
407675c741 additional config information 2018-07-02 15:00:05 -07:00
Max U
b09fcc70c8 update DATA_FORMATS_INPUT.md to reflect gjson 2018-07-02 14:56:31 -07:00
Max U
9836b1eb02 change error messages 2018-07-02 13:18:43 -07:00
Max U
a79f1b7e0d modify test cases 2018-07-02 11:25:02 -07:00
Max U
d4a4ac25bb add option to mark field as int 2018-07-02 11:20:16 -07:00
Max U
92e156c784 add gjson functionality with toml added to internal.config 2018-07-02 09:43:32 -07:00
Max U
342d3d633a initial jsonpath functionality 2018-06-28 14:33:38 -07:00
Daniel Nelson
23523ffd10 Document path tag in tail input 2018-06-21 18:02:34 -07:00
Daniel Nelson
523d761f34 Update changelog 2018-06-21 17:59:31 -07:00
JongHyok Lee
3f28add025 Added path tag to tail input plugin (#4292) 2018-06-21 17:55:54 -07:00
Daniel Nelson
ee6e4b0afd Run windows tests with -short 2018-06-21 17:46:58 -07:00
Patrick Hemmer
16454e25ba Fix postfix input handling of multi-level queues (#4333) 2018-06-21 16:01:38 -07:00
Daniel Nelson
2a1feb6db9 Update changelog 2018-06-21 14:20:35 -07:00
Ayrdrie
61e197d254 Add support for comma in logparser timestamp format (#4311) 2018-06-21 14:19:15 -07:00
14 changed files with 405 additions and 62 deletions

View File

@@ -20,6 +20,8 @@
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
## v1.7.1 [unreleased]
@@ -27,6 +29,7 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
## v1.7 [2018-06-12]

View File

@@ -54,11 +54,11 @@ fmtcheck:
@echo '[INFO] done.'
test-windows:
go test ./plugins/inputs/ping/...
go test ./plugins/inputs/win_perf_counters/...
go test ./plugins/inputs/win_services/...
go test ./plugins/inputs/procstat/...
go test ./plugins/inputs/ntpq/...
go test -short ./plugins/inputs/ping/...
go test -short ./plugins/inputs/win_perf_counters/...
go test -short ./plugins/inputs/win_services/...
go test -short ./plugins/inputs/procstat/...
go test -short ./plugins/inputs/ntpq/...
# vet runs the Go source code static analysis tool `vet` to find
# any common errors.

View File

@@ -4,6 +4,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx)
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#json)
1. [GJSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#gjson)
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
@@ -205,6 +206,69 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```
# GJSON:
GJSON also parses JSON data, but uses paths to name and identify fields of your choosing.
The GJSON parser supports 5 different configuration fields for json objects:
1.'gjson_tag_paths'
2.'gjson_string_paths'
3.'gjson_int_paths'
4.'gjson_float_paths'
5.'gjson_bool_paths'
Each field is a map type that will map a field_name to a field_path. Path syntax is described below.
Path maps should be configured as:
`toml gjson_tag_paths = {"field_name" = "field.path", "field_name2" = "field.path2"}`
Any paths specified in gjson_tag_paths will be converted to strings and stored as tags.
Any paths otherwise specified will be their marked type and stored as fields.
#### GJSON Configuration:
Paths are a series of keys seperated by a dot, ie "obj.sub_obj".
Paths should not lead to an JSON array, but a single object.
An error message will be thrown if a path describes an array.
Further reading for path syntax can be found here: https://github.com/tidwall/gjson
As an example, if you had the json:
```json
{
"name": {"first": "Tom", "last": "Anderson"},
"age":37,
"children": ["Sara","Alex","Jack"],
"fav.movie": "Deer Hunter",
"friends": [
{"first": "Dale", "last": "Murphy", "age": 44},
{"first": "Roger", "last": "Craig", "age": 68},
{"first": "Jane", "last": "Murphy", "age": 47}
]
}
```
with the config:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/bin/mycollector --foo=bar"]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "gjson"
name_override = "gjson_sample"
gjson_tag_paths = {"first_name_tag" = "name.first"}
gjson_string_paths = {"last_name" = "name.last"}
gjson_int_paths = {"age" = "age", "Janes_age" = "friends.2.age"}
```
would output the metric:
`gjson_sample, first_name_tag=Tom last_name=Anderson,age=37,Janes_age=47`
# Value:
The "value" data format translates single values into Telegraf metrics. This

View File

@@ -1338,6 +1338,71 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
c.GJSONTagPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_tag_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONTagPaths[name] = str.Value
}
}
}
}
}
c.GJSONBoolPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_bool_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONBoolPaths[name] = str.Value
}
}
}
}
}
c.GJSONFloatPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_float_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONFloatPaths[name] = str.Value
}
}
}
}
}
c.GJSONStringPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_string_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONStringPaths[name] = str.Value
}
}
}
}
}
c.GJSONIntPaths = make(map[string]string)
if node, ok := tbl.Fields["gjson_int_paths"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GJSONIntPaths[name] = str.Value
}
}
}
}
}
c.MetricName = name
delete(tbl.Fields, "data_format")
@@ -1353,6 +1418,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "gjson_tag_paths")
delete(tbl.Fields, "gjson_bool_paths")
delete(tbl.Fields, "gjson_float_paths")
delete(tbl.Fields, "gjson_string_paths")
delete(tbl.Fields, "gjson_int_paths")
return parsers.NewParser(c)
}

View File

@@ -108,7 +108,9 @@ You must capture at least one field per line.
- ts-"CUSTOM"
CUSTOM time layouts must be within quotes and be the representation of the
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
To match a comma decimal point you can use a period in the pattern string.
See https://golang.org/pkg/time/#Parse for more details.
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),

View File

@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP:
// goodbye!
default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil {
timestamp = ts

View File

@@ -982,3 +982,21 @@ func TestSyslogTimestampParser(t *testing.T) {
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
}
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"time"
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil
}
func qScan(path string) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
var length, size int64
var oldest time.Time
for _, finfo := range finfos {
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++
size += finfo.Size()
ctime := statCTime(finfo.Sys())
if ctime.IsZero() {
continue
return nil
}
if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime
}
return nil
})
if err != nil {
return 0, 0, 0, err
}
var age int64
if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second)
} else if len(finfos) != 0 {
} else if length != 0 {
// system doesn't support ctime
age = -1
}
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
}
}
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
}
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil
}

View File

@@ -3,7 +3,7 @@ package postfix
import (
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"github.com/influxdata/telegraf/testutil"
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
}
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
p := Postfix{
QueueDirectory: td,

View File

@@ -1,4 +1,4 @@
# tail Input Plugin
# Tail Input Plugin
The tail plugin "tails" a logfile and parses each log message.
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
data_format = "influx"
```
### Metrics:
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
m, err = t.parser.ParseLine(text)
if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err))

View File

@@ -0,0 +1,96 @@
package gjson
import (
"log"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/gjson"
)
type JSONPath struct {
MetricName string
TagPath map[string]string
FloatPath map[string]string
IntPath map[string]string
StrPath map[string]string
BoolPath map[string]string
DefaultTags map[string]string
}
func (j *JSONPath) Parse(buf []byte) ([]telegraf.Metric, error) {
tags := make(map[string]string)
for k, v := range j.DefaultTags {
tags[k] = v
}
fields := make(map[string]interface{})
metrics := make([]telegraf.Metric, 0)
for k, v := range j.TagPath {
c := gjson.GetBytes(buf, v)
if c.IsArray() {
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
continue
}
tags[k] = c.String()
}
for k, v := range j.FloatPath {
c := gjson.GetBytes(buf, v)
if c.IsArray() {
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
continue
}
fields[k] = c.Float()
}
for k, v := range j.IntPath {
c := gjson.GetBytes(buf, v)
if c.IsArray() {
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
continue
}
fields[k] = c.Int()
}
for k, v := range j.BoolPath {
c := gjson.GetBytes(buf, v)
if c.IsArray() {
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
continue
}
if c.String() == "true" {
fields[k] = true
} else if c.String() == "false" {
fields[k] = false
} else {
log.Printf("E! Cannot decode: %v as bool", c.String())
}
}
for k, v := range j.StrPath {
c := gjson.GetBytes(buf, v)
if c.IsArray() {
log.Printf("E! GJSON cannot assign array to field on path: %v", v)
continue
}
fields[k] = c.String()
}
m, err := metric.New(j.MetricName, tags, fields, time.Now())
if err != nil {
return nil, err
}
metrics = append(metrics, m)
return metrics, nil
}
func (j *JSONPath) ParseLine(str string) (telegraf.Metric, error) {
m, err := j.Parse([]byte(str))
return m[0], err
}
func (j *JSONPath) SetDefaultTags(tags map[string]string) {
j.DefaultTags = tags
}

View File

@@ -0,0 +1,72 @@
package gjson
import (
"log"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseJsonPath(t *testing.T) {
testString := `{
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": true,
"tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
}
}`
jsonParser := JSONPath{
MetricName: "jsonpather",
TagPath: map[string]string{"hello": "shares.tester3.hello"},
BoolPath: map[string]string{"bool": "shares.tester2"},
}
metrics, err := jsonParser.Parse([]byte(testString))
assert.NoError(t, err)
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
}
func TestTagTypes(t *testing.T) {
testString := `{
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"my_bool": true,
"tester": "work",
"tester2": {
"hello":"sup",
"fun":true,
"break":9.97
}
}
}`
r := JSONPath{
TagPath: map[string]string{"int1": "total_devices", "my_bool": "shares.my_bool"},
FloatPath: map[string]string{"total": "shares.total"},
BoolPath: map[string]string{"fun": "shares.tester2.fun"},
StrPath: map[string]string{"hello": "shares.tester2.hello"},
IntPath: map[string]string{"accepted": "shares.accepted"},
}
metrics, err := r.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
assert.NoError(t, err)
assert.Equal(t, true, reflect.DeepEqual(map[string]interface{}{"total": 5.0, "fun": true, "hello": "sup", "accepted": int64(5)}, metrics[0].Fields()))
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/collectd"
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/gjson"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
@@ -87,6 +88,13 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string
//for gjson format
GJSONTagPaths map[string]string
GJSONBoolPaths map[string]string
GJSONFloatPaths map[string]string
GJSONStringPaths map[string]string
GJSONIntPaths map[string]string
}
// NewParser returns a Parser interface based on the given config.
@@ -120,12 +128,37 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags,
config.Separator,
config.Templates)
case "gjson":
parser, err = newGJSONParser(config.MetricName,
config.GJSONTagPaths,
config.GJSONStringPaths,
config.GJSONBoolPaths,
config.GJSONFloatPaths,
config.GJSONIntPaths)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
return parser, err
}
func newGJSONParser(metricName string,
tagPaths map[string]string,
strPaths map[string]string,
boolPaths map[string]string,
floatPaths map[string]string,
intPaths map[string]string) (Parser, error) {
parser := &gjson.JSONPath{
MetricName: metricName,
TagPath: tagPaths,
StrPath: strPaths,
BoolPath: boolPaths,
FloatPath: floatPaths,
IntPath: intPaths,
}
return parser, nil
}
func NewJSONParser(
metricName string,
tagKeys []string,