Add Tail service input plugin and LTSV parser.

This commit is contained in:
Hiroaki Nakamura 2016-03-01 19:34:41 +09:00
parent bd3d0c330f
commit b99298f851
12 changed files with 1529 additions and 2 deletions

3
Godeps
View File

@ -18,6 +18,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
github.com/gorilla/context 1ea25387ff6f684839d82767c1733ff4d4d15d0a
github.com/gorilla/mux c9e326e2bdec29039a3761c07bece13133863e1e
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
@ -50,5 +51,7 @@ golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/fsnotify.v1 8611c35ab31c1c28aa903d33cf8b6e44a399b09e
gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4

View File

@ -20,6 +20,7 @@ github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2
github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d
github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hpcloud/tail 1a0242e795eeefe54261ff308dc685f7d29cc58c
github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24
github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5
github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264
@ -52,5 +53,7 @@ golang.org/x/net 04b9de9b512f58addf28c9853d50ebef61c3953e
golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/fsnotify.v1 8611c35ab31c1c28aa903d33cf8b6e44a399b09e
gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4

View File

@ -272,3 +272,156 @@ There are many more options available,
"measurement*"
]
```
## LTSV:
The [Labeled Tab-separated Values (LTSV)](http://ltsv.org/) data format translate a LTSV line into a measurement with _timestamp_, _fields_ and _tags_. For example, this line:
```
time:2016-03-06T09:24:12Z\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:false\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2
```
Would get translate into _timestamp_, _fields_ and _tags_ of a measurement using the example configuration in the following section:
```
ltsv_example str1=value1,int1=23i,int2=34i,float1=1.23,bool1=true,bool2=false tag1=tval1,tag2=tval2,log_host=log.example.com 2016-03-06T09:24:12Z
```
### LTSV Configuration:
The LTSV data format specifying the following configurations.
- metric_name
- time_label
- time_format
- str_field_labels
- int_field_labels
- float_field_labels
- bool_field_labels
- tag_labels
- duplicate_points_modifier_method
- duplicate_points_modifier_uniq_tag
For details, please see the comments in the following configuration example.
```toml
[[inputs.tail]]
## The measurement name
override_name = "nginx_access"
## A LTSV formatted log file path.
## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV)
## Here is an example config for nginx (http://nginx.org/en/).
##
## log_format ltsv 'time:$time_iso8601\t'
## 'host:$host\t'
## 'http_host:$http_host\t'
## 'scheme:$scheme\t'
## 'remote_addr:$remote_addr\t'
## 'remote_user:$remote_user\t'
## 'request:$request\t'
## 'status:$status\t'
## 'body_bytes_sent:$body_bytes_sent\t'
## 'http_referer:$http_referer\t'
## 'http_user_agent:$http_user_agent\t'
## 'http_x_forwarded_for:$http_x_forwarded_for\t'
## 'request_time:$request_time';
## access_log /var/log/nginx/access.ltsv.log ltsv;
##
filename = "/var/log/nginx/access.ltsv.log"
## Seek to this location before tailing
seek_offset = 0
## Seek from whence. See https://golang.org/pkg/os/#File.Seek
seek_whence = 0
## Reopen recreated files (tail -F)
re_open = true
## Fail early if the file does not exist
must_exist = false
## Poll for file changes instead of using inotify
poll = false
## Set this to true if the file is a named pipe (mkfifo)
pipe = false
## Continue looking for new lines (tail -f)
follow = true
## If non-zero, split longer lines into multiple lines
max_line_size = 0
## Set this false to enable logging to stderr, true to disable logging
disable_logging = false
## Data format to consume. Currently only "ltsv" is supported.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "ltsv"
## Time label to be used to create a timestamp for a measurement.
time_label = "time"
## Time format for parsing timestamps.
## Please see https://golang.org/pkg/time/#Parse for the format string.
time_format = "2006-01-02T15:04:05Z07:00"
## Labels for string fields.
str_field_labels = ["str1"]
## Labels for integer (64bit signed decimal integer) fields.
## For acceptable integer values, please refer to:
## https://golang.org/pkg/strconv/#ParseInt
int_field_labels = ["int1", "int2"]
## Labels for float (64bit float) fields.
## For acceptable float values, please refer to:
## https://golang.org/pkg/strconv/#ParseFloat
float_field_labels = ["float1"]
## Labels for boolean fields.
## For acceptable boolean values, please refer to:
## https://golang.org/pkg/strconv/#ParseBool
bool_field_labels = ["bool1", "bool2"]
## Labels for tags to be added
tag_labels = ["tag1", "tag2"]
## Method to modify duplicated measurement points.
## Must be one of "add_uniq_tag", "increment_time", "no_op".
## This will be used to modify duplicated points.
## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points
## NOTE: For modifier methods other than "no_op" to work correctly, the log lines
## MUST be sorted by timestamps in ascending order.
duplicate_points_modifier_method = "add_uniq_tag"
## When duplicate_points_modifier_method is "increment_time",
## this will be added to the time of the previous measurement
## if the time of current time is equal to or less than the
## time of the previous measurement.
##
## NOTE: You need to set this value equal to or greater than
## precisions of your output plugins. Otherwise the times will
## become the same value!
## For the precision of the InfluxDB plugin, please see
## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42
## For the duration string format, please see
## https://golang.org/pkg/time/#ParseDuration
duplicate_points_increment_duration = "1us"
## When duplicate_points_modifier_method is "add_uniq_tag",
## this will be the label of the tag to be added to ensure uniqueness of points.
## NOTE: The uniq tag will be only added to the successive points of duplicated
## points, it will not be added to the first point of duplicated points.
## If you want to always add the uniq tag, add a tag with the same name as
## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags].
duplicate_points_modifier_uniq_tag = "uniq"
## Defaults tags to be added to measurements.
[inputs.tail.tags]
log_host = "log.example.com"
```

View File

@ -16,6 +16,7 @@
- github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE)
- github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE)
- github.com/hashicorp/raft-boltdb [MPL LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE)
- github.com/hpcloud/tail [MIT LICENSE](https://github.com/hpcloud/tail/blob/master/LICENSE.txt)
- github.com/lib/pq [MIT LICENSE](https://github.com/lib/pq/blob/master/LICENSE.md)
- github.com/matttproud/golang_protobuf_extensions [APACHE LICENSE](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE)
- github.com/naoina/go-stringutil [MIT LICENSE](https://github.com/naoina/go-stringutil/blob/master/LICENSE)

View File

@ -701,12 +701,127 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}
if node, ok := tbl.Fields["time_label"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.TimeLabel = str.Value
}
}
}
if node, ok := tbl.Fields["time_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.TimeFormat = str.Value
}
}
}
if node, ok := tbl.Fields["str_field_labels"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.StrFieldLabels = append(c.StrFieldLabels, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["int_field_labels"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.IntFieldLabels = append(c.IntFieldLabels, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["float_field_labels"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FloatFieldLabels = append(c.FloatFieldLabels, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["bool_field_labels"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.BoolFieldLabels = append(c.BoolFieldLabels, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["tag_labels"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.TagLabels = append(c.TagLabels, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["duplicate_points_modifier_method"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DuplicatePointsModifierMethod = str.Value
}
}
}
if node, ok := tbl.Fields["duplicate_points_increment_duration"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
c.DuplicatePointsIncrementDuration = dur
}
}
}
if node, ok := tbl.Fields["duplicate_points_modifier_uniq_tag"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DuplicatePointsModifierUniqTag = str.Value
}
}
}
c.MetricName = name
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "time_label")
delete(tbl.Fields, "time_format")
delete(tbl.Fields, "str_field_labels")
delete(tbl.Fields, "int_field_labels")
delete(tbl.Fields, "float_field_labels")
delete(tbl.Fields, "bool_field_labels")
delete(tbl.Fields, "tag_labels")
delete(tbl.Fields, "duplicate_points_modifier_method")
delete(tbl.Fields, "duplicate_points_increment_duration")
delete(tbl.Fields, "duplicate_points_modifier_uniq_tag")
return parsers.NewParser(c)
}

View File

@ -47,6 +47,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tail"
_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"

View File

@ -0,0 +1,187 @@
# Service Input Plugin: Tail
The tail plugin gathers metrics by reading a log file.
It works like the BSD `tail` command and can keep reading when more logs are added.
### Configuration:
```toml
# Read a log file like the BSD tail command
[[inputs.ltsv_log]]
## The measurement name
name_override = "nginx_access"
## A LTSV formatted log file path.
## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV)
## Here is an example config for nginx (http://nginx.org/en/).
##
## log_format ltsv 'time:$time_iso8601\t'
## 'host:$host\t'
## 'http_host:$http_host\t'
## 'scheme:$scheme\t'
## 'remote_addr:$remote_addr\t'
## 'remote_user:$remote_user\t'
## 'request:$request\t'
## 'status:$status\t'
## 'body_bytes_sent:$body_bytes_sent\t'
## 'http_referer:$http_referer\t'
## 'http_user_agent:$http_user_agent\t'
## 'http_x_forwarded_for:$http_x_forwarded_for\t'
## 'request_time:$request_time';
## access_log /var/log/nginx/access.ltsv.log ltsv;
##
filename = "/var/log/nginx/access.ltsv.log"
## Seek to this location before tailing
seek_offset = 0
## Seek from whence. See https://golang.org/pkg/os/#File.Seek
seek_whence = 0
## Reopen recreated files (tail -F)
re_open = true
## Fail early if the file does not exist
must_exist = false
## Poll for file changes instead of using inotify
poll = false
## Set this to true if the file is a named pipe (mkfifo)
pipe = false
## Continue looking for new lines (tail -f)
follow = true
## If non-zero, split longer lines into multiple lines
max_line_size = 0
## Set this false to enable logging to stderr, true to disable logging
disable_logging = false
## Data format to consume. Currently only "ltsv" is supported.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "ltsv"
## Time label to be used to create a timestamp for a measurement.
time_label = "time"
## Time format for parsing timestamps.
## Please see https://golang.org/pkg/time/#Parse for the format string.
time_format = "2006-01-02T15:04:05Z07:00"
## Labels for string fields.
str_field_labels = []
## Labels for integer (64bit signed decimal integer) fields.
## For acceptable integer values, please refer to:
## https://golang.org/pkg/strconv/#ParseInt
int_field_labels = ["body_bytes_sent"]
## Labels for float (64bit float) fields.
## For acceptable float values, please refer to:
## https://golang.org/pkg/strconv/#ParseFloat
float_field_labels = ["request_time"]
## Labels for boolean fields.
## For acceptable boolean values, please refer to:
## https://golang.org/pkg/strconv/#ParseBool
bool_field_labels = []
## Labels for tags to be added
tag_labels = ["host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent", "http_x_forwarded_for"]
## Method to modify duplicated measurement points.
## Must be one of "add_uniq_tag", "increment_time", "no_op".
## This will be used to modify duplicated points.
## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points
## NOTE: For modifier methods other than "no_op" to work correctly, the log lines
## MUST be sorted by timestamps in ascending order.
duplicate_points_modifier_method = "add_uniq_tag"
## When duplicate_points_modifier_method is "increment_time",
## this will be added to the time of the previous measurement
## if the time of current time is equal to or less than the
## time of the previous measurement.
##
## NOTE: You need to set this value equal to or greater than
## precisions of your output plugins. Otherwise the times will
## become the same value!
## For the precision of the InfluxDB plugin, please see
## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42
## For the duration string format, please see
## https://golang.org/pkg/time/#ParseDuration
duplicate_points_increment_duration = "1us"
## When duplicate_points_modifier_method is "add_uniq_tag",
## this will be the label of the tag to be added to ensure uniqueness of points.
## NOTE: The uniq tag will be only added to the successive points of duplicated
## points, it will not be added to the first point of duplicated points.
## If you want to always add the uniq tag, add a tag with the same name as
## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags].
duplicate_points_modifier_uniq_tag = "uniq"
## Defaults tags to be added to measurements.
[inputs.tail.tags]
log_host = "log.example.com"
```
### Tail plugin with LTSV parser
#### Measurements & Fields:
- measurement of the name specified in the config `measurement` value
- fields specified in the config `int_field_labels`, `float_field_labels`, `bool_field_labels`, and `str_field_labels` values.
#### Tags:
- tags specified in the config `inputs.tail.tags`, `duplicate_points_modifier_uniq_tag`, `tag_labels` values.
#### Example Output:
This is an example output with `duplicate_points_modifier_method = "add_uniq_tag"`.
```
[root@localhost bin]# sudo -u telegraf ./telegraf -config /etc/telegraf/telegraf.conf -input-filter tail -debug & for i in `seq 1 3`; do curl -s -o /dev/null localhost; done && sleep 1 && for i in `seq 1 2`; do curl -s -o /dv/null localhost; done
[1] 2894
2016/03/05 19:04:35 Attempting connection to output: influxdb
2016/03/05 19:04:35 Successfully connected to output: influxdb
2016/03/05 19:04:35 Starting Telegraf (version 0.10.4.1-44-ga2a0d51)
2016/03/05 19:04:35 Loaded outputs: influxdb
2016/03/05 19:04:35 Loaded inputs: tail
2016/03/05 19:04:35 Tags enabled: host=localhost.localdomain
2016/03/05 19:04:35 Agent Config: Interval:5s, Debug:true, Quiet:false, Hostname:"localhost.localdomain", Flush Interval:10s
2016/03/05 19:04:35 Started a tail log reader, filename: /var/log/nginx/access.ltsv.log
2016/03/05 19:04:35 Seeked /var/log/nginx/access.ltsv.log - &{Offset:0 Whence:0}
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172275000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=1 body_bytes_sent=612i,request_time=0 1457172275000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=2 body_bytes_sent=612i,request_time=0 1457172275000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172276000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200,uniq=1 body_bytes_sent=612i,request_time=0 1457172276000000000
2016/03/05 19:04:40 Gathered metrics, (5s interval), from 1 inputs in 23.904µs
```
This is an example output with `duplicate_points_modifier_method = "increment_time"` and `duplicate_points_increment_duration = "1ms"`.
```
[root@localhost bin]# sudo -u telegraf ./telegraf -config /etc/telegraf/telegraf.conf -input-filter tail -debug & for i in `seq 1 3`; do curl -s -o /dev/null localhost; done && sleep 1 && for i in `seq 1 2`; do curl -s -o /dv/null localhost; done
[1] 2845
2016/03/05 19:03:13 Attempting connection to output: influxdb
2016/03/05 19:03:13 Successfully connected to output: influxdb
2016/03/05 19:03:13 Starting Telegraf (version 0.10.4.1-44-ga2a0d51)
2016/03/05 19:03:13 Loaded outputs: influxdb
2016/03/05 19:03:13 Loaded inputs: tail
2016/03/05 19:03:13 Tags enabled: host=localhost.localdomain
2016/03/05 19:03:13 Agent Config: Interval:5s, Debug:true, Quiet:false, Hostname:"localhost.localdomain", Flush Interval:10s
2016/03/05 19:03:13 Started a tail log reader, filename: /var/log/nginx/access.ltsv.log
2016/03/05 19:03:13 Seeked /var/log/nginx/access.ltsv.log - &{Offset:0 Whence:0}
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193001000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172193002000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172194000000000
> nginx_access,host=localhost,http_host=localhost,http_referer=-,http_user_agent=curl/7.29.0,http_x_forwarded_for=-,log_host=log.example.com,remote_addr=127.0.0.1,remote_user=-,request=GET\ /\ HTTP/1.1,scheme=http,status=200 body_bytes_sent=612i,request_time=0 1457172194001000000
2016/03/05 19:03:15 Gathered metrics, (5s interval), from 1 inputs in 52.911µs
```

247
plugins/inputs/tail/tail.go Normal file
View File

@ -0,0 +1,247 @@
package tail
import (
"sync"
tailfile "github.com/hpcloud/tail"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
const sampleConfig = `
## The measurement name
name_override = "nginx_access"
## A LTSV formatted log file path.
## See http://ltsv.org/ for Labeled Tab-separated Values (LTSV)
## Here is an example config for nginx (http://nginx.org/en/).
##
## log_format ltsv 'time:$time_iso8601\t'
## 'host:$host\t'
## 'http_host:$http_host\t'
## 'scheme:$scheme\t'
## 'remote_addr:$remote_addr\t'
## 'remote_user:$remote_user\t'
## 'request:$request\t'
## 'status:$status\t'
## 'body_bytes_sent:$body_bytes_sent\t'
## 'http_referer:$http_referer\t'
## 'http_user_agent:$http_user_agent\t'
## 'http_x_forwarded_for:$http_x_forwarded_for\t'
## 'request_time:$request_time';
## access_log /var/log/nginx/access.ltsv.log ltsv;
##
filename = "/var/log/nginx/access.ltsv.log"
## Seek to this location before tailing
seek_offset = 0
## Seek from whence. See https://golang.org/pkg/os/#File.Seek
seek_whence = 0
## Reopen recreated files (tail -F)
re_open = true
## Fail early if the file does not exist
must_exist = false
## Poll for file changes instead of using inotify
poll = false
## Set this to true if the file is a named pipe (mkfifo)
pipe = false
## Continue looking for new lines (tail -f)
follow = true
## If non-zero, split longer lines into multiple lines
max_line_size = 0
## Set this false to enable logging to stderr, true to disable logging
disable_logging = false
## Data format to consume. Currently only "ltsv" is supported.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "ltsv"
## Time label to be used to create a timestamp for a measurement.
time_label = "time"
## Time format for parsing timestamps.
## Please see https://golang.org/pkg/time/#Parse for the format string.
time_format = "2006-01-02T15:04:05Z07:00"
## Labels for string fields.
str_field_labels = []
## Labels for integer (64bit signed decimal integer) fields.
## For acceptable integer values, please refer to:
## https://golang.org/pkg/strconv/#ParseInt
int_field_labels = ["body_bytes_sent"]
## Labels for float (64bit float) fields.
## For acceptable float values, please refer to:
## https://golang.org/pkg/strconv/#ParseFloat
float_field_labels = ["request_time"]
## Labels for boolean fields.
## For acceptable boolean values, please refer to:
## https://golang.org/pkg/strconv/#ParseBool
bool_field_labels = []
## Labels for tags to be added
tag_labels = ["host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent", "http_x_forwarded_for"]
## Method to modify duplicated measurement points.
## Must be one of "add_uniq_tag", "increment_time", "no_op".
## This will be used to modify duplicated points.
## For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points
## NOTE: For modifier methods other than "no_op" to work correctly, the log lines
## MUST be sorted by timestamps in ascending order.
duplicate_points_modifier_method = "add_uniq_tag"
## When duplicate_points_modifier_method is "increment_time",
## this will be added to the time of the previous measurement
## if the time of current time is equal to or less than the
## time of the previous measurement.
##
## NOTE: You need to set this value equal to or greater than
## precisions of your output plugins. Otherwise the times will
## become the same value!
## For the precision of the InfluxDB plugin, please see
## https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42
## For the duration string format, please see
## https://golang.org/pkg/time/#ParseDuration
duplicate_points_increment_duration = "1us"
## When duplicate_points_modifier_method is "add_uniq_tag",
## this will be the label of the tag to be added to ensure uniqueness of points.
## NOTE: The uniq tag will be only added to the successive points of duplicated
## points, it will not be added to the first point of duplicated points.
## If you want to always add the uniq tag, add a tag with the same name as
## duplicate_points_modifier_uniq_tag and the string value "0" to [inputs.tail.tags].
duplicate_points_modifier_uniq_tag = "uniq"
## Defaults tags to be added to measurements.
[inputs.tail.tags]
log_host = "log.example.com"
`
type Tail struct {
Filename string
// File-specfic
SeekOffset int64 // Seek to this location before tailing
SeekWhence int // Seek from whence. See https://golang.org/pkg/os/#File.Seek
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using inotify
Pipe bool // Is a named pipe (mkfifo)
// TODO: Add configs for RateLimiter
// Generic IO
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
DisableLogging bool // If false, logs are printed to stderr
sync.Mutex
done chan struct{}
acc telegraf.Accumulator
parser parsers.Parser
tail *tailfile.Tail
}
func (t *Tail) SampleConfig() string {
return sampleConfig
}
func (t *Tail) Description() string {
return "Read a log file like the BSD tail command"
}
func (t *Tail) SetParser(parser parsers.Parser) {
t.parser = parser
}
// Start a tail log reader. Caller must call *Tail.Stop() to clean up.
func (t *Tail) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.acc = acc
t.done = make(chan struct{})
config := tailfile.Config{
Location: &tailfile.SeekInfo{
Offset: t.SeekOffset,
Whence: t.SeekWhence,
},
ReOpen: t.ReOpen,
MustExist: t.MustExist,
Poll: t.Poll,
Pipe: t.Pipe,
Follow: t.Follow,
MaxLineSize: t.MaxLineSize,
}
if t.DisableLogging {
config.Logger = tailfile.DiscardingLogger
}
tf, err := tailfile.TailFile(t.Filename, config)
if err != nil {
return err
}
t.tail = tf
// Start the log file reader
go t.receiver()
t.tail.Logger.Printf("Started a tail log reader, filename: %s\n", t.Filename)
return nil
}
func (t *Tail) receiver() {
for {
for line := range t.tail.Lines {
if err := line.Err; err != nil {
t.tail.Logger.Printf("error while reading from %s, error: %s\n", t.Filename, err.Error())
} else {
metric, err := t.parser.ParseLine(line.Text)
if err != nil {
t.tail.Logger.Printf("error while parsing from %s, error: %s\n", t.Filename, err.Error())
}
t.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
select {
case <-t.done:
t.tail.Done()
return
default:
// Start reading lines again
}
}
}
}
func (t *Tail) Stop() {
t.Lock()
close(t.done)
t.Unlock()
}
// All the work is done in the Start() function, so this is just a dummy
// function.
func (t *Tail) Gather(_ telegraf.Accumulator) error {
return nil
}
func init() {
inputs.Add("tail", func() telegraf.Input {
return &Tail{}
})
}

View File

@ -0,0 +1,72 @@
package tail
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
const sampleLog = "time:2016-03-03T13:58:57+00:00\thost:localhost\thttp_host:localhost\tscheme:http\tremote_addr:127.0.0.1\tremote_user:-\ttime_local:03/Mar/2016:13:58:57\t+0000\trequest:GET / HTTP/1.1\tstatus:200\tbody_bytes_sent:612\thttp_referer:-\thttp_user_agent:curl/7.29.0\thttp_x_forwarded_for:-\trequest_time:0.000\tupstream_response_time:-\tupstream_http_content_type:-\tupstream_status:-\tupstream_cache_status:-\n"
func TestLtsvLogGeneratesMetrics(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "access.ltsv.log")
assert.NoError(t, err, "failed to create a temporary file")
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString(sampleLog)
assert.NoError(t, err, "failed to write logs a temporary file")
err = tmpfile.Close()
assert.NoError(t, err, "failed to close the temporary log file")
metricName := "nginx_access"
config := &parsers.Config{
DataFormat: "ltsv",
MetricName: metricName,
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05-07:00",
IntFieldLabels: []string{"body_bytes_sent"},
FloatFieldLabels: []string{"request_time"},
BoolFieldLabels: []string{},
StrFieldLabels: []string{},
TagLabels: []string{"host", "http_host", "scheme", "remote_addr", "remote_user", "request", "status", "http_referer", "http_user_agent"},
DuplicatePointsModifierMethod: "add_uniq_tag",
DuplicatePointsModifierUniqTag: "uniq",
}
parser, err := parsers.NewParser(config)
assert.NoError(t, err)
reader := &Tail{
Filename: tmpfile.Name(),
ReOpen: true,
Follow: true,
DisableLogging: true,
parser: parser,
}
var acc testutil.Accumulator
reader.Start(&acc)
// NOTE: Wait for the tail reader process the log line.
time.Sleep(time.Duration(100) * time.Millisecond)
reader.Stop()
fields := map[string]interface{}{
"body_bytes_sent": int64(612),
"request_time": 0.0,
}
tags := map[string]string{
"host": "localhost",
"http_host": "localhost",
"scheme": "http",
"remote_addr": "127.0.0.1",
"remote_user": "-",
"request": "GET / HTTP/1.1",
"status": "200",
"http_referer": "-",
"http_user_agent": "curl/7.29.0",
}
acc.AssertContainsTaggedFields(t, metricName, fields, tags)
}

View File

@ -0,0 +1,218 @@
package ltsv
import (
"bytes"
"fmt"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
)
type LTSVParser struct {
MetricName string
TimeLabel string
TimeFormat string
StrFieldLabels []string
IntFieldLabels []string
FloatFieldLabels []string
BoolFieldLabels []string
TagLabels []string
DefaultTags map[string]string
DuplicatePointsModifierMethod string
DuplicatePointsIncrementDuration time.Duration
DuplicatePointsModifierUniqTag string
initialized bool
fieldLabelSet map[string]string
tagLabelSet map[string]bool
dupPointModifier DuplicatePointModifier
buf bytes.Buffer
}
func (p *LTSVParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
if buf == nil {
if p.buf.Len() > 0 {
metric, err := p.ParseLine(p.buf.String())
if err != nil {
return nil, err
}
metrics = append(metrics, metric)
}
} else {
for {
i := bytes.IndexByte(buf, byte('\n'))
if i == -1 {
p.buf.Write(buf)
break
}
p.buf.Write(buf[:i])
if p.buf.Len() > 0 {
metric, err := p.ParseLine(p.buf.String())
if err != nil {
return nil, err
}
metrics = append(metrics, metric)
p.buf.Reset()
}
buf = buf[i+1:]
}
}
return metrics, nil
}
func (p *LTSVParser) ParseLine(line string) (telegraf.Metric, error) {
if !p.initialized {
err := p.initialize()
if err != nil {
return nil, err
}
}
var t time.Time
timeLabelFound := false
fields := make(map[string]interface{})
tags := make(map[string]string)
for k, v := range p.DefaultTags {
tags[k] = v
}
terms := strings.Split(line, "\t")
for _, term := range terms {
kv := strings.SplitN(term, ":", 2)
k := kv[0]
if k == p.TimeLabel {
timeLabelFound = true
var err error
t, err = time.Parse(p.TimeFormat, kv[1])
if err != nil {
return nil, err
}
} else if typ, ok := p.fieldLabelSet[k]; ok {
switch typ {
case "string":
fields[k] = kv[1]
case "int":
val, err := strconv.ParseInt(kv[1], 10, 64)
if err != nil {
return nil, err
}
fields[k] = val
case "float":
val, err := strconv.ParseFloat(kv[1], 64)
if err != nil {
return nil, err
}
fields[k] = val
case "boolean":
val, err := strconv.ParseBool(kv[1])
if err != nil {
return nil, err
}
fields[k] = val
}
} else if _, ok := p.tagLabelSet[k]; ok {
tags[k] = kv[1]
}
}
if !timeLabelFound {
t = time.Now().UTC()
}
p.dupPointModifier.Modify(&t, tags)
return telegraf.NewMetric(p.MetricName, tags, fields, t)
}
func (p *LTSVParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *LTSVParser) initialize() error {
p.fieldLabelSet = newFieldLabelSet(p.StrFieldLabels, p.IntFieldLabels, p.FloatFieldLabels, p.BoolFieldLabels)
p.tagLabelSet = newTagLabelSet(p.TagLabels)
dupPointModifier, err := newDupPointModifier(
p.DuplicatePointsModifierMethod,
p.DuplicatePointsIncrementDuration,
p.DuplicatePointsModifierUniqTag)
if err != nil {
return err
}
p.dupPointModifier = dupPointModifier
p.initialized = true
return nil
}
func newFieldLabelSet(strFieldLabels, intFieldLabels, floatFieldLabels, boolFieldLabels []string) map[string]string {
s := make(map[string]string)
for _, label := range strFieldLabels {
s[label] = "string"
}
for _, label := range intFieldLabels {
s[label] = "int"
}
for _, label := range floatFieldLabels {
s[label] = "float"
}
for _, label := range boolFieldLabels {
s[label] = "boolean"
}
return s
}
func newTagLabelSet(labels []string) map[string]bool {
s := make(map[string]bool)
for _, label := range labels {
s[label] = true
}
return s
}
type DuplicatePointModifier interface {
Modify(t *time.Time, tags map[string]string)
}
func newDupPointModifier(method string, incrementDuration time.Duration, uniqTagName string) (DuplicatePointModifier, error) {
switch method {
case "add_uniq_tag":
return &AddTagDupPointModifier{UniqTagName: uniqTagName}, nil
case "increment_time":
return &IncTimeDupPointModifier{IncrementDuration: incrementDuration}, nil
case "no_op":
return &NoOpDupPointModifier{}, nil
default:
return nil, fmt.Errorf("invalid duplicate_points_modifier_method: %s", method)
}
}
type AddTagDupPointModifier struct {
UniqTagName string
prevTime time.Time
dupCount int64
}
func (m *AddTagDupPointModifier) Modify(t *time.Time, tags map[string]string) {
if t.Equal(m.prevTime) {
m.dupCount++
tags[m.UniqTagName] = strconv.FormatInt(m.dupCount, 10)
} else {
m.dupCount = 0
m.prevTime = *t
}
}
type IncTimeDupPointModifier struct {
IncrementDuration time.Duration
prevTime time.Time
}
func (m *IncTimeDupPointModifier) Modify(t *time.Time, _ map[string]string) {
if !t.After(m.prevTime) {
*t = m.prevTime.Add(m.IncrementDuration)
}
m.prevTime = *t
}
type NoOpDupPointModifier struct{}
func (n *NoOpDupPointModifier) Modify(_ *time.Time, _ map[string]string) {}

View File

@ -0,0 +1,432 @@
package ltsv
import (
"bytes"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
validLTSV1 = "time:2016-03-06T09:24:12Z\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:false\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2"
)
var validLTSV2 = [][]byte{
[]byte("time:2016-03-06T09:24:12.012+09:00\tstr1:value1\tint1:23\tint2:34\tfloat1:1.23\tbool1:true\tbool2:fal"),
[]byte("se\tignore_field1:foo\ttag1:tval1\tignore_tag1:bar\ttag2:tval2\ntime:2016-03-06T09:24:12.125+09:00\ts"),
// NOTE: validLTSV2[2] contains an empty line, and it is safely ignored.
[]byte("tr1:value2\ntime:2016-03-06T09:24:13.000+09:00\tstr1:value3\n\ntime:2016-03-06T09:24:15.999+09:00\tst"),
// NOTE: validLTSV2[3] does not end with a newline, so you need to call Parse(nil) to parse the rest of data.
[]byte("r1:value4"),
nil,
}
var validLTSV3 = []string{
"time:2016-03-06T09:24:12.000000000+09:00\tint1:1\ttag1:tval1",
"time:2016-03-06T09:24:12.000000000+09:00\tint1:2\ttag1:tval1",
"time:2016-03-06T09:24:12.000000000+09:00\tint1:3\ttag1:tval1",
"time:2016-03-06T09:24:12.000000002+09:00\tint1:4\ttag1:tval1",
}
func TestParseLineValidLTSV(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05Z07:00",
StrFieldLabels: []string{"str1"},
IntFieldLabels: []string{"int1", "int2"},
FloatFieldLabels: []string{"float1"},
BoolFieldLabels: []string{"bool1", "bool2", "bool3", "bool4"},
TagLabels: []string{"tag1", "tag2"},
DuplicatePointsModifierMethod: "no_op",
DefaultTags: map[string]string{
"log_host": "log.example.com",
},
}
metric, err := parser.ParseLine(validLTSV1)
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
fields := metric.Fields()
assert.Equal(t, map[string]interface{}{
"str1": "value1",
"int1": int64(23),
"int2": int64(34),
"float1": float64(1.23),
"bool1": true,
"bool2": false,
}, fields)
assert.NotContains(t, fields, "ignore_field1", "ignore_tag1")
tags := metric.Tags()
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"tag2": "tval2",
}, tags)
assert.NotContains(t, tags, "ignore_field1", "ignore_tag1")
}
func TestParseValidLTSV(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05Z07:00",
StrFieldLabels: []string{"str1"},
IntFieldLabels: []string{"int1", "int2"},
FloatFieldLabels: []string{"float1"},
BoolFieldLabels: []string{"bool1", "bool2", "bool3", "bool4"},
TagLabels: []string{"tag1", "tag2"},
DuplicatePointsModifierMethod: "no_op",
DefaultTags: map[string]string{
"log_host": "log.example.com",
},
}
metrics, err := parser.Parse(validLTSV2[0])
assert.NoError(t, err)
assert.Len(t, metrics, 0)
metrics, err = parser.Parse(validLTSV2[1])
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "ltsv_test", metrics[0].Name())
fields := metrics[0].Fields()
assert.Equal(t, map[string]interface{}{
"str1": "value1",
"int1": int64(23),
"int2": int64(34),
"float1": float64(1.23),
"bool1": true,
"bool2": false,
}, fields)
assert.NotContains(t, fields, "ignore_field1", "ignore_tag1")
tags := metrics[0].Tags()
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"tag2": "tval2",
}, tags)
assert.NotContains(t, tags, "ignore_field1", "ignore_tag1")
metrics, err = parser.Parse(validLTSV2[2])
assert.NoError(t, err)
assert.Len(t, metrics, 2)
assert.Equal(t, "ltsv_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"str1": "value2",
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
}, metrics[0].Tags())
assert.Equal(t, map[string]interface{}{
"str1": "value3",
}, metrics[1].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
}, metrics[1].Tags())
metrics, err = parser.Parse(validLTSV2[3])
assert.NoError(t, err)
assert.Len(t, metrics, 0)
metrics, err = parser.Parse(validLTSV2[4])
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "ltsv_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"str1": "value4",
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
}, metrics[0].Tags())
}
func TestAlwaysAddTagDuplicatePointModifier(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05.000000000Z07:00",
IntFieldLabels: []string{"int1"},
TagLabels: []string{"tag1"},
DuplicatePointsModifierMethod: "add_uniq_tag",
DuplicatePointsModifierUniqTag: "uniq",
DefaultTags: map[string]string{
"log_host": "log.example.com",
"uniq": "0",
},
}
metric, err := parser.ParseLine(validLTSV3[0])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(1),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "0",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[1])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(2),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[2])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(3),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "2",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[3])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(4),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "0",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat))
}
func TestAddTagDuplicatePointModifier(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05.000000000Z07:00",
IntFieldLabels: []string{"int1"},
TagLabels: []string{"tag1"},
DuplicatePointsModifierMethod: "add_uniq_tag",
DuplicatePointsModifierUniqTag: "uniq",
DefaultTags: map[string]string{
"log_host": "log.example.com",
},
}
metric, err := parser.ParseLine(validLTSV3[0])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(1),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[1])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(2),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[2])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(3),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
"uniq": "2",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[3])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(4),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat))
}
func TestIncTimeDuplicatePointModifier(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05.000000000Z07:00",
IntFieldLabels: []string{"int1"},
TagLabels: []string{"tag1"},
DuplicatePointsModifierMethod: "increment_time",
DuplicatePointsIncrementDuration: time.Nanosecond,
DefaultTags: map[string]string{
"log_host": "log.example.com",
},
}
metric, err := parser.ParseLine(validLTSV3[0])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(1),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[1])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(2),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000001+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[2])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(3),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metric.Time().Format(parser.TimeFormat))
metric, err = parser.ParseLine(validLTSV3[3])
assert.NoError(t, err)
assert.NotNil(t, metric)
assert.Equal(t, "ltsv_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(4),
}, metric.Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metric.Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000003+09:00", metric.Time().Format(parser.TimeFormat))
}
func TestNoOpDuplicatePointModifier(t *testing.T) {
parser := LTSVParser{
MetricName: "ltsv_test",
TimeLabel: "time",
TimeFormat: "2006-01-02T15:04:05.000000000Z07:00",
IntFieldLabels: []string{"int1"},
TagLabels: []string{"tag1"},
DuplicatePointsModifierMethod: "no_op",
DefaultTags: map[string]string{
"log_host": "log.example.com",
},
}
var buf bytes.Buffer
for _, line := range validLTSV3 {
buf.WriteString(line)
buf.WriteByte(byte('\n'))
}
metrics, err := parser.Parse(buf.Bytes())
assert.NoError(t, err)
// NOTE: Even though 4 metrics are created here, 3 of these will be merged on
// a InfluxDB database.
assert.Len(t, metrics, 4)
assert.Equal(t, "ltsv_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(1),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metrics[0].Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[0].Time().Format(parser.TimeFormat))
assert.Equal(t, "ltsv_test", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(2),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metrics[1].Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[1].Time().Format(parser.TimeFormat))
assert.Equal(t, "ltsv_test", metrics[2].Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(3),
}, metrics[2].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metrics[2].Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000000+09:00", metrics[2].Time().Format(parser.TimeFormat))
assert.Equal(t, "ltsv_test", metrics[3].Name())
assert.Equal(t, map[string]interface{}{
"int1": int64(4),
}, metrics[3].Fields())
assert.Equal(t, map[string]string{
"log_host": "log.example.com",
"tag1": "tval1",
}, metrics[3].Tags())
assert.Equal(t, "2016-03-06T09:24:12.000000002+09:00", metrics[3].Time().Format(parser.TimeFormat))
}
func TestInvalidDuplicatePointsModifierMethod(t *testing.T) {
parser := LTSVParser{
DuplicatePointsModifierMethod: "",
}
metric, err := parser.ParseLine("")
assert.Error(t, err)
assert.Nil(t, metric)
}

View File

@ -2,12 +2,14 @@ package parsers
import (
"fmt"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/ltsv"
)
// ParserInput is an interface for input plugins that are able to parse
@ -22,6 +24,9 @@ type Parser interface {
// Parse takes a byte buffer separated by newlines
// ie, `cpu.usage.idle 90\ncpu.usage.busy 10`
// and parses it into telegraf metrics
//
// NOTE: For the LTSV parser, you need to call an additional `Parse(nil)`
// if the last data does not end with the newline `\n`.
Parse(buf []byte) ([]telegraf.Metric, error)
// ParseLine takes a single string metric
@ -38,7 +43,7 @@ type Parser interface {
// Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers.
type Config struct {
// Dataformat can be one of: json, influx, graphite
// Dataformat can be one of: json, influx, graphite, ltsv
DataFormat string
// Separator only applied to Graphite data.
@ -48,9 +53,53 @@ type Config struct {
// TagKeys only apply to JSON data
TagKeys []string
// MetricName only applies to JSON data. This will be the name of the measurement.
// MetricName only applies to JSON data and LTSV data. This will be the name of the measurement.
MetricName string
// TimeLabel only applies to LTSV data. This will be the label of the timestamp.
// If this label is not found in the measurement, the current time will be used.
TimeLabel string
// TimeFormat only applies to LTSV data. This will be the format of the timestamp.
// Please see https://golang.org/pkg/time/#Parse for the format string.
TimeFormat string
// StrFieldLabels only applies to LTSV data. This will be the labels of string fields.
StrFieldLabels []string
// IntFieldLabels only applies to LTSV data. This will be the labels of integer fields.
IntFieldLabels []string
// FloatFieldLabels only applies to LTSV data. This will be the labels of float fields.
FloatFieldLabels []string
// BoolFieldLabels only applies to LTSV data. This will be the labels of boolean fields.
BoolFieldLabels []string
// TagLabels only applies to LTSV data. This will be the labels of tags.
TagLabels []string
// DuplicatePointsModifierMethod only applies to LTSV data.
// Must be one of "add_uniq_tag", "increment_time", "no_op".
// This will be used to modify duplicated points.
// For detail, please see https://docs.influxdata.com/influxdb/v0.10/troubleshooting/frequently_encountered_issues/#writing-duplicate-points
// NOTE: For modifier methods other than "no_op" to work correctly, the log lines
// MUST be sorted by timestamps in ascending order.
DuplicatePointsModifierMethod string
// DuplicatePointsIncrementDuration only applies to LTSV data.
// When duplicate_points_modifier_method is "increment_time",
// this will be added to the time of the previous measurement
// if the time of current time is equal to or less than the
// time of the previous measurement.
//
// NOTE: You need to set this value equal to or greater than
// precisions of your output plugins. Otherwise the times will
// become the same value!
// For the precision of the InfluxDB plugin, please see
// https://github.com/influxdata/telegraf/blob/v0.10.1/plugins/outputs/influxdb/influxdb.go#L40-L42
DuplicatePointsIncrementDuration time.Duration
// DuplicatePointsModifierUniqTag only applies to LTSV data.
// When DuplicatePointsModifierMethod is one of "add_uniq_tag",
// this will be the label of the tag to be added to ensure uniqueness of points.
// NOTE: The uniq tag will be only added to the successive points of duplicated
// points, it will not be added to the first point of duplicated points.
// If you want to always add the uniq tag, add a tag with the same name as
// DuplicatePointsModifierUniqTag and the string value "0" to DefaultTags.
DuplicatePointsModifierUniqTag string
// DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string
}
@ -68,6 +117,21 @@ func NewParser(config *Config) (Parser, error) {
case "graphite":
parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags)
case "ltsv":
parser, err = NewLTSVParser(
config.MetricName,
config.TimeLabel,
config.TimeFormat,
config.StrFieldLabels,
config.IntFieldLabels,
config.FloatFieldLabels,
config.BoolFieldLabels,
config.TagLabels,
config.DuplicatePointsModifierMethod,
config.DuplicatePointsIncrementDuration,
config.DuplicatePointsModifierUniqTag,
config.DefaultTags,
)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
@ -98,3 +162,34 @@ func NewGraphiteParser(
) (Parser, error) {
return graphite.NewGraphiteParser(separator, templates, defaultTags)
}
func NewLTSVParser(
metricName string,
timeLabel string,
timeFormat string,
strFieldLabels []string,
intFieldLabels []string,
floatFieldLabels []string,
boolFieldLabels []string,
tagLabels []string,
duplicatePointsModifierMethod string,
duplicatePointsIncrementDuration time.Duration,
duplicatePointsModifierUniqTag string,
defaultTags map[string]string,
) (Parser, error) {
parser := &ltsv.LTSVParser{
MetricName: metricName,
TimeLabel: timeLabel,
TimeFormat: timeFormat,
StrFieldLabels: strFieldLabels,
IntFieldLabels: intFieldLabels,
FloatFieldLabels: floatFieldLabels,
BoolFieldLabels: boolFieldLabels,
TagLabels: tagLabels,
DuplicatePointsModifierMethod: duplicatePointsModifierMethod,
DuplicatePointsIncrementDuration: duplicatePointsIncrementDuration,
DuplicatePointsModifierUniqTag: duplicatePointsModifierUniqTag,
DefaultTags: defaultTags,
}
return parser, nil
}