This commit is contained in:
ncohensm 2016-08-04 10:43:41 -07:00
commit 739e0af4f7
18 changed files with 349 additions and 184 deletions

View File

@ -4,6 +4,7 @@
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio. - [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
### Bugfixes ### Bugfixes
@ -12,6 +13,7 @@
- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes. - [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes.
- [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters. - [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters.
- [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does). - [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does).
- [#1379](https://github.com/influxdata/telegraf/issues/1379): Fix covering Amazon Linux for post remove flow.
## v1.0 beta 3 [2016-07-18] ## v1.0 beta 3 [2016-07-18]
@ -63,7 +65,6 @@ should now look like:
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data - [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
### Bugfixes ### Bugfixes

View File

@ -11,6 +11,7 @@ Output plugins READMEs are less structured,
but any information you can provide on how the data will look is appreciated. but any information you can provide on how the data will look is appreciated.
See the [OpenTSDB output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) See the [OpenTSDB output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb)
for a good example. for a good example.
1. **Optional:** Write a [tickscript](https://docs.influxdata.com/kapacitor/v1.0/tick/syntax/) for your plugin and add it to [Kapacitor](https://github.com/influxdata/kapacitor/tree/master/examples/telegraf). Or mention @jackzampolin in a PR comment with some common queries that you would want to alert on and he will write one for you.
## GoDoc ## GoDoc

View File

@ -39,12 +39,6 @@ var fOutputList = flag.Bool("output-list", false,
"print available output plugins.") "print available output plugins.")
var fUsage = flag.String("usage", "", var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'") "print usage for a plugin, ie, 'telegraf -usage mysql'")
var fInputFiltersLegacy = flag.String("filter", "",
"filter the inputs to enable, separator is :")
var fOutputFiltersLegacy = flag.String("outputfilter", "",
"filter the outputs to enable, separator is :")
var fConfigDirectoryLegacy = flag.String("configdirectory", "",
"directory containing additional *.conf files")
// Telegraf version, populated linker. // Telegraf version, populated linker.
// ie, -ldflags "-X main.version=`git describe --always --tags`" // ie, -ldflags "-X main.version=`git describe --always --tags`"
@ -110,24 +104,11 @@ func main() {
args := flag.Args() args := flag.Args()
var inputFilters []string var inputFilters []string
if *fInputFiltersLegacy != "" {
fmt.Printf("WARNING '--filter' flag is deprecated, please use" +
" '--input-filter'")
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" { if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters) inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":") inputFilters = strings.Split(":"+inputFilter+":", ":")
} }
var outputFilters []string var outputFilters []string
if *fOutputFiltersLegacy != "" {
fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" +
" '--output-filter'")
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" { if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters) outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":") outputFilters = strings.Split(":"+outputFilter+":", ":")
@ -145,34 +126,28 @@ func main() {
} }
} }
if *fOutputList { // switch for flags which just do something and exit immediately
switch {
case *fOutputList:
fmt.Println("Available Output Plugins:") fmt.Println("Available Output Plugins:")
for k, _ := range outputs.Outputs { for k, _ := range outputs.Outputs {
fmt.Printf(" %s\n", k) fmt.Printf(" %s\n", k)
} }
return return
} case *fInputList:
if *fInputList {
fmt.Println("Available Input Plugins:") fmt.Println("Available Input Plugins:")
for k, _ := range inputs.Inputs { for k, _ := range inputs.Inputs {
fmt.Printf(" %s\n", k) fmt.Printf(" %s\n", k)
} }
return return
} case *fVersion:
if *fVersion {
v := fmt.Sprintf("Telegraf - version %s", version) v := fmt.Sprintf("Telegraf - version %s", version)
fmt.Println(v) fmt.Println(v)
return return
} case *fSampleConfig:
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters) config.PrintSampleConfig(inputFilters, outputFilters)
return return
} case *fUsage != "":
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil { if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2) log.Fatalf("%s and %s", err, err2)
@ -191,15 +166,6 @@ func main() {
os.Exit(1) os.Exit(1)
} }
if *fConfigDirectoryLegacy != "" {
fmt.Printf("WARNING '--configdirectory' flag is deprecated, please use" +
" '--config-directory'")
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}
if *fConfigDirectory != "" { if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory) err = c.LoadDirectory(*fConfigDirectory)
if err != nil { if err != nil {

View File

@ -1577,7 +1577,7 @@
# ## /var/log/**.log -> recursively find all .log files in /var/log # ## /var/log/**.log -> recursively find all .log files in /var/log
# ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log # ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
# ## /var/log/apache.log -> only tail the apache log file # ## /var/log/apache.log -> only tail the apache log file
# files = ["/var/log/influxdb/influxdb.log"] # files = ["/var/log/apache/access.log"]
# ## Read file from beginning. # ## Read file from beginning.
# from_beginning = false # from_beginning = false
# #
@ -1590,9 +1590,9 @@
# ## Other common built-in patterns are: # ## Other common built-in patterns are:
# ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
# ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
# patterns = ["%{INFLUXDB_HTTPD_LOG}"] # patterns = ["%{COMBINED_LOG_FORMAT}"]
# ## Name of the outputted measurement name. # ## Name of the outputted measurement name.
# measurement = "influxdb_log" # measurement = "apache_access_log"
# ## Full path(s) to custom pattern files. # ## Full path(s) to custom pattern files.
# custom_pattern_files = [] # custom_pattern_files = []
# ## Custom patterns can also be defined here. Put one pattern per line. # ## Custom patterns can also be defined here. Put one pattern per line.

View File

@ -14,17 +14,22 @@ regex patterns.
## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/influxdb/influxdb.log"] files = ["/var/log/apache/access.log"]
## Read file from beginning. ## Read file from beginning.
from_beginning = false from_beginning = false
## Parse logstash-style "grok" patterns: ## Parse logstash-style "grok" patterns:
## Telegraf builtin parsing patterns: https://goo.gl/dkay10 ## Telegraf built-in parsing patterns: https://goo.gl/dkay10
[inputs.logparser.grok] [inputs.logparser.grok]
## This is a list of patterns to check the given log file(s) for. ## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most ## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one file & pattern per logparser. ## efficient configuration is to have one pattern per logparser.
patterns = ["%{INFLUXDB_HTTPD_LOG}"] ## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name.
measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.
@ -32,8 +37,6 @@ regex patterns.
''' '''
``` ```
> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher.
## Grok Parser ## Grok Parser
The grok parser uses a slightly modified version of logstash "grok" patterns, The grok parser uses a slightly modified version of logstash "grok" patterns,
@ -69,6 +72,7 @@ Timestamp modifiers can be used to convert captures to the timestamp of the
- tag (converts the field into a tag) - tag (converts the field into a tag)
- drop (drops the field completely) - drop (drops the field completely)
- Timestamp modifiers: - Timestamp modifiers:
- ts (This will auto-learn the timestamp format)
- ts-ansic ("Mon Jan _2 15:04:05 2006") - ts-ansic ("Mon Jan _2 15:04:05 2006")
- ts-unix ("Mon Jan _2 15:04:05 MST 2006") - ts-unix ("Mon Jan _2 15:04:05 MST 2006")
- ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") - ts-ruby ("Mon Jan 02 15:04:05 -0700 2006")

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
var timeFormats = map[string]string{ var timeLayouts = map[string]string{
"ts-ansic": "Mon Jan _2 15:04:05 2006", "ts-ansic": "Mon Jan _2 15:04:05 2006",
"ts-unix": "Mon Jan _2 15:04:05 MST 2006", "ts-unix": "Mon Jan _2 15:04:05 MST 2006",
"ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006",
@ -27,8 +27,11 @@ var timeFormats = map[string]string{
"ts-rfc3339": "2006-01-02T15:04:05Z07:00", "ts-rfc3339": "2006-01-02T15:04:05Z07:00",
"ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00",
"ts-httpd": "02/Jan/2006:15:04:05 -0700", "ts-httpd": "02/Jan/2006:15:04:05 -0700",
// These three are not exactly "layouts", but they are special cases that
// will get handled in the ParseLine function.
"ts-epoch": "EPOCH", "ts-epoch": "EPOCH",
"ts-epochnano": "EPOCH_NANO", "ts-epochnano": "EPOCH_NANO",
"ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts.
} }
const ( const (
@ -38,16 +41,19 @@ const (
STRING = "string" STRING = "string"
DURATION = "duration" DURATION = "duration"
DROP = "drop" DROP = "drop"
EPOCH = "EPOCH"
EPOCH_NANO = "EPOCH_NANO"
GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP"
) )
var ( var (
// matches named captures that contain a type. // matches named captures that contain a modifier.
// ie, // ie,
// %{NUMBER:bytes:int} // %{NUMBER:bytes:int}
// %{IPORHOST:clientip:tag} // %{IPORHOST:clientip:tag}
// %{HTTPDATE:ts1:ts-http} // %{HTTPDATE:ts1:ts-http}
// %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"}
typedRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`)
// matches a plain pattern name. ie, %{NUMBER} // matches a plain pattern name. ie, %{NUMBER}
patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) patternOnlyRe = regexp.MustCompile(`%{(\w+)}`)
) )
@ -87,6 +93,12 @@ type Parser struct {
// "RESPONSE_CODE": "%{NUMBER:rc:tag}" // "RESPONSE_CODE": "%{NUMBER:rc:tag}"
// } // }
patterns map[string]string patterns map[string]string
// foundTsLayouts is a slice of timestamp patterns that have been found
// in the log lines. This slice gets updated if the user uses the generic
// 'ts' modifier for timestamps. This slice is checked first for matches,
// so that previously-matched layouts get priority over all other timestamp
// layouts.
foundTsLayouts []string
g *grok.Grok g *grok.Grok
tsModder *tsModder tsModder *tsModder
@ -140,6 +152,7 @@ func (p *Parser) Compile() error {
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var err error var err error
// values are the parsed fields from the log line
var values map[string]string var values map[string]string
// the matching pattern string // the matching pattern string
var patternName string var patternName string
@ -165,6 +178,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
continue continue
} }
// t is the modifier of the field
var t string var t string
// check if pattern has some modifiers // check if pattern has some modifiers
if types, ok := p.typeMap[patternName]; ok { if types, ok := p.typeMap[patternName]; ok {
@ -210,20 +224,50 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
tags[k] = v tags[k] = v
case STRING: case STRING:
fields[k] = strings.Trim(v, `"`) fields[k] = strings.Trim(v, `"`)
case "EPOCH": case EPOCH:
iv, err := strconv.ParseInt(v, 10, 64) iv, err := strconv.ParseInt(v, 10, 64)
if err != nil { if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err) log.Printf("ERROR parsing %s to int: %s", v, err)
} else { } else {
timestamp = time.Unix(iv, 0) timestamp = time.Unix(iv, 0)
} }
case "EPOCH_NANO": case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64) iv, err := strconv.ParseInt(v, 10, 64)
if err != nil { if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err) log.Printf("ERROR parsing %s to int: %s", v, err)
} else { } else {
timestamp = time.Unix(0, iv) timestamp = time.Unix(0, iv)
} }
case GENERIC_TIMESTAMP:
var foundTs bool
// first try timestamp layouts that we've already found
for _, layout := range p.foundTsLayouts {
ts, err := time.Parse(layout, v)
if err == nil {
timestamp = ts
foundTs = true
break
}
}
// if we haven't found a timestamp layout yet, try all timestamp
// layouts.
if !foundTs {
for _, layout := range timeLayouts {
ts, err := time.Parse(layout, v)
if err == nil {
timestamp = ts
foundTs = true
p.foundTsLayouts = append(p.foundTsLayouts, layout)
break
}
}
}
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("ERROR parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP: case DROP:
// goodbye! // goodbye!
default: default:
@ -267,7 +311,7 @@ func (p *Parser) compileCustomPatterns() error {
// check if pattern contains modifiers. Parse them out if it does. // check if pattern contains modifiers. Parse them out if it does.
for name, pattern := range p.patterns { for name, pattern := range p.patterns {
if typedRe.MatchString(pattern) { if modifierRe.MatchString(pattern) {
// this pattern has modifiers, so parse out the modifiers // this pattern has modifiers, so parse out the modifiers
pattern, err = p.parseTypedCaptures(name, pattern) pattern, err = p.parseTypedCaptures(name, pattern)
if err != nil { if err != nil {
@ -280,13 +324,13 @@ func (p *Parser) compileCustomPatterns() error {
return p.g.AddPatternsFromMap(p.patterns) return p.g.AddPatternsFromMap(p.patterns)
} }
// parseTypedCaptures parses the capture types, and then deletes the type from // parseTypedCaptures parses the capture modifiers, and then deletes the
// the line so that it is a valid "grok" pattern again. // modifier from the line so that it is a valid "grok" pattern again.
// ie, // ie,
// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) // %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int)
// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) // %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag)
func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
matches := typedRe.FindAllStringSubmatch(pattern, -1) matches := modifierRe.FindAllStringSubmatch(pattern, -1)
// grab the name of the capture pattern // grab the name of the capture pattern
patternName := "%{" + name + "}" patternName := "%{" + name + "}"
@ -298,16 +342,18 @@ func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) {
hasTimestamp := false hasTimestamp := false
for _, match := range matches { for _, match := range matches {
// regex capture 1 is the name of the capture // regex capture 1 is the name of the capture
// regex capture 2 is the type of the capture // regex capture 2 is the modifier of the capture
if strings.HasPrefix(match[2], "ts-") { if strings.HasPrefix(match[2], "ts") {
if hasTimestamp { if hasTimestamp {
return pattern, fmt.Errorf("logparser pattern compile error: "+ return pattern, fmt.Errorf("logparser pattern compile error: "+
"Each pattern is allowed only one named "+ "Each pattern is allowed only one named "+
"timestamp data type. pattern: %s", pattern) "timestamp data type. pattern: %s", pattern)
} }
if f, ok := timeFormats[match[2]]; ok { if layout, ok := timeLayouts[match[2]]; ok {
p.tsMap[patternName][match[1]] = f // built-in time format
p.tsMap[patternName][match[1]] = layout
} else { } else {
// custom time format
p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`)
} }
hasTimestamp = true hasTimestamp = true

View File

@ -38,32 +38,6 @@ func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) {
benchM = m benchM = m
} }
func Benchmark_ParseLine_InfluxLog(b *testing.B) {
p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
}
p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[httpd] 192.168.1.1 - - [14/Jun/2016:11:33:29 +0100] "POST /write?consistency=any&db=telegraf&precision=ns&rp= HTTP/1.1" 204 0 "-" "InfluxDBClient" 6f61bc44-321b-11e6-8050-000000000000 2513`)
}
benchM = m
}
func Benchmark_ParseLine_InfluxLog_NoMatch(b *testing.B) {
p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"},
}
p.Compile()
var m telegraf.Metric
for n := 0; n < b.N; n++ {
m, _ = p.ParseLine(`[retention] 2016/06/14 14:38:24 retention policy shard deletion check commencing`)
}
benchM = m
}
func Benchmark_ParseLine_CustomPattern(b *testing.B) { func Benchmark_ParseLine_CustomPattern(b *testing.B) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
@ -108,9 +82,9 @@ func TestMeasurementName(t *testing.T) {
assert.Equal(t, "my_web_log", m.Name()) assert.Equal(t, "my_web_log", m.Name())
} }
func TestBuiltinInfluxdbHttpd(t *testing.T) { func TestCustomInfluxdbHttpd(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, Patterns: []string{`\[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}`},
} }
assert.NoError(t, p.Compile()) assert.NoError(t, p.Compile())
@ -333,6 +307,55 @@ func TestParseEpochErrors(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestParseGenericTimestamp(t *testing.T) {
p := &Parser{
Patterns: []string{`\[%{HTTPDATE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[09/Jun/2016:03:37:03 +0000] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(1465443423, 0).UTC(), metricA.Time().UTC())
metricB, err := p.ParseLine(`[09/Jun/2016:03:37:04 +0000] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricB)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricB.Fields())
assert.Equal(t, map[string]string{}, metricB.Tags())
assert.Equal(t, time.Unix(1465443424, 0).UTC(), metricB.Time().UTC())
}
func TestParseGenericTimestampNotFound(t *testing.T) {
p := &Parser{
Patterns: []string{`\[%{NOTSPACE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`},
}
assert.NoError(t, p.Compile())
metricA, err := p.ParseLine(`[foobar] response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
}
func TestCompileFileAndParse(t *testing.T) { func TestCompileFileAndParse(t *testing.T) {
p := &Parser{ p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},

View File

@ -55,15 +55,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE}
# Wider-ranging username matching vs. logstash built-in %{USER} # Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME} NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
## ##
## COMMON LOG PATTERNS ## COMMON LOG PATTERNS
## ##
# InfluxDB log patterns
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)

View File

@ -51,15 +51,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE}
# Wider-ranging username matching vs. logstash built-in %{USER} # Wider-ranging username matching vs. logstash built-in %{USER}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME} NGUSER %{NGUSERNAME}
# Wider-ranging client IP matching
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
## ##
## COMMON LOG PATTERNS ## COMMON LOG PATTERNS
## ##
# InfluxDB log patterns
CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1)
INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}
# apache & nginx logs, this is also known as the "common log format" # apache & nginx logs, this is also known as the "common log format"
# see https://en.wikipedia.org/wiki/Common_Log_Format # see https://en.wikipedia.org/wiki/Common_Log_Format
COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-)

View File

@ -45,7 +45,7 @@ const sampleConfig = `
## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file ## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/influxdb/influxdb.log"] files = ["/var/log/apache/access.log"]
## Read file from beginning. ## Read file from beginning.
from_beginning = false from_beginning = false
@ -58,9 +58,9 @@ const sampleConfig = `
## Other common built-in patterns are: ## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ["%{INFLUXDB_HTTPD_LOG}"] patterns = ["%{COMBINED_LOG_FORMAT}"]
## Name of the outputted measurement name. ## Name of the outputted measurement name.
measurement = "influxdb_log" measurement = "apache_access_log"
## Full path(s) to custom pattern files. ## Full path(s) to custom pattern files.
custom_pattern_files = [] custom_pattern_files = []
## Custom patterns can also be defined here. Put one pattern per line. ## Custom patterns can also be defined here. Put one pattern per line.

View File

@ -17,6 +17,8 @@ func TestTailFromBeginning(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "") tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(tmpfile.Name()) defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
tt := NewTail() tt := NewTail()
tt.FromBeginning = true tt.FromBeginning = true
@ -28,12 +30,10 @@ func TestTailFromBeginning(t *testing.T) {
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
time.Sleep(time.Millisecond * 100)
_, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n")
require.NoError(t, err)
require.NoError(t, tt.Gather(&acc)) require.NoError(t, tt.Gather(&acc))
// arbitrary sleep to wait for message to show up // arbitrary sleep to wait for message to show up
time.Sleep(time.Millisecond * 250) time.Sleep(time.Millisecond * 150)
acc.AssertContainsTaggedFields(t, "cpu", acc.AssertContainsTaggedFields(t, "cpu",
map[string]interface{}{ map[string]interface{}{

View File

@ -158,7 +158,6 @@ func (t *TcpListener) tcpListen() error {
if err != nil { if err != nil {
return err return err
} }
// log.Printf("Received TCP Connection from %s", conn.RemoteAddr())
select { select {
case <-t.accept: case <-t.accept:
@ -194,7 +193,6 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
defer func() { defer func() {
t.wg.Done() t.wg.Done()
conn.Close() conn.Close()
// log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
// Add one connection potential back to channel when this one closes // Add one connection potential back to channel when this one closes
t.accept <- true t.accept <- true
t.forget(id) t.forget(id)
@ -239,14 +237,19 @@ func (t *TcpListener) tcpParser() error {
for { for {
select { select {
case <-t.done: case <-t.done:
// drain input packets before finishing:
if len(t.in) == 0 {
return nil return nil
}
case packet = <-t.in: case packet = <-t.in:
if len(packet) == 0 { if len(packet) == 0 {
continue continue
} }
metrics, err = t.parser.Parse(packet) metrics, err = t.parser.Parse(packet)
if err == nil { if err == nil {
t.storeMetrics(metrics) for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
t.malformed++ t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 { if t.malformed == 1 || t.malformed%1000 == 0 {
@ -257,15 +260,6 @@ func (t *TcpListener) tcpParser() error {
} }
} }
func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
// forget a TCP connection // forget a TCP connection
func (t *TcpListener) forget(id string) { func (t *TcpListener) forget(id string) {
t.cleanup.Lock() t.cleanup.Lock()

View File

@ -37,6 +37,62 @@ func newTestTcpListener() (*TcpListener, chan []byte) {
return listener, in return listener, in
} }
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := TcpListener{
ServiceAddress: ":8198",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
for n := 0; n < b.N; n++ {
err := listener.Start(acc)
if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8198")
if err != nil {
panic(err)
}
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
// wait for 100,000 metrics to get added to accumulator
time.Sleep(time.Millisecond)
listener.Stop()
}
}
func TestHighTrafficTCP(t *testing.T) {
listener := TcpListener{
ServiceAddress: ":8199",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("tcp", "127.0.0.1:8199")
require.NoError(t, err)
for i := 0; i < 100000; i++ {
fmt.Fprintf(conn, testMsg)
}
time.Sleep(time.Millisecond)
listener.Stop()
assert.Equal(t, 100000, len(acc.Metrics))
}
func TestConnectTCP(t *testing.T) { func TestConnectTCP(t *testing.T) {
listener := TcpListener{ listener := TcpListener{
ServiceAddress: ":8194", ServiceAddress: ":8194",

View File

@ -3,8 +3,8 @@ package udp_listener
import ( import (
"log" "log"
"net" "net"
"strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -99,9 +99,11 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
} }
func (u *UdpListener) Stop() { func (u *UdpListener) Stop() {
u.Lock()
defer u.Unlock()
close(u.done) close(u.done)
u.listener.Close()
u.wg.Wait() u.wg.Wait()
u.listener.Close()
close(u.in) close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress) log.Println("Stopped UDP listener service on ", u.ServiceAddress)
} }
@ -122,9 +124,13 @@ func (u *UdpListener) udpListen() error {
case <-u.done: case <-u.done:
return nil return nil
default: default:
u.listener.SetReadDeadline(time.Now().Add(time.Second))
n, _, err := u.listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") { if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
} else {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR: %s\n", err.Error())
}
continue continue
} }
bufCopy := make([]byte, n) bufCopy := make([]byte, n)
@ -151,11 +157,15 @@ func (u *UdpListener) udpParser() error {
for { for {
select { select {
case <-u.done: case <-u.done:
if len(u.in) == 0 {
return nil return nil
}
case packet = <-u.in: case packet = <-u.in:
metrics, err = u.parser.Parse(packet) metrics, err = u.parser.Parse(packet)
if err == nil { if err == nil {
u.storeMetrics(metrics) for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else { } else {
u.malformed++ u.malformed++
if u.malformed == 1 || u.malformed%1000 == 0 { if u.malformed == 1 || u.malformed%1000 == 0 {
@ -166,15 +176,6 @@ func (u *UdpListener) udpParser() error {
} }
} }
func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
u.Lock()
defer u.Unlock()
for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}
func init() { func init() {
inputs.Add("udp_listener", func() telegraf.Input { inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{} return &UdpListener{}

View File

@ -1,20 +1,36 @@
package udp_listener package udp_listener
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
testMsgs = `
cpu_load_short,host=server02 value=12.0 1422568543702900257
cpu_load_short,host=server03 value=12.0 1422568543702900257
cpu_load_short,host=server04 value=12.0 1422568543702900257
cpu_load_short,host=server05 value=12.0 1422568543702900257
cpu_load_short,host=server06 value=12.0 1422568543702900257
`
) )
func newTestUdpListener() (*UdpListener, chan []byte) { func newTestUdpListener() (*UdpListener, chan []byte) {
in := make(chan []byte, 1500) in := make(chan []byte, 1500)
listener := &UdpListener{ listener := &UdpListener{
ServiceAddress: ":8125", ServiceAddress: ":8125",
UDPPacketSize: 1500,
AllowedPendingMessages: 10000, AllowedPendingMessages: 10000,
in: in, in: in,
done: make(chan struct{}), done: make(chan struct{}),
@ -22,6 +38,72 @@ func newTestUdpListener() (*UdpListener, chan []byte) {
return listener, in return listener, in
} }
func TestHighTrafficUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8126",
AllowedPendingMessages: 100000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
// send multiple messages to socket
err := listener.Start(acc)
require.NoError(t, err)
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8126")
require.NoError(t, err)
for i := 0; i < 20000; i++ {
// arbitrary, just to give the OS buffer some slack handling the
// packet storm.
time.Sleep(time.Microsecond)
fmt.Fprintf(conn, testMsgs)
}
time.Sleep(time.Millisecond)
listener.Stop()
// this is not an exact science, since UDP packets can easily get lost or
// dropped, but assume that the OS will be able to
// handle at least 90% of the sent UDP packets.
assert.InDelta(t, 100000, len(acc.Metrics), 10000)
}
func TestConnectUDP(t *testing.T) {
listener := UdpListener{
ServiceAddress: ":8127",
AllowedPendingMessages: 10000,
}
listener.parser, _ = parsers.NewInfluxParser()
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
time.Sleep(time.Millisecond * 25)
conn, err := net.Dial("udp", "127.0.0.1:8127")
require.NoError(t, err)
// send single message to socket
fmt.Fprintf(conn, testMsg)
time.Sleep(time.Millisecond * 15)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
// send multiple messages to socket
fmt.Fprintf(conn, testMsgs)
time.Sleep(time.Millisecond * 15)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
func TestRunParser(t *testing.T) { func TestRunParser(t *testing.T) {
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257") var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257")

View File

@ -83,29 +83,17 @@ targets = {
} }
supported_builds = { supported_builds = {
"darwin": [ "amd64" ],
"windows": [ "amd64" ], "windows": [ "amd64" ],
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ], "linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
"freebsd": [ "amd64" ] "freebsd": [ "amd64" ]
} }
supported_packages = { supported_packages = {
"darwin": [ "tar" ],
"linux": [ "deb", "rpm", "tar" ], "linux": [ "deb", "rpm", "tar" ],
"windows": [ "zip" ], "windows": [ "zip" ],
"freebsd": [ "tar" ] "freebsd": [ "tar" ]
} }
supported_tags = {
# "linux": {
# "amd64": ["sensors"]
# }
}
prereq_cmds = {
# "linux": "sudo apt-get install lm-sensors libsensors4-dev"
}
################ ################
#### Telegraf Functions #### Telegraf Functions
################ ################

View File

@ -15,10 +15,9 @@ function disable_chkconfig {
rm -f /etc/init.d/telegraf rm -f /etc/init.d/telegraf
} }
if [[ -f /etc/redhat-release ]]; then if [[ "$1" == "0" ]]; then
# RHEL-variant logic # RHEL and any distribution that follow RHEL, Amazon Linux covered
if [[ "$1" = "0" ]]; then # telegraf is no longer installed, remove from init system
# InfluxDB is no longer installed, remove from init system
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
which systemctl &>/dev/null which systemctl &>/dev/null
@ -28,10 +27,8 @@ if [[ -f /etc/redhat-release ]]; then
# Assuming sysv # Assuming sysv
disable_chkconfig disable_chkconfig
fi fi
fi elif [ "$1" == "remove" -o "$1" == "purge" ]; then
elif [[ -f /etc/debian_version ]]; then
# Debian/Ubuntu logic # Debian/Ubuntu logic
if [[ "$1" != "upgrade" ]]; then
# Remove/purge # Remove/purge
rm -f /etc/default/telegraf rm -f /etc/default/telegraf
@ -43,4 +40,3 @@ elif [[ -f /etc/debian_version ]]; then
disable_update_rcd disable_update_rcd
fi fi
fi fi
fi

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -28,6 +29,8 @@ type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
nMetrics uint64
Discard bool
Errors []error Errors []error
debug bool debug bool
} }
@ -43,6 +46,10 @@ func (a *Accumulator) Add(
a.AddFields(measurement, fields, tags, t...) a.AddFields(measurement, fields, tags, t...)
} }
func (a *Accumulator) NMetrics() uint64 {
return atomic.LoadUint64(&a.nMetrics)
}
// AddFields adds a measurement point with a specified timestamp. // AddFields adds a measurement point with a specified timestamp.
func (a *Accumulator) AddFields( func (a *Accumulator) AddFields(
measurement string, measurement string,
@ -50,6 +57,10 @@ func (a *Accumulator) AddFields(
tags map[string]string, tags map[string]string,
timestamp ...time.Time, timestamp ...time.Time,
) { ) {
atomic.AddUint64(&a.nMetrics, 1)
if a.Discard {
return
}
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
if tags == nil { if tags == nil {