Compare commits

..

1 Commits

Author SHA1 Message Date
Max U
640ae884ea update readme to include procstat_lookup metric 2018-06-22 10:01:13 -07:00
21 changed files with 128 additions and 369 deletions

View File

@@ -20,8 +20,6 @@
- [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input. - [#4259](https://github.com/influxdata/telegraf/pull/4259): Add container status tag to docker input.
- [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin. - [#3523](https://github.com/influxdata/telegraf/pull/3523): Add valuecounter aggregator plugin.
- [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input. - [#4307](https://github.com/influxdata/telegraf/pull/4307): Add new measurement with results of pgrep lookup to procstat input.
- [#4311](https://github.com/influxdata/telegraf/pull/4311): Add support for comma in logparser timestamp format.
- [#4292](https://github.com/influxdata/telegraf/pull/4292): Add path tag to tail input plugin.
## v1.7.1 [unreleased] ## v1.7.1 [unreleased]
@@ -29,7 +27,6 @@
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal. - [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser. - [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
## v1.7 [2018-06-12] ## v1.7 [2018-06-12]

View File

@@ -9,8 +9,6 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only) 1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok)
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
@@ -653,37 +651,5 @@ For more information about the dropwizard json format see
# [inputs.exec.dropwizard_tag_paths] # [inputs.exec.dropwizard_tag_paths]
# tag1 = "tags.tag1" # tag1 = "tags.tag1"
# tag2 = "tags.tag2" # tag2 = "tags.tag2"
```
#### Grok
Parse logstash-style "grok" patterns:
```toml
[inputs.reader]
## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one pattern per logparser.
## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
name_override = "apache_access_log"
## Full path(s) to custom pattern files.
custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line.
custom_patterns = '''
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
timezone = "Canada/Eastern"
``` ```

View File

@@ -211,12 +211,16 @@ var header = `# Telegraf Configuration
# Environment variables can be used anywhere in this config file, simply prepend # Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"), # them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR) # for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)
# Global tags can be specified here in key="value" format. # Global tags can be specified here in key="value" format.
[global_tags] [global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1 # dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a" # rack = "1a"
## Environment variables can be used as tags, and throughout the config file ## Environment variables can be used as tags, and throughout the config file
# user = "$USER" # user = "$USER"
# Configuration for telegraf agent # Configuration for telegraf agent
[agent] [agent]
## Default data collection interval for all inputs ## Default data collection interval for all inputs
@@ -224,20 +228,24 @@ var header = `# Telegraf Configuration
## Rounds collection interval to 'interval' ## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc. ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true round_interval = true
## Telegraf will send metrics to outputs in batches of at most ## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics. ## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins. ## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000 metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics ## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills. ## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s). ## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000 metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount. ## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting. ## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the ## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system. ## same time, which can have a measurable effect on the system.
collection_jitter = "0s" collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below ## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter ## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s" flush_interval = "10s"
@@ -245,6 +253,7 @@ var header = `# Telegraf Configuration
## large write spikes for users running a large number of telegraf instances. ## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s" flush_jitter = "0s"
## By default or when set to "0s", precision will be set to the same ## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s. ## timestamp order as the collection interval, with the maximum being 1s.
## ie, when interval = "10s", precision will be "1s" ## ie, when interval = "10s", precision will be "1s"
@@ -253,6 +262,7 @@ var header = `# Telegraf Configuration
## service input to set the timestamp at the appropriate precision. ## service input to set the timestamp at the appropriate precision.
## Valid time units are "ns", "us" (or "µs"), "ms", "s". ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
precision = "" precision = ""
## Logging configuration: ## Logging configuration:
## Run telegraf with debug log messages. ## Run telegraf with debug log messages.
debug = false debug = false
@@ -260,34 +270,41 @@ var header = `# Telegraf Configuration
quiet = false quiet = false
## Specify the log file name. The empty string means to log to stderr. ## Specify the log file name. The empty string means to log to stderr.
logfile = "" logfile = ""
## Override default hostname, if empty use os.Hostname() ## Override default hostname, if empty use os.Hostname()
hostname = "" hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent. ## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false omit_hostname = false
############################################################################### ###############################################################################
# OUTPUT PLUGINS # # OUTPUT PLUGINS #
############################################################################### ###############################################################################
` `
var processorHeader = ` var processorHeader = `
############################################################################### ###############################################################################
# PROCESSOR PLUGINS # # PROCESSOR PLUGINS #
############################################################################### ###############################################################################
` `
var aggregatorHeader = ` var aggregatorHeader = `
############################################################################### ###############################################################################
# AGGREGATOR PLUGINS # # AGGREGATOR PLUGINS #
############################################################################### ###############################################################################
` `
var inputHeader = ` var inputHeader = `
############################################################################### ###############################################################################
# INPUT PLUGINS # # INPUT PLUGINS #
############################################################################### ###############################################################################
` `
var serviceInputHeader = ` var serviceInputHeader = `
############################################################################### ###############################################################################
# SERVICE INPUT PLUGINS # # SERVICE INPUT PLUGINS #
############################################################################### ###############################################################################
@@ -1321,59 +1338,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
//for grok data_format
if node, ok := tbl.Fields["named_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.NamedPatterns = append(c.NamedPatterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.Patterns = append(c.Patterns, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["custom_patterns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CustomPatterns = str.Value
}
}
}
if node, ok := tbl.Fields["custom_pattern_files"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CustomPatternFiles = append(c.CustomPatternFiles, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["timezone"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.TimeZone = str.Value
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
@@ -1389,11 +1353,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "dropwizard_time_format") delete(tbl.Fields, "dropwizard_time_format")
delete(tbl.Fields, "dropwizard_tags_path") delete(tbl.Fields, "dropwizard_tags_path")
delete(tbl.Fields, "dropwizard_tag_paths") delete(tbl.Fields, "dropwizard_tag_paths")
delete(tbl.Fields, "named_patterns")
delete(tbl.Fields, "patterns")
delete(tbl.Fields, "custom_patterns")
delete(tbl.Fields, "custom_pattern_files")
delete(tbl.Fields, "timezone")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@@ -85,7 +85,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent"
_ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/reader"
_ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/riak"

View File

@@ -108,9 +108,7 @@ You must capture at least one field per line.
- ts-"CUSTOM" - ts-"CUSTOM"
CUSTOM time layouts must be within quotes and be the representation of the CUSTOM time layouts must be within quotes and be the representation of the
"reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`. "reference time", which is `Mon Jan 2 15:04:05 -0700 MST 2006`
To match a comma decimal point you can use a period. For example `%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"}` can be used to match `"2018-01-02 15:04:05,000"`
To match a comma decimal point you can use a period in the pattern string.
See https://golang.org/pkg/time/#Parse for more details. See https://golang.org/pkg/time/#Parse for more details.
Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns), Telegraf has many of its own [built-in patterns](./grok/patterns/influx-patterns),

View File

@@ -335,9 +335,6 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
// Replace commas with dot character
v = strings.Replace(v, ",", ".", -1)
ts, err := time.ParseInLocation(t, v, p.loc) ts, err := time.ParseInLocation(t, v, p.loc)
if err == nil { if err == nil {
timestamp = ts timestamp = ts

View File

@@ -982,21 +982,3 @@ func TestSyslogTimestampParser(t *testing.T) {
require.NotNil(t, m) require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year()) require.Equal(t, 2018, m.Time().Year())
} }
func TestReplaceTimestampComma(t *testing.T) {
p := &Parser{
Patterns: []string{`%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} successfulMatches=%{NUMBER:value:int}`},
}
require.NoError(t, p.Compile())
m, err := p.ParseLine("2018-02-21 13:10:34,555 successfulMatches=1")
require.NoError(t, err)
require.NotNil(t, m)
require.Equal(t, 2018, m.Time().Year())
require.Equal(t, 13, m.Time().Hour())
require.Equal(t, 34, m.Time().Second())
//Convert Nanosecond to milisecond for compare
require.Equal(t, 555, m.Time().Nanosecond()/1000000)
}

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path"
"strings" "strings"
"time" "time"
@@ -28,37 +28,36 @@ func getQueueDirectory() (string, error) {
return strings.TrimSpace(string(qd)), nil return strings.TrimSpace(string(qd)), nil
} }
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) { func qScan(path string) (int64, int64, int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, 0, 0, err
}
finfos, err := f.Readdir(-1)
f.Close()
if err != nil {
return 0, 0, 0, err
}
var length, size int64 var length, size int64
var oldest time.Time var oldest time.Time
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error { for _, finfo := range finfos {
if err != nil {
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
return nil
}
if finfo.IsDir() {
return nil
}
length++ length++
size += finfo.Size() size += finfo.Size()
ctime := statCTime(finfo.Sys()) ctime := statCTime(finfo.Sys())
if ctime.IsZero() { if ctime.IsZero() {
return nil continue
} }
if oldest.IsZero() || ctime.Before(oldest) { if oldest.IsZero() || ctime.Before(oldest) {
oldest = ctime oldest = ctime
} }
return nil
})
if err != nil {
return 0, 0, 0, err
} }
var age int64 var age int64
if !oldest.IsZero() { if !oldest.IsZero() {
age = int64(time.Now().Sub(oldest) / time.Second) age = int64(time.Now().Sub(oldest) / time.Second)
} else if length != 0 { } else if len(finfos) != 0 {
// system doesn't support ctime // system doesn't support ctime
age = -1 age = -1
} }
@@ -78,8 +77,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
} }
} }
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} { for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc) length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err)) acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
continue continue
@@ -91,6 +90,30 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q}) acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
} }
var dLength, dSize int64
dAge := int64(-1)
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
if err != nil {
if os.IsNotExist(err) {
// the directories are created on first use
continue
}
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
return nil
}
dLength += length
dSize += size
if age > dAge {
dAge = age
}
}
fields := map[string]interface{}{"length": dLength, "size": dSize}
if dAge != -1 {
fields["age"] = dAge
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
return nil return nil
} }

View File

@@ -3,7 +3,7 @@ package postfix
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@@ -16,16 +16,19 @@ func TestGather(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(td) defer os.RemoveAll(td)
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} { for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755)) require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
}
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
} }
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644)) require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
p := Postfix{ p := Postfix{
QueueDirectory: td, QueueDirectory: td,

View File

@@ -1,6 +1,8 @@
# Procstat Input Plugin # Procstat Input Plugin
The procstat plugin can be used to monitor the system resource usage of one or more processes. The procstat plugin can be used to monitor the system resource usage of one or more processes.
The procstat_lookup metric displays the query information,
specifically the number of PIDs returned on a search
Processes can be selected for monitoring using one of several methods: Processes can be selected for monitoring using one of several methods:
- pidfile - pidfile

View File

@@ -1,23 +0,0 @@
# Reader Input Plugin
The `reader` plugin reads and parses files every interval. Reader will always begin at the top of each file.
Reader supports all data_format formats
### Configuration
```toml
## Files to parse each interval.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## The dataformat to be read from files
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = ""
```

View File

@@ -1,14 +0,0 @@
version: '3'
services:
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
- ./json_a.log:/var/log/test.log
entrypoint:
- /telegraf
- --config
- /telegraf.conf

View File

@@ -1,15 +0,0 @@
{
"parent": {
"child": 3.0,
"ignored_child": "hi"
},
"ignored_null": null,
"integer": 4,
"list": [3, 4],
"ignored_parent": {
"another_ignored_null": null,
"ignored_string": "hello, world!"
},
"another_list": [4]
}

View File

@@ -1,8 +0,0 @@
[[inputs.reader]]
files = ["/var/log/test.log"]
data_format = "json"
name_override = "json_reader"
[[outputs.file]]
files = ["stdout"]

View File

@@ -6,19 +6,27 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
) )
type Reader struct { type Reader struct {
Filepaths []string `toml:"files"` Filepaths []string `toml:"files"`
FromBeginning bool FromBeginning bool
parser parsers.Parser DataFormat string `toml:"data_format"`
ParserConfig parsers.Config
Parser parsers.Parser
Tags []string
Filenames []string Filenames []string
//for grok parser
Patterns []string
namedPatterns []string
CustomPatterns string
CustomPatternFiles []string
} }
const sampleConfig = `## Files to parse each interval. const sampleConfig = `## Files to parse.
## These accept standard unix glob matching rules, but with the addition of ## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie: ## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/**.log -> recursively find all .log files in /var/log
@@ -30,8 +38,7 @@ files = ["/var/log/apache/access.log"]
## Each data format has its own unique set of configuration options, read ## Each data format has its own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "" data_format = ""`
`
// SampleConfig returns the default configuration of the Input // SampleConfig returns the default configuration of the Input
func (r *Reader) SampleConfig() string { func (r *Reader) SampleConfig() string {
@@ -50,21 +57,34 @@ func (r *Reader) Gather(acc telegraf.Accumulator) error {
return err return err
} }
for i, m := range metrics { for _, m := range metrics {
//error if m is nil
if m == nil {
log.Printf("E! Metric could not be parsed from: %v, on line %v", k, i)
continue
}
acc.AddFields(m.Name(), m.Fields(), m.Tags()) acc.AddFields(m.Name(), m.Fields(), m.Tags())
} }
} }
return nil return nil
} }
func (r *Reader) SetParser(p parsers.Parser) { func (r *Reader) compileParser() {
r.parser = p if r.DataFormat == "" {
log.Printf("E! No data_format specified")
return
}
r.ParserConfig = parsers.Config{
DataFormat: r.DataFormat,
TagKeys: r.Tags,
//grok settings
Patterns: r.Patterns,
NamedPatterns: r.namedPatterns,
CustomPatterns: r.CustomPatterns,
CustomPatternFiles: r.CustomPatternFiles,
}
nParser, err := parsers.NewParser(&r.ParserConfig)
if err != nil {
log.Printf("E! Error building parser: %v", err)
}
r.Parser = nParser
} }
func (r *Reader) refreshFilePaths() { func (r *Reader) refreshFilePaths() {
@@ -91,12 +111,7 @@ func (r *Reader) readMetric(filename string) ([]telegraf.Metric, error) {
if err != nil { if err != nil {
log.Printf("E! File could not be opened: %v", filename) log.Printf("E! File could not be opened: %v", filename)
} }
return r.parser.Parse(fileContents)
return r.Parser.Parse(fileContents)
} }
func init() {
inputs.Add("reader", func() telegraf.Input {
return &Reader{}
})
}

View File

@@ -1,11 +1,11 @@
package reader package reader
import ( import (
"log"
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -17,23 +17,20 @@ func TestRefreshFilePaths(t *testing.T) {
} }
r.refreshFilePaths() r.refreshFilePaths()
//log.Printf("filenames: %v", filenames)
assert.Equal(t, len(r.Filenames), 2) assert.Equal(t, len(r.Filenames), 2)
} }
func TestJSONParserCompile(t *testing.T) { func TestJSONParserCompile(t *testing.T) {
testDir := getPluginDir() testDir := getPluginDir()
var acc testutil.Accumulator var acc testutil.Accumulator
r := Reader{ r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/json_a.log"}, Filepaths: []string{testDir + "/reader/testfiles/json_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "json", DataFormat: "json",
TagKeys: []string{"parent_ignored_child"}, Tags: []string{"parent_ignored_child"},
} }
nParser, err := parsers.NewParser(&parserConfig) r.compileParser()
r.parser = nParser
assert.NoError(t, err)
r.Gather(&acc) r.Gather(&acc)
log.Printf("acc: %v", acc.Metrics[0].Tags)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields)) assert.Equal(t, 5, len(acc.Metrics[0].Fields))
} }
@@ -42,19 +39,16 @@ func TestGrokParser(t *testing.T) {
testDir := getPluginDir() testDir := getPluginDir()
var acc testutil.Accumulator var acc testutil.Accumulator
r := Reader{ r := Reader{
Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"}, Filepaths: []string{testDir + "/reader/testfiles/grok_a.log"},
}
parserConfig := parsers.Config{
DataFormat: "grok", DataFormat: "grok",
Patterns: []string{"%{COMMON_LOG_FORMAT}"}, Patterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
nParser, err := parsers.NewParser(&parserConfig) r.compileParser()
r.parser = nParser err := r.Gather(&acc)
assert.NoError(t, err) log.Printf("err: %v", err)
log.Printf("metric[0]_tags: %v, metric[0]_fields: %v", acc.Metrics[0].Tags, acc.Metrics[0].Fields)
err = r.Gather(&acc) log.Printf("metric[1]_tags: %v, metric[1]_fields: %v", acc.Metrics[1].Tags, acc.Metrics[1].Fields)
assert.Equal(t, 2, len(acc.Metrics)) assert.Equal(t, 2, len(acc.Metrics))
} }

View File

@@ -1,4 +1,4 @@
# Tail Input Plugin # tail Input Plugin
The tail plugin "tails" a logfile and parses each log message. The tail plugin "tails" a logfile and parses each log message.
@@ -49,7 +49,3 @@ The plugin expects messages in one of the
data_format = "influx" data_format = "influx"
``` ```
### Metrics:
Metrics are produced according to the `data_format` option. Additionally a
tag labeled `path` is added to the metric containing the filename being tailed.

View File

@@ -146,11 +146,7 @@ func (t *Tail) receiver(tailer *tail.Tail) {
m, err = t.parser.ParseLine(text) m, err = t.parser.ParseLine(text)
if err == nil { if err == nil {
if m != nil { t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else { } else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err)) tailer.Filename, line.Text, err))

View File

@@ -94,7 +94,6 @@ type Config struct {
NamedPatterns []string NamedPatterns []string
CustomPatterns string CustomPatterns string
CustomPatternFiles []string CustomPatternFiles []string
TimeZone string
} }
// NewParser returns a Parser interface based on the given config. // NewParser returns a Parser interface based on the given config.
@@ -128,15 +127,13 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "grok": case "grok":
parser, err = NewGrokParser( parser, err = NewGrokParser(
config.MetricName, config.MetricName,
config.Patterns, config.Patterns,
config.NamedPatterns, config.NamedPatterns,
config.CustomPatterns, config.CustomPatterns,
config.CustomPatternFiles, config.CustomPatternFiles)
config.TimeZone)
default: default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat) err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
} }
@@ -147,14 +144,13 @@ func NewGrokParser(metricName string,
patterns []string, patterns []string,
nPatterns []string, nPatterns []string,
cPatterns string, cPatterns string,
cPatternFiles []string, tZone string) (Parser, error) { cPatternFiles []string) (Parser, error) {
parser := grok.Parser{ parser := grok.Parser{
Measurement: metricName, Measurement: metricName,
Patterns: patterns, Patterns: patterns,
NamedPatterns: nPatterns, NamedPatterns: nPatterns,
CustomPatterns: cPatterns, CustomPatterns: cPatterns,
CustomPatternFiles: cPatternFiles, CustomPatternFiles: cPatternFiles,
Timezone: tZone,
} }
parser.Compile() parser.Compile()

View File

@@ -1,104 +0,0 @@
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s.
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stderr.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
# # reload and gather from file[s] on telegraf's interval
[[inputs.reader]]
# ## These accept standard unix glob matching rules, but with the addition of
# ## ** as a "super asterisk". ie:
# ## /var/log/**.log -> recursively find all .log files in /var/log
# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
# ## /var/log/apache.log -> only tail the apache log file
files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/**.log"]
#
# ## The dataformat to be read from files
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "grok"
#
patterns = ["%{TEST_LOG_B}","%{TEST_LOG_A}"]
#
# ## Name of the outputted measurement name.
name_override = "grok_reader"
#
# ## Full path(s) to custom pattern files.
custom_pattern_files = ["/Users/maxu/go/src/github.com/influxdata/telegraf/plugins/inputs/logparser/grok/testdata/test-patterns"]
#
# ## Custom patterns can also be defined here. Put one pattern per line.
# custom_patterns = '''
# '''
#
# ## Timezone allows you to provide an override for timestamps that
# ## don't already include an offset
# ## e.g. 04/06/2016 12:41:45 data one two 5.43µs
# ##
# ## Default: "" which renders UTC
# ## Options are as follows:
# ## 1. Local -- interpret based on machine localtime
# ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
# ## 3. UTC -- or blank/unspecified, will return timestamp in UTC
# timezone = "Canada/Eastern"