diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fa1645725..9edd80998 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -129,6 +129,52 @@ func init() { } ``` +## Input Plugins Accepting Arbitrary Data Formats + +Some input plugins (such as +[exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec)) +accept arbitrary input data formats. An overview of these data formats can +be found +[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). + +In order to enable this, you must specify a `SetParser(parser parsers.Parser)` +function on the plugin object (see the exec plugin for an example), as well as +defining `parser` as a field of the object. + +You can then utilize the parser internally in your plugin, parsing data as you +see fit. Telegraf's configuration layer will take care of instantiating and +creating the `Parser` object. + +You should also add the following to your SampleConfig() return: + +```toml + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "influx" +``` + +Below is the `Parser` interface. + +```go +// Parser is an interface defining functions that a parser plugin must satisfy. +type Parser interface { + // Parse takes a byte buffer separated by newlines + // ie, `cpu.usage.idle 90\ncpu.usage.busy 10` + // and parses it into telegraf metrics + Parse(buf []byte) ([]telegraf.Metric, error) + + // ParseLine takes a single string metric + // ie, "cpu.usage.idle 90" + // and parses it into a telegraf metric. + ParseLine(line string) (telegraf.Metric, error) +} +``` + +And you can view the code +[here.](https://github.com/influxdata/telegraf/blob/henrypfhu-master/plugins/parsers/registry.go) + ## Service Input Plugins This section is for developers who want to create new "service" collection diff --git a/DATA_FORMATS_INPUT.md b/DATA_FORMATS_INPUT.md new file mode 100644 index 000000000..66d049920 --- /dev/null +++ b/DATA_FORMATS_INPUT.md @@ -0,0 +1,274 @@ +# Telegraf Input Data Formats + +Telegraf metrics, like InfluxDB +[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), +are a combination of four basic parts: + +1. Measurement Name +1. Tags +1. Fields +1. Timestamp + +These four parts are easily defined when using InfluxDB line-protocol as a +data format. But there are other data formats that users may want to use which +require more advanced configuration to create usable Telegraf metrics. + +Plugins such as `exec` and `kafka_consumer` parse textual data. Up until now, +these plugins were statically configured to parse just a single +data format. `exec` mostly only supported parsing JSON, and `kafka_consumer` only +supported data in InfluxDB line-protocol. + +But now we are normalizing the parsing of various data formats across all +plugins that can support it. You will be able to identify a plugin that supports +different data formats by the presence of a `data_format` config option, for +example, in the exec plugin: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "json" + + ### Additional configuration options go here +``` + +Each data_format has an additional set of configuration options available, which +I'll go over below. + +## Influx: + +There are no additional configuration options for InfluxDB line-protocol. The +metrics are parsed directly into Telegraf metrics. + +#### Influx Configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "influx" +``` + +## JSON: + +The JSON data format flattens JSON into metric _fields_. For example, this JSON: + +```json +{ + "a": 5, + "b": { + "c": 6 + } +} +``` + +Would get translated into _fields_ of a measurement: + +``` +myjsonmetric a=5,b_c=6 +``` + +The _measurement_ _name_ is usually the name of the plugin, +but can be overridden using the `name_override` config option. + +#### JSON Configuration: + +The JSON data format supports specifying "tag keys". If specified, keys +will be searched for in the root-level of the JSON blob. If the key(s) exist, +they will be applied as tags to the Telegraf metrics. + +For example, if you had this configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "json" + + ### List of tag names to extract from top-level of JSON server response + tag_keys = [ + "my_tag_1", + "my_tag_2" + ] +``` + +with this JSON output from a command: + +```json +{ + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo" +} +``` + +Your Telegraf metrics would get tagged with "my_tag_1" + +``` +exec_mycollector,my_tag_1=foo a=5,b_c=6 +``` + +## Graphite: + +The Graphite data format translates graphite _dot_ buckets directly into +telegraf measurement names, with a single value field, and without any tags. For +more advanced options, Telegraf supports specifying "templates" to translate +graphite buckets into Telegraf metrics. + +#### Separator: + +You can specify a separator to use for the parsed metrics. +By default, it will leave the metrics with a "." separator. +Setting `separator = "_"` will translate: + +``` +cpu.usage.idle 99 +=> cpu_usage_idle value=99 +``` + +#### Measurement/Tag Templates: + +The most basic template is to specify a single transformation to apply to all +incoming metrics. _measurement_ is a special keyword that tells Telegraf which +parts of the graphite bucket to combine into the measurement name. It can have a +trailing `*` to indicate that the remainder of the metric should be used. +Other words are considered tag keys. So the following template: + +```toml +templates = [ + "region.measurement*" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +us-west.cpu.load 100 +=> cpu.load,region=us-west value=100 +``` + +#### Field Templates: + +There is also a _field_ keyword, which can only be specified once. +The field keyword tells Telegraf to give the metric that field name. +So the following template: + +```toml +templates = [ + "measurement.measurement.field.region" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +cpu.usage.idle.us-west 100 +=> cpu_usage,region=us-west idle=100 +``` + +#### Filter Templates: + +Users can also filter the template(s) to use based on the name of the bucket, +using glob matching, like so: + +```toml +templates = [ + "cpu.* measurement.measurement.region", + "mem.* measurement.measurement.host" +] +``` + +which would result in the following transformation: + +``` +cpu.load.us-west 100 +=> cpu_load,region=us-west value=100 + +mem.cached.localhost 256 +=> mem_cached,host=localhost value=256 +``` + +#### Adding Tags: + +Additional tags can be added to a metric that don't exist on the received metric. +You can add additional tags by specifying them after the pattern. +Tags have the same format as the line protocol. +Multiple tags are separated by commas. + +```toml +templates = [ + "measurement.measurement.field.region datacenter=1a" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +cpu.usage.idle.us-west 100 +=> cpu_usage,region=us-west,datacenter=1a idle=100 +``` + +There are many more options available, +[More details can be found here](https://github.com/influxdata/influxdb/tree/master/services/graphite#templates) + +#### Graphite Configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "graphite" + + ### This string will be used to join the matched values. + separator = "_" + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. There can be only one default template. + ### Templates support below format: + ### 1. filter + template + ### 2. filter + template + extra tag + ### 3. filter + template with field key + ### 4. default template + templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" + ] +``` diff --git a/Godeps b/Godeps index 7e43ed610..474356d20 100644 --- a/Godeps +++ b/Godeps @@ -2,10 +2,8 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9ad github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d @@ -14,16 +12,12 @@ github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 @@ -56,4 +50,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 diff --git a/Godeps_windows b/Godeps_windows index 829e2cb35..ce6663260 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -1,34 +1,28 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034 -github.com/Shopify/sarama b1da1753dedcf77d053613b7eae907b98a2ddad5 +github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 -github.com/aws/aws-sdk-go 2a34ea8812f32aae75b43400f9424a0559840659 +github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 -github.com/fsouza/go-dockerclient 02a8beb401b20e112cff3ea740545960b667eab1 +github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-ole/go-ole 50055884d646dd9434f16bbb5c9801749b9bafe4 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 -github.com/golang/protobuf 45bba206dd5270d96bac4942dcfe515726613249 -github.com/golang/snappy 1963d058044b19e16595f80d5050fa54e2070438 +github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 +github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c -github.com/influxdb/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c +github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 +github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f @@ -45,7 +39,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common 14ca1097bbe21584194c15e391a9dab95ad42a59 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f -github.com/shirou/gopsutil 9d8191d6a6e17dcf43b10a20084a11e8c1aa92e6 +github.com/shirou/gopsutil 85bf0974ed06e4e668595ae2b4de02e772a2819b github.com/shirou/w32 ada3ba68f000aa1b58580e45c9d308fe0b7fc5c5 github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 @@ -54,10 +48,9 @@ github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18 github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 -golang.org/x/crypto 1f22c0103821b9390939b6776727195525381532 golang.org/x/net 04b9de9b512f58addf28c9853d50ebef61c3953e -golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4 +golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 -gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 \ No newline at end of file +gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 78687d286..a65c5607c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) diff --git a/internal/config/config.go b/internal/config/config.go index 68d36388e..852a30bb9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/config" "github.com/naoina/toml/ast" @@ -428,6 +429,17 @@ func (c *Config) addInput(name string, table *ast.Table) error { } input := creator() + // If the input has a SetParser function, then this means it can accept + // arbitrary types of input, so build the parser and set it. + switch t := input.(type) { + case parsers.ParserInput: + parser, err := buildParser(name, table) + if err != nil { + return err + } + t.SetParser(parser) + } + pluginConfig, err := buildInput(name, table) if err != nil { return err @@ -583,6 +595,66 @@ func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, erro return cp, nil } +// buildParser grabs the necessary entries from the ast.Table for creating +// a parsers.Parser object, and creates it, which can then be added onto +// an Input object. +func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { + c := &parsers.Config{} + + if node, ok := tbl.Fields["data_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.DataFormat = str.Value + } + } + } + + if c.DataFormat == "" { + c.DataFormat = "influx" + } + + if node, ok := tbl.Fields["separator"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.Separator = str.Value + } + } + } + + if node, ok := tbl.Fields["templates"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.Templates = append(c.Templates, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["tag_keys"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.TagKeys = append(c.TagKeys, str.Value) + } + } + } + } + } + + c.MetricName = name + + delete(tbl.Fields, "data_format") + delete(tbl.Fields, "separator") + delete(tbl.Fields, "templates") + delete(tbl.Fields, "tag_keys") + + return parsers.NewParser(c) +} + // buildOutput parses output specific items from the ast.Table, builds the filter and returns an // internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 261057875..c7f48b7c3 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/memcached" "github.com/influxdata/telegraf/plugins/inputs/procstat" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/stretchr/testify/assert" ) @@ -91,6 +92,9 @@ func TestConfig_LoadDirectory(t *testing.T) { "Testdata did not produce correct memcached metadata.") ex := inputs.Inputs["exec"]().(*exec.Exec) + p, err := parsers.NewInfluxParser() + assert.NoError(t, err) + ex.SetParser(p) ex.Command = "/usr/bin/myothercollector --foo=bar" eConfig := &internal_models.InputConfig{ Name: "exec", diff --git a/internal/encoding/encoder.go b/internal/encoding/encoder.go deleted file mode 100644 index 129906ce5..000000000 --- a/internal/encoding/encoder.go +++ /dev/null @@ -1,31 +0,0 @@ -package encoding - -import ( - "fmt" - - "github.com/influxdata/telegraf" -) - -type Parser interface { - InitConfig(configs map[string]interface{}) error - Parse(buf []byte) ([]telegraf.Metric, error) - ParseLine(line string) (telegraf.Metric, error) -} - -type Creator func() Parser - -var Parsers = map[string]Creator{} - -func Add(name string, creator Creator) { - Parsers[name] = creator -} - -func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) { - creator := Parsers[dataFormat] - if creator == nil { - return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat) - } - parser = creator() - err = parser.InitConfig(configs) - return parser, err -} diff --git a/internal/encoding/influx/parser.go b/internal/encoding/influx/parser.go deleted file mode 100644 index 7965b69e2..000000000 --- a/internal/encoding/influx/parser.go +++ /dev/null @@ -1,48 +0,0 @@ -package influx - -import ( - "fmt" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/encoding" -) - -type InfluxParser struct { -} - -func (p *InfluxParser) Parse(buf []byte) ([]telegraf.Metric, error) { - metrics, err := telegraf.ParseMetrics(buf) - - if err != nil { - return nil, err - } - return metrics, nil -} - -func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line + "\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) - } - - return metrics[0], nil -} - -func NewParser() *InfluxParser { - return &InfluxParser{} -} - -func (p *InfluxParser) InitConfig(configs map[string]interface{}) error { - return nil -} - -func init() { - encoding.Add("influx", func() encoding.Parser { - return NewParser() - }) -} diff --git a/internal/encoding/json/parser.go b/internal/encoding/json/parser.go deleted file mode 100644 index 69a91d14d..000000000 --- a/internal/encoding/json/parser.go +++ /dev/null @@ -1,68 +0,0 @@ -package json - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/influxdata/telegraf" - - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/encoding" -) - -type JsonParser struct { -} - -func (p *JsonParser) Parse(buf []byte) ([]telegraf.Metric, error) { - - metrics := make([]telegraf.Metric, 0) - - var jsonOut interface{} - err := json.Unmarshal(buf, &jsonOut) - if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) - return nil, err - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return nil, err - } - - metric, err := telegraf.NewMetric("exec", nil, f.Fields, time.Now().UTC()) - - if err != nil { - return nil, err - } - return append(metrics, metric), nil -} - -func (p *JsonParser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line + "\n")) - - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) - } - - return metrics[0], nil -} - -func NewParser() *JsonParser { - return &JsonParser{} -} - -func (p *JsonParser) InitConfig(configs map[string]interface{}) error { - return nil -} - -func init() { - encoding.Add("json", func() encoding.Parser { - return NewParser() - }) -} diff --git a/internal/internal.go b/internal/internal.go index 27c9d664f..82758e5e8 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -9,7 +9,6 @@ import ( "fmt" "io/ioutil" "os" - "strconv" "strings" "time" ) @@ -35,47 +34,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error { var NotImplementedError = errors.New("not implemented yet") -type JSONFlattener struct { - Fields map[string]interface{} -} - -// FlattenJSON flattens nested maps/interfaces into a fields map -func (f *JSONFlattener) FlattenJSON( - fieldname string, - v interface{}, -) error { - if f.Fields == nil { - f.Fields = make(map[string]interface{}) - } - fieldname = strings.Trim(fieldname, "_") - switch t := v.(type) { - case map[string]interface{}: - for k, v := range t { - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return err - } - } - case []interface{}: - for i, v := range t { - k := strconv.Itoa(i) - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return nil - } - } - case float64: - f.Fields[fieldname] = t - case bool, string, nil: - // ignored types - return nil - default: - return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)", - t, t, fieldname) - } - return nil -} - // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { diff --git a/metric.go b/metric.go index 99ee30369..574565c22 100644 --- a/metric.go +++ b/metric.go @@ -1,11 +1,9 @@ package telegraf import ( - "bytes" "time" "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/models" ) type Metric interface { @@ -63,25 +61,6 @@ func NewMetric( }, nil } -// ParseMetrics returns a slice of Metrics from a text representation of a -// metric (in line-protocol format) -// with each metric separated by newlines. If any metrics fail to parse, -// a non-nil error will be returned in addition to the metrics that parsed -// successfully. -func ParseMetrics(buf []byte) ([]Metric, error) { - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - points, err := models.ParsePoints(buf) - metrics := make([]Metric, len(points)) - for i, point := range points { - // Ignore error here because it's impossible that a model.Point - // wouldn't parse into client.Point properly - metrics[i], _ = NewMetric(point.Name(), point.Tags(), - point.Fields(), point.Time()) - } - return metrics, err -} - func (m *metric) Name() string { return m.pt.Name() } diff --git a/metric_test.go b/metric_test.go index acf6dee99..1177ab494 100644 --- a/metric_test.go +++ b/metric_test.go @@ -9,58 +9,6 @@ import ( "github.com/stretchr/testify/assert" ) -const validMs = ` -cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 1454105876344540456 -` - -const invalidMs = ` -cpu, cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo usage_idle -cpu,host usage_idle=99 -cpu,host=foo usage_idle=99 very bad metric -` - -const validInvalidMs = ` -cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=51,usage_busy=49 -cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=60,usage_busy=40 -cpu,host usage_idle=99 -` - -func TestParseValidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validMs)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - m := metrics[0] - - tags := map[string]string{ - "host": "foo", - "datacenter": "us-east", - "cpu": "cpu0", - } - fields := map[string]interface{}{ - "usage_idle": float64(99), - "usage_busy": float64(1), - } - - assert.Equal(t, tags, m.Tags()) - assert.Equal(t, fields, m.Fields()) - assert.Equal(t, "cpu", m.Name()) - assert.Equal(t, int64(1454105876344540456), m.UnixNano()) -} - -func TestParseInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(invalidMs)) - assert.Error(t, err) - assert.Len(t, metrics, 0) -} - -func TestParseValidAndInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validInvalidMs)) - assert.Error(t, err) - assert.Len(t, metrics, 3) -} - func TestNewMetric(t *testing.T) { now := time.Now() diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 2dbd6f357..f7341686e 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -10,8 +10,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) const statsPath = "/_nodes/stats" @@ -168,7 +168,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er now := time.Now() for p, s := range stats { - f := internal.JSONFlattener{} + f := jsonparser.JSONFlattener{} err := f.FlattenJSON("", s) if err != nil { return err diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index 4b862b273..daf800db3 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -8,8 +8,8 @@ The exec plugin can execute arbitrary commands which output: > Graphite understands messages with this format: -> ``` -metric_path value timestamp\n +> ``` +metric_path value timestamp\n ``` > __metric_path__ is the metric namespace that you want to populate. @@ -28,10 +28,7 @@ and strings will be ignored. # Read flattened metrics from one or more commands that output JSON to stdout [[inputs.exec]] # Shell/commands array - # compatible with old version - # we can still use the old command configuration - # command = "/usr/bin/mycollector --foo=bar" - commands = ["/tmp/test.sh","/tmp/test2.sh"] + commands = ["/tmp/test.sh", "/tmp/test2.sh"] # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. @@ -128,7 +125,7 @@ and usage_busy. They will receive a timestamp at collection time. We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite): * Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins) -* Sensu [Mertics Plugins] (https://github.com/sensu-plugins) +* Sensu [Mertics Plugins] (https://github.com/sensu-plugins) #### Configuration ``` @@ -180,4 +177,4 @@ sensu.metric.net.server0.eth0.rx_dropped 0 1444234982 The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines. More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md) - + diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index a53a6f32d..ca7ed2fed 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -9,66 +9,40 @@ import ( "github.com/gonuts/go-shellquote" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/encoding" "github.com/influxdata/telegraf/plugins/inputs" - - _ "github.com/influxdata/telegraf/internal/encoding/graphite" - _ "github.com/influxdata/telegraf/internal/encoding/influx" - _ "github.com/influxdata/telegraf/internal/encoding/json" + "github.com/influxdata/telegraf/plugins/parsers" ) const sampleConfig = ` - # Shell/commands array - # compatible with old version - # we can still use the old command configuration - # command = "/usr/bin/mycollector --foo=bar" - commands = ["/tmp/test.sh","/tmp/test2.sh"] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] - # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) - # NOTE json only reads numerical measurements, strings and booleans are ignored. - data_format = "json" - - # measurement name suffix (for separating different commands) + ### measurement name suffix (for separating different commands) name_suffix = "_mycollector" - ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format - ### If matching multiple measurement files, this string will be used to join the matched values. - separator = "." - - ### Each template line requires a template pattern. It can have an optional - ### filter before the template and separated by spaces. It can also have optional extra - ### tags following the template. Multiple tags should be separated by commas and no spaces - ### similar to the line protocol format. The can be only one default template. - ### Templates support below format: - ### 1. filter + template - ### 2. filter + template + extra tag - ### 3. filter + template with field key - ### 4. default template - templates = [ - "*.app env.service.resource.measurement", - "stats.* .host.measurement* region=us-west,agent=sensu", - "stats2.* .host.measurement.field", - "measurement*" - ] + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "influx" ` type Exec struct { - Commands []string - Command string - DataFormat string + Commands []string + Command string - Separator string - Templates []string - - encodingParser encoding.Parser - - initedConfig bool + parser parsers.Parser wg sync.WaitGroup - sync.Mutex - runner Runner - errc chan error + runner Runner + errChan chan error +} + +func NewExec() *Exec { + return &Exec{ + runner: CommandRunner{}, + } } type Runner interface { @@ -95,22 +69,18 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) { return out.Bytes(), nil } -func NewExec() *Exec { - return &Exec{runner: CommandRunner{}} -} - func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { defer e.wg.Done() out, err := e.runner.Run(e, command) if err != nil { - e.errc <- err + e.errChan <- err return } - metrics, err := e.encodingParser.Parse(out) + metrics, err := e.parser.Parse(out) if err != nil { - e.errc <- err + e.errChan <- err } else { for _, metric := range metrics { acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) @@ -118,66 +88,33 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { } } -func (e *Exec) initConfig() error { - e.Lock() - defer e.Unlock() - - if e.Command != "" && len(e.Commands) < 1 { - e.Commands = []string{e.Command} - } - - if e.DataFormat == "" { - e.DataFormat = "json" - } - - var err error - - configs := make(map[string]interface{}) - configs["Separator"] = e.Separator - configs["Templates"] = e.Templates - - e.encodingParser, err = encoding.NewParser(e.DataFormat, configs) - - if err != nil { - return fmt.Errorf("exec configuration is error: %s ", err.Error()) - } - - return nil -} - func (e *Exec) SampleConfig() string { return sampleConfig } func (e *Exec) Description() string { - return "Read metrics from one or more commands that can output JSON, influx or graphite line protocol to stdout" + return "Read metrics from one or more commands that can output to stdout" +} + +func (e *Exec) SetParser(parser parsers.Parser) { + e.parser = parser } func (e *Exec) Gather(acc telegraf.Accumulator) error { + e.errChan = make(chan error, len(e.Commands)) - if !e.initedConfig { - if err := e.initConfig(); err != nil { - return err - } - e.initedConfig = true - } - - e.Lock() - e.errc = make(chan error, 10) - e.Unlock() - + e.wg.Add(len(e.Commands)) for _, command := range e.Commands { - e.wg.Add(1) go e.ProcessCommand(command, acc) } e.wg.Wait() select { default: - close(e.errc) + close(e.errChan) return nil - case err := <-e.errc: - close(e.errc) + case err := <-e.errChan: + close(e.errChan) return err } diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 4be11e611..da55ef9d3 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,9 +65,11 @@ func (r runnerMock) Run(e *Exec, command string) ([]byte, error) { } func TestExec(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ runner: newRunnerMock([]byte(validJson), nil), Commands: []string{"testcommand arg1"}, + parser: parser, } var acc testutil.Accumulator @@ -87,9 +91,11 @@ func TestExec(t *testing.T) { } func TestExecMalformed(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ runner: newRunnerMock([]byte(malformedJson), nil), Commands: []string{"badcommand arg1"}, + parser: parser, } var acc testutil.Accumulator @@ -99,9 +105,11 @@ func TestExecMalformed(t *testing.T) { } func TestCommandError(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), Commands: []string{"badcommand"}, + parser: parser, } var acc testutil.Accumulator @@ -111,10 +119,11 @@ func TestCommandError(t *testing.T) { } func TestLineProtocolParse(t *testing.T) { + parser, _ := parsers.NewInfluxParser() e := &Exec{ - runner: newRunnerMock([]byte(lineProtocol), nil), - Commands: []string{"line-protocol"}, - DataFormat: "influx", + runner: newRunnerMock([]byte(lineProtocol), nil), + Commands: []string{"line-protocol"}, + parser: parser, } var acc testutil.Accumulator @@ -133,10 +142,11 @@ func TestLineProtocolParse(t *testing.T) { } func TestLineProtocolParseMultiple(t *testing.T) { + parser, _ := parsers.NewInfluxParser() e := &Exec{ - runner: newRunnerMock([]byte(lineProtocolMulti), nil), - Commands: []string{"line-protocol"}, - DataFormat: "influx", + runner: newRunnerMock([]byte(lineProtocolMulti), nil), + Commands: []string{"line-protocol"}, + parser: parser, } var acc testutil.Accumulator @@ -158,15 +168,3 @@ func TestLineProtocolParseMultiple(t *testing.T) { acc.AssertContainsTaggedFields(t, "cpu", fields, tags) } } - -func TestInvalidDataFormat(t *testing.T) { - e := &Exec{ - runner: newRunnerMock([]byte(lineProtocol), nil), - Commands: []string{"bad data format"}, - DataFormat: "FooBar", - } - - var acc testutil.Accumulator - err := e.Gather(&acc) - require.Error(t, err) -} diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 3070e6338..d3955c31a 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -1,7 +1,6 @@ package httpjson import ( - "encoding/json" "errors" "fmt" "io/ioutil" @@ -12,8 +11,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" ) type HttpJson struct { @@ -137,39 +136,34 @@ func (h *HttpJson) gatherServer( return err } - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(resp), &jsonOut); err != nil { - return errors.New("Error decoding JSON response") - } - - tags := map[string]string{ - "server": serverURL, - } - - for _, tag := range h.TagKeys { - switch v := jsonOut[tag].(type) { - case string: - tags[tag] = v - } - delete(jsonOut, tag) - } - - if responseTime >= 0 { - jsonOut["response_time"] = responseTime - } - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return err - } - var msrmnt_name string if h.Name == "" { msrmnt_name = "httpjson" } else { msrmnt_name = "httpjson_" + h.Name } - acc.AddFields(msrmnt_name, f.Fields, tags) + tags := map[string]string{ + "server": serverURL, + } + + parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) + if err != nil { + return err + } + + metrics, err := parser.Parse([]byte(resp)) + if err != nil { + return err + } + + for _, metric := range metrics { + fields := make(map[string]interface{}) + for k, v := range metric.Fields() { + fields[k] = v + } + fields["response_time"] = responseTime + acc.AddFields(metric.Name(), fields, metric.Tags()) + } return nil } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 499b2e50b..e139fa8da 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,12 +1,14 @@ package kafka_consumer import ( + "fmt" "log" "strings" "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" "github.com/wvanbergen/kafka/consumergroup" @@ -20,6 +22,8 @@ type Kafka struct { PointBuffer int Offset string + parser parsers.Parser + sync.Mutex // channel for all incoming kafka messages @@ -36,16 +40,22 @@ type Kafka struct { } var sampleConfig = ` - # topic(s) to consume + ### topic(s) to consume topics = ["telegraf"] - # an array of Zookeeper connection strings + ### an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] - # the name of the consumer group + ### the name of the consumer group consumer_group = "telegraf_metrics_consumers" - # Maximum number of points to buffer between collection intervals + ### Maximum number of points to buffer between collection intervals point_buffer = 100000 - # Offset (must be either "oldest" or "newest") + ### Offset (must be either "oldest" or "newest") offset = "oldest" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md + data_format = "influx" ` func (k *Kafka) SampleConfig() string { @@ -53,7 +63,11 @@ func (k *Kafka) SampleConfig() string { } func (k *Kafka) Description() string { - return "Read line-protocol metrics from Kafka topic(s)" + return "Read metrics from Kafka topic(s)" +} + +func (k *Kafka) SetParser(parser parsers.Parser) { + k.parser = parser } func (k *Kafka) Start() error { @@ -96,15 +110,15 @@ func (k *Kafka) Start() error { k.metricC = make(chan telegraf.Metric, k.PointBuffer) // Start the kafka message reader - go k.parser() + go k.receiver() log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", k.ZookeeperPeers, k.Topics) return nil } -// parser() reads all incoming messages from the consumer, and parses them into +// receiver() reads all incoming messages from the consumer, and parses them into // influxdb metric points. -func (k *Kafka) parser() { +func (k *Kafka) receiver() { for { select { case <-k.done: @@ -112,13 +126,14 @@ func (k *Kafka) parser() { case err := <-k.errs: log.Printf("Kafka Consumer Error: %s\n", err.Error()) case msg := <-k.in: - metrics, err := telegraf.ParseMetrics(msg.Value) + metrics, err := k.parser.Parse(msg.Value) if err != nil { log.Printf("Could not parse kafka message: %s, error: %s", string(msg.Value), err.Error()) } for _, metric := range metrics { + fmt.Println(string(metric.Name())) select { case k.metricC <- metric: continue diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index a3a4a6e35..458d43d35 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -9,6 +9,8 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/parsers" ) func TestReadsMetricsFromKafka(t *testing.T) { @@ -40,6 +42,8 @@ func TestReadsMetricsFromKafka(t *testing.T) { PointBuffer: 100000, Offset: "oldest", } + p, _ := parsers.NewInfluxParser() + k.SetParser(p) if err := k.Start(); err != nil { t.Fatal(err.Error()) } else { diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index be8984300..ec69cb926 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/Shopify/sarama" @@ -12,9 +13,11 @@ import ( ) const ( - testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" - invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257" + pointBuffer = 5 ) func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { @@ -39,7 +42,8 @@ func TestRunParser(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) @@ -51,7 +55,8 @@ func TestRunParserInvalidMsg(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(invalidMsg) time.Sleep(time.Millisecond) @@ -63,7 +68,8 @@ func TestRunParserRespectsBuffer(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() for i := 0; i < pointBuffer+1; i++ { in <- saramaMsg(testMsg) } @@ -77,7 +83,8 @@ func TestRunParserAndGather(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) @@ -89,6 +96,45 @@ func TestRunParserAndGather(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherGraphite(t *testing.T) { + k, in := NewTestKafka() + defer close(k.done) + + k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgGraphite) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + k.Gather(&acc) + + assert.Equal(t, len(acc.Metrics), 1) + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherJSON(t *testing.T) { + k, in := NewTestKafka() + defer close(k.done) + + k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgJSON) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + k.Gather(&acc) + + assert.Equal(t, len(acc.Metrics), 1) + acc.AssertContainsFields(t, "kafka_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + func saramaMsg(val string) *sarama.ConsumerMessage { return &sarama.ConsumerMessage{ Key: nil, diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 2b80442d6..9bec30f5f 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/services/graphite" + "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -123,37 +123,39 @@ func (_ *Statsd) Description() string { } const sampleConfig = ` - # Address and port to host UDP listener on + ### Address and port to host UDP listener on service_address = ":8125" - # Delete gauges every interval (default=false) + ### Delete gauges every interval (default=false) delete_gauges = false - # Delete counters every interval (default=false) + ### Delete counters every interval (default=false) delete_counters = false - # Delete sets every interval (default=false) + ### Delete sets every interval (default=false) delete_sets = false - # Delete timings & histograms every interval (default=true) + ### Delete timings & histograms every interval (default=true) delete_timings = true - # Percentiles to calculate for timing & histogram stats + ### Percentiles to calculate for timing & histogram stats percentiles = [90] - # convert measurement names, "." to "_" and "-" to "__" + ### convert measurement names, "." to "_" and "-" to "__" convert_names = true + ### Statsd data translation templates, more info can be read here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS.md#graphite # templates = [ # "cpu.* measurement*" # ] - # Number of UDP messages allowed to queue up, once filled, - # the statsd server will start dropping packets + ### Number of UDP messages allowed to queue up, once filled, + ### the statsd server will start dropping packets allowed_pending_messages = 10000 - # Number of timing/histogram values to track per-measurement in the - # calculation of percentiles. Raising this limit increases the accuracy - # of percentiles but also increases the memory usage and cpu time. + ### Number of timing/histogram values to track per-measurement in the + ### calculation of percentiles. Raising this limit increases the accuracy + ### of percentiles but also increases the memory usage and cpu time. percentile_limit = 1000 - # UDP packet size for the server to listen for. This will depend on the size - # of the packets that the client is sending, which is usually 1500 bytes. + ### UDP packet size for the server to listen for. This will depend on the size + ### of the packets that the client is sending, which is usually 1500 bytes. udp_packet_size = 1500 ` @@ -418,18 +420,14 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { } } - o := graphite.Options{ - Separator: "_", - Templates: s.Templates, - DefaultTags: tags, - } - var field string name := bucketparts[0] - p, err := graphite.NewParserWithOptions(o) + p, err := graphite.NewGraphiteParser(".", s.Templates, nil) if err == nil { + p.DefaultTags = tags name, tags, field, _ = p.ApplyTemplate(name) } + if s.ConvertNames { name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 4d8c9f353..1020ee625 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -71,16 +71,11 @@ func TestGraphiteOK(t *testing.T) { // Start TCP server wg.Add(1) go TCPServer(t, &wg) - wg.Wait() - // Connect - wg.Add(1) err1 := g.Connect() - wg.Wait() require.NoError(t, err1) // Send Data err2 := g.Write(metrics) require.NoError(t, err2) - wg.Add(1) // Waiting TCPserver wg.Wait() g.Close() @@ -88,9 +83,8 @@ func TestGraphiteOK(t *testing.T) { func TCPServer(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") - wg.Done() + defer wg.Done() conn, _ := tcpServer.Accept() - wg.Done() reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() @@ -100,7 +94,6 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { data3, _ := tp.ReadLine() assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) conn.Close() - wg.Done() } func TestGraphiteTags(t *testing.T) { diff --git a/internal/encoding/graphite/config.go b/plugins/parsers/graphite/config.go similarity index 100% rename from internal/encoding/graphite/config.go rename to plugins/parsers/graphite/config.go diff --git a/internal/encoding/graphite/errors.go b/plugins/parsers/graphite/errors.go similarity index 100% rename from internal/encoding/graphite/errors.go rename to plugins/parsers/graphite/errors.go diff --git a/internal/encoding/graphite/parser.go b/plugins/parsers/graphite/parser.go similarity index 88% rename from internal/encoding/graphite/parser.go rename to plugins/parsers/graphite/parser.go index f43c76b87..74ccd81cb 100644 --- a/internal/encoding/graphite/parser.go +++ b/plugins/parsers/graphite/parser.go @@ -1,6 +1,7 @@ package graphite import ( + "bufio" "bytes" "fmt" "io" @@ -10,11 +11,7 @@ import ( "strings" "time" - "bufio" - - "github.com/influxdata/influxdb/models" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/encoding" ) // Minimum and maximum supported dates for timestamps. @@ -23,35 +20,40 @@ var ( MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC) ) -// Options are configurable values that can be provided to a Parser -type Options struct { - Separator string - Templates []string -} - // Parser encapsulates a Graphite Parser. type GraphiteParser struct { + Separator string + Templates []string + DefaultTags map[string]string + matcher *matcher } -func NewParser() *GraphiteParser { - return &GraphiteParser{} -} - -func (p *GraphiteParser) InitConfig(configs map[string]interface{}) error { - +func NewGraphiteParser( + separator string, + templates []string, + defaultTags map[string]string, +) (*GraphiteParser, error) { var err error - options := Options{ - Templates: configs["Templates"].([]string), - Separator: configs["Separator"].(string)} + + if separator == "" { + separator = DefaultSeparator + } + p := &GraphiteParser{ + Separator: separator, + Templates: templates, + } + + if defaultTags != nil { + p.DefaultTags = defaultTags + } matcher := newMatcher() p.matcher = matcher - defaultTemplate, _ := NewTemplate("measurement*", nil, DefaultSeparator) + defaultTemplate, _ := NewTemplate("measurement*", nil, p.Separator) matcher.AddDefaultTemplate(defaultTemplate) - for _, pattern := range options.Templates { - + for _, pattern := range p.Templates { template := pattern filter := "" // Format is [filter]