Compare commits
15 Commits
procstat-r
...
feature/13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91964888c9 | ||
|
|
e25fbed0de | ||
|
|
674b9578d7 | ||
|
|
885193bc8f | ||
|
|
4db667573a | ||
|
|
7f2b2a08ae | ||
|
|
c50bfc2c71 | ||
|
|
420dafd591 | ||
|
|
23523ffd10 | ||
|
|
523d761f34 | ||
|
|
3f28add025 | ||
|
|
ee6e4b0afd | ||
|
|
16454e25ba | ||
|
|
2a1feb6db9 | ||
|
|
61e197d254 |
@@ -20,6 +20,8 @@
|
|||||||
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
|
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
|
||||||
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
|
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
|
||||||
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
|
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
|
||||||
|
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
|
||||||
|
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
|
||||||
|
|
||||||
## v1.7.1 [unreleased]
|
## v1.7.1 [unreleased]
|
||||||
|
|
||||||
@@ -27,6 +29,7 @@
|
|||||||
|
|
||||||
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
|
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
|
||||||
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
|
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
|
||||||
|
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
|
||||||
|
|
||||||
## v1.7 [2018-06-12]
|
## v1.7 [2018-06-12]
|
||||||
|
|
||||||
|
|||||||
10
Makefile
10
Makefile
@@ -54,11 +54,11 @@ fmtcheck:
|
|||||||
@echo '[INFO] done.'
|
@echo '[INFO] done.'
|
||||||
|
|
||||||
test-windows:
|
test-windows:
|
||||||
go test ./plugins/inputs/ping/...
|
go test -short ./plugins/inputs/ping/...
|
||||||
go test ./plugins/inputs/win_perf_counters/...
|
go test -short ./plugins/inputs/win_perf_counters/...
|
||||||
go test ./plugins/inputs/win_services/...
|
go test -short ./plugins/inputs/win_services/...
|
||||||
go test ./plugins/inputs/procstat/...
|
go test -short ./plugins/inputs/procstat/...
|
||||||
go test ./plugins/inputs/ntpq/...
|
go test -short ./plugins/inputs/ntpq/...
|
||||||
|
|
||||||
# vet runs the Go source code static analysis tool `vet` to find
|
# vet runs the Go source code static analysis tool `vet` to find
|
||||||
# any common errors.
|
# any common errors.
|
||||||
|
|||||||
@@ -104,9 +104,10 @@ but can be overridden using the `name_override` config option.
|
|||||||
|
|
||||||
#### JSON Configuration:
|
#### JSON Configuration:
|
||||||
|
|
||||||
The JSON data format supports specifying "tag keys". If specified, keys
|
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
|
||||||
will be searched for in the root-level of the JSON blob. If the key(s) exist,
|
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
|
||||||
they will be applied as tags to the Telegraf metrics.
|
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
|
||||||
|
all int and float values will be set as fields by default.
|
||||||
|
|
||||||
For example, if you had this configuration:
|
For example, if you had this configuration:
|
||||||
|
|
||||||
@@ -173,6 +174,7 @@ For example, if the following configuration:
|
|||||||
"my_tag_1",
|
"my_tag_1",
|
||||||
"my_tag_2"
|
"my_tag_2"
|
||||||
]
|
]
|
||||||
|
field_keys = ["b_c"]
|
||||||
```
|
```
|
||||||
|
|
||||||
with this JSON output from a command:
|
with this JSON output from a command:
|
||||||
@@ -198,11 +200,11 @@ with this JSON output from a command:
|
|||||||
]
|
]
|
||||||
```
|
```
|
||||||
|
|
||||||
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
|
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
|
||||||
|
|
||||||
```
|
```
|
||||||
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
|
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
|
||||||
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
|
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
|
||||||
```
|
```
|
||||||
|
|
||||||
# Value:
|
# Value:
|
||||||
|
|||||||
@@ -1261,6 +1261,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["field_keys"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if ary, ok := kv.Value.(*ast.Array); ok {
|
||||||
|
for _, elem := range ary.Value {
|
||||||
|
if str, ok := elem.(*ast.String); ok {
|
||||||
|
c.FieldKeys = append(c.FieldKeys, str.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if node, ok := tbl.Fields["data_type"]; ok {
|
if node, ok := tbl.Fields["data_type"]; ok {
|
||||||
if kv, ok := node.(*ast.KeyValue); ok {
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
if str, ok := kv.Value.(*ast.String); ok {
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
@@ -1344,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
|||||||
delete(tbl.Fields, "separator")
|
delete(tbl.Fields, "separator")
|
||||||
delete(tbl.Fields, "templates")
|
delete(tbl.Fields, "templates")
|
||||||
delete(tbl.Fields, "tag_keys")
|
delete(tbl.Fields, "tag_keys")
|
||||||
|
delete(tbl.Fields, "field_keys")
|
||||||
delete(tbl.Fields, "data_type")
|
delete(tbl.Fields, "data_type")
|
||||||
delete(tbl.Fields, "collectd_auth_file")
|
delete(tbl.Fields, "collectd_auth_file")
|
||||||
delete(tbl.Fields, "collectd_security_level")
|
delete(tbl.Fields, "collectd_security_level")
|
||||||
|
|||||||
@@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
|
|||||||
"Testdata did not produce correct memcached metadata.")
|
"Testdata did not produce correct memcached metadata.")
|
||||||
|
|
||||||
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
ex := inputs.Inputs["exec"]().(*exec.Exec)
|
||||||
p, err := parsers.NewJSONParser("exec", nil, nil)
|
p, err := parsers.NewParser(&parsers.Config{
|
||||||
|
MetricName: "exec",
|
||||||
|
DataFormat: "json",
|
||||||
|
})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
ex.SetParser(p)
|
ex.SetParser(p)
|
||||||
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
ex.Command = "/usr/bin/myothercollector --foo=bar"
|
||||||
|
|||||||
@@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestExec(t *testing.T) {
|
func TestExec(t *testing.T) {
|
||||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
parser, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "exec",
|
||||||
|
})
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock([]byte(validJson), nil),
|
runner: newRunnerMock([]byte(validJson), nil),
|
||||||
Commands: []string{"testcommand arg1"},
|
Commands: []string{"testcommand arg1"},
|
||||||
@@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestExecMalformed(t *testing.T) {
|
func TestExecMalformed(t *testing.T) {
|
||||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
parser, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "exec",
|
||||||
|
})
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock([]byte(malformedJson), nil),
|
runner: newRunnerMock([]byte(malformedJson), nil),
|
||||||
Commands: []string{"badcommand arg1"},
|
Commands: []string{"badcommand arg1"},
|
||||||
@@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestCommandError(t *testing.T) {
|
func TestCommandError(t *testing.T) {
|
||||||
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
|
parser, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "exec",
|
||||||
|
})
|
||||||
e := &Exec{
|
e := &Exec{
|
||||||
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
|
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
|
||||||
Commands: []string{"badcommand"},
|
Commands: []string{"badcommand"},
|
||||||
|
|||||||
@@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
|
|||||||
URLs: []string{url},
|
URLs: []string{url},
|
||||||
}
|
}
|
||||||
metricName := "metricName"
|
metricName := "metricName"
|
||||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
|
||||||
|
p, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "metricName",
|
||||||
|
})
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
|
|||||||
URLs: []string{url},
|
URLs: []string{url},
|
||||||
Headers: map[string]string{header: headerValue},
|
Headers: map[string]string{header: headerValue},
|
||||||
}
|
}
|
||||||
metricName := "metricName"
|
|
||||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
p, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "metricName",
|
||||||
|
})
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
metricName := "metricName"
|
metricName := "metricName"
|
||||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
p, _ := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: metricName,
|
||||||
|
})
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
@@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
|
|||||||
Method: "POST",
|
Method: "POST",
|
||||||
}
|
}
|
||||||
|
|
||||||
metricName := "metricName"
|
p, _ := parsers.NewParser(&parsers.Config{
|
||||||
p, _ := parsers.NewJSONParser(metricName, nil, nil)
|
DataFormat: "json",
|
||||||
|
MetricName: "metricName",
|
||||||
|
})
|
||||||
plugin.SetParser(p)
|
plugin.SetParser(p)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|||||||
@@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
|
|||||||
"server": serverURL,
|
"server": serverURL,
|
||||||
}
|
}
|
||||||
|
|
||||||
parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
|
parser, err := parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: msrmnt_name,
|
||||||
|
TagKeys: h.TagKeys,
|
||||||
|
DefaultTags: tags,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
k.acc = &acc
|
k.acc = &acc
|
||||||
defer close(k.done)
|
defer close(k.done)
|
||||||
|
|
||||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
k.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "kafka_json_test",
|
||||||
|
})
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgJSON)
|
in <- saramaMsg(testMsgJSON)
|
||||||
acc.Wait(1)
|
acc.Wait(1)
|
||||||
|
|||||||
@@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
k.acc = &acc
|
k.acc = &acc
|
||||||
defer close(k.done)
|
defer close(k.done)
|
||||||
|
|
||||||
k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
|
k.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "kafka_json_test",
|
||||||
|
})
|
||||||
go k.receiver()
|
go k.receiver()
|
||||||
in <- saramaMsg(testMsgJSON)
|
in <- saramaMsg(testMsgJSON)
|
||||||
acc.Wait(1)
|
acc.Wait(1)
|
||||||
|
|||||||
@@ -108,7 +108,9 @@ You must capture at least one field per line.
|
|||||||
- ts-"CUSTOM"
|
- ts-"CUSTOM"
|
||||||
|
|
||||||
CUSTOM time layouts must be within quotes and be the representation of the
|
CUSTOM time layouts must be within quotes and be the representation of the
|
||||||
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`
|
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`.
|
||||||
|
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
|
||||||
|
To match a comma decimal point you can use a period in the pattern string.
|
||||||
See https://golang.org/pkg/time/#Parse for more details.
|
See https://golang.org/pkg/time/#Parse for more details.
|
||||||
|
|
||||||
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),
|
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),
|
||||||
|
|||||||
@@ -335,6 +335,9 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
|||||||
case DROP:
|
case DROP:
|
||||||
// goodbye!
|
// goodbye!
|
||||||
default:
|
default:
|
||||||
|
// Replace commas with dot character
|
||||||
|
v = strings.Replace(v, ",", ".", -1)
|
||||||
|
|
||||||
ts, err := time.ParseInLocation(t, v, p.loc)
|
ts, err := time.ParseInLocation(t, v, p.loc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
timestamp = ts
|
timestamp = ts
|
||||||
|
|||||||
@@ -982,3 +982,21 @@ func TestSyslogTimestampParser(t *testing.T) {
|
|||||||
require.NotNil(t, m)
|
require.NotNil(t, m)
|
||||||
require.Equal(t, 2018, m.Time().Year())
|
require.Equal(t, 2018, m.Time().Year())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplaceTimestampComma(t *testing.T) {
|
||||||
|
|
||||||
|
p := &Parser{
|
||||||
|
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, p.Compile())
|
||||||
|
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, m)
|
||||||
|
|
||||||
|
require.Equal(t, 2018, m.Time().Year())
|
||||||
|
require.Equal(t, 13, m.Time().Hour())
|
||||||
|
require.Equal(t, 34, m.Time().Second())
|
||||||
|
//Convert Nanosecond to milisecond for compare
|
||||||
|
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
|
||||||
|
}
|
||||||
|
|||||||
@@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
n.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "nats_json_test",
|
||||||
|
})
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsgJSON)
|
in <- mqttMsg(testMsgJSON)
|
||||||
|
|
||||||
|
|||||||
@@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
|
|||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
|
n.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "nats_json_test",
|
||||||
|
})
|
||||||
n.wg.Add(1)
|
n.wg.Add(1)
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- natsMsg(testMsgJSON)
|
in <- natsMsg(testMsgJSON)
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
|
|||||||
return strings.TrimSpace(string(qd)), nil
|
return strings.TrimSpace(string(qd)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func qScan(path string) (int64, int64, int64, error) {
|
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
|
||||||
f, err := os.Open(path)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
finfos, err := f.Readdir(-1)
|
|
||||||
f.Close()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var length, size int64
|
var length, size int64
|
||||||
var oldest time.Time
|
var oldest time.Time
|
||||||
for _, finfo := range finfos {
|
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if finfo.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
length++
|
length++
|
||||||
size += finfo.Size()
|
size += finfo.Size()
|
||||||
|
|
||||||
ctime := statCTime(finfo.Sys())
|
ctime := statCTime(finfo.Sys())
|
||||||
if ctime.IsZero() {
|
if ctime.IsZero() {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
if oldest.IsZero() || ctime.Before(oldest) {
|
if oldest.IsZero() || ctime.Before(oldest) {
|
||||||
oldest = ctime
|
oldest = ctime
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, 0, err
|
||||||
}
|
}
|
||||||
var age int64
|
var age int64
|
||||||
if !oldest.IsZero() {
|
if !oldest.IsZero() {
|
||||||
age = int64(time.Now().Sub(oldest) / time.Second)
|
age = int64(time.Now().Sub(oldest) / time.Second)
|
||||||
} else if len(finfos) != 0 {
|
} else if length != 0 {
|
||||||
// system doesn't support ctime
|
// system doesn't support ctime
|
||||||
age = -1
|
age = -1
|
||||||
}
|
}
|
||||||
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
|
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
|
||||||
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
|
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
|
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
|
||||||
continue
|
continue
|
||||||
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
|
|||||||
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
|
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
|
||||||
}
|
}
|
||||||
|
|
||||||
var dLength, dSize int64
|
|
||||||
dAge := int64(-1)
|
|
||||||
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
|
|
||||||
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
// the directories are created on first use
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
dLength += length
|
|
||||||
dSize += size
|
|
||||||
if age > dAge {
|
|
||||||
dAge = age
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fields := map[string]interface{}{"length": dLength, "size": dSize}
|
|
||||||
if dAge != -1 {
|
|
||||||
fields["age"] = dAge
|
|
||||||
}
|
|
||||||
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package postfix
|
|||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(td)
|
defer os.RemoveAll(td)
|
||||||
|
|
||||||
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
|
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
|
||||||
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
|
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
|
||||||
}
|
|
||||||
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
|
|
||||||
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
|
||||||
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
|
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
|
||||||
|
|
||||||
p := Postfix{
|
p := Postfix{
|
||||||
QueueDirectory: td,
|
QueueDirectory: td,
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
# tail Input Plugin
|
# Tail Input Plugin
|
||||||
|
|
||||||
The tail plugin "tails" a logfile and parses each log message.
|
The tail plugin "tails" a logfile and parses each log message.
|
||||||
|
|
||||||
@@ -49,3 +49,7 @@ The plugin expects messages in one of the
|
|||||||
data_format = "influx"
|
data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Metrics:
|
||||||
|
|
||||||
|
Metrics are produced according to the `data_format` option. Additionally a
|
||||||
|
tag labeled `path` is added to the metric containing the filename being tailed.
|
||||||
|
|||||||
@@ -146,7 +146,11 @@ func (t *Tail) receiver(tailer *tail.Tail) {
|
|||||||
|
|
||||||
m, err = t.parser.ParseLine(text)
|
m, err = t.parser.ParseLine(text)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
|
if m != nil {
|
||||||
|
tags := m.Tags()
|
||||||
|
tags["path"] = tailer.Filename
|
||||||
|
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
|
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
|
||||||
tailer.Filename, line.Text, err))
|
tailer.Filename, line.Text, err))
|
||||||
|
|||||||
@@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
|
|||||||
listener.acc = &acc
|
listener.acc = &acc
|
||||||
defer close(listener.done)
|
defer close(listener.done)
|
||||||
|
|
||||||
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "udp_json_test",
|
||||||
|
})
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.tcpParser()
|
go listener.tcpParser()
|
||||||
|
|
||||||
|
|||||||
@@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) {
|
|||||||
listener.acc = &acc
|
listener.acc = &acc
|
||||||
defer close(listener.done)
|
defer close(listener.done)
|
||||||
|
|
||||||
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
|
listener.parser, _ = parsers.NewParser(&parsers.Config{
|
||||||
|
DataFormat: "json",
|
||||||
|
MetricName: "udp_json_test",
|
||||||
|
})
|
||||||
listener.wg.Add(1)
|
listener.wg.Add(1)
|
||||||
go listener.udpParser()
|
go listener.udpParser()
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ var (
|
|||||||
type JSONParser struct {
|
type JSONParser struct {
|
||||||
MetricName string
|
MetricName string
|
||||||
TagKeys []string
|
TagKeys []string
|
||||||
|
FieldKeys []string
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,6 +87,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//if field_keys is specified, only those values should be reported as fields
|
||||||
|
if len(p.FieldKeys) > 0 {
|
||||||
|
nFields := make(map[string]interface{})
|
||||||
|
for _, name := range p.FieldKeys {
|
||||||
|
if fields[name] != nil {
|
||||||
|
nFields[name] = fields[name]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tags, nFields
|
||||||
|
}
|
||||||
|
|
||||||
//remove any additional string/bool values from fields
|
//remove any additional string/bool values from fields
|
||||||
for k := range fields {
|
for k := range fields {
|
||||||
switch fields[k].(type) {
|
switch fields[k].(type) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package json
|
package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -448,22 +449,28 @@ func TestJSONParseNestedArray(t *testing.T) {
|
|||||||
"total_devices": 5,
|
"total_devices": 5,
|
||||||
"total_threads": 10,
|
"total_threads": 10,
|
||||||
"shares": {
|
"shares": {
|
||||||
"total": 5,
|
"total": 5,
|
||||||
"accepted": 5,
|
"accepted": 5,
|
||||||
"rejected": 0,
|
"rejected": 0,
|
||||||
"avg_find_time": 4,
|
"avg_find_time": 4,
|
||||||
"tester": "work",
|
"tester": "work",
|
||||||
"tester2": "don't want this",
|
"tester2": "don't want this",
|
||||||
"tester3": 7.93
|
"tester3": {
|
||||||
|
"hello":"sup",
|
||||||
|
"fun":"money",
|
||||||
|
"break":9
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}`
|
}`
|
||||||
|
|
||||||
parser := JSONParser{
|
parser := JSONParser{
|
||||||
MetricName: "json_test",
|
MetricName: "json_test",
|
||||||
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
|
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
|
||||||
|
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics, err := parser.Parse([]byte(testString))
|
metrics, err := parser.Parse([]byte(testString))
|
||||||
|
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
|
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,6 +56,8 @@ type Config struct {
|
|||||||
|
|
||||||
// TagKeys only apply to JSON data
|
// TagKeys only apply to JSON data
|
||||||
TagKeys []string
|
TagKeys []string
|
||||||
|
// FieldKeys only apply to JSON
|
||||||
|
FieldKeys []string
|
||||||
// MetricName applies to JSON & value. This will be the name of the measurement.
|
// MetricName applies to JSON & value. This will be the name of the measurement.
|
||||||
MetricName string
|
MetricName string
|
||||||
|
|
||||||
@@ -95,8 +97,8 @@ func NewParser(config *Config) (Parser, error) {
|
|||||||
var parser Parser
|
var parser Parser
|
||||||
switch config.DataFormat {
|
switch config.DataFormat {
|
||||||
case "json":
|
case "json":
|
||||||
parser, err = NewJSONParser(config.MetricName,
|
parser, err = newJSONParser(config.MetricName,
|
||||||
config.TagKeys, config.DefaultTags)
|
config.TagKeys, config.FieldKeys, config.DefaultTags)
|
||||||
case "value":
|
case "value":
|
||||||
parser, err = NewValueParser(config.MetricName,
|
parser, err = NewValueParser(config.MetricName,
|
||||||
config.DataType, config.DefaultTags)
|
config.DataType, config.DefaultTags)
|
||||||
@@ -126,6 +128,22 @@ func NewParser(config *Config) (Parser, error) {
|
|||||||
return parser, err
|
return parser, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newJSONParser(
|
||||||
|
metricName string,
|
||||||
|
tagKeys []string,
|
||||||
|
fieldKeys []string,
|
||||||
|
defaultTags map[string]string,
|
||||||
|
) (Parser, error) {
|
||||||
|
parser := &json.JSONParser{
|
||||||
|
MetricName: metricName,
|
||||||
|
TagKeys: tagKeys,
|
||||||
|
FieldKeys: fieldKeys,
|
||||||
|
DefaultTags: defaultTags,
|
||||||
|
}
|
||||||
|
return parser, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//Deprecated: Use NewParser to get a JSONParser object
|
||||||
func NewJSONParser(
|
func NewJSONParser(
|
||||||
metricName string,
|
metricName string,
|
||||||
tagKeys []string,
|
tagKeys []string,
|
||||||
|
|||||||
Reference in New Issue
Block a user