From b99298f851bd1a18b08ddb98f40a1a11209db48f Mon Sep 17 00:00:00 2001 From: Hiroaki Nakamura Date: Tue, 1 Mar 2016 19:34:41 +0900 Subject: [PATCH] Add Tail service input plugin and LTSV parser. --- Godeps | 3 + Godeps_windows | 3 + docs/DATA_FORMATS_INPUT.md | 153 ++++++++++ docs/LICENSE_OF_DEPENDENCIES.md | 1 + internal/config/config.go | 115 ++++++++ plugins/inputs/all/all.go | 1 + plugins/inputs/tail/README.md | 187 ++++++++++++ plugins/inputs/tail/tail.go | 247 ++++++++++++++++ plugins/inputs/tail/tail_test.go | 72 +++++ plugins/parsers/ltsv/parser.go | 218 ++++++++++++++ plugins/parsers/ltsv/parser_test.go | 432 ++++++++++++++++++++++++++++ plugins/parsers/registry.go | 99 ++++++- 12 files changed, 1529 insertions(+), 2 deletions(-) create mode 100644 plugins/inputs/tail/README.md create mode 100644 plugins/inputs/tail/tail.go create mode 100644 plugins/inputs/tail/tail_test.go create mode 100644 plugins/parsers/ltsv/parser.go create mode 100644 plugins/parsers/ltsv/parser_test.go diff --git a/Godeps b/Godeps index 089860ed5..f98a5d1b1 100644 --- a/Godeps +++ b/Godeps @@ -18,6 +18,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 +github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48 github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 @@ -50,5 +51,7 @@ golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172 golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 +gopkg.in/fsnotify.v1 8611c35ab31c1c28aa903d33cf8b6e44a399b09e +gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886 gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4 diff --git a/Godeps_windows b/Godeps_windows index dd46184ec..588c5b292 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -20,6 +20,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 +github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 @@ -52,5 +53,7 @@ golang.org/x/net 04b9de9b512f58addf28c9853d50ebef61c3953e golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 +gopkg.in/fsnotify.v1 8611c35ab31c1c28aa903d33cf8b6e44a399b09e +gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 79528a962..bd4ab677c 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -272,3 +272,156 @@ There are many more options available, "measurement*" ] ``` + +## LTSV: + +The [Labeled Tab-separated Values (LTSV)](http://ltsv.org/) data format translate a LTSV line into a measurement with _timestamp_, _fields_ and _tags_. For example, this line: + +``` +time:2016-03-06T09:24:12Z\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:false\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2 +``` + +Would get translate into _timestamp_, _fields_ and _tags_ of a measurement using the example configuration in the following section: + +``` +ltsv_example str1=value1,int1=23i,int2=34i,float1=1.23,bool1=true,bool2=false tag1=tval1,tag2=tval2,log_host=log.example.com 2016-03-06T09:24:12Z +``` + +### LTSV Configuration: + +The LTSV data format specifying the following configurations. + +- metric_name +- time_label +- time_format +- str_field_labels +- int_field_labels +- float_field_labels +- bool_field_labels +- tag_labels +- duplicate_points_modifier_method +- duplicate_points_modifier_uniq_tag + +For details, please see the comments in the following configuration example. + +```toml +[[inputs.tail]] + ## The measurement name + override_name = "nginx_access" + + ## A LTSV formatted log file path. + ## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV) + ## Here is an example config for nginx (http://nginx.org/en/). + ## + ## log_format ltsv 'time:$time_iso8601\t' + ## 'host:$host\t' + ## 'http_host:$http_host\t' + ## 'scheme:$scheme\t' + ## 'remote_addr:$remote_addr\t' + ## 'remote_user:$remote_user\t' + ## 'request:$request\t' + ## 'status:$status\t' + ## 'body_bytes_sent:$body_bytes_sent\t' + ## 'http_referer:$http_referer\t' + ## 'http_user_agent:$http_user_agent\t' + ## 'http_x_forwarded_for:$http_x_forwarded_for\t' + ## 'request_time:$request_time'; + ## access_log /var/log/nginx/access.ltsv.log ltsv; + ## + filename = "/var/log/nginx/access.ltsv.log" + + ## Seek to this location before tailing + seek_offset = 0 + + ## Seek from whence. See https://golang.org/pkg/os/#File.Seek + seek_whence = 0 + + ## Reopen recreated files (tail -F) + re_open = true + + ## Fail early if the file does not exist + must_exist = false + + ## Poll for file changes instead of using inotify + poll = false + + ## Set this to true if the file is a named pipe (mkfifo) + pipe = false + + ## Continue looking for new lines (tail -f) + follow = true + + ## If non-zero, split longer lines into multiple lines + max_line_size = 0 + + ## Set this false to enable logging to stderr, true to disable logging + disable_logging = false + + ## Data format to consume. Currently only "ltsv" is supported. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "ltsv" + + ## Time label to be used to create a timestamp for a measurement. + time_label = "time" + + ## Time format for parsing timestamps. + ## Please see https://golang.org/pkg/time/#Parse for the format string. + time_format = "2006-01-02T15:04:05Z07:00" + + ## Labels for string fields. + str_field_labels = ["str1"] + + ## Labels for integer (64bit signed decimal integer) fields. + ## For acceptable integer values, please refer to: + ## https://golang.org/pkg/strconv/#ParseInt + int_field_labels = ["int1", "int2"] + + ## Labels for float (64bit float) fields. + ## For acceptable float values, please refer to: + ## https://golang.org/pkg/strconv/#ParseFloat + float_field_labels = ["float1"] + + ## Labels for boolean fields. + ## For acceptable boolean values, please refer to: + ## https://golang.org/pkg/strconv/#ParseBool + bool_field_labels = ["bool1", "bool2"] + + ## Labels for tags to be added + tag_labels = ["tag1", "tag2"] + + ## Method to modify duplicated measurement points. + ## Must be one of "add_uniq_tag", "increment_time", "no_op". + ## This will be used to modify duplicated points. + ## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points + ## NOTE: For modifier methods other than "no_op" to work correctly, the log lines + ## MUST be sorted by timestamps in ascending order. + duplicate_points_modifier_method = "add_uniq_tag" + + ## When duplicate_points_modifier_method is "increment_time", + ## this will be added to the time of the previous measurement + ## if the time of current time is equal to or less than the + ## time of the previous measurement. + ## + ## NOTE: You need to set this value equal to or greater than + ## precisions of your output plugins. Otherwise the times will + ## become the same value! + ## For the precision of the InfluxDB plugin, please see + ## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42 + ## For the duration string format, please see + ## https://golang.org/pkg/time/#ParseDuration + duplicate_points_increment_duration = "1us" + + ## When duplicate_points_modifier_method is "add_uniq_tag", + ## this will be the label of the tag to be added to ensure uniqueness of points. + ## NOTE: The uniq tag will be only added to the successive points of duplicated + ## points, it will not be added to the first point of duplicated points. + ## If you want to always add the uniq tag, add a tag with the same name as + ## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags]. + duplicate_points_modifier_uniq_tag = "uniq" + + ## Defaults tags to be added to measurements. + [inputs.tail.tags] + log_host = "log.example.com" +``` diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index c8f3b0926..099992379 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -16,6 +16,7 @@ - github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE) - github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE) - github.com/hashicorp/raft-boltdb [MPL LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE) +- github.com/hpcloud/tail [MIT LICENSE](https://github.com/hpcloud/tail/blob/master/LICENSE.txt) - github.com/lib/pq [MIT LICENSE](https://github.com/lib/pq/blob/master/LICENSE.md) - github.com/matttproud/golang_protobuf_extensions [APACHE LICENSE](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE) - github.com/naoina/go-stringutil [MIT LICENSE](https://github.com/naoina/go-stringutil/blob/master/LICENSE) diff --git a/internal/config/config.go b/internal/config/config.go index f64e0a56a..1076bac49 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -701,12 +701,127 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["time_label"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.TimeLabel = str.Value + } + } + } + + if node, ok := tbl.Fields["time_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.TimeFormat = str.Value + } + } + } + + if node, ok := tbl.Fields["str_field_labels"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.StrFieldLabels = append(c.StrFieldLabels, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["int_field_labels"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.IntFieldLabels = append(c.IntFieldLabels, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["float_field_labels"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.FloatFieldLabels = append(c.FloatFieldLabels, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["bool_field_labels"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.BoolFieldLabels = append(c.BoolFieldLabels, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["tag_labels"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.TagLabels = append(c.TagLabels, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["duplicate_points_modifier_method"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.DuplicatePointsModifierMethod = str.Value + } + } + } + + if node, ok := tbl.Fields["duplicate_points_increment_duration"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + c.DuplicatePointsIncrementDuration = dur + } + } + } + + if node, ok := tbl.Fields["duplicate_points_modifier_uniq_tag"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.DuplicatePointsModifierUniqTag = str.Value + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") delete(tbl.Fields, "separator") delete(tbl.Fields, "templates") delete(tbl.Fields, "tag_keys") + delete(tbl.Fields, "time_label") + delete(tbl.Fields, "time_format") + delete(tbl.Fields, "str_field_labels") + delete(tbl.Fields, "int_field_labels") + delete(tbl.Fields, "float_field_labels") + delete(tbl.Fields, "bool_field_labels") + delete(tbl.Fields, "tag_labels") + delete(tbl.Fields, "duplicate_points_modifier_method") + delete(tbl.Fields, "duplicate_points_increment_duration") + delete(tbl.Fields, "duplicate_points_modifier_uniq_tag") return parsers.NewParser(c) } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2808ce2b5..18b899bd4 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -47,6 +47,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/system" + _ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md new file mode 100644 index 000000000..4aad0bd06 --- /dev/null +++ b/plugins/inputs/tail/README.md @@ -0,0 +1,187 @@ +# Service Input Plugin: Tail + +The tail plugin gathers metrics by reading a log file. +It works like the BSD `tail` command and can keep reading when more logs are added. + +### Configuration: + +```toml +# Read a log file like the BSD tail command +[[inputs.ltsv_log]] + ## The measurement name + name_override = "nginx_access" + + ## A LTSV formatted log file path. + ## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV) + ## Here is an example config for nginx (http://nginx.org/en/). + ## + ## log_format ltsv 'time:$time_iso8601\t' + ## 'host:$host\t' + ## 'http_host:$http_host\t' + ## 'scheme:$scheme\t' + ## 'remote_addr:$remote_addr\t' + ## 'remote_user:$remote_user\t' + ## 'request:$request\t' + ## 'status:$status\t' + ## 'body_bytes_sent:$body_bytes_sent\t' + ## 'http_referer:$http_referer\t' + ## 'http_user_agent:$http_user_agent\t' + ## 'http_x_forwarded_for:$http_x_forwarded_for\t' + ## 'request_time:$request_time'; + ## access_log /var/log/nginx/access.ltsv.log ltsv; + ## + filename = "/var/log/nginx/access.ltsv.log" + + ## Seek to this location before tailing + seek_offset = 0 + + ## Seek from whence. See https://golang.org/pkg/os/#File.Seek + seek_whence = 0 + + ## Reopen recreated files (tail -F) + re_open = true + + ## Fail early if the file does not exist + must_exist = false + + ## Poll for file changes instead of using inotify + poll = false + + ## Set this to true if the file is a named pipe (mkfifo) + pipe = false + + ## Continue looking for new lines (tail -f) + follow = true + + ## If non-zero, split longer lines into multiple lines + max_line_size = 0 + + ## Set this false to enable logging to stderr, true to disable logging + disable_logging = false + + ## Data format to consume. Currently only "ltsv" is supported. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "ltsv" + + ## Time label to be used to create a timestamp for a measurement. + time_label = "time" + + ## Time format for parsing timestamps. + ## Please see https://golang.org/pkg/time/#Parse for the format string. + time_format = "2006-01-02T15:04:05Z07:00" + + ## Labels for string fields. + str_field_labels = [] + + ## Labels for integer (64bit signed decimal integer) fields. + ## For acceptable integer values, please refer to: + ## https://golang.org/pkg/strconv/#ParseInt + int_field_labels = ["body_bytes_sent"] + + ## Labels for float (64bit float) fields. + ## For acceptable float values, please refer to: + ## https://golang.org/pkg/strconv/#ParseFloat + float_field_labels = ["request_time"] + + ## Labels for boolean fields. + ## For acceptable boolean values, please refer to: + ## https://golang.org/pkg/strconv/#ParseBool + bool_field_labels = [] + + ## Labels for tags to be added + tag_labels = ["host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent", "http_x_forwarded_for"] + + ## Method to modify duplicated measurement points. + ## Must be one of "add_uniq_tag", "increment_time", "no_op". + ## This will be used to modify duplicated points. + ## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points + ## NOTE: For modifier methods other than "no_op" to work correctly, the log lines + ## MUST be sorted by timestamps in ascending order. + duplicate_points_modifier_method = "add_uniq_tag" + + ## When duplicate_points_modifier_method is "increment_time", + ## this will be added to the time of the previous measurement + ## if the time of current time is equal to or less than the + ## time of the previous measurement. + ## + ## NOTE: You need to set this value equal to or greater than + ## precisions of your output plugins. Otherwise the times will + ## become the same value! + ## For the precision of the InfluxDB plugin, please see + ## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42 + ## For the duration string format, please see + ## https://golang.org/pkg/time/#ParseDuration + duplicate_points_increment_duration = "1us" + + ## When duplicate_points_modifier_method is "add_uniq_tag", + ## this will be the label of the tag to be added to ensure uniqueness of points. + ## NOTE: The uniq tag will be only added to the successive points of duplicated + ## points, it will not be added to the first point of duplicated points. + ## If you want to always add the uniq tag, add a tag with the same name as + ## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags]. + duplicate_points_modifier_uniq_tag = "uniq" + + ## Defaults tags to be added to measurements. + [inputs.tail.tags] + log_host = "log.example.com" +``` + +### Tail plugin with LTSV parser + +#### Measurements & Fields: + +- measurement of the name specified in the config `measurement` value +- fields specified in the config `int_field_labels`, `float_field_labels`, `bool_field_labels`, and `str_field_labels` values. + +#### Tags: + +- tags specified in the config `inputs.tail.tags`, `duplicate_points_modifier_uniq_tag`, `tag_labels` values. + +#### Example Output: + +This is an example output with `duplicate_points_modifier_method = "add_uniq_tag"`. + +``` +[root@localhost bin]# sudo -u telegraf ./telegraf -config /etc/telegraf/telegraf.conf -input-filter tail -debug & for i in `seq 1 3`; do curl -s -o /dev/null localhost; done && sleep 1 && for i in `seq 1 2`; do curl -s -o /dv/null localhost; done +[1] 2894 +2016/03/05 19:04:35 Attempting connection to output: influxdb +2016/03/05 19:04:35 Successfully connected to output: influxdb +2016/03/05 19:04:35 Starting Telegraf (version 0.10.4.1-44-ga2a0d51) +2016/03/05 19:04:35 Loaded outputs: influxdb +2016/03/05 19:04:35 Loaded inputs: tail +2016/03/05 19:04:35 Tags enabled: host=localhost.localdomain +2016/03/05 19:04:35 Agent Config: Interval:5s, Debug:true, Quiet:false, Hostname:"localhost.localdomain", Flush Interval:10s +2016/03/05 19:04:35 Started a tail log reader, filename: /var/log/nginx/access.ltsv.log +2016/03/05 19:04:35 Seeked /var/log/nginx/access.ltsv.log - &{Offset:0 Whence:0} +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172275000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=1 body_bytes_sent=612i,request_time=0 1457172275000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=2 body_bytes_sent=612i,request_time=0 1457172275000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172276000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=1 body_bytes_sent=612i,request_time=0 1457172276000000000 +2016/03/05 19:04:40 Gathered metrics, (5s interval), from 1 inputs in 23.904µs +``` + + +This is an example output with `duplicate_points_modifier_method = "increment_time"` and `duplicate_points_increment_duration = "1ms"`. + +``` +[root@localhost bin]# sudo -u telegraf ./telegraf -config /etc/telegraf/telegraf.conf -input-filter tail -debug & for i in `seq 1 3`; do curl -s -o /dev/null localhost; done && sleep 1 && for i in `seq 1 2`; do curl -s -o /dv/null localhost; done +[1] 2845 +2016/03/05 19:03:13 Attempting connection to output: influxdb +2016/03/05 19:03:13 Successfully connected to output: influxdb +2016/03/05 19:03:13 Starting Telegraf (version 0.10.4.1-44-ga2a0d51) +2016/03/05 19:03:13 Loaded outputs: influxdb +2016/03/05 19:03:13 Loaded inputs: tail +2016/03/05 19:03:13 Tags enabled: host=localhost.localdomain +2016/03/05 19:03:13 Agent Config: Interval:5s, Debug:true, Quiet:false, Hostname:"localhost.localdomain", Flush Interval:10s +2016/03/05 19:03:13 Started a tail log reader, filename: /var/log/nginx/access.ltsv.log +2016/03/05 19:03:13 Seeked /var/log/nginx/access.ltsv.log - &{Offset:0 Whence:0} +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193001000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193002000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172194000000000 +> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172194001000000 +2016/03/05 19:03:15 Gathered metrics, (5s interval), from 1 inputs in 52.911µs +``` diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go new file mode 100644 index 000000000..ec169c5a8 --- /dev/null +++ b/plugins/inputs/tail/tail.go @@ -0,0 +1,247 @@ +package tail + +import ( + "sync" + + tailfile "github.com/hpcloud/tail" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +const sampleConfig = ` + ## The measurement name + name_override = "nginx_access" + + ## A LTSV formatted log file path. + ## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV) + ## Here is an example config for nginx (http://nginx.org/en/). + ## + ## log_format ltsv 'time:$time_iso8601\t' + ## 'host:$host\t' + ## 'http_host:$http_host\t' + ## 'scheme:$scheme\t' + ## 'remote_addr:$remote_addr\t' + ## 'remote_user:$remote_user\t' + ## 'request:$request\t' + ## 'status:$status\t' + ## 'body_bytes_sent:$body_bytes_sent\t' + ## 'http_referer:$http_referer\t' + ## 'http_user_agent:$http_user_agent\t' + ## 'http_x_forwarded_for:$http_x_forwarded_for\t' + ## 'request_time:$request_time'; + ## access_log /var/log/nginx/access.ltsv.log ltsv; + ## + filename = "/var/log/nginx/access.ltsv.log" + + ## Seek to this location before tailing + seek_offset = 0 + + ## Seek from whence. See https://golang.org/pkg/os/#File.Seek + seek_whence = 0 + + ## Reopen recreated files (tail -F) + re_open = true + + ## Fail early if the file does not exist + must_exist = false + + ## Poll for file changes instead of using inotify + poll = false + + ## Set this to true if the file is a named pipe (mkfifo) + pipe = false + + ## Continue looking for new lines (tail -f) + follow = true + + ## If non-zero, split longer lines into multiple lines + max_line_size = 0 + + ## Set this false to enable logging to stderr, true to disable logging + disable_logging = false + + ## Data format to consume. Currently only "ltsv" is supported. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "ltsv" + + ## Time label to be used to create a timestamp for a measurement. + time_label = "time" + + ## Time format for parsing timestamps. + ## Please see https://golang.org/pkg/time/#Parse for the format string. + time_format = "2006-01-02T15:04:05Z07:00" + + ## Labels for string fields. + str_field_labels = [] + + ## Labels for integer (64bit signed decimal integer) fields. + ## For acceptable integer values, please refer to: + ## https://golang.org/pkg/strconv/#ParseInt + int_field_labels = ["body_bytes_sent"] + + ## Labels for float (64bit float) fields. + ## For acceptable float values, please refer to: + ## https://golang.org/pkg/strconv/#ParseFloat + float_field_labels = ["request_time"] + + ## Labels for boolean fields. + ## For acceptable boolean values, please refer to: + ## https://golang.org/pkg/strconv/#ParseBool + bool_field_labels = [] + + ## Labels for tags to be added + tag_labels = ["host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent", "http_x_forwarded_for"] + + ## Method to modify duplicated measurement points. + ## Must be one of "add_uniq_tag", "increment_time", "no_op". + ## This will be used to modify duplicated points. + ## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points + ## NOTE: For modifier methods other than "no_op" to work correctly, the log lines + ## MUST be sorted by timestamps in ascending order. + duplicate_points_modifier_method = "add_uniq_tag" + + ## When duplicate_points_modifier_method is "increment_time", + ## this will be added to the time of the previous measurement + ## if the time of current time is equal to or less than the + ## time of the previous measurement. + ## + ## NOTE: You need to set this value equal to or greater than + ## precisions of your output plugins. Otherwise the times will + ## become the same value! + ## For the precision of the InfluxDB plugin, please see + ## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42 + ## For the duration string format, please see + ## https://golang.org/pkg/time/#ParseDuration + duplicate_points_increment_duration = "1us" + + ## When duplicate_points_modifier_method is "add_uniq_tag", + ## this will be the label of the tag to be added to ensure uniqueness of points. + ## NOTE: The uniq tag will be only added to the successive points of duplicated + ## points, it will not be added to the first point of duplicated points. + ## If you want to always add the uniq tag, add a tag with the same name as + ## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags]. + duplicate_points_modifier_uniq_tag = "uniq" + + ## Defaults tags to be added to measurements. + [inputs.tail.tags] + log_host = "log.example.com" +` + +type Tail struct { + Filename string + + // File-specfic + SeekOffset int64 // Seek to this location before tailing + SeekWhence int // Seek from whence. See https://golang.org/pkg/os/#File.Seek + ReOpen bool // Reopen recreated files (tail -F) + MustExist bool // Fail early if the file does not exist + Poll bool // Poll for file changes instead of using inotify + Pipe bool // Is a named pipe (mkfifo) + // TODO: Add configs for RateLimiter + + // Generic IO + Follow bool // Continue looking for new lines (tail -f) + MaxLineSize int // If non-zero, split longer lines into multiple lines + + DisableLogging bool // If false, logs are printed to stderr + + sync.Mutex + done chan struct{} + + acc telegraf.Accumulator + parser parsers.Parser + tail *tailfile.Tail +} + +func (t *Tail) SampleConfig() string { + return sampleConfig +} + +func (t *Tail) Description() string { + return "Read a log file like the BSD tail command" +} + +func (t *Tail) SetParser(parser parsers.Parser) { + t.parser = parser +} + +// Start a tail log reader. Caller must call *Tail.Stop() to clean up. +func (t *Tail) Start(acc telegraf.Accumulator) error { + t.Lock() + defer t.Unlock() + + t.acc = acc + t.done = make(chan struct{}) + + config := tailfile.Config{ + Location: &tailfile.SeekInfo{ + Offset: t.SeekOffset, + Whence: t.SeekWhence, + }, + ReOpen: t.ReOpen, + MustExist: t.MustExist, + Poll: t.Poll, + Pipe: t.Pipe, + Follow: t.Follow, + MaxLineSize: t.MaxLineSize, + } + if t.DisableLogging { + config.Logger = tailfile.DiscardingLogger + } + tf, err := tailfile.TailFile(t.Filename, config) + if err != nil { + return err + } + t.tail = tf + + // Start the log file reader + go t.receiver() + t.tail.Logger.Printf("Started a tail log reader, filename: %s\n", t.Filename) + + return nil +} + +func (t *Tail) receiver() { + for { + for line := range t.tail.Lines { + if err := line.Err; err != nil { + t.tail.Logger.Printf("error while reading from %s, error: %s\n", t.Filename, err.Error()) + } else { + metric, err := t.parser.ParseLine(line.Text) + if err != nil { + t.tail.Logger.Printf("error while parsing from %s, error: %s\n", t.Filename, err.Error()) + } + t.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + + select { + case <-t.done: + t.tail.Done() + return + default: + // Start reading lines again + } + } + } +} + +func (t *Tail) Stop() { + t.Lock() + close(t.done) + t.Unlock() +} + +// All the work is done in the Start() function, so this is just a dummy +// function. +func (t *Tail) Gather(_ telegraf.Accumulator) error { + return nil +} + +func init() { + inputs.Add("tail", func() telegraf.Input { + return &Tail{} + }) +} diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go new file mode 100644 index 000000000..33f01687d --- /dev/null +++ b/plugins/inputs/tail/tail_test.go @@ -0,0 +1,72 @@ +package tail + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +const sampleLog = "time:2016-03-03T13:58:57+00:00\thost:localhost\thttp_host:localhost\tscheme:http\tremote_addr:127.0.0.1\tremote_user:-\ttime_local:03/Mar/2016:13:58:57\t+0000\trequest:GET / HTTP/1.1\tstatus:200\tbody_bytes_sent:612\thttp_referer:-\thttp_user_agent:curl/7.29.0\thttp_x_forwarded_for:-\trequest_time:0.000\tupstream_response_time:-\tupstream_http_content_type:-\tupstream_status:-\tupstream_cache_status:-\n" + +func TestLtsvLogGeneratesMetrics(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "access.ltsv.log") + assert.NoError(t, err, "failed to create a temporary file") + defer os.Remove(tmpfile.Name()) + + _, err = tmpfile.WriteString(sampleLog) + assert.NoError(t, err, "failed to write logs a temporary file") + err = tmpfile.Close() + assert.NoError(t, err, "failed to close the temporary log file") + + metricName := "nginx_access" + config := &parsers.Config{ + DataFormat: "ltsv", + MetricName: metricName, + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05-07:00", + IntFieldLabels: []string{"body_bytes_sent"}, + FloatFieldLabels: []string{"request_time"}, + BoolFieldLabels: []string{}, + StrFieldLabels: []string{}, + TagLabels: []string{"host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent"}, + DuplicatePointsModifierMethod: "add_uniq_tag", + DuplicatePointsModifierUniqTag: "uniq", + } + parser, err := parsers.NewParser(config) + assert.NoError(t, err) + + reader := &Tail{ + Filename: tmpfile.Name(), + ReOpen: true, + Follow: true, + DisableLogging: true, + parser: parser, + } + var acc testutil.Accumulator + reader.Start(&acc) + // NOTE: Wait for the tail reader process the log line. + time.Sleep(time.Duration(100) * time.Millisecond) + reader.Stop() + + fields := map[string]interface{}{ + "body_bytes_sent": int64(612), + "request_time": 0.0, + } + tags := map[string]string{ + "host": "localhost", + "http_host": "localhost", + "scheme": "http", + "remote_addr": "127.0.0.1", + "remote_user": "-", + "request": "GET / HTTP/1.1", + "status": "200", + "http_referer": "-", + "http_user_agent": "curl/7.29.0", + } + acc.AssertContainsTaggedFields(t, metricName, fields, tags) +} diff --git a/plugins/parsers/ltsv/parser.go b/plugins/parsers/ltsv/parser.go new file mode 100644 index 000000000..e12894c47 --- /dev/null +++ b/plugins/parsers/ltsv/parser.go @@ -0,0 +1,218 @@ +package ltsv + +import ( + "bytes" + "fmt" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" +) + +type LTSVParser struct { + MetricName string + TimeLabel string + TimeFormat string + StrFieldLabels []string + IntFieldLabels []string + FloatFieldLabels []string + BoolFieldLabels []string + TagLabels []string + DefaultTags map[string]string + DuplicatePointsModifierMethod string + DuplicatePointsIncrementDuration time.Duration + DuplicatePointsModifierUniqTag string + + initialized bool + fieldLabelSet map[string]string + tagLabelSet map[string]bool + dupPointModifier DuplicatePointModifier + buf bytes.Buffer +} + +func (p *LTSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { + metrics := make([]telegraf.Metric, 0) + if buf == nil { + if p.buf.Len() > 0 { + metric, err := p.ParseLine(p.buf.String()) + if err != nil { + return nil, err + } + metrics = append(metrics, metric) + } + } else { + for { + i := bytes.IndexByte(buf, byte('\n')) + if i == -1 { + p.buf.Write(buf) + break + } + + p.buf.Write(buf[:i]) + if p.buf.Len() > 0 { + metric, err := p.ParseLine(p.buf.String()) + if err != nil { + return nil, err + } + metrics = append(metrics, metric) + p.buf.Reset() + } + buf = buf[i+1:] + } + } + return metrics, nil +} + +func (p *LTSVParser) ParseLine(line string) (telegraf.Metric, error) { + if !p.initialized { + err := p.initialize() + if err != nil { + return nil, err + } + } + + var t time.Time + timeLabelFound := false + fields := make(map[string]interface{}) + tags := make(map[string]string) + for k, v := range p.DefaultTags { + tags[k] = v + } + terms := strings.Split(line, "\t") + for _, term := range terms { + kv := strings.SplitN(term, ":", 2) + k := kv[0] + if k == p.TimeLabel { + timeLabelFound = true + var err error + t, err = time.Parse(p.TimeFormat, kv[1]) + if err != nil { + return nil, err + } + } else if typ, ok := p.fieldLabelSet[k]; ok { + switch typ { + case "string": + fields[k] = kv[1] + case "int": + val, err := strconv.ParseInt(kv[1], 10, 64) + if err != nil { + return nil, err + } + fields[k] = val + case "float": + val, err := strconv.ParseFloat(kv[1], 64) + if err != nil { + return nil, err + } + fields[k] = val + case "boolean": + val, err := strconv.ParseBool(kv[1]) + if err != nil { + return nil, err + } + fields[k] = val + } + } else if _, ok := p.tagLabelSet[k]; ok { + tags[k] = kv[1] + } + } + if !timeLabelFound { + t = time.Now().UTC() + } + p.dupPointModifier.Modify(&t, tags) + return telegraf.NewMetric(p.MetricName, tags, fields, t) +} + +func (p *LTSVParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func (p *LTSVParser) initialize() error { + p.fieldLabelSet = newFieldLabelSet(p.StrFieldLabels, p.IntFieldLabels, p.FloatFieldLabels, p.BoolFieldLabels) + p.tagLabelSet = newTagLabelSet(p.TagLabels) + dupPointModifier, err := newDupPointModifier( + p.DuplicatePointsModifierMethod, + p.DuplicatePointsIncrementDuration, + p.DuplicatePointsModifierUniqTag) + if err != nil { + return err + } + p.dupPointModifier = dupPointModifier + p.initialized = true + return nil +} + +func newFieldLabelSet(strFieldLabels, intFieldLabels, floatFieldLabels, boolFieldLabels []string) map[string]string { + s := make(map[string]string) + for _, label := range strFieldLabels { + s[label] = "string" + } + for _, label := range intFieldLabels { + s[label] = "int" + } + for _, label := range floatFieldLabels { + s[label] = "float" + } + for _, label := range boolFieldLabels { + s[label] = "boolean" + } + return s +} + +func newTagLabelSet(labels []string) map[string]bool { + s := make(map[string]bool) + for _, label := range labels { + s[label] = true + } + return s +} + +type DuplicatePointModifier interface { + Modify(t *time.Time, tags map[string]string) +} + +func newDupPointModifier(method string, incrementDuration time.Duration, uniqTagName string) (DuplicatePointModifier, error) { + switch method { + case "add_uniq_tag": + return &AddTagDupPointModifier{UniqTagName: uniqTagName}, nil + case "increment_time": + return &IncTimeDupPointModifier{IncrementDuration: incrementDuration}, nil + case "no_op": + return &NoOpDupPointModifier{}, nil + default: + return nil, fmt.Errorf("invalid duplicate_points_modifier_method: %s", method) + } +} + +type AddTagDupPointModifier struct { + UniqTagName string + prevTime time.Time + dupCount int64 +} + +func (m *AddTagDupPointModifier) Modify(t *time.Time, tags map[string]string) { + if t.Equal(m.prevTime) { + m.dupCount++ + tags[m.UniqTagName] = strconv.FormatInt(m.dupCount, 10) + } else { + m.dupCount = 0 + m.prevTime = *t + } +} + +type IncTimeDupPointModifier struct { + IncrementDuration time.Duration + prevTime time.Time +} + +func (m *IncTimeDupPointModifier) Modify(t *time.Time, _ map[string]string) { + if !t.After(m.prevTime) { + *t = m.prevTime.Add(m.IncrementDuration) + } + m.prevTime = *t +} + +type NoOpDupPointModifier struct{} + +func (n *NoOpDupPointModifier) Modify(_ *time.Time, _ map[string]string) {} diff --git a/plugins/parsers/ltsv/parser_test.go b/plugins/parsers/ltsv/parser_test.go new file mode 100644 index 000000000..6c1541eb5 --- /dev/null +++ b/plugins/parsers/ltsv/parser_test.go @@ -0,0 +1,432 @@ +package ltsv + +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + validLTSV1 = "time:2016-03-06T09:24:12Z\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:false\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2" +) + +var validLTSV2 = [][]byte{ + []byte("time:2016-03-06T09:24:12.012+09:00\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:fal"), + []byte("se\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2\ntime:2016-03-06T09:24:12.125+09:00\ts"), + // NOTE: validLTSV2[2] contains an empty line, and it is safely ignored. + []byte("tr1:value2\ntime:2016-03-06T09:24:13.000+09:00\tstr1:value3\n\ntime:2016-03-06T09:24:15.999+09:00\tst"), + // NOTE: validLTSV2[3] does not end with a newline, so you need to call Parse(nil) to parse the rest of data. + []byte("r1:value4"), + nil, +} + +var validLTSV3 = []string{ + "time:2016-03-06T09:24:12.000000000+09:00\tint1:1\ttag1:tval1", + "time:2016-03-06T09:24:12.000000000+09:00\tint1:2\ttag1:tval1", + "time:2016-03-06T09:24:12.000000000+09:00\tint1:3\ttag1:tval1", + "time:2016-03-06T09:24:12.000000002+09:00\tint1:4\ttag1:tval1", +} + +func TestParseLineValidLTSV(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05Z07:00", + StrFieldLabels: []string{"str1"}, + IntFieldLabels: []string{"int1", "int2"}, + FloatFieldLabels: []string{"float1"}, + BoolFieldLabels: []string{"bool1", "bool2", "bool3", "bool4"}, + TagLabels: []string{"tag1", "tag2"}, + DuplicatePointsModifierMethod: "no_op", + DefaultTags: map[string]string{ + "log_host": "log.example.com", + }, + } + metric, err := parser.ParseLine(validLTSV1) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + + fields := metric.Fields() + assert.Equal(t, map[string]interface{}{ + "str1": "value1", + "int1": int64(23), + "int2": int64(34), + "float1": float64(1.23), + "bool1": true, + "bool2": false, + }, fields) + assert.NotContains(t, fields, "ignore_field1", "ignore_tag1") + + tags := metric.Tags() + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "tag2": "tval2", + }, tags) + assert.NotContains(t, tags, "ignore_field1", "ignore_tag1") +} + +func TestParseValidLTSV(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05Z07:00", + StrFieldLabels: []string{"str1"}, + IntFieldLabels: []string{"int1", "int2"}, + FloatFieldLabels: []string{"float1"}, + BoolFieldLabels: []string{"bool1", "bool2", "bool3", "bool4"}, + TagLabels: []string{"tag1", "tag2"}, + DuplicatePointsModifierMethod: "no_op", + DefaultTags: map[string]string{ + "log_host": "log.example.com", + }, + } + metrics, err := parser.Parse(validLTSV2[0]) + assert.NoError(t, err) + assert.Len(t, metrics, 0) + + metrics, err = parser.Parse(validLTSV2[1]) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "ltsv_test", metrics[0].Name()) + + fields := metrics[0].Fields() + assert.Equal(t, map[string]interface{}{ + "str1": "value1", + "int1": int64(23), + "int2": int64(34), + "float1": float64(1.23), + "bool1": true, + "bool2": false, + }, fields) + assert.NotContains(t, fields, "ignore_field1", "ignore_tag1") + + tags := metrics[0].Tags() + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "tag2": "tval2", + }, tags) + assert.NotContains(t, tags, "ignore_field1", "ignore_tag1") + + metrics, err = parser.Parse(validLTSV2[2]) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "ltsv_test", metrics[0].Name()) + + assert.Equal(t, map[string]interface{}{ + "str1": "value2", + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + }, metrics[0].Tags()) + + assert.Equal(t, map[string]interface{}{ + "str1": "value3", + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + }, metrics[1].Tags()) + + metrics, err = parser.Parse(validLTSV2[3]) + assert.NoError(t, err) + assert.Len(t, metrics, 0) + + metrics, err = parser.Parse(validLTSV2[4]) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "ltsv_test", metrics[0].Name()) + + assert.Equal(t, map[string]interface{}{ + "str1": "value4", + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + }, metrics[0].Tags()) +} + +func TestAlwaysAddTagDuplicatePointModifier(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05.000000000Z07:00", + IntFieldLabels: []string{"int1"}, + TagLabels: []string{"tag1"}, + DuplicatePointsModifierMethod: "add_uniq_tag", + DuplicatePointsModifierUniqTag: "uniq", + DefaultTags: map[string]string{ + "log_host": "log.example.com", + "uniq": "0", + }, + } + + metric, err := parser.ParseLine(validLTSV3[0]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(1), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "0", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[1]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(2), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[2]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(3), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "2", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[3]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(4), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "0", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat)) +} + +func TestAddTagDuplicatePointModifier(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05.000000000Z07:00", + IntFieldLabels: []string{"int1"}, + TagLabels: []string{"tag1"}, + DuplicatePointsModifierMethod: "add_uniq_tag", + DuplicatePointsModifierUniqTag: "uniq", + DefaultTags: map[string]string{ + "log_host": "log.example.com", + }, + } + + metric, err := parser.ParseLine(validLTSV3[0]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(1), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[1]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(2), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[2]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(3), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + "uniq": "2", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[3]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(4), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat)) +} + +func TestIncTimeDuplicatePointModifier(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05.000000000Z07:00", + IntFieldLabels: []string{"int1"}, + TagLabels: []string{"tag1"}, + DuplicatePointsModifierMethod: "increment_time", + DuplicatePointsIncrementDuration: time.Nanosecond, + DefaultTags: map[string]string{ + "log_host": "log.example.com", + }, + } + + metric, err := parser.ParseLine(validLTSV3[0]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(1), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[1]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(2), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000001+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[2]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(3), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat)) + + metric, err = parser.ParseLine(validLTSV3[3]) + assert.NoError(t, err) + assert.NotNil(t, metric) + assert.Equal(t, "ltsv_test", metric.Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(4), + }, metric.Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metric.Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000003+09:00", metric.Time().Format(parser.TimeFormat)) +} + +func TestNoOpDuplicatePointModifier(t *testing.T) { + parser := LTSVParser{ + MetricName: "ltsv_test", + TimeLabel: "time", + TimeFormat: "2006-01-02T15:04:05.000000000Z07:00", + IntFieldLabels: []string{"int1"}, + TagLabels: []string{"tag1"}, + DuplicatePointsModifierMethod: "no_op", + DefaultTags: map[string]string{ + "log_host": "log.example.com", + }, + } + + var buf bytes.Buffer + for _, line := range validLTSV3 { + buf.WriteString(line) + buf.WriteByte(byte('\n')) + } + + metrics, err := parser.Parse(buf.Bytes()) + assert.NoError(t, err) + // NOTE: Even though 4 metrics are created here, 3 of these will be merged on + // a InfluxDB database. + assert.Len(t, metrics, 4) + + assert.Equal(t, "ltsv_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(1), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metrics[0].Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[0].Time().Format(parser.TimeFormat)) + + assert.Equal(t, "ltsv_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(2), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metrics[1].Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[1].Time().Format(parser.TimeFormat)) + + assert.Equal(t, "ltsv_test", metrics[2].Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(3), + }, metrics[2].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metrics[2].Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[2].Time().Format(parser.TimeFormat)) + + assert.Equal(t, "ltsv_test", metrics[3].Name()) + assert.Equal(t, map[string]interface{}{ + "int1": int64(4), + }, metrics[3].Fields()) + assert.Equal(t, map[string]string{ + "log_host": "log.example.com", + "tag1": "tval1", + }, metrics[3].Tags()) + assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metrics[3].Time().Format(parser.TimeFormat)) +} + +func TestInvalidDuplicatePointsModifierMethod(t *testing.T) { + parser := LTSVParser{ + DuplicatePointsModifierMethod: "", + } + metric, err := parser.ParseLine("") + assert.Error(t, err) + assert.Nil(t, metric) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 982b6bb80..cdab87699 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -2,12 +2,14 @@ package parsers import ( "fmt" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/ltsv" ) // ParserInput is an interface for input plugins that are able to parse @@ -22,6 +24,9 @@ type Parser interface { // Parse takes a byte buffer separated by newlines // ie, `cpu.usage.idle 90\ncpu.usage.busy 10` // and parses it into telegraf metrics + // + // NOTE: For the LTSV parser, you need to call an additional `Parse(nil)` + // if the last data does not end with the newline `\n`. Parse(buf []byte) ([]telegraf.Metric, error) // ParseLine takes a single string metric @@ -38,7 +43,7 @@ type Parser interface { // Config is a struct that covers the data types needed for all parser types, // and can be used to instantiate _any_ of the parsers. type Config struct { - // Dataformat can be one of: json, influx, graphite + // Dataformat can be one of: json, influx, graphite, ltsv DataFormat string // Separator only applied to Graphite data. @@ -48,9 +53,53 @@ type Config struct { // TagKeys only apply to JSON data TagKeys []string - // MetricName only applies to JSON data. This will be the name of the measurement. + // MetricName only applies to JSON data and LTSV data. This will be the name of the measurement. MetricName string + // TimeLabel only applies to LTSV data. This will be the label of the timestamp. + // If this label is not found in the measurement, the current time will be used. + TimeLabel string + // TimeFormat only applies to LTSV data. This will be the format of the timestamp. + // Please see https://golang.org/pkg/time/#Parse for the format string. + TimeFormat string + // StrFieldLabels only applies to LTSV data. This will be the labels of string fields. + StrFieldLabels []string + // IntFieldLabels only applies to LTSV data. This will be the labels of integer fields. + IntFieldLabels []string + // FloatFieldLabels only applies to LTSV data. This will be the labels of float fields. + FloatFieldLabels []string + // BoolFieldLabels only applies to LTSV data. This will be the labels of boolean fields. + BoolFieldLabels []string + // TagLabels only applies to LTSV data. This will be the labels of tags. + TagLabels []string + // DuplicatePointsModifierMethod only applies to LTSV data. + // Must be one of "add_uniq_tag", "increment_time", "no_op". + // This will be used to modify duplicated points. + // For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points + // NOTE: For modifier methods other than "no_op" to work correctly, the log lines + // MUST be sorted by timestamps in ascending order. + DuplicatePointsModifierMethod string + // DuplicatePointsIncrementDuration only applies to LTSV data. + // When duplicate_points_modifier_method is "increment_time", + // this will be added to the time of the previous measurement + // if the time of current time is equal to or less than the + // time of the previous measurement. + // + // NOTE: You need to set this value equal to or greater than + // precisions of your output plugins. Otherwise the times will + // become the same value! + // For the precision of the InfluxDB plugin, please see + // https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42 + DuplicatePointsIncrementDuration time.Duration + // DuplicatePointsModifierUniqTag only applies to LTSV data. + // When DuplicatePointsModifierMethod is one of "add_uniq_tag", + // this will be the label of the tag to be added to ensure uniqueness of points. + // NOTE: The uniq tag will be only added to the successive points of duplicated + // points, it will not be added to the first point of duplicated points. + // If you want to always add the uniq tag, add a tag with the same name as + // DuplicatePointsModifierUniqTag and the string value "0" to DefaultTags. + DuplicatePointsModifierUniqTag string + // DefaultTags are the default tags that will be added to all parsed metrics. DefaultTags map[string]string } @@ -68,6 +117,21 @@ func NewParser(config *Config) (Parser, error) { case "graphite": parser, err = NewGraphiteParser(config.Separator, config.Templates, config.DefaultTags) + case "ltsv": + parser, err = NewLTSVParser( + config.MetricName, + config.TimeLabel, + config.TimeFormat, + config.StrFieldLabels, + config.IntFieldLabels, + config.FloatFieldLabels, + config.BoolFieldLabels, + config.TagLabels, + config.DuplicatePointsModifierMethod, + config.DuplicatePointsIncrementDuration, + config.DuplicatePointsModifierUniqTag, + config.DefaultTags, + ) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -98,3 +162,34 @@ func NewGraphiteParser( ) (Parser, error) { return graphite.NewGraphiteParser(separator, templates, defaultTags) } + +func NewLTSVParser( + metricName string, + timeLabel string, + timeFormat string, + strFieldLabels []string, + intFieldLabels []string, + floatFieldLabels []string, + boolFieldLabels []string, + tagLabels []string, + duplicatePointsModifierMethod string, + duplicatePointsIncrementDuration time.Duration, + duplicatePointsModifierUniqTag string, + defaultTags map[string]string, +) (Parser, error) { + parser := <sv.LTSVParser{ + MetricName: metricName, + TimeLabel: timeLabel, + TimeFormat: timeFormat, + StrFieldLabels: strFieldLabels, + IntFieldLabels: intFieldLabels, + FloatFieldLabels: floatFieldLabels, + BoolFieldLabels: boolFieldLabels, + TagLabels: tagLabels, + DuplicatePointsModifierMethod: duplicatePointsModifierMethod, + DuplicatePointsIncrementDuration: duplicatePointsIncrementDuration, + DuplicatePointsModifierUniqTag: duplicatePointsModifierUniqTag, + DefaultTags: defaultTags, + } + return parser, nil +}