diff --git a/CHANGELOG.md b/CHANGELOG.md index fe9606e7d..492bfc561 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. - [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio. +- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() ### Bugfixes @@ -12,6 +13,7 @@ - [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes. - [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters. - [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does). +- [#1379](https://github.com/influxdata/telegraf/issues/1379): Fix covering Amazon Linux for post remove flow. ## v1.0 beta 3 [2016-07-18] @@ -63,7 +65,6 @@ should now look like: - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. -- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() - [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data ### Bugfixes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f02f109fd..a639e91f9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -11,6 +11,7 @@ Output plugins READMEs are less structured, but any information you can provide on how the data will look is appreciated. See the [OpenTSDB output](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) for a good example. +1. **Optional:** Write a [tickscript](https://docs.influxdata.com/kapacitor/v1.0/tick/syntax/) for your plugin and add it to [Kapacitor](https://github.com/influxdata/kapacitor/tree/master/examples/telegraf). Or mention @jackzampolin in a PR comment with some common queries that you would want to alert on and he will write one for you. ## GoDoc diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 6681ad073..e3398511a 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -39,12 +39,6 @@ var fOutputList = flag.Bool("output-list", false, "print available output plugins.") var fUsage = flag.String("usage", "", "print usage for a plugin, ie, 'telegraf -usage mysql'") -var fInputFiltersLegacy = flag.String("filter", "", - "filter the inputs to enable, separator is :") -var fOutputFiltersLegacy = flag.String("outputfilter", "", - "filter the outputs to enable, separator is :") -var fConfigDirectoryLegacy = flag.String("configdirectory", "", - "directory containing additional *.conf files") // Telegraf version, populated linker. // ie, -ldflags "-X main.version=`git describe --always --tags`" @@ -110,24 +104,11 @@ func main() { args := flag.Args() var inputFilters []string - if *fInputFiltersLegacy != "" { - fmt.Printf("WARNING '--filter' flag is deprecated, please use" + - " '--input-filter'") - inputFilter := strings.TrimSpace(*fInputFiltersLegacy) - inputFilters = strings.Split(":"+inputFilter+":", ":") - } if *fInputFilters != "" { inputFilter := strings.TrimSpace(*fInputFilters) inputFilters = strings.Split(":"+inputFilter+":", ":") } - var outputFilters []string - if *fOutputFiltersLegacy != "" { - fmt.Printf("WARNING '--outputfilter' flag is deprecated, please use" + - " '--output-filter'") - outputFilter := strings.TrimSpace(*fOutputFiltersLegacy) - outputFilters = strings.Split(":"+outputFilter+":", ":") - } if *fOutputFilters != "" { outputFilter := strings.TrimSpace(*fOutputFilters) outputFilters = strings.Split(":"+outputFilter+":", ":") @@ -145,34 +126,28 @@ func main() { } } - if *fOutputList { + // switch for flags which just do something and exit immediately + switch { + case *fOutputList: fmt.Println("Available Output Plugins:") for k, _ := range outputs.Outputs { fmt.Printf(" %s\n", k) } return - } - - if *fInputList { + case *fInputList: fmt.Println("Available Input Plugins:") for k, _ := range inputs.Inputs { fmt.Printf(" %s\n", k) } return - } - - if *fVersion { + case *fVersion: v := fmt.Sprintf("Telegraf - version %s", version) fmt.Println(v) return - } - - if *fSampleConfig { + case *fSampleConfig: config.PrintSampleConfig(inputFilters, outputFilters) return - } - - if *fUsage != "" { + case *fUsage != "": if err := config.PrintInputConfig(*fUsage); err != nil { if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { log.Fatalf("%s and %s", err, err2) @@ -191,15 +166,6 @@ func main() { os.Exit(1) } - if *fConfigDirectoryLegacy != "" { - fmt.Printf("WARNING '--configdirectory' flag is deprecated, please use" + - " '--config-directory'") - err = c.LoadDirectory(*fConfigDirectoryLegacy) - if err != nil { - log.Fatal(err) - } - } - if *fConfigDirectory != "" { err = c.LoadDirectory(*fConfigDirectory) if err != nil { diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 5189d2e3f..338aa1b68 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -1577,7 +1577,7 @@ # ## /var/log/**.log -> recursively find all .log files in /var/log # ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log # ## /var/log/apache.log -> only tail the apache log file -# files = ["/var/log/influxdb/influxdb.log"] +# files = ["/var/log/apache/access.log"] # ## Read file from beginning. # from_beginning = false # @@ -1590,9 +1590,9 @@ # ## Other common built-in patterns are: # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) -# patterns = ["%{INFLUXDB_HTTPD_LOG}"] +# patterns = ["%{COMBINED_LOG_FORMAT}"] # ## Name of the outputted measurement name. -# measurement = "influxdb_log" +# measurement = "apache_access_log" # ## Full path(s) to custom pattern files. # custom_pattern_files = [] # ## Custom patterns can also be defined here. Put one pattern per line. diff --git a/plugins/inputs/logparser/README.md b/plugins/inputs/logparser/README.md index 64e8909f5..1affcd811 100644 --- a/plugins/inputs/logparser/README.md +++ b/plugins/inputs/logparser/README.md @@ -14,17 +14,22 @@ regex patterns. ## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/apache.log -> only tail the apache log file - files = ["/var/log/influxdb/influxdb.log"] + files = ["/var/log/apache/access.log"] ## Read file from beginning. from_beginning = false ## Parse logstash-style "grok" patterns: - ## Telegraf builtin parsing patterns: https://goo.gl/dkay10 + ## Telegraf built-in parsing patterns: https://goo.gl/dkay10 [inputs.logparser.grok] ## This is a list of patterns to check the given log file(s) for. ## Note that adding patterns here increases processing time. The most - ## efficient configuration is to have one file & pattern per logparser. - patterns = ["%{INFLUXDB_HTTPD_LOG}"] + ## efficient configuration is to have one pattern per logparser. + ## Other common built-in patterns are: + ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) + ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) + patterns = ["%{COMBINED_LOG_FORMAT}"] + ## Name of the outputted measurement name. + measurement = "apache_access_log" ## Full path(s) to custom pattern files. custom_pattern_files = [] ## Custom patterns can also be defined here. Put one pattern per line. @@ -32,8 +37,6 @@ regex patterns. ''' ``` -> **Note:** The InfluxDB log pattern in the default configuration only works for Influx versions 1.0.0-beta1 or higher. - ## Grok Parser The grok parser uses a slightly modified version of logstash "grok" patterns, @@ -69,6 +72,7 @@ Timestamp modifiers can be used to convert captures to the timestamp of the - tag (converts the field into a tag) - drop (drops the field completely) - Timestamp modifiers: + - ts (This will auto-learn the timestamp format) - ts-ansic ("Mon Jan _2 15:04:05 2006") - ts-unix ("Mon Jan _2 15:04:05 MST 2006") - ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index d8691d7b9..70b759826 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -15,7 +15,7 @@ import ( "github.com/influxdata/telegraf" ) -var timeFormats = map[string]string{ +var timeLayouts = map[string]string{ "ts-ansic": "Mon Jan _2 15:04:05 2006", "ts-unix": "Mon Jan _2 15:04:05 MST 2006", "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", @@ -27,27 +27,33 @@ var timeFormats = map[string]string{ "ts-rfc3339": "2006-01-02T15:04:05Z07:00", "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", "ts-httpd": "02/Jan/2006:15:04:05 -0700", - "ts-epoch": "EPOCH", - "ts-epochnano": "EPOCH_NANO", + // These three are not exactly "layouts", but they are special cases that + // will get handled in the ParseLine function. + "ts-epoch": "EPOCH", + "ts-epochnano": "EPOCH_NANO", + "ts": "GENERIC_TIMESTAMP", // try parsing all known timestamp layouts. } const ( - INT = "int" - TAG = "tag" - FLOAT = "float" - STRING = "string" - DURATION = "duration" - DROP = "drop" + INT = "int" + TAG = "tag" + FLOAT = "float" + STRING = "string" + DURATION = "duration" + DROP = "drop" + EPOCH = "EPOCH" + EPOCH_NANO = "EPOCH_NANO" + GENERIC_TIMESTAMP = "GENERIC_TIMESTAMP" ) var ( - // matches named captures that contain a type. + // matches named captures that contain a modifier. // ie, // %{NUMBER:bytes:int} // %{IPORHOST:clientip:tag} // %{HTTPDATE:ts1:ts-http} // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} - typedRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) + modifierRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) // matches a plain pattern name. ie, %{NUMBER} patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) ) @@ -87,6 +93,12 @@ type Parser struct { // "RESPONSE_CODE": "%{NUMBER:rc:tag}" // } patterns map[string]string + // foundTsLayouts is a slice of timestamp patterns that have been found + // in the log lines. This slice gets updated if the user uses the generic + // 'ts' modifier for timestamps. This slice is checked first for matches, + // so that previously-matched layouts get priority over all other timestamp + // layouts. + foundTsLayouts []string g *grok.Grok tsModder *tsModder @@ -140,6 +152,7 @@ func (p *Parser) Compile() error { func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { var err error + // values are the parsed fields from the log line var values map[string]string // the matching pattern string var patternName string @@ -165,6 +178,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { continue } + // t is the modifier of the field var t string // check if pattern has some modifiers if types, ok := p.typeMap[patternName]; ok { @@ -210,20 +224,50 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { tags[k] = v case STRING: fields[k] = strings.Trim(v, `"`) - case "EPOCH": + case EPOCH: iv, err := strconv.ParseInt(v, 10, 64) if err != nil { log.Printf("ERROR parsing %s to int: %s", v, err) } else { timestamp = time.Unix(iv, 0) } - case "EPOCH_NANO": + case EPOCH_NANO: iv, err := strconv.ParseInt(v, 10, 64) if err != nil { log.Printf("ERROR parsing %s to int: %s", v, err) } else { timestamp = time.Unix(0, iv) } + case GENERIC_TIMESTAMP: + var foundTs bool + // first try timestamp layouts that we've already found + for _, layout := range p.foundTsLayouts { + ts, err := time.Parse(layout, v) + if err == nil { + timestamp = ts + foundTs = true + break + } + } + // if we haven't found a timestamp layout yet, try all timestamp + // layouts. + if !foundTs { + for _, layout := range timeLayouts { + ts, err := time.Parse(layout, v) + if err == nil { + timestamp = ts + foundTs = true + p.foundTsLayouts = append(p.foundTsLayouts, layout) + break + } + } + } + // if we still haven't found a timestamp layout, log it and we will + // just use time.Now() + if !foundTs { + log.Printf("ERROR parsing timestamp [%s], could not find any "+ + "suitable time layouts.", v) + } case DROP: // goodbye! default: @@ -267,7 +311,7 @@ func (p *Parser) compileCustomPatterns() error { // check if pattern contains modifiers. Parse them out if it does. for name, pattern := range p.patterns { - if typedRe.MatchString(pattern) { + if modifierRe.MatchString(pattern) { // this pattern has modifiers, so parse out the modifiers pattern, err = p.parseTypedCaptures(name, pattern) if err != nil { @@ -280,13 +324,13 @@ func (p *Parser) compileCustomPatterns() error { return p.g.AddPatternsFromMap(p.patterns) } -// parseTypedCaptures parses the capture types, and then deletes the type from -// the line so that it is a valid "grok" pattern again. +// parseTypedCaptures parses the capture modifiers, and then deletes the +// modifier from the line so that it is a valid "grok" pattern again. // ie, // %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) // %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { - matches := typedRe.FindAllStringSubmatch(pattern, -1) + matches := modifierRe.FindAllStringSubmatch(pattern, -1) // grab the name of the capture pattern patternName := "%{" + name + "}" @@ -298,16 +342,18 @@ func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { hasTimestamp := false for _, match := range matches { // regex capture 1 is the name of the capture - // regex capture 2 is the type of the capture - if strings.HasPrefix(match[2], "ts-") { + // regex capture 2 is the modifier of the capture + if strings.HasPrefix(match[2], "ts") { if hasTimestamp { return pattern, fmt.Errorf("logparser pattern compile error: "+ "Each pattern is allowed only one named "+ "timestamp data type. pattern: %s", pattern) } - if f, ok := timeFormats[match[2]]; ok { - p.tsMap[patternName][match[1]] = f + if layout, ok := timeLayouts[match[2]]; ok { + // built-in time format + p.tsMap[patternName][match[1]] = layout } else { + // custom time format p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) } hasTimestamp = true diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 295f32609..bc8d980f2 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -38,32 +38,6 @@ func Benchmark_ParseLine_CombinedLogFormat(b *testing.B) { benchM = m } -func Benchmark_ParseLine_InfluxLog(b *testing.B) { - p := &Parser{ - Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, - } - p.Compile() - - var m telegraf.Metric - for n := 0; n < b.N; n++ { - m, _ = p.ParseLine(`[httpd] 192.168.1.1 - - [14/Jun/2016:11:33:29 +0100] "POST /write?consistency=any&db=telegraf&precision=ns&rp= HTTP/1.1" 204 0 "-" "InfluxDBClient" 6f61bc44-321b-11e6-8050-000000000000 2513`) - } - benchM = m -} - -func Benchmark_ParseLine_InfluxLog_NoMatch(b *testing.B) { - p := &Parser{ - Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, - } - p.Compile() - - var m telegraf.Metric - for n := 0; n < b.N; n++ { - m, _ = p.ParseLine(`[retention] 2016/06/14 14:38:24 retention policy shard deletion check commencing`) - } - benchM = m -} - func Benchmark_ParseLine_CustomPattern(b *testing.B) { p := &Parser{ Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, @@ -108,9 +82,9 @@ func TestMeasurementName(t *testing.T) { assert.Equal(t, "my_web_log", m.Name()) } -func TestBuiltinInfluxdbHttpd(t *testing.T) { +func TestCustomInfluxdbHttpd(t *testing.T) { p := &Parser{ - Patterns: []string{"%{INFLUXDB_HTTPD_LOG}"}, + Patterns: []string{`\[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int}`}, } assert.NoError(t, p.Compile()) @@ -333,6 +307,55 @@ func TestParseEpochErrors(t *testing.T) { assert.NoError(t, err) } +func TestParseGenericTimestamp(t *testing.T) { + p := &Parser{ + Patterns: []string{`\[%{HTTPDATE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`[09/Jun/2016:03:37:03 +0000] response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) + assert.Equal(t, time.Unix(1465443423, 0).UTC(), metricA.Time().UTC()) + + metricB, err := p.ParseLine(`[09/Jun/2016:03:37:04 +0000] response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricB) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricB.Fields()) + assert.Equal(t, map[string]string{}, metricB.Tags()) + assert.Equal(t, time.Unix(1465443424, 0).UTC(), metricB.Time().UTC()) +} + +func TestParseGenericTimestampNotFound(t *testing.T) { + p := &Parser{ + Patterns: []string{`\[%{NOTSPACE:ts:ts}\] response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}`}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`[foobar] response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) +} + func TestCompileFileAndParse(t *testing.T) { p := &Parser{ Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, diff --git a/plugins/inputs/logparser/grok/influx_patterns.go b/plugins/inputs/logparser/grok/influx_patterns.go index 53be0e20d..ff9d60ebf 100644 --- a/plugins/inputs/logparser/grok/influx_patterns.go +++ b/plugins/inputs/logparser/grok/influx_patterns.go @@ -55,15 +55,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} # Wider-ranging username matching vs. logstash built-in %{USER} NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSER %{NGUSERNAME} +# Wider-ranging client IP matching +CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) ## ## COMMON LOG PATTERNS ## -# InfluxDB log patterns -CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) -INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int} - # apache & nginx logs, this is also known as the "common log format" # see https://en.wikipedia.org/wiki/Common_Log_Format COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) diff --git a/plugins/inputs/logparser/grok/patterns/influx-patterns b/plugins/inputs/logparser/grok/patterns/influx-patterns index 1db74a17a..6f4d81f89 100644 --- a/plugins/inputs/logparser/grok/patterns/influx-patterns +++ b/plugins/inputs/logparser/grok/patterns/influx-patterns @@ -51,15 +51,13 @@ EXAMPLE_LOG \[%{HTTPDATE:ts:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} # Wider-ranging username matching vs. logstash built-in %{USER} NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSER %{NGUSERNAME} +# Wider-ranging client IP matching +CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) ## ## COMMON LOG PATTERNS ## -# InfluxDB log patterns -CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) -INFLUXDB_HTTPD_LOG \[httpd\] %{COMBINED_LOG_FORMAT} %{UUID:uuid:drop} %{NUMBER:response_time_us:int} - # apache & nginx logs, this is also known as the "common log format" # see https://en.wikipedia.org/wiki/Common_Log_Format COMMON_LOG_FORMAT %{CLIENT:client_ip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:ts:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})" %{NUMBER:resp_code:tag} (?:%{NUMBER:resp_bytes:int}|-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 6b29ea031..8ded03edc 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -45,7 +45,7 @@ const sampleConfig = ` ## /var/log/**.log -> recursively find all .log files in /var/log ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log ## /var/log/apache.log -> only tail the apache log file - files = ["/var/log/influxdb/influxdb.log"] + files = ["/var/log/apache/access.log"] ## Read file from beginning. from_beginning = false @@ -58,9 +58,9 @@ const sampleConfig = ` ## Other common built-in patterns are: ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) - patterns = ["%{INFLUXDB_HTTPD_LOG}"] + patterns = ["%{COMBINED_LOG_FORMAT}"] ## Name of the outputted measurement name. - measurement = "influxdb_log" + measurement = "apache_access_log" ## Full path(s) to custom pattern files. custom_pattern_files = [] ## Custom patterns can also be defined here. Put one pattern per line. diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index f9f6bff28..31ecfbf30 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -17,6 +17,8 @@ func TestTailFromBeginning(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) + _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") + require.NoError(t, err) tt := NewTail() tt.FromBeginning = true @@ -28,12 +30,10 @@ func TestTailFromBeginning(t *testing.T) { acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) - - _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") - require.NoError(t, err) + time.Sleep(time.Millisecond * 100) require.NoError(t, tt.Gather(&acc)) // arbitrary sleep to wait for message to show up - time.Sleep(time.Millisecond * 250) + time.Sleep(time.Millisecond * 150) acc.AssertContainsTaggedFields(t, "cpu", map[string]interface{}{ diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index 4688e008b..b8bea2bd6 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -158,7 +158,6 @@ func (t *TcpListener) tcpListen() error { if err != nil { return err } - // log.Printf("Received TCP Connection from %s", conn.RemoteAddr()) select { case <-t.accept: @@ -194,7 +193,6 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { defer func() { t.wg.Done() conn.Close() - // log.Printf("Closed TCP Connection from %s", conn.RemoteAddr()) // Add one connection potential back to channel when this one closes t.accept <- true t.forget(id) @@ -239,14 +237,19 @@ func (t *TcpListener) tcpParser() error { for { select { case <-t.done: - return nil + // drain input packets before finishing: + if len(t.in) == 0 { + return nil + } case packet = <-t.in: if len(packet) == 0 { continue } metrics, err = t.parser.Parse(packet) if err == nil { - t.storeMetrics(metrics) + for _, m := range metrics { + t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } } else { t.malformed++ if t.malformed == 1 || t.malformed%1000 == 0 { @@ -257,15 +260,6 @@ func (t *TcpListener) tcpParser() error { } } -func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error { - t.Lock() - defer t.Unlock() - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - return nil -} - // forget a TCP connection func (t *TcpListener) forget(id string) { t.cleanup.Lock() diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index b4aec9dd2..f7e5784d3 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -37,6 +37,62 @@ func newTestTcpListener() (*TcpListener, chan []byte) { return listener, in } +// benchmark how long it takes to accept & process 100,000 metrics: +func BenchmarkTCP(b *testing.B) { + listener := TcpListener{ + ServiceAddress: ":8198", + AllowedPendingMessages: 100000, + MaxTCPConnections: 250, + } + listener.parser, _ = parsers.NewInfluxParser() + acc := &testutil.Accumulator{Discard: true} + + // send multiple messages to socket + for n := 0; n < b.N; n++ { + err := listener.Start(acc) + if err != nil { + panic(err) + } + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8198") + if err != nil { + panic(err) + } + for i := 0; i < 100000; i++ { + fmt.Fprintf(conn, testMsg) + } + // wait for 100,000 metrics to get added to accumulator + time.Sleep(time.Millisecond) + listener.Stop() + } +} + +func TestHighTrafficTCP(t *testing.T) { + listener := TcpListener{ + ServiceAddress: ":8199", + AllowedPendingMessages: 100000, + MaxTCPConnections: 250, + } + listener.parser, _ = parsers.NewInfluxParser() + acc := &testutil.Accumulator{} + + // send multiple messages to socket + err := listener.Start(acc) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8199") + require.NoError(t, err) + for i := 0; i < 100000; i++ { + fmt.Fprintf(conn, testMsg) + } + time.Sleep(time.Millisecond) + listener.Stop() + + assert.Equal(t, 100000, len(acc.Metrics)) +} + func TestConnectTCP(t *testing.T) { listener := TcpListener{ ServiceAddress: ":8194", diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 120ee50e5..fa773f624 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -3,8 +3,8 @@ package udp_listener import ( "log" "net" - "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -99,9 +99,11 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error { } func (u *UdpListener) Stop() { + u.Lock() + defer u.Unlock() close(u.done) - u.listener.Close() u.wg.Wait() + u.listener.Close() close(u.in) log.Println("Stopped UDP listener service on ", u.ServiceAddress) } @@ -122,9 +124,13 @@ func (u *UdpListener) udpListen() error { case <-u.done: return nil default: + u.listener.SetReadDeadline(time.Now().Add(time.Second)) n, _, err := u.listener.ReadFromUDP(buf) - if err != nil && !strings.Contains(err.Error(), "closed network") { - log.Printf("ERROR: %s\n", err.Error()) + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + } else { + log.Printf("ERROR: %s\n", err.Error()) + } continue } bufCopy := make([]byte, n) @@ -151,11 +157,15 @@ func (u *UdpListener) udpParser() error { for { select { case <-u.done: - return nil + if len(u.in) == 0 { + return nil + } case packet = <-u.in: metrics, err = u.parser.Parse(packet) if err == nil { - u.storeMetrics(metrics) + for _, m := range metrics { + u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } } else { u.malformed++ if u.malformed == 1 || u.malformed%1000 == 0 { @@ -166,15 +176,6 @@ func (u *UdpListener) udpParser() error { } } -func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error { - u.Lock() - defer u.Unlock() - for _, m := range metrics { - u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - return nil -} - func init() { inputs.Add("udp_listener", func() telegraf.Input { return &UdpListener{} diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go index bdbab318b..fa9980682 100644 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -1,20 +1,36 @@ package udp_listener import ( + "fmt" "io/ioutil" "log" + "net" "testing" "time" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" + + testMsgs = ` +cpu_load_short,host=server02 value=12.0 1422568543702900257 +cpu_load_short,host=server03 value=12.0 1422568543702900257 +cpu_load_short,host=server04 value=12.0 1422568543702900257 +cpu_load_short,host=server05 value=12.0 1422568543702900257 +cpu_load_short,host=server06 value=12.0 1422568543702900257 +` ) func newTestUdpListener() (*UdpListener, chan []byte) { in := make(chan []byte, 1500) listener := &UdpListener{ ServiceAddress: ":8125", - UDPPacketSize: 1500, AllowedPendingMessages: 10000, in: in, done: make(chan struct{}), @@ -22,6 +38,72 @@ func newTestUdpListener() (*UdpListener, chan []byte) { return listener, in } +func TestHighTrafficUDP(t *testing.T) { + listener := UdpListener{ + ServiceAddress: ":8126", + AllowedPendingMessages: 100000, + } + listener.parser, _ = parsers.NewInfluxParser() + acc := &testutil.Accumulator{} + + // send multiple messages to socket + err := listener.Start(acc) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("udp", "127.0.0.1:8126") + require.NoError(t, err) + for i := 0; i < 20000; i++ { + // arbitrary, just to give the OS buffer some slack handling the + // packet storm. + time.Sleep(time.Microsecond) + fmt.Fprintf(conn, testMsgs) + } + time.Sleep(time.Millisecond) + listener.Stop() + + // this is not an exact science, since UDP packets can easily get lost or + // dropped, but assume that the OS will be able to + // handle at least 90% of the sent UDP packets. + assert.InDelta(t, 100000, len(acc.Metrics), 10000) +} + +func TestConnectUDP(t *testing.T) { + listener := UdpListener{ + ServiceAddress: ":8127", + AllowedPendingMessages: 10000, + } + listener.parser, _ = parsers.NewInfluxParser() + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("udp", "127.0.0.1:8127") + require.NoError(t, err) + + // send single message to socket + fmt.Fprintf(conn, testMsg) + time.Sleep(time.Millisecond * 15) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + + // send multiple messages to socket + fmt.Fprintf(conn, testMsgs) + time.Sleep(time.Millisecond * 15) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } +} + func TestRunParser(t *testing.T) { log.SetOutput(ioutil.Discard) var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257") diff --git a/scripts/build.py b/scripts/build.py index 426aa87bb..77befd599 100755 --- a/scripts/build.py +++ b/scripts/build.py @@ -83,29 +83,17 @@ targets = { } supported_builds = { - "darwin": [ "amd64" ], "windows": [ "amd64" ], "linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ], "freebsd": [ "amd64" ] } supported_packages = { - "darwin": [ "tar" ], "linux": [ "deb", "rpm", "tar" ], "windows": [ "zip" ], "freebsd": [ "tar" ] } -supported_tags = { - # "linux": { - # "amd64": ["sensors"] - # } -} - -prereq_cmds = { - # "linux": "sudo apt-get install lm-sensors libsensors4-dev" -} - ################ #### Telegraf Functions ################ diff --git a/scripts/post-remove.sh b/scripts/post-remove.sh index 96b178f4d..0f262d225 100644 --- a/scripts/post-remove.sh +++ b/scripts/post-remove.sh @@ -15,32 +15,28 @@ function disable_chkconfig { rm -f /etc/init.d/telegraf } -if [[ -f /etc/redhat-release ]]; then - # RHEL-variant logic - if [[ "$1" = "0" ]]; then - # InfluxDB is no longer installed, remove from init system - rm -f /etc/default/telegraf - - which systemctl &>/dev/null - if [[ $? -eq 0 ]]; then - disable_systemd - else - # Assuming sysv - disable_chkconfig - fi +if [[ "$1" == "0" ]]; then + # RHEL and any distribution that follow RHEL, Amazon Linux covered + # telegraf is no longer installed, remove from init system + rm -f /etc/default/telegraf + + which systemctl &>/dev/null + if [[ $? -eq 0 ]]; then + disable_systemd + else + # Assuming sysv + disable_chkconfig fi -elif [[ -f /etc/debian_version ]]; then +elif [ "$1" == "remove" -o "$1" == "purge" ]; then # Debian/Ubuntu logic - if [[ "$1" != "upgrade" ]]; then - # Remove/purge - rm -f /etc/default/telegraf - - which systemctl &>/dev/null - if [[ $? -eq 0 ]]; then - disable_systemd - else - # Assuming sysv - disable_update_rcd - fi + # Remove/purge + rm -f /etc/default/telegraf + + which systemctl &>/dev/null + if [[ $? -eq 0 ]]; then + disable_systemd + else + # Assuming sysv + disable_update_rcd fi fi diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 598aa3155..62b765a3c 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "testing" "time" @@ -27,9 +28,11 @@ func (p *Metric) String() string { type Accumulator struct { sync.Mutex - Metrics []*Metric - Errors []error - debug bool + Metrics []*Metric + nMetrics uint64 + Discard bool + Errors []error + debug bool } // Add adds a measurement point to the accumulator @@ -43,6 +46,10 @@ func (a *Accumulator) Add( a.AddFields(measurement, fields, tags, t...) } +func (a *Accumulator) NMetrics() uint64 { + return atomic.LoadUint64(&a.nMetrics) +} + // AddFields adds a measurement point with a specified timestamp. func (a *Accumulator) AddFields( measurement string, @@ -50,6 +57,10 @@ func (a *Accumulator) AddFields( tags map[string]string, timestamp ...time.Time, ) { + atomic.AddUint64(&a.nMetrics, 1) + if a.Discard { + return + } a.Lock() defer a.Unlock() if tags == nil {