diff --git a/CHANGELOG.md b/CHANGELOG.md index baabc1b53..ee69938be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,21 @@ ## v0.10.3 [unreleased] ### Release Notes +- Users of the `exec` and `kafka_consumer` can now specify the incoming data +format that they would like to parse. Currently supports: "json", "influx", and +"graphite" +- More info on parsing arbitrary data formats can be found +[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md) ### Features +- [#652](https://github.com/influxdata/telegraf/pull/652): CouchDB Input Plugin +- [#655](https://github.com/influxdata/telegraf/pull/655): Support parsing arbitrary data formats. Currently limited to kafka_consumer and exec inputs. +- [#671](https://github.com/influxdata/telegraf/pull/671): Dovecot input plugin. ### Bugfixes +- [#443](https://github.com/influxdata/telegraf/issues/443): Fix Ping command timeout parameter on Linux. +- [#662](https://github.com/influxdata/telegraf/pull/667): Change `[tags]` to `[global_tags]` to fix multiple-plugin tags bug. +- [#642](https://github.com/influxdata/telegraf/issues/642): Riemann output plugin issues. ## v0.10.2 [2016-02-04] diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 421fe4f72..f4214b5d4 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -9,9 +9,9 @@ To generate a file with specific inputs and outputs, you can use the -input-filter and -output-filter flags: `telegraf -sample-config -input-filter cpu:mem:net:swap -output-filter influxdb:kafka` -## `[tags]` Configuration +## `[global_tags]` Configuration -Global tags can be specific in the `[tags]` section of the config file in +Global tags can be specific in the `[global_tags]` section of the config file in key="value" format. All metrics being gathered on this host will be tagged with the tags specified here. @@ -76,7 +76,7 @@ measurements at a 10s interval and will collect per-cpu data, dropping any fields which begin with `time_`. ```toml -[tags] +[global_tags] dc = "denver-1" [agent] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fa1645725..6876cfa7b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -129,6 +129,52 @@ func init() { } ``` +## Input Plugins Accepting Arbitrary Data Formats + +Some input plugins (such as +[exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec)) +accept arbitrary input data formats. An overview of these data formats can +be found +[here](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). + +In order to enable this, you must specify a `SetParser(parser parsers.Parser)` +function on the plugin object (see the exec plugin for an example), as well as +defining `parser` as a field of the object. + +You can then utilize the parser internally in your plugin, parsing data as you +see fit. Telegraf's configuration layer will take care of instantiating and +creating the `Parser` object. + +You should also add the following to your SampleConfig() return: + +```toml + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +Below is the `Parser` interface. + +```go +// Parser is an interface defining functions that a parser plugin must satisfy. +type Parser interface { + // Parse takes a byte buffer separated by newlines + // ie, `cpu.usage.idle 90\ncpu.usage.busy 10` + // and parses it into telegraf metrics + Parse(buf []byte) ([]telegraf.Metric, error) + + // ParseLine takes a single string metric + // ie, "cpu.usage.idle 90" + // and parses it into a telegraf metric. + ParseLine(line string) (telegraf.Metric, error) +} +``` + +And you can view the code +[here.](https://github.com/influxdata/telegraf/blob/henrypfhu-master/plugins/parsers/registry.go) + ## Service Input Plugins This section is for developers who want to create new "service" collection diff --git a/DATA_FORMATS_INPUT.md b/DATA_FORMATS_INPUT.md new file mode 100644 index 000000000..16870adc7 --- /dev/null +++ b/DATA_FORMATS_INPUT.md @@ -0,0 +1,274 @@ +# Telegraf Input Data Formats + +Telegraf metrics, like InfluxDB +[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), +are a combination of four basic parts: + +1. Measurement Name +1. Tags +1. Fields +1. Timestamp + +These four parts are easily defined when using InfluxDB line-protocol as a +data format. But there are other data formats that users may want to use which +require more advanced configuration to create usable Telegraf metrics. + +Plugins such as `exec` and `kafka_consumer` parse textual data. Up until now, +these plugins were statically configured to parse just a single +data format. `exec` mostly only supported parsing JSON, and `kafka_consumer` only +supported data in InfluxDB line-protocol. + +But now we are normalizing the parsing of various data formats across all +plugins that can support it. You will be able to identify a plugin that supports +different data formats by the presence of a `data_format` config option, for +example, in the exec plugin: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "json" + + ### Additional configuration options go here +``` + +Each data_format has an additional set of configuration options available, which +I'll go over below. + +## Influx: + +There are no additional configuration options for InfluxDB line-protocol. The +metrics are parsed directly into Telegraf metrics. + +#### Influx Configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +## JSON: + +The JSON data format flattens JSON into metric _fields_. For example, this JSON: + +```json +{ + "a": 5, + "b": { + "c": 6 + } +} +``` + +Would get translated into _fields_ of a measurement: + +``` +myjsonmetric a=5,b_c=6 +``` + +The _measurement_ _name_ is usually the name of the plugin, +but can be overridden using the `name_override` config option. + +#### JSON Configuration: + +The JSON data format supports specifying "tag keys". If specified, keys +will be searched for in the root-level of the JSON blob. If the key(s) exist, +they will be applied as tags to the Telegraf metrics. + +For example, if you had this configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "json" + + ### List of tag names to extract from top-level of JSON server response + tag_keys = [ + "my_tag_1", + "my_tag_2" + ] +``` + +with this JSON output from a command: + +```json +{ + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo" +} +``` + +Your Telegraf metrics would get tagged with "my_tag_1" + +``` +exec_mycollector,my_tag_1=foo a=5,b_c=6 +``` + +## Graphite: + +The Graphite data format translates graphite _dot_ buckets directly into +telegraf measurement names, with a single value field, and without any tags. For +more advanced options, Telegraf supports specifying "templates" to translate +graphite buckets into Telegraf metrics. + +#### Separator: + +You can specify a separator to use for the parsed metrics. +By default, it will leave the metrics with a "." separator. +Setting `separator = "_"` will translate: + +``` +cpu.usage.idle 99 +=> cpu_usage_idle value=99 +``` + +#### Measurement/Tag Templates: + +The most basic template is to specify a single transformation to apply to all +incoming metrics. _measurement_ is a special keyword that tells Telegraf which +parts of the graphite bucket to combine into the measurement name. It can have a +trailing `*` to indicate that the remainder of the metric should be used. +Other words are considered tag keys. So the following template: + +```toml +templates = [ + "region.measurement*" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +us-west.cpu.load 100 +=> cpu.load,region=us-west value=100 +``` + +#### Field Templates: + +There is also a _field_ keyword, which can only be specified once. +The field keyword tells Telegraf to give the metric that field name. +So the following template: + +```toml +templates = [ + "measurement.measurement.field.region" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +cpu.usage.idle.us-west 100 +=> cpu_usage,region=us-west idle=100 +``` + +#### Filter Templates: + +Users can also filter the template(s) to use based on the name of the bucket, +using glob matching, like so: + +```toml +templates = [ + "cpu.* measurement.measurement.region", + "mem.* measurement.measurement.host" +] +``` + +which would result in the following transformation: + +``` +cpu.load.us-west 100 +=> cpu_load,region=us-west value=100 + +mem.cached.localhost 256 +=> mem_cached,host=localhost value=256 +``` + +#### Adding Tags: + +Additional tags can be added to a metric that don't exist on the received metric. +You can add additional tags by specifying them after the pattern. +Tags have the same format as the line protocol. +Multiple tags are separated by commas. + +```toml +templates = [ + "measurement.measurement.field.region datacenter=1a" +] +``` + +would result in the following Graphite -> Telegraf transformation. + +``` +cpu.usage.idle.us-west 100 +=> cpu_usage,region=us-west,datacenter=1a idle=100 +``` + +There are many more options available, +[More details can be found here](https://github.com/influxdata/influxdb/tree/master/services/graphite#templates) + +#### Graphite Configuration: + +```toml +[[inputs.exec]] + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ### measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "graphite" + + ### This string will be used to join the matched values. + separator = "_" + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. There can be only one default template. + ### Templates support below format: + ### 1. filter + template + ### 2. filter + template + extra tag + ### 3. filter + template with field key + ### 4. default template + templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" + ] +``` diff --git a/Godeps b/Godeps index 3393a1cee..5cdfecbe7 100644 --- a/Godeps +++ b/Godeps @@ -2,10 +2,8 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9ad github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d @@ -14,19 +12,15 @@ github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 -github.com/influxdb/influxdb 697f48b4e62e514e701ffec39978b864a3c666e6 +github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67 +github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f @@ -42,7 +36,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common 14ca1097bbe21584194c15e391a9dab95ad42a59 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f -github.com/shirou/gopsutil 85bf0974ed06e4e668595ae2b4de02e772a2819b +github.com/shirou/gopsutil e77438504d45b9985c99a75730fe65220ceea00e github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 diff --git a/Godeps_windows b/Godeps_windows index 8f147ed87..034fb4fec 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -1,34 +1,28 @@ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034 -github.com/Shopify/sarama b1da1753dedcf77d053613b7eae907b98a2ddad5 +github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 -github.com/armon/go-metrics 345426c77237ece5dab0e1605c3e4b35c3f54757 -github.com/aws/aws-sdk-go 2a34ea8812f32aae75b43400f9424a0559840659 +github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804 github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d -github.com/boltdb/bolt ee4a0888a9abe7eefe5a0992ca4cb06864839873 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 -github.com/fsouza/go-dockerclient 02a8beb401b20e112cff3ea740545960b667eab1 +github.com/fsouza/go-dockerclient 7b651349f9479f5114913eefbfd3c4eeddd79ab4 github.com/go-ini/ini afbd495e5aaea13597b5e14fe514ddeaa4d76fc3 github.com/go-ole/go-ole 50055884d646dd9434f16bbb5c9801749b9bafe4 github.com/go-sql-driver/mysql 7c7f556282622f94213bc028b4d0a7b6151ba239 -github.com/gogo/protobuf e8904f58e872a473a5b91bc9bf3377d223555263 -github.com/golang/protobuf 45bba206dd5270d96bac4942dcfe515726613249 -github.com/golang/snappy 1963d058044b19e16595f80d5050fa54e2070438 +github.com/golang/protobuf 6aaa8d47701fa6cf07e914ec01fde3d4a1fe79c3 +github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a github.com/gonuts/go-shellquote e842a11b24c6abfb3dd27af69a17f482e4b483c2 github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 -github.com/hashicorp/go-msgpack fa3f63826f7c23912c15263591e65d54d080b458 -github.com/hashicorp/raft 057b893fd996696719e98b6c44649ea14968c811 -github.com/hashicorp/raft-boltdb d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c -github.com/influxdb/influxdb 60df13fb566d07ff2cdd07aa23a4796a02b0df3c +github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67 +github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f @@ -45,7 +39,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common 14ca1097bbe21584194c15e391a9dab95ad42a59 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f -github.com/shirou/gopsutil 9d8191d6a6e17dcf43b10a20084a11e8c1aa92e6 +github.com/shirou/gopsutil e77438504d45b9985c99a75730fe65220ceea00e github.com/shirou/w32 ada3ba68f000aa1b58580e45c9d308fe0b7fc5c5 github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 @@ -54,9 +48,8 @@ github.com/stretchr/testify f390dcf405f7b83c997eac1b06768bb9f44dec18 github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 -golang.org/x/crypto 1f22c0103821b9390939b6776727195525381532 golang.org/x/net 04b9de9b512f58addf28c9853d50ebef61c3953e -golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4 +golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0 gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 diff --git a/README.md b/README.md index 160a8fa62..6109e0841 100644 --- a/README.md +++ b/README.md @@ -155,10 +155,12 @@ Currently implemented sources: * aerospike * apache * bcache +* couchdb * disque * docker +* dovecot * elasticsearch -* exec (generic JSON-emitting executable plugin) +* exec (generic executable plugin, support JSON, influx and graphite) * haproxy * httpjson (generic JSON-emitting http service plugin) * influxdb diff --git a/build.py b/build.py index 8efc1e22d..53c3e84e0 100755 --- a/build.py +++ b/build.py @@ -412,7 +412,7 @@ def generate_md5_from_file(path): m.update(data) return m.hexdigest() -def build_packages(build_output, version, nightly=False, rc=None, iteration=1): +def build_packages(build_output, version, pkg_arch, nightly=False, rc=None, iteration=1): outfiles = [] tmp_build_dir = create_temp_dir() if debug: @@ -461,6 +461,9 @@ def build_packages(build_output, version, nightly=False, rc=None, iteration=1): current_location = os.path.join(current_location, name + '.tar.gz') if rc is not None: package_iteration = "0.rc{}".format(rc) + saved_a = a + if pkg_arch is not None: + a = pkg_arch if a == '386': a = 'i386' fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( @@ -472,6 +475,8 @@ def build_packages(build_output, version, nightly=False, rc=None, iteration=1): package_iteration, build_root, current_location) + if pkg_arch is not None: + a = saved_a if package_type == "rpm": fpm_command += "--depends coreutils " fpm_command += "--depends lsof" @@ -506,6 +511,7 @@ def print_usage(): print("\t --goarm= \n\t\t- Build for specified ARM version (when building for ARM). Default value is: 6") print("\t --platform= \n\t\t- Build for specified platform. Acceptable values: linux, windows, darwin, or all") print("\t --version= \n\t\t- Version information to apply to build metadata. If not specified, will be pulled from repo tag.") + print("\t --pkgarch= \n\t\t- Package architecture if different from ") print("\t --commit= \n\t\t- Use specific commit for build (currently a NOOP).") print("\t --branch= \n\t\t- Build from a specific branch (currently a NOOP).") print("\t --rc= \n\t\t- Whether or not the build is a release candidate (affects version information).") @@ -532,6 +538,7 @@ def main(): commit = None target_platform = None target_arch = None + package_arch = None nightly = False race = False branch = None @@ -570,6 +577,9 @@ def main(): elif '--version' in arg: # Version to assign to this build (0.9.5, etc) version = arg.split("=")[1] + elif '--pkgarch' in arg: + # Package architecture if different from (armhf, etc) + package_arch = arg.split("=")[1] elif '--rc' in arg: # Signifies that this is a release candidate build. rc = arg.split("=")[1] @@ -703,7 +713,7 @@ def main(): if not check_path_for("fpm"): print("!! Cannot package without command 'fpm'. Stopping.") return 1 - packages = build_packages(build_output, version, nightly=nightly, rc=rc, iteration=iteration) + packages = build_packages(build_output, version, package_arch, nightly=nightly, rc=rc, iteration=iteration) # Optionally upload to S3 if upload: upload_packages(packages, bucket_name=upload_bucket, nightly=nightly) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 78687d286..a65c5607c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/internal/config" + _ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all" ) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index b5b028559..b62e50263 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -10,7 +10,7 @@ # file would generate. # Global tags can be specified here in key="value" format. -[tags] +[global_tags] # dc = "us-east-1" # will tag all metrics with dc=us-east-1 # rack = "1a" diff --git a/internal/config/config.go b/internal/config/config.go index 9b35cd407..766ba1189 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/config" "github.com/naoina/toml/ast" @@ -126,7 +127,9 @@ func (c *Config) ListTags() string { return strings.Join(tags, " ") } -var header = `# Telegraf configuration +var header = `############################################################################### +# Telegraf Configuration # +############################################################################### # Telegraf is entirely plugin driven. All metrics are gathered from the # declared inputs, and sent to the declared outputs. @@ -138,41 +141,41 @@ var header = `# Telegraf configuration # file would generate. # Global tags can be specified here in key="value" format. -[tags] +[global_tags] # dc = "us-east-1" # will tag all metrics with dc=us-east-1 # rack = "1a" # Configuration for telegraf agent [agent] - # Default data collection interval for all inputs + ### Default data collection interval for all inputs interval = "10s" - # Rounds collection interval to 'interval' - # ie, if interval="10s" then always collect on :00, :10, :20, etc. + ### Rounds collection interval to 'interval' + ### ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true - # Telegraf will cache metric_buffer_limit metrics for each output, and will - # flush this buffer on a successful write. + ### Telegraf will cache metric_buffer_limit metrics for each output, and will + ### flush this buffer on a successful write. metric_buffer_limit = 10000 - # Collection jitter is used to jitter the collection by a random amount. - # Each plugin will sleep for a random time within jitter before collecting. - # This can be used to avoid many plugins querying things like sysfs at the - # same time, which can have a measurable effect on the system. + ### Collection jitter is used to jitter the collection by a random amount. + ### Each plugin will sleep for a random time within jitter before collecting. + ### This can be used to avoid many plugins querying things like sysfs at the + ### same time, which can have a measurable effect on the system. collection_jitter = "0s" - # Default data flushing interval for all outputs. You should not set this below - # interval. Maximum flush_interval will be flush_interval + flush_jitter + ### Default flushing interval for all outputs. You shouldn't set this below + ### interval. Maximum flush_interval will be flush_interval + flush_jitter flush_interval = "10s" - # Jitter the flush interval by a random amount. This is primarily to avoid - # large write spikes for users running a large number of telegraf instances. - # ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + ### Jitter the flush interval by a random amount. This is primarily to avoid + ### large write spikes for users running a large number of telegraf instances. + ### ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s flush_jitter = "0s" - # Run telegraf in debug mode + ### Run telegraf in debug mode debug = false - # Run telegraf in quiet mode + ### Run telegraf in quiet mode quiet = false - # Override default hostname, if empty use os.Hostname() + ### Override default hostname, if empty use os.Hostname() hostname = "" @@ -333,9 +336,9 @@ func (c *Config) LoadConfig(path string) error { log.Printf("Could not parse [agent] config\n") return err } - case "tags": + case "global_tags", "tags": if err = config.UnmarshalTable(subTable, c.Tags); err != nil { - log.Printf("Could not parse [tags] config\n") + log.Printf("Could not parse [global_tags] config\n") return err } case "outputs": @@ -428,6 +431,17 @@ func (c *Config) addInput(name string, table *ast.Table) error { } input := creator() + // If the input has a SetParser function, then this means it can accept + // arbitrary types of input, so build the parser and set it. + switch t := input.(type) { + case parsers.ParserInput: + parser, err := buildParser(name, table) + if err != nil { + return err + } + t.SetParser(parser) + } + pluginConfig, err := buildInput(name, table) if err != nil { return err @@ -583,6 +597,69 @@ func buildInput(name string, tbl *ast.Table) (*internal_models.InputConfig, erro return cp, nil } +// buildParser grabs the necessary entries from the ast.Table for creating +// a parsers.Parser object, and creates it, which can then be added onto +// an Input object. +func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { + c := &parsers.Config{} + + if node, ok := tbl.Fields["data_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.DataFormat = str.Value + } + } + } + + // Legacy support, exec plugin originally parsed JSON by default. + if name == "exec" && c.DataFormat == "" { + c.DataFormat = "json" + } else if c.DataFormat == "" { + c.DataFormat = "influx" + } + + if node, ok := tbl.Fields["separator"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.Separator = str.Value + } + } + } + + if node, ok := tbl.Fields["templates"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.Templates = append(c.Templates, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["tag_keys"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.TagKeys = append(c.TagKeys, str.Value) + } + } + } + } + } + + c.MetricName = name + + delete(tbl.Fields, "data_format") + delete(tbl.Fields, "separator") + delete(tbl.Fields, "templates") + delete(tbl.Fields, "tag_keys") + + return parsers.NewParser(c) +} + // buildOutput parses output specific items from the ast.Table, builds the filter and returns an // internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 261057875..0e9f2c967 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/memcached" "github.com/influxdata/telegraf/plugins/inputs/procstat" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/stretchr/testify/assert" ) @@ -91,6 +92,9 @@ func TestConfig_LoadDirectory(t *testing.T) { "Testdata did not produce correct memcached metadata.") ex := inputs.Inputs["exec"]().(*exec.Exec) + p, err := parsers.NewJSONParser("exec", nil, nil) + assert.NoError(t, err) + ex.SetParser(p) ex.Command = "/usr/bin/myothercollector --foo=bar" eConfig := &internal_models.InputConfig{ Name: "exec", diff --git a/internal/config/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml index 5ede47016..b2ffa0cf0 100644 --- a/internal/config/testdata/telegraf-agent.toml +++ b/internal/config/testdata/telegraf-agent.toml @@ -20,7 +20,7 @@ # with 'required'. Be sure to edit those to make this configuration work. # Tags can also be specified via a normal map, but only one form at a time: -[tags] +[global_tags] dc = "us-east-1" # Configuration for telegraf agent diff --git a/internal/internal.go b/internal/internal.go index 27c9d664f..82758e5e8 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -9,7 +9,6 @@ import ( "fmt" "io/ioutil" "os" - "strconv" "strings" "time" ) @@ -35,47 +34,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error { var NotImplementedError = errors.New("not implemented yet") -type JSONFlattener struct { - Fields map[string]interface{} -} - -// FlattenJSON flattens nested maps/interfaces into a fields map -func (f *JSONFlattener) FlattenJSON( - fieldname string, - v interface{}, -) error { - if f.Fields == nil { - f.Fields = make(map[string]interface{}) - } - fieldname = strings.Trim(fieldname, "_") - switch t := v.(type) { - case map[string]interface{}: - for k, v := range t { - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return err - } - } - case []interface{}: - for i, v := range t { - k := strconv.Itoa(i) - err := f.FlattenJSON(fieldname+"_"+k+"_", v) - if err != nil { - return nil - } - } - case float64: - f.Fields[fieldname] = t - case bool, string, nil: - // ignored types - return nil - default: - return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)", - t, t, fieldname) - } - return nil -} - // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { diff --git a/metric.go b/metric.go index 99ee30369..574565c22 100644 --- a/metric.go +++ b/metric.go @@ -1,11 +1,9 @@ package telegraf import ( - "bytes" "time" "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/models" ) type Metric interface { @@ -63,25 +61,6 @@ func NewMetric( }, nil } -// ParseMetrics returns a slice of Metrics from a text representation of a -// metric (in line-protocol format) -// with each metric separated by newlines. If any metrics fail to parse, -// a non-nil error will be returned in addition to the metrics that parsed -// successfully. -func ParseMetrics(buf []byte) ([]Metric, error) { - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - points, err := models.ParsePoints(buf) - metrics := make([]Metric, len(points)) - for i, point := range points { - // Ignore error here because it's impossible that a model.Point - // wouldn't parse into client.Point properly - metrics[i], _ = NewMetric(point.Name(), point.Tags(), - point.Fields(), point.Time()) - } - return metrics, err -} - func (m *metric) Name() string { return m.pt.Name() } diff --git a/metric_test.go b/metric_test.go index acf6dee99..1177ab494 100644 --- a/metric_test.go +++ b/metric_test.go @@ -9,58 +9,6 @@ import ( "github.com/stretchr/testify/assert" ) -const validMs = ` -cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 1454105876344540456 -` - -const invalidMs = ` -cpu, cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo usage_idle -cpu,host usage_idle=99 -cpu,host=foo usage_idle=99 very bad metric -` - -const validInvalidMs = ` -cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=51,usage_busy=49 -cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=60,usage_busy=40 -cpu,host usage_idle=99 -` - -func TestParseValidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validMs)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - m := metrics[0] - - tags := map[string]string{ - "host": "foo", - "datacenter": "us-east", - "cpu": "cpu0", - } - fields := map[string]interface{}{ - "usage_idle": float64(99), - "usage_busy": float64(1), - } - - assert.Equal(t, tags, m.Tags()) - assert.Equal(t, fields, m.Fields()) - assert.Equal(t, "cpu", m.Name()) - assert.Equal(t, int64(1454105876344540456), m.UnixNano()) -} - -func TestParseInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(invalidMs)) - assert.Error(t, err) - assert.Len(t, metrics, 0) -} - -func TestParseValidAndInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validInvalidMs)) - assert.Error(t, err) - assert.Len(t, metrics, 3) -} - func TestNewMetric(t *testing.T) { now := time.Now() diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 00a396451..e46960101 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -104,11 +104,9 @@ type Aerospike struct { } var sampleConfig = ` - # Aerospike servers to connect to (with port) - # Default: servers = ["localhost:3000"] - # - # This plugin will query all namespaces the aerospike - # server has configured and get stats for them. + ### Aerospike servers to connect to (with port) + ### This plugin will query all namespaces the aerospike + ### server has configured and get stats for them. servers = ["localhost:3000"] ` diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index c0d67025b..e7329b042 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/docker" _ "github.com/influxdata/telegraf/plugins/inputs/dovecot" diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index 1256afd3f..faedf7f7d 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -20,7 +20,7 @@ type Apache struct { } var sampleConfig = ` - # An array of Apache status URI to gather stats. + ### An array of Apache status URI to gather stats. urls = ["http://localhost/server-status?auto"] ` diff --git a/plugins/inputs/bcache/bcache.go b/plugins/inputs/bcache/bcache.go index 9e0e19665..25005fce3 100644 --- a/plugins/inputs/bcache/bcache.go +++ b/plugins/inputs/bcache/bcache.go @@ -18,14 +18,14 @@ type Bcache struct { } var sampleConfig = ` - # Bcache sets path - # If not specified, then default is: - # bcachePath = "/sys/fs/bcache" - # - # By default, telegraf gather stats for all bcache devices - # Setting devices will restrict the stats to the specified - # bcache devices. - # bcacheDevs = ["bcache0", ...] + ### Bcache sets path + ### If not specified, then default is: + bcachePath = "/sys/fs/bcache" + + ### By default, telegraf gather stats for all bcache devices + ### Setting devices will restrict the stats to the specified + ### bcache devices. + bcacheDevs = ["bcache0"] ` func (b *Bcache) SampleConfig() string { diff --git a/plugins/inputs/couchdb/README.md b/plugins/inputs/couchdb/README.md new file mode 100644 index 000000000..313b57011 --- /dev/null +++ b/plugins/inputs/couchdb/README.md @@ -0,0 +1,255 @@ +# CouchDB Input Plugin +--- + +The CouchDB plugin gathers metrics of CouchDB using [_stats](http://docs.couchdb.org/en/1.6.1/api/server/common.html?highlight=stats#get--_stats) endpoint. + +### Configuration: + +``` +# Sample Config: +[[inputs.couchdb]] + hosts = ["http://localhost:5984/_stats"] +``` + +### Measurements & Fields: + +Statistics specific to the internals of CouchDB: + +- couchdb_auth_cache_misses +- couchdb_database_writes +- couchdb_open_databases +- couchdb_auth_cache_hits +- couchdb_request_time +- couchdb_database_reads +- couchdb_open_os_files + +Statistics of HTTP requests by method: + +- httpd_request_methods_put +- httpd_request_methods_get +- httpd_request_methods_copy +- httpd_request_methods_delete +- httpd_request_methods_post +- httpd_request_methods_head + +Statistics of HTTP requests by response code: + +- httpd_status_codes_200 +- httpd_status_codes_201 +- httpd_status_codes_202 +- httpd_status_codes_301 +- httpd_status_codes_304 +- httpd_status_codes_400 +- httpd_status_codes_401 +- httpd_status_codes_403 +- httpd_status_codes_404 +- httpd_status_codes_405 +- httpd_status_codes_409 +- httpd_status_codes_412 +- httpd_status_codes_500 + +httpd statistics: + +- httpd_clients_requesting_changes +- httpd_temporary_view_reads +- httpd_requests +- httpd_bulk_requests +- httpd_view_reads + +### Tags: + +- server (url of the couchdb _stats endpoint) + +### Example output: + +``` +➜ telegraf git:(master) ✗ ./telegraf -config ./config.conf -input-filter couchdb -test +* Plugin: couchdb, + Collection 1 +> couchdb,server=http://localhost:5984/_stats couchdb_auth_cache_hits_current=0, +couchdb_auth_cache_hits_max=0, +couchdb_auth_cache_hits_mean=0, +couchdb_auth_cache_hits_min=0, +couchdb_auth_cache_hits_stddev=0, +couchdb_auth_cache_hits_sum=0, +couchdb_auth_cache_misses_current=0, +couchdb_auth_cache_misses_max=0, +couchdb_auth_cache_misses_mean=0, +couchdb_auth_cache_misses_min=0, +couchdb_auth_cache_misses_stddev=0, +couchdb_auth_cache_misses_sum=0, +couchdb_database_reads_current=0, +couchdb_database_reads_max=0, +couchdb_database_reads_mean=0, +couchdb_database_reads_min=0, +couchdb_database_reads_stddev=0, +couchdb_database_reads_sum=0, +couchdb_database_writes_current=1102, +couchdb_database_writes_max=131, +couchdb_database_writes_mean=0.116, +couchdb_database_writes_min=0, +couchdb_database_writes_stddev=3.536, +couchdb_database_writes_sum=1102, +couchdb_open_databases_current=1, +couchdb_open_databases_max=1, +couchdb_open_databases_mean=0, +couchdb_open_databases_min=0, +couchdb_open_databases_stddev=0.01, +couchdb_open_databases_sum=1, +couchdb_open_os_files_current=2, +couchdb_open_os_files_max=2, +couchdb_open_os_files_mean=0, +couchdb_open_os_files_min=0, +couchdb_open_os_files_stddev=0.02, +couchdb_open_os_files_sum=2, +couchdb_request_time_current=242.21, +couchdb_request_time_max=102, +couchdb_request_time_mean=5.767, +couchdb_request_time_min=1, +couchdb_request_time_stddev=17.369, +couchdb_request_time_sum=242.21, +httpd_bulk_requests_current=0, +httpd_bulk_requests_max=0, +httpd_bulk_requests_mean=0, +httpd_bulk_requests_min=0, +httpd_bulk_requests_stddev=0, +httpd_bulk_requests_sum=0, +httpd_clients_requesting_changes_current=0, +httpd_clients_requesting_changes_max=0, +httpd_clients_requesting_changes_mean=0, +httpd_clients_requesting_changes_min=0, +httpd_clients_requesting_changes_stddev=0, +httpd_clients_requesting_changes_sum=0, +httpd_request_methods_copy_current=0, +httpd_request_methods_copy_max=0, +httpd_request_methods_copy_mean=0, +httpd_request_methods_copy_min=0, +httpd_request_methods_copy_stddev=0, +httpd_request_methods_copy_sum=0, +httpd_request_methods_delete_current=0, +httpd_request_methods_delete_max=0, +httpd_request_methods_delete_mean=0, +httpd_request_methods_delete_min=0, +httpd_request_methods_delete_stddev=0, +httpd_request_methods_delete_sum=0, +httpd_request_methods_get_current=31, +httpd_request_methods_get_max=1, +httpd_request_methods_get_mean=0.003, +httpd_request_methods_get_min=0, +httpd_request_methods_get_stddev=0.057, +httpd_request_methods_get_sum=31, +httpd_request_methods_head_current=0, +httpd_request_methods_head_max=0, +httpd_request_methods_head_mean=0, +httpd_request_methods_head_min=0, +httpd_request_methods_head_stddev=0, +httpd_request_methods_head_sum=0, +httpd_request_methods_post_current=1102, +httpd_request_methods_post_max=131, +httpd_request_methods_post_mean=0.116, +httpd_request_methods_post_min=0, +httpd_request_methods_post_stddev=3.536, +httpd_request_methods_post_sum=1102, +httpd_request_methods_put_current=1, +httpd_request_methods_put_max=1, +httpd_request_methods_put_mean=0, +httpd_request_methods_put_min=0, +httpd_request_methods_put_stddev=0.01, +httpd_request_methods_put_sum=1, +httpd_requests_current=1133, +httpd_requests_max=130, +httpd_requests_mean=0.118, +httpd_requests_min=0, +httpd_requests_stddev=3.512, +httpd_requests_sum=1133, +httpd_status_codes_200_current=31, +httpd_status_codes_200_max=1, +httpd_status_codes_200_mean=0.003, +httpd_status_codes_200_min=0, +httpd_status_codes_200_stddev=0.057, +httpd_status_codes_200_sum=31, +httpd_status_codes_201_current=1103, +httpd_status_codes_201_max=130, +httpd_status_codes_201_mean=0.116, +httpd_status_codes_201_min=0, +httpd_status_codes_201_stddev=3.532, +httpd_status_codes_201_sum=1103, +httpd_status_codes_202_current=0, +httpd_status_codes_202_max=0, +httpd_status_codes_202_mean=0, +httpd_status_codes_202_min=0, +httpd_status_codes_202_stddev=0, +httpd_status_codes_202_sum=0, +httpd_status_codes_301_current=0, +httpd_status_codes_301_max=0, +httpd_status_codes_301_mean=0, +httpd_status_codes_301_min=0, +httpd_status_codes_301_stddev=0, +httpd_status_codes_301_sum=0, +httpd_status_codes_304_current=0, +httpd_status_codes_304_max=0, +httpd_status_codes_304_mean=0, +httpd_status_codes_304_min=0, +httpd_status_codes_304_stddev=0, +httpd_status_codes_304_sum=0, +httpd_status_codes_400_current=0, +httpd_status_codes_400_max=0, +httpd_status_codes_400_mean=0, +httpd_status_codes_400_min=0, +httpd_status_codes_400_stddev=0, +httpd_status_codes_400_sum=0, +httpd_status_codes_401_current=0, +httpd_status_codes_401_max=0, +httpd_status_codes_401_mean=0, +httpd_status_codes_401_min=0, +httpd_status_codes_401_stddev=0, +httpd_status_codes_401_sum=0, +httpd_status_codes_403_current=0, +httpd_status_codes_403_max=0, +httpd_status_codes_403_mean=0, +httpd_status_codes_403_min=0, +httpd_status_codes_403_stddev=0, +httpd_status_codes_403_sum=0, +httpd_status_codes_404_current=0, +httpd_status_codes_404_max=0, +httpd_status_codes_404_mean=0, +httpd_status_codes_404_min=0, +httpd_status_codes_404_stddev=0, +httpd_status_codes_404_sum=0, +httpd_status_codes_405_current=0, +httpd_status_codes_405_max=0, +httpd_status_codes_405_mean=0, +httpd_status_codes_405_min=0, +httpd_status_codes_405_stddev=0, +httpd_status_codes_405_sum=0, +httpd_status_codes_409_current=0, +httpd_status_codes_409_max=0, +httpd_status_codes_409_mean=0, +httpd_status_codes_409_min=0, +httpd_status_codes_409_stddev=0, +httpd_status_codes_409_sum=0, +httpd_status_codes_412_current=0, +httpd_status_codes_412_max=0, +httpd_status_codes_412_mean=0, +httpd_status_codes_412_min=0, +httpd_status_codes_412_stddev=0, +httpd_status_codes_412_sum=0, +httpd_status_codes_500_current=0, +httpd_status_codes_500_max=0, +httpd_status_codes_500_mean=0, +httpd_status_codes_500_min=0, +httpd_status_codes_500_stddev=0, +httpd_status_codes_500_sum=0, +httpd_temporary_view_reads_current=0, +httpd_temporary_view_reads_max=0, +httpd_temporary_view_reads_mean=0, +httpd_temporary_view_reads_min=0, +httpd_temporary_view_reads_stddev=0, +httpd_temporary_view_reads_sum=0, +httpd_view_reads_current=0, +httpd_view_reads_max=0, +httpd_view_reads_mean=0, +httpd_view_reads_min=0, +httpd_view_reads_stddev=0, +httpd_view_reads_sum=0 1454692257621938169 +``` diff --git a/plugins/inputs/couchdb/couchdb.go b/plugins/inputs/couchdb/couchdb.go new file mode 100644 index 000000000..7cec65777 --- /dev/null +++ b/plugins/inputs/couchdb/couchdb.go @@ -0,0 +1,205 @@ +package couchdb + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "net/http" + "reflect" + "strings" + "sync" +) + +// Schema: +type metaData struct { + Description string `json:"description"` + Current float64 `json:"current"` + Sum float64 `json:"sum"` + Mean float64 `json:"mean"` + Stddev float64 `json:"stddev"` + Min float64 `json:"min"` + Max float64 `json:"max"` +} + +type Stats struct { + Couchdb struct { + AuthCacheMisses metaData `json:"auth_cache_misses"` + DatabaseWrites metaData `json:"database_writes"` + OpenDatabases metaData `json:"open_databases"` + AuthCacheHits metaData `json:"auth_cache_hits"` + RequestTime metaData `json:"request_time"` + DatabaseReads metaData `json:"database_reads"` + OpenOsFiles metaData `json:"open_os_files"` + } `json:"couchdb"` + HttpdRequestMethods struct { + Put metaData `json:"PUT"` + Get metaData `json:"GET"` + Copy metaData `json:"COPY"` + Delete metaData `json:"DELETE"` + Post metaData `json:"POST"` + Head metaData `json:"HEAD"` + } `json:"httpd_request_methods"` + HttpdStatusCodes struct { + Status200 metaData `json:"200"` + Status201 metaData `json:"201"` + Status202 metaData `json:"202"` + Status301 metaData `json:"301"` + Status304 metaData `json:"304"` + Status400 metaData `json:"400"` + Status401 metaData `json:"401"` + Status403 metaData `json:"403"` + Status404 metaData `json:"404"` + Status405 metaData `json:"405"` + Status409 metaData `json:"409"` + Status412 metaData `json:"412"` + Status500 metaData `json:"500"` + } `json:"httpd_status_codes"` + Httpd struct { + ClientsRequestingChanges metaData `json:"clients_requesting_changes"` + TemporaryViewReads metaData `json:"temporary_view_reads"` + Requests metaData `json:"requests"` + BulkRequests metaData `json:"bulk_requests"` + ViewReads metaData `json:"view_reads"` + } `json:"httpd"` +} + +type CouchDB struct { + HOSTs []string `toml:"hosts"` +} + +func (*CouchDB) Description() string { + return "Read CouchDB Stats from one or more servers" +} + +func (*CouchDB) SampleConfig() string { + return ` + ### Works with CouchDB stats endpoints out of the box + ### Multiple HOSTs from which to read CouchDB stats: + hosts = ["http://localhost:8086/_stats"] +` +} + +func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error { + errorChannel := make(chan error, len(c.HOSTs)) + var wg sync.WaitGroup + for _, u := range c.HOSTs { + wg.Add(1) + go func(host string) { + defer wg.Done() + if err := c.fetchAndInsertData(accumulator, host); err != nil { + errorChannel <- fmt.Errorf("[host=%s]: %s", host, err) + } + }(u) + } + + wg.Wait() + close(errorChannel) + + // If there weren't any errors, we can return nil now. + if len(errorChannel) == 0 { + return nil + } + + // There were errors, so join them all together as one big error. + errorStrings := make([]string, 0, len(errorChannel)) + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + return errors.New(strings.Join(errorStrings, "\n")) + +} + +func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error { + + response, error := http.Get(host) + if error != nil { + return error + } + defer response.Body.Close() + + var stats Stats + decoder := json.NewDecoder(response.Body) + decoder.Decode(&stats) + + fields := map[string]interface{}{} + + // CouchDB meta stats: + c.MapCopy(fields, c.generateFields("couchdb_auth_cache_misses", stats.Couchdb.AuthCacheMisses)) + c.MapCopy(fields, c.generateFields("couchdb_database_writes", stats.Couchdb.DatabaseWrites)) + c.MapCopy(fields, c.generateFields("couchdb_open_databases", stats.Couchdb.OpenDatabases)) + c.MapCopy(fields, c.generateFields("couchdb_auth_cache_hits", stats.Couchdb.AuthCacheHits)) + c.MapCopy(fields, c.generateFields("couchdb_request_time", stats.Couchdb.RequestTime)) + c.MapCopy(fields, c.generateFields("couchdb_database_reads", stats.Couchdb.DatabaseReads)) + c.MapCopy(fields, c.generateFields("couchdb_open_os_files", stats.Couchdb.OpenOsFiles)) + + // http request methods stats: + c.MapCopy(fields, c.generateFields("httpd_request_methods_put", stats.HttpdRequestMethods.Put)) + c.MapCopy(fields, c.generateFields("httpd_request_methods_get", stats.HttpdRequestMethods.Get)) + c.MapCopy(fields, c.generateFields("httpd_request_methods_copy", stats.HttpdRequestMethods.Copy)) + c.MapCopy(fields, c.generateFields("httpd_request_methods_delete", stats.HttpdRequestMethods.Delete)) + c.MapCopy(fields, c.generateFields("httpd_request_methods_post", stats.HttpdRequestMethods.Post)) + c.MapCopy(fields, c.generateFields("httpd_request_methods_head", stats.HttpdRequestMethods.Head)) + + // status code stats: + c.MapCopy(fields, c.generateFields("httpd_status_codes_200", stats.HttpdStatusCodes.Status200)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_201", stats.HttpdStatusCodes.Status201)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_202", stats.HttpdStatusCodes.Status202)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_301", stats.HttpdStatusCodes.Status301)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_304", stats.HttpdStatusCodes.Status304)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_400", stats.HttpdStatusCodes.Status400)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_401", stats.HttpdStatusCodes.Status401)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_403", stats.HttpdStatusCodes.Status403)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_404", stats.HttpdStatusCodes.Status404)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_405", stats.HttpdStatusCodes.Status405)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_409", stats.HttpdStatusCodes.Status409)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_412", stats.HttpdStatusCodes.Status412)) + c.MapCopy(fields, c.generateFields("httpd_status_codes_500", stats.HttpdStatusCodes.Status500)) + + // httpd stats: + c.MapCopy(fields, c.generateFields("httpd_clients_requesting_changes", stats.Httpd.ClientsRequestingChanges)) + c.MapCopy(fields, c.generateFields("httpd_temporary_view_reads", stats.Httpd.TemporaryViewReads)) + c.MapCopy(fields, c.generateFields("httpd_requests", stats.Httpd.Requests)) + c.MapCopy(fields, c.generateFields("httpd_bulk_requests", stats.Httpd.BulkRequests)) + c.MapCopy(fields, c.generateFields("httpd_view_reads", stats.Httpd.ViewReads)) + + tags := map[string]string{ + "server": host, + } + accumulator.AddFields("couchdb", fields, tags) + return nil +} + +func (*CouchDB) MapCopy(dst, src interface{}) { + dv, sv := reflect.ValueOf(dst), reflect.ValueOf(src) + for _, k := range sv.MapKeys() { + dv.SetMapIndex(k, sv.MapIndex(k)) + } +} + +func (*CouchDB) safeCheck(value interface{}) interface{} { + if value == nil { + return 0.0 + } + return value +} + +func (c *CouchDB) generateFields(prefix string, obj metaData) map[string]interface{} { + fields := map[string]interface{}{ + prefix + "_current": c.safeCheck(obj.Current), + prefix + "_sum": c.safeCheck(obj.Sum), + prefix + "_mean": c.safeCheck(obj.Mean), + prefix + "_stddev": c.safeCheck(obj.Stddev), + prefix + "_min": c.safeCheck(obj.Min), + prefix + "_max": c.safeCheck(obj.Max), + } + return fields +} + +func init() { + inputs.Add("couchdb", func() telegraf.Input { + return &CouchDB{} + }) +} diff --git a/plugins/inputs/couchdb/couchdb_test.go b/plugins/inputs/couchdb/couchdb_test.go new file mode 100644 index 000000000..7b824e748 --- /dev/null +++ b/plugins/inputs/couchdb/couchdb_test.go @@ -0,0 +1,320 @@ +package couchdb_test + +import ( + "github.com/influxdata/telegraf/plugins/inputs/couchdb" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" +) + +func TestBasic(t *testing.T) { + js := ` +{ + "couchdb": { + "auth_cache_misses": { + "description": "number of authentication cache misses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "database_writes": { + "description": "number of times a database was changed", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "open_databases": { + "description": "number of open databases", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "auth_cache_hits": { + "description": "number of authentication cache hits", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "request_time": { + "description": "length of a request inside CouchDB without MochiWeb", + "current": 18.0, + "sum": 18.0, + "mean": 18.0, + "stddev": null, + "min": 18.0, + "max": 18.0 + }, + "database_reads": { + "description": "number of times a document was read from a database", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "open_os_files": { + "description": "number of file descriptors CouchDB has open", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + } + }, + "httpd_request_methods": { + "PUT": { + "description": "number of HTTP PUT requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "GET": { + "description": "number of HTTP GET requests", + "current": 2.0, + "sum": 2.0, + "mean": 0.25, + "stddev": 0.70699999999999996181, + "min": 0, + "max": 2 + }, + "COPY": { + "description": "number of HTTP COPY requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "DELETE": { + "description": "number of HTTP DELETE requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "POST": { + "description": "number of HTTP POST requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "HEAD": { + "description": "number of HTTP HEAD requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + } + }, + "httpd_status_codes": { + "403": { + "description": "number of HTTP 403 Forbidden responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "202": { + "description": "number of HTTP 202 Accepted responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "401": { + "description": "number of HTTP 401 Unauthorized responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "409": { + "description": "number of HTTP 409 Conflict responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "200": { + "description": "number of HTTP 200 OK responses", + "current": 1.0, + "sum": 1.0, + "mean": 0.125, + "stddev": 0.35399999999999998135, + "min": 0, + "max": 1 + }, + "405": { + "description": "number of HTTP 405 Method Not Allowed responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "400": { + "description": "number of HTTP 400 Bad Request responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "201": { + "description": "number of HTTP 201 Created responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "404": { + "description": "number of HTTP 404 Not Found responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "500": { + "description": "number of HTTP 500 Internal Server Error responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "412": { + "description": "number of HTTP 412 Precondition Failed responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "301": { + "description": "number of HTTP 301 Moved Permanently responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "304": { + "description": "number of HTTP 304 Not Modified responses", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + } + }, + "httpd": { + "clients_requesting_changes": { + "description": "number of clients for continuous _changes", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "temporary_view_reads": { + "description": "number of temporary view reads", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "requests": { + "description": "number of HTTP requests", + "current": 2.0, + "sum": 2.0, + "mean": 0.25, + "stddev": 0.70699999999999996181, + "min": 0, + "max": 2 + }, + "bulk_requests": { + "description": "number of bulk requests", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + }, + "view_reads": { + "description": "number of view reads", + "current": null, + "sum": null, + "mean": null, + "stddev": null, + "min": null, + "max": null + } + } +} + +` + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/_stats" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + plugin := &couchdb.CouchDB{ + HOSTs: []string{fakeServer.URL + "/_stats"}, + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) +} diff --git a/plugins/inputs/disque/disque.go b/plugins/inputs/disque/disque.go index f1ca1b800..51457bec1 100644 --- a/plugins/inputs/disque/disque.go +++ b/plugins/inputs/disque/disque.go @@ -22,11 +22,11 @@ type Disque struct { } var sampleConfig = ` - # An array of URI to gather stats about. Specify an ip or hostname - # with optional port and password. ie disque://localhost, disque://10.10.3.33:18832, - # 10.0.0.1:10000, etc. - # - # If no servers are specified, then localhost is used as the host. + ### An array of URI to gather stats about. Specify an ip or hostname + ### with optional port and password. ie disque://localhost, disque://10.10.3.33:18832, + ### 10.0.0.1:10000, etc. + + ### If no servers are specified, then localhost is used as the host. servers = ["localhost"] ` diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 6814c190a..964c9aa57 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -21,11 +21,11 @@ type Docker struct { } var sampleConfig = ` - # Docker Endpoint - # To use TCP, set endpoint = "tcp://[ip]:[port]" - # To use environment variables (ie, docker-machine), set endpoint = "ENV" + ### Docker Endpoint + ### To use TCP, set endpoint = "tcp://[ip]:[port]" + ### To use environment variables (ie, docker-machine), set endpoint = "ENV" endpoint = "unix:///var/run/docker.sock" - # Only collect metrics for these containers, collect all if empty + ### Only collect metrics for these containers, collect all if empty container_names = [] ` diff --git a/plugins/inputs/dovecot/README.md b/plugins/inputs/dovecot/README.md index 5031bec42..b2df0107f 100644 --- a/plugins/inputs/dovecot/README.md +++ b/plugins/inputs/dovecot/README.md @@ -56,10 +56,13 @@ domains. You can read Dovecot's documentation ### Example Output: -telegraf -config telegraf.cfg -input-filter dovecot -test +``` +telegraf -config telegraf.cfg -input-filter dovecot -test +>>>>>>> upstream/master * Plugin: dovecot, Collection 1 > dovecot,domain=xxxxx.it,server=dovecot--1.mail.sys clock_time=12105746411632.5,disk_input=115285225472i,disk_output=4885067755520i,invol_cs=169701886i,last_update="2016-02-09 08:49:47.000014113 +0100 CET",mail_cache_hits=441828i,mail_lookup_attr=0i,mail_lookup_path=25323i,mail_read_bytes=241188145i,mail_read_count=11719i,maj_faults=3168i,min_faults=321438988i,num_cmds=51635i,num_connected_sessions=2i,num_logins=17149i,read_bytes=7939026951110i,read_count=3716991752i,reset_timestamp="2016-01-28 09:34:36 +0100 CET",sys_cpu=222595.288,user_cpu=267468.08,vol_cs=3288715920i,write_bytes=4483648967059i,write_count=1640646952i 1455004219924838345 > dovecot,domain=yyyyy.com,server=dovecot-1.mail.sys clock_time=6650794455331782,disk_input=61957695569920i,disk_output=2638244004487168i,invol_cs=2004805041i,last_update="2016-02-09 08:49:49.000251296 +0100 CET",mail_cache_hits=2499112513i,mail_lookup_attr=506730i,mail_lookup_path=39128227i,mail_read_bytes=1076496874501i,mail_read_count=32615262i,maj_faults=1643304i,min_faults=4216116325i,num_cmds=85785559i,num_connected_sessions=1177i,num_logins=11658255i,read_bytes=4289150974554145i,read_count=1112000703i,reset_timestamp="2016-01-28 09:31:26 +0100 CET",sys_cpu=121125923.032,user_cpu=145561336.428,vol_cs=205451885i,write_bytes=2420130526835796i,write_count=2991367252i 1455004219925152529 > dovecot,domain=xxxxx.it,server=dovecot-2.mail.sys clock_time=10710826586999.143,disk_input=79792410624i,disk_output=4496066158592i,invol_cs=150426876i,last_update="2016-02-09 08:48:19.000209134 +0100 CET",mail_cache_hits=5480869i,mail_lookup_attr=0i,mail_lookup_path=122563i,mail_read_bytes=340746273i,mail_read_count=44275i,maj_faults=1722i,min_faults=288071875i,num_cmds=50098i,num_connected_sessions=0i,num_logins=16389i,read_bytes=7259551999517i,read_count=3396625369i,reset_timestamp="2016-01-28 09:31:29 +0100 CET",sys_cpu=200762.792,user_cpu=242477.664,vol_cs=2996657358i,write_bytes=4133381575263i,write_count=1497242759i 1455004219924888283 > dovecot,domain=yyyyy.com,server=dovecot-2.mail.sys clock_time=6522131245483702,disk_input=48259150004224i,disk_output=2754333359087616i,invol_cs=2294595260i,last_update="2016-02-09 08:49:49.000251919 +0100 CET",mail_cache_hits=2139113611i,mail_lookup_attr=520276i,mail_lookup_path=37940318i,mail_read_bytes=1088002215022i,mail_read_count=31350271i,maj_faults=994420i,min_faults=1486260543i,num_cmds=40414997i,num_connected_sessions=978i,num_logins=11259672i,read_bytes=4445546612487315i,read_count=1763534543i,reset_timestamp="2016-01-28 09:31:24 +0100 CET",sys_cpu=123655962.668,user_cpu=149259327.032,vol_cs=4215130546i,write_bytes=2531186030222761i,write_count=2186579650i 1455004219925398372 +``` diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index 6d283d5e7..de9ef0cfe 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -24,13 +24,13 @@ func (d *Dovecot) Description() string { } var sampleConfig = ` - # specify dovecot servers via an address:port list - # e.g. - # localhost:24242 - # - # If no servers are specified, then localhost is used as the host. + ### specify dovecot servers via an address:port list + ### e.g. + ### localhost:24242 + ### + ### If no servers are specified, then localhost is used as the host. servers = ["localhost:24242"] - # Only collect metrics for these domains, collect all if empty + ### Only collect metrics for these domains, collect all if empty domains = [] ` diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 2dbd6f357..8c2c055cb 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -10,8 +10,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) const statsPath = "/_nodes/stats" @@ -59,14 +59,14 @@ type indexHealth struct { } const sampleConfig = ` - # specify a list of one or more Elasticsearch servers + ### specify a list of one or more Elasticsearch servers servers = ["http://localhost:9200"] - # set local to false when you want to read the indices stats from all nodes - # within the cluster + ### set local to false when you want to read the indices stats from all nodes + ### within the cluster local = true - # set cluster_health to true when you want to also obtain cluster level stats + ### set cluster_health to true when you want to also obtain cluster level stats cluster_health = false ` @@ -168,7 +168,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er now := time.Now() for p, s := range stats { - f := internal.JSONFlattener{} + f := jsonparser.JSONFlattener{} err := f.FlattenJSON("", s) if err != nil { return err diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index 1172140c7..daf800db3 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -1,7 +1,23 @@ # Exec Input Plugin -The exec plugin can execute arbitrary commands which output JSON or -InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/). +The exec plugin can execute arbitrary commands which output: + +* JSON +* InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/) +* Graphite [graphite-protocol](http://graphite.readthedocs.org/en/latest/feeding-carbon.html) + +> Graphite understands messages with this format: + +> ``` +metric_path value timestamp\n +``` + +> __metric_path__ is the metric namespace that you want to populate. + +> __value__ is the value that you want to assign to the metric at this time. + +> __timestamp__ is the unix epoch time. + If using JSON, only numeric values are parsed and turned into floats. Booleans and strings will be ignored. @@ -11,21 +27,40 @@ and strings will be ignored. ``` # Read flattened metrics from one or more commands that output JSON to stdout [[inputs.exec]] - # the command to run - command = "/usr/bin/mycollector --foo=bar" + # Shell/commands array + commands = ["/tmp/test.sh", "/tmp/test2.sh"] - # Data format to consume. This can be "json" or "influx" (line-protocol) + # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. data_format = "json" # measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format + ### If matching multiple measurement files, this string will be used to join the matched values. + #separator = "." + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. The can be only one default template. + ### Templates support below format: + ### 1. filter + template + ### 2. filter + template + extra tag + ### 3. filter + template with field key + ### 4. default template + #templates = [ + # "*.app env.service.resource.measurement", + # "stats.* .host.measurement* region=us-west,agent=sensu", + # "stats2.* .host.measurement.field", + # "measurement*" + #] ``` Other options for modifying the measurement names are: ``` -name_override = "measurement_name" name_prefix = "prefix_" ``` @@ -57,8 +92,11 @@ Now let's say we have the following configuration: ``` [[inputs.exec]] - # the command to run - command = "/usr/bin/line_protocol_collector" + # Shell/commands array + # compatible with old version + # we can still use the old command configuration + # command = "/usr/bin/line_protocol_collector" + commands = ["/usr/bin/line_protocol_collector","/tmp/test2.sh"] # Data format to consume. This can be "json" or "influx" (line-protocol) # NOTE json only reads numerical measurements, strings and booleans are ignored. @@ -80,3 +118,63 @@ cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 You will get data in InfluxDB exactly as it is defined above, tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle and usage_busy. They will receive a timestamp at collection time. + + +### Example 3 + +We can also change the data_format to "graphite" to use the metrics collecting scripts such as (compatible with graphite): + +* Nagios [Mertics Plugins] (https://exchange.nagios.org/directory/Plugins) +* Sensu [Mertics Plugins] (https://github.com/sensu-plugins) + +#### Configuration +``` +# Read flattened metrics from one or more commands that output JSON to stdout +[[inputs.exec]] + # Shell/commands array + commands = ["/tmp/test.sh","/tmp/test2.sh"] + + # Data format to consume. This can be "json", "influx" or "graphite" (line-protocol) + # NOTE json only reads numerical measurements, strings and booleans are ignored. + data_format = "graphite" + + # measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ### Below configuration will be used for data_format = "graphite", can be ignored for other data_format + ### If matching multiple measurement files, this string will be used to join the matched values. + separator = "." + + ### Each template line requires a template pattern. It can have an optional + ### filter before the template and separated by spaces. It can also have optional extra + ### tags following the template. Multiple tags should be separated by commas and no spaces + ### similar to the line protocol format. The can be only one default template. + ### Templates support below format: + ### 1. filter + template + ### 2. filter + template + extra tag + ### 3. filter + template with field key + ### 4. default template + templates = [ + "*.app env.service.resource.measurement", + "stats.* .host.measurement* region=us-west,agent=sensu", + "stats2.* .host.measurement.field", + "measurement*" + ] +``` + +And test.sh/test2.sh will output: + +``` +sensu.metric.net.server0.eth0.rx_packets 461295119435 1444234982 +sensu.metric.net.server0.eth0.tx_bytes 1093086493388480 1444234982 +sensu.metric.net.server0.eth0.rx_bytes 1015633926034834 1444234982 +sensu.metric.net.server0.eth0.tx_errors 0 1444234982 +sensu.metric.net.server0.eth0.rx_errors 0 1444234982 +sensu.metric.net.server0.eth0.tx_dropped 0 1444234982 +sensu.metric.net.server0.eth0.rx_dropped 0 1444234982 +``` + +The templates configuration will be used to parse the graphite metrics to support influxdb/opentsdb tagging store engines. + +More detail information about templates, please refer to [The graphite Input] (https://github.com/influxdata/influxdb/blob/master/services/graphite/README.md) + diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index c4bb634ba..e297721ba 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -2,62 +2,90 @@ package exec import ( "bytes" - "encoding/json" "fmt" "os/exec" - "time" + "sync" "github.com/gonuts/go-shellquote" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" ) const sampleConfig = ` - # the command to run - command = "/usr/bin/mycollector --foo=bar" + ### Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] - # Data format to consume. This can be "json" or "influx" (line-protocol) - # NOTE json only reads numerical measurements, strings and booleans are ignored. - data_format = "json" - - # measurement name suffix (for separating different commands) + ### measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" ` type Exec struct { - Command string - DataFormat string + Commands []string + Command string - runner Runner + parser parsers.Parser + + wg sync.WaitGroup + + runner Runner + errChan chan error +} + +func NewExec() *Exec { + return &Exec{ + runner: CommandRunner{}, + } } type Runner interface { - Run(*Exec) ([]byte, error) + Run(*Exec, string) ([]byte, error) } type CommandRunner struct{} -func (c CommandRunner) Run(e *Exec) ([]byte, error) { - split_cmd, err := shellquote.Split(e.Command) +func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) { + split_cmd, err := shellquote.Split(command) if err != nil || len(split_cmd) == 0 { return nil, fmt.Errorf("exec: unable to parse command, %s", err) } cmd := exec.Command(split_cmd[0], split_cmd[1:]...) + var out bytes.Buffer cmd.Stdout = &out if err := cmd.Run(); err != nil { - return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command) + return nil, fmt.Errorf("exec: %s for command '%s'", err, command) } return out.Bytes(), nil } -func NewExec() *Exec { - return &Exec{runner: CommandRunner{}} +func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { + defer e.wg.Done() + + out, err := e.runner.Run(e, command) + if err != nil { + e.errChan <- err + return + } + + metrics, err := e.parser.Parse(out) + if err != nil { + e.errChan <- err + } else { + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + } } func (e *Exec) SampleConfig() string { @@ -65,42 +93,37 @@ func (e *Exec) SampleConfig() string { } func (e *Exec) Description() string { - return "Read flattened metrics from one or more commands that output JSON to stdout" + return "Read metrics from one or more commands that can output to stdout" +} + +func (e *Exec) SetParser(parser parsers.Parser) { + e.parser = parser } func (e *Exec) Gather(acc telegraf.Accumulator) error { - out, err := e.runner.Run(e) - if err != nil { - return err + // Legacy single command support + if e.Command != "" { + e.Commands = append(e.Commands, e.Command) + e.Command = "" } - switch e.DataFormat { - case "", "json": - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", - e.Command, err) - } + e.errChan = make(chan error, len(e.Commands)) - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return err - } - acc.AddFields("exec", f.Fields, nil) - case "influx": - now := time.Now() - metrics, err := telegraf.ParseMetrics(out) - for _, metric := range metrics { - acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) - } - return err + e.wg.Add(len(e.Commands)) + for _, command := range e.Commands { + go e.ProcessCommand(command, acc) + } + e.wg.Wait() + + select { default: - return fmt.Errorf("Unsupported data format: %s. Must be either json "+ - "or influx.", e.DataFormat) + close(e.errChan) + return nil + case err := <-e.errChan: + close(e.errChan) + return err } - return nil + } func init() { diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 709308fce..da55ef9d3 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -55,7 +57,7 @@ func newRunnerMock(out []byte, err error) Runner { } } -func (r runnerMock) Run(e *Exec) ([]byte, error) { +func (r runnerMock) Run(e *Exec, command string) ([]byte, error) { if r.err != nil { return nil, r.err } @@ -63,9 +65,11 @@ func (r runnerMock) Run(e *Exec) ([]byte, error) { } func TestExec(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ - runner: newRunnerMock([]byte(validJson), nil), - Command: "testcommand arg1", + runner: newRunnerMock([]byte(validJson), nil), + Commands: []string{"testcommand arg1"}, + parser: parser, } var acc testutil.Accumulator @@ -87,9 +91,11 @@ func TestExec(t *testing.T) { } func TestExecMalformed(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ - runner: newRunnerMock([]byte(malformedJson), nil), - Command: "badcommand arg1", + runner: newRunnerMock([]byte(malformedJson), nil), + Commands: []string{"badcommand arg1"}, + parser: parser, } var acc testutil.Accumulator @@ -99,9 +105,11 @@ func TestExecMalformed(t *testing.T) { } func TestCommandError(t *testing.T) { + parser, _ := parsers.NewJSONParser("exec", []string{}, nil) e := &Exec{ - runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), - Command: "badcommand", + runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), + Commands: []string{"badcommand"}, + parser: parser, } var acc testutil.Accumulator @@ -111,10 +119,11 @@ func TestCommandError(t *testing.T) { } func TestLineProtocolParse(t *testing.T) { + parser, _ := parsers.NewInfluxParser() e := &Exec{ - runner: newRunnerMock([]byte(lineProtocol), nil), - Command: "line-protocol", - DataFormat: "influx", + runner: newRunnerMock([]byte(lineProtocol), nil), + Commands: []string{"line-protocol"}, + parser: parser, } var acc testutil.Accumulator @@ -133,10 +142,11 @@ func TestLineProtocolParse(t *testing.T) { } func TestLineProtocolParseMultiple(t *testing.T) { + parser, _ := parsers.NewInfluxParser() e := &Exec{ - runner: newRunnerMock([]byte(lineProtocolMulti), nil), - Command: "line-protocol", - DataFormat: "influx", + runner: newRunnerMock([]byte(lineProtocolMulti), nil), + Commands: []string{"line-protocol"}, + parser: parser, } var acc testutil.Accumulator @@ -158,15 +168,3 @@ func TestLineProtocolParseMultiple(t *testing.T) { acc.AssertContainsTaggedFields(t, "cpu", fields, tags) } } - -func TestInvalidDataFormat(t *testing.T) { - e := &Exec{ - runner: newRunnerMock([]byte(lineProtocol), nil), - Command: "bad data format", - DataFormat: "FooBar", - } - - var acc testutil.Accumulator - err := e.Gather(&acc) - require.Error(t, err) -} diff --git a/plugins/inputs/github_webhooks/github_webhooks.go b/plugins/inputs/github_webhooks/github_webhooks.go index 3eeb44c22..a66563add 100644 --- a/plugins/inputs/github_webhooks/github_webhooks.go +++ b/plugins/inputs/github_webhooks/github_webhooks.go @@ -31,7 +31,7 @@ func NewGithubWebhooks() *GithubWebhooks { func (gh *GithubWebhooks) SampleConfig() string { return ` - # Address and port to host Webhook listener on + ### Address and port to host Webhook listener on service_address = ":1618" ` } diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index 7e02756b8..92969a057 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -86,13 +86,13 @@ type haproxy struct { } var sampleConfig = ` - # An array of address to gather stats about. Specify an ip on hostname - # with optional port. ie localhost, 10.10.3.33:1936, etc. - # - # If no servers are specified, then default to 127.0.0.1:1936 + ### An array of address to gather stats about. Specify an ip on hostname + ### with optional port. ie localhost, 10.10.3.33:1936, etc. + + ### If no servers are specified, then default to 127.0.0.1:1936 servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] - # Or you can also use local socket(not work yet) - # servers = ["socket://run/haproxy/admin.sock"] + ### Or you can also use local socket(not work yet) + ### servers = ["socket://run/haproxy/admin.sock"] ` func (r *haproxy) SampleConfig() string { diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 3070e6338..4c3f4dee5 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -1,7 +1,6 @@ package httpjson import ( - "encoding/json" "errors" "fmt" "io/ioutil" @@ -12,8 +11,8 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" ) type HttpJson struct { @@ -47,37 +46,36 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { } var sampleConfig = ` - # NOTE This plugin only reads numerical measurements, strings and booleans - # will be ignored. + ### NOTE This plugin only reads numerical measurements, strings and booleans + ### will be ignored. - # a name for the service being polled + ### a name for the service being polled name = "webserver_stats" - # URL of each server in the service's cluster + ### URL of each server in the service's cluster servers = [ "http://localhost:9999/stats/", "http://localhost:9998/stats/", ] - # HTTP method to use (case-sensitive) + ### HTTP method to use (case-sensitive) method = "GET" - # List of tag names to extract from top-level of JSON server response + ### List of tag names to extract from top-level of JSON server response # tag_keys = [ # "my_tag_1", # "my_tag_2" # ] - # HTTP parameters (all values must be strings) + ### HTTP parameters (all values must be strings) [inputs.httpjson.parameters] event_type = "cpu_spike" threshold = "0.75" - # HTTP Header parameters (all values must be strings) + ### HTTP Header parameters (all values must be strings) # [inputs.httpjson.headers] # X-Auth-Token = "my-xauth-token" # apiVersion = "v1" - ` func (h *HttpJson) SampleConfig() string { @@ -137,39 +135,34 @@ func (h *HttpJson) gatherServer( return err } - var jsonOut map[string]interface{} - if err = json.Unmarshal([]byte(resp), &jsonOut); err != nil { - return errors.New("Error decoding JSON response") - } - - tags := map[string]string{ - "server": serverURL, - } - - for _, tag := range h.TagKeys { - switch v := jsonOut[tag].(type) { - case string: - tags[tag] = v - } - delete(jsonOut, tag) - } - - if responseTime >= 0 { - jsonOut["response_time"] = responseTime - } - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { - return err - } - var msrmnt_name string if h.Name == "" { msrmnt_name = "httpjson" } else { msrmnt_name = "httpjson_" + h.Name } - acc.AddFields(msrmnt_name, f.Fields, tags) + tags := map[string]string{ + "server": serverURL, + } + + parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) + if err != nil { + return err + } + + metrics, err := parser.Parse([]byte(resp)) + if err != nil { + return err + } + + for _, metric := range metrics { + fields := make(map[string]interface{}) + for k, v := range metric.Fields() { + fields[k] = v + } + fields["response_time"] = responseTime + acc.AddFields(metric.Name(), fields, metric.Tags()) + } return nil } diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index e8350ddca..b12990cf1 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -22,11 +22,11 @@ func (*InfluxDB) Description() string { func (*InfluxDB) SampleConfig() string { return ` - # Works with InfluxDB debug endpoints out of the box, - # but other services can use this format too. - # See the influxdb plugin's README for more details. + ### Works with InfluxDB debug endpoints out of the box, + ### but other services can use this format too. + ### See the influxdb plugin's README for more details. - # Multiple URLs from which to read InfluxDB-formatted JSON + ### Multiple URLs from which to read InfluxDB-formatted JSON urls = [ "http://localhost:8086/debug/vars" ] diff --git a/plugins/inputs/jolokia/jolokia.go b/plugins/inputs/jolokia/jolokia.go index fcf5d0bad..77546006f 100644 --- a/plugins/inputs/jolokia/jolokia.go +++ b/plugins/inputs/jolokia/jolokia.go @@ -46,10 +46,10 @@ type Jolokia struct { func (j *Jolokia) SampleConfig() string { return ` - # This is the context root used to compose the jolokia url + ### This is the context root used to compose the jolokia url context = "/jolokia/read" - # List of servers exposing jolokia read service + ### List of servers exposing jolokia read service [[inputs.jolokia.servers]] name = "stable" host = "192.168.103.2" @@ -57,9 +57,10 @@ func (j *Jolokia) SampleConfig() string { # username = "myuser" # password = "mypassword" - # List of metrics collected on above servers - # Each metric consists in a name, a jmx path and either a pass or drop slice attributes - # This collect all heap memory usage metrics + ### List of metrics collected on above servers + ### Each metric consists in a name, a jmx path and either + ### a pass or drop slice attribute. + ### This collect all heap memory usage metrics. [[inputs.jolokia.metrics]] name = "heap_memory_usage" jmx = "/java.lang:type=Memory/HeapMemoryUsage" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 499b2e50b..20ce8ef23 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -1,12 +1,14 @@ package kafka_consumer import ( + "fmt" "log" "strings" "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/Shopify/sarama" "github.com/wvanbergen/kafka/consumergroup" @@ -20,6 +22,8 @@ type Kafka struct { PointBuffer int Offset string + parser parsers.Parser + sync.Mutex // channel for all incoming kafka messages @@ -36,16 +40,22 @@ type Kafka struct { } var sampleConfig = ` - # topic(s) to consume + ### topic(s) to consume topics = ["telegraf"] - # an array of Zookeeper connection strings + ### an array of Zookeeper connection strings zookeeper_peers = ["localhost:2181"] - # the name of the consumer group + ### the name of the consumer group consumer_group = "telegraf_metrics_consumers" - # Maximum number of points to buffer between collection intervals + ### Maximum number of points to buffer between collection intervals point_buffer = 100000 - # Offset (must be either "oldest" or "newest") + ### Offset (must be either "oldest" or "newest") offset = "oldest" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" ` func (k *Kafka) SampleConfig() string { @@ -53,7 +63,11 @@ func (k *Kafka) SampleConfig() string { } func (k *Kafka) Description() string { - return "Read line-protocol metrics from Kafka topic(s)" + return "Read metrics from Kafka topic(s)" +} + +func (k *Kafka) SetParser(parser parsers.Parser) { + k.parser = parser } func (k *Kafka) Start() error { @@ -96,15 +110,15 @@ func (k *Kafka) Start() error { k.metricC = make(chan telegraf.Metric, k.PointBuffer) // Start the kafka message reader - go k.parser() + go k.receiver() log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", k.ZookeeperPeers, k.Topics) return nil } -// parser() reads all incoming messages from the consumer, and parses them into +// receiver() reads all incoming messages from the consumer, and parses them into // influxdb metric points. -func (k *Kafka) parser() { +func (k *Kafka) receiver() { for { select { case <-k.done: @@ -112,13 +126,14 @@ func (k *Kafka) parser() { case err := <-k.errs: log.Printf("Kafka Consumer Error: %s\n", err.Error()) case msg := <-k.in: - metrics, err := telegraf.ParseMetrics(msg.Value) + metrics, err := k.parser.Parse(msg.Value) if err != nil { log.Printf("Could not parse kafka message: %s, error: %s", string(msg.Value), err.Error()) } for _, metric := range metrics { + fmt.Println(string(metric.Name())) select { case k.metricC <- metric: continue diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index a3a4a6e35..458d43d35 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -9,6 +9,8 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/parsers" ) func TestReadsMetricsFromKafka(t *testing.T) { @@ -40,6 +42,8 @@ func TestReadsMetricsFromKafka(t *testing.T) { PointBuffer: 100000, Offset: "oldest", } + p, _ := parsers.NewInfluxParser() + k.SetParser(p) if err := k.Start(); err != nil { t.Fatal(err.Error()) } else { diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index be8984300..ec69cb926 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/Shopify/sarama" @@ -12,9 +13,11 @@ import ( ) const ( - testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" - invalidMsg = "cpu_load_short,host=server01 1422568543702900257" - pointBuffer = 5 + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257" + pointBuffer = 5 ) func NewTestKafka() (*Kafka, chan *sarama.ConsumerMessage) { @@ -39,7 +42,8 @@ func TestRunParser(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) @@ -51,7 +55,8 @@ func TestRunParserInvalidMsg(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(invalidMsg) time.Sleep(time.Millisecond) @@ -63,7 +68,8 @@ func TestRunParserRespectsBuffer(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() for i := 0; i < pointBuffer+1; i++ { in <- saramaMsg(testMsg) } @@ -77,7 +83,8 @@ func TestRunParserAndGather(t *testing.T) { k, in := NewTestKafka() defer close(k.done) - go k.parser() + k.parser, _ = parsers.NewInfluxParser() + go k.receiver() in <- saramaMsg(testMsg) time.Sleep(time.Millisecond) @@ -89,6 +96,45 @@ func TestRunParserAndGather(t *testing.T) { map[string]interface{}{"value": float64(23422)}) } +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherGraphite(t *testing.T) { + k, in := NewTestKafka() + defer close(k.done) + + k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgGraphite) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + k.Gather(&acc) + + assert.Equal(t, len(acc.Metrics), 1) + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses kafka messages into points +func TestRunParserAndGatherJSON(t *testing.T) { + k, in := NewTestKafka() + defer close(k.done) + + k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + go k.receiver() + in <- saramaMsg(testMsgJSON) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + k.Gather(&acc) + + assert.Equal(t, len(acc.Metrics), 1) + acc.AssertContainsFields(t, "kafka_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + func saramaMsg(val string) *sarama.ConsumerMessage { return &sarama.ConsumerMessage{ Key: nil, diff --git a/plugins/inputs/leofs/leofs.go b/plugins/inputs/leofs/leofs.go index d186b328f..4a52706b3 100644 --- a/plugins/inputs/leofs/leofs.go +++ b/plugins/inputs/leofs/leofs.go @@ -132,10 +132,8 @@ var serverTypeMapping = map[string]ServerType{ } var sampleConfig = ` - # An array of URI to gather stats about LeoFS. - # Specify an ip or hostname with port. ie 127.0.0.1:4020 - # - # If no servers are specified, then 127.0.0.1 is used as the host and 4020 as the port. + ### An array of URI to gather stats about LeoFS. + ### Specify an ip or hostname with port. ie 127.0.0.1:4020 servers = ["127.0.0.1:4021"] ` diff --git a/plugins/inputs/lustre2/lustre2.go b/plugins/inputs/lustre2/lustre2.go index cf57c5c65..26d0e3702 100644 --- a/plugins/inputs/lustre2/lustre2.go +++ b/plugins/inputs/lustre2/lustre2.go @@ -29,10 +29,13 @@ type Lustre2 struct { } var sampleConfig = ` - # An array of /proc globs to search for Lustre stats - # If not specified, the default will work on Lustre 2.5.x - # - # ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"] + ### An array of /proc globs to search for Lustre stats + ### If not specified, the default will work on Lustre 2.5.x + ### + # ost_procfiles = [ + # "/proc/fs/lustre/obdfilter/*/stats", + # "/proc/fs/lustre/osd-ldiskfs/*/stats" + # ] # mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"] ` diff --git a/plugins/inputs/mailchimp/mailchimp.go b/plugins/inputs/mailchimp/mailchimp.go index 35ea38858..290c01bfd 100644 --- a/plugins/inputs/mailchimp/mailchimp.go +++ b/plugins/inputs/mailchimp/mailchimp.go @@ -17,13 +17,13 @@ type MailChimp struct { } var sampleConfig = ` - # MailChimp API key - # get from https://admin.mailchimp.com/account/api/ + ### MailChimp API key + ### get from https://admin.mailchimp.com/account/api/ api_key = "" # required - # Reports for campaigns sent more than days_old ago will not be collected. - # 0 means collect all. + ### Reports for campaigns sent more than days_old ago will not be collected. + ### 0 means collect all. days_old = 0 - # Campaign ID to get, if empty gets all campaigns, this option overrides days_old + ### Campaign ID to get, if empty gets all campaigns, this option overrides days_old # campaign_id = "" ` diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index d27e6a099..19654937c 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -19,10 +19,8 @@ type Memcached struct { } var sampleConfig = ` - # An array of address to gather stats about. Specify an ip on hostname - # with optional port. ie localhost, 10.0.0.1:11211, etc. - # - # If no servers are specified, then localhost is used as the host. + ### An array of address to gather stats about. Specify an ip on hostname + ### with optional port. ie localhost, 10.0.0.1:11211, etc. servers = ["localhost:11211"] # unix_sockets = ["/var/run/memcached.sock"] ` diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index b0cf492c0..4054ccd54 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -26,11 +26,11 @@ type Ssl struct { } var sampleConfig = ` - # An array of URI to gather stats about. Specify an ip or hostname - # with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017, - # mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc. - # - # If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port. + ### An array of URI to gather stats about. Specify an ip or hostname + ### with optional port add password. ie, + ### mongodb://user:auth_key@10.10.3.30:27017, + ### mongodb://10.10.3.33:18832, + ### 10.0.0.1:10000, etc. servers = ["127.0.0.1:27017"] ` diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 07a739d01..272baddb1 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -15,14 +15,14 @@ type Mysql struct { } var sampleConfig = ` - # specify servers via a url matching: - # [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]] - # see https://github.com/go-sql-driver/mysql#dsn-data-source-name - # e.g. - # root:passwd@tcp(127.0.0.1:3306)/?tls=false - # root@tcp(127.0.0.1:3306)/?tls=false - # - # If no servers are specified, then localhost is used as the host. + ### specify servers via a url matching: + ### [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]] + ### see https://github.com/go-sql-driver/mysql#dsn-data-source-name + ### e.g. + ### root:passwd@tcp(127.0.0.1:3306)/?tls=false + ### root@tcp(127.0.0.1:3306)/?tls=false + ### + ### If no servers are specified, then localhost is used as the host. servers = ["tcp(127.0.0.1:3306)/"] ` diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index 126c48f54..4ceca01f2 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -20,7 +20,7 @@ type Nginx struct { } var sampleConfig = ` - # An array of Nginx stub_status URI to gather stats. + ### An array of Nginx stub_status URI to gather stats. urls = ["http://localhost/status"] ` diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 82c09a4bb..1cf7d4dcc 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -41,7 +41,7 @@ type NSQ struct { } var sampleConfig = ` - # An array of NSQD HTTP API endpoints + ### An array of NSQD HTTP API endpoints endpoints = ["http://localhost:4151"] ` diff --git a/plugins/inputs/passenger/passenger.go b/plugins/inputs/passenger/passenger.go index a91e8503c..802107f4f 100644 --- a/plugins/inputs/passenger/passenger.go +++ b/plugins/inputs/passenger/passenger.go @@ -126,16 +126,15 @@ func (p *process) getUptime() int64 { } var sampleConfig = ` - # Path of passenger-status. - # - # Plugin gather metric via parsing XML output of passenger-status - # More information about the tool: - # https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html - # - # - # If no path is specified, then the plugin simply execute passenger-status - # hopefully it can be found in your PATH - command = "passenger-status -v --show=xml" + ### Path of passenger-status. + ### + ### Plugin gather metric via parsing XML output of passenger-status + ### More information about the tool: + ### https://www.phusionpassenger.com/library/admin/apache/overall_status_report.html + ### + ### If no path is specified, then the plugin simply execute passenger-status + ### hopefully it can be found in your PATH + command = "passenger-status -v --show=xml" ` func (r *passenger) SampleConfig() string { diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index 2e9bc417a..157f87691 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -41,26 +41,25 @@ type phpfpm struct { } var sampleConfig = ` - # An array of addresses to gather stats about. Specify an ip or hostname - # with optional port and path - # - # Plugin can be configured in three modes (either can be used): - # - http: the URL must start with http:// or https://, ie: - # "http://localhost/status" - # "http://192.168.130.1/status?full" - # - # - unixsocket: path to fpm socket, ie: - # "/var/run/php5-fpm.sock" - # or using a custom fpm status path: - # "/var/run/php5-fpm.sock:fpm-custom-status-path" - # - # - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie: - # "fcgi://10.0.0.12:9000/status" - # "cgi://10.0.10.12:9001/status" - # - # Example of multiple gathering from local socket and remove host - # urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"] - # If no servers are specified, then default to http://127.0.0.1/status + ### An array of addresses to gather stats about. Specify an ip or hostname + ### with optional port and path + ### + ### Plugin can be configured in three modes (either can be used): + ### - http: the URL must start with http:// or https://, ie: + ### "http://localhost/status" + ### "http://192.168.130.1/status?full" + ### + ### - unixsocket: path to fpm socket, ie: + ### "/var/run/php5-fpm.sock" + ### or using a custom fpm status path: + ### "/var/run/php5-fpm.sock:fpm-custom-status-path" + ### + ### - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie: + ### "fcgi://10.0.0.12:9000/status" + ### "cgi://10.0.10.12:9001/status" + ### + ### Example of multiple gathering from local socket and remove host + ### urls = ["http://192.168.1.20/status", "/tmp/fpm.sock"] urls = ["http://localhost/status"] ` diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index b415c6d4d..ab5df6e82 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -5,6 +5,7 @@ package ping import ( "errors" "os/exec" + "runtime" "strconv" "strings" "sync" @@ -43,18 +44,18 @@ func (_ *Ping) Description() string { } var sampleConfig = ` - # NOTE: this plugin forks the ping command. You may need to set capabilities - # via setcap cap_net_raw+p /bin/ping + ### NOTE: this plugin forks the ping command. You may need to set capabilities + ### via setcap cap_net_raw+p /bin/ping - # urls to ping + ### urls to ping urls = ["www.google.com"] # required - # number of pings to send (ping -c ) + ### number of pings to send (ping -c ) count = 1 # required - # interval, in s, at which to ping. 0 == default (ping -i ) + ### interval, in s, at which to ping. 0 == default (ping -i ) ping_interval = 0.0 - # ping timeout, in s. 0 == no timeout (ping -t ) + ### ping timeout, in s. 0 == no timeout (ping -t ) timeout = 0.0 - # interface to send ping from (ping -I ) + ### interface to send ping from (ping -I ) interface = "" ` @@ -128,12 +129,20 @@ func hostPinger(args ...string) (string, error) { // args returns the arguments for the 'ping' executable func (p *Ping) args(url string) []string { // Build the ping command args based on toml config - args := []string{"-c", strconv.Itoa(p.Count)} + args := []string{"-c", strconv.Itoa(p.Count), "-n", "-s", "16"} if p.PingInterval > 0 { args = append(args, "-i", strconv.FormatFloat(p.PingInterval, 'f', 1, 64)) } if p.Timeout > 0 { - args = append(args, "-t", strconv.FormatFloat(p.Timeout, 'f', 1, 64)) + switch runtime.GOOS { + case "darwin", "freebsd": + args = append(args, "-t", strconv.FormatFloat(p.Timeout, 'f', 1, 64)) + case "linux": + args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', 1, 64)) + default: + // Not sure the best option here, just assume GNU ping? + args = append(args, "-W", strconv.FormatFloat(p.Timeout, 'f', 1, 64)) + } } if p.Interface != "" { args = append(args, "-I", p.Interface) diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index 64da15b51..cd61a4fb2 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -5,6 +5,7 @@ package ping import ( "errors" "reflect" + "runtime" "sort" "testing" @@ -76,7 +77,7 @@ func TestArgs(t *testing.T) { // Actual and Expected arg lists must be sorted for reflect.DeepEqual actual := p.args("www.google.com") - expected := []string{"-c", "2", "www.google.com"} + expected := []string{"-c", "2", "-n", "-s", "16", "www.google.com"} sort.Strings(actual) sort.Strings(expected) assert.True(t, reflect.DeepEqual(expected, actual), @@ -84,7 +85,8 @@ func TestArgs(t *testing.T) { p.Interface = "eth0" actual = p.args("www.google.com") - expected = []string{"-c", "2", "-I", "eth0", "www.google.com"} + expected = []string{"-c", "2", "-n", "-s", "16", "-I", "eth0", + "www.google.com"} sort.Strings(actual) sort.Strings(expected) assert.True(t, reflect.DeepEqual(expected, actual), @@ -92,7 +94,15 @@ func TestArgs(t *testing.T) { p.Timeout = 12.0 actual = p.args("www.google.com") - expected = []string{"-c", "2", "-I", "eth0", "-t", "12.0", "www.google.com"} + switch runtime.GOOS { + case "darwin", "freebsd": + expected = []string{"-c", "2", "-n", "-s", "16", "-I", "eth0", "-t", + "12.0", "www.google.com"} + default: + expected = []string{"-c", "2", "-n", "-s", "16", "-I", "eth0", "-W", + "12.0", "www.google.com"} + } + sort.Strings(actual) sort.Strings(expected) assert.True(t, reflect.DeepEqual(expected, actual), @@ -100,8 +110,14 @@ func TestArgs(t *testing.T) { p.PingInterval = 1.2 actual = p.args("www.google.com") - expected = []string{"-c", "2", "-I", "eth0", "-t", "12.0", "-i", "1.2", - "www.google.com"} + switch runtime.GOOS { + case "darwin", "freebsd": + expected = []string{"-c", "2", "-n", "-s", "16", "-I", "eth0", "-t", + "12.0", "-i", "1.2", "www.google.com"} + default: + expected = []string{"-c", "2", "-n", "-s", "16", "-I", "eth0", "-W", + "12.0", "-i", "1.2", "www.google.com"} + } sort.Strings(actual) sort.Strings(expected) assert.True(t, reflect.DeepEqual(expected, actual), diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index 3a9815af6..d64cc1099 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -21,22 +21,22 @@ type Postgresql struct { var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} var sampleConfig = ` - # specify address via a url matching: - # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] - # or a simple string: - # host=localhost user=pqotest password=... sslmode=... dbname=app_production - # - # All connection parameters are optional. - # - # Without the dbname parameter, the driver will default to a database - # with the same name as the user. This dbname is just for instantiating a - # connection with the server and doesn't restrict the databases we are trying - # to grab metrics for. - # + ### specify address via a url matching: + ### postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] + ### or a simple string: + ### host=localhost user=pqotest password=... sslmode=... dbname=app_production + ### + ### All connection parameters are optional. + ### + ### Without the dbname parameter, the driver will default to a database + ### with the same name as the user. This dbname is just for instantiating a + ### connection with the server and doesn't restrict the databases we are trying + ### to grab metrics for. + ### address = "host=localhost user=postgres sslmode=disable" - # A list of databases to pull metrics about. If not specified, metrics for all - # databases are gathered. + ### A list of databases to pull metrics about. If not specified, metrics for all + ### databases are gathered. # databases = ["app_production", "testing"] ` diff --git a/plugins/inputs/powerdns/powerdns.go b/plugins/inputs/powerdns/powerdns.go index 0203d916e..f011f8716 100644 --- a/plugins/inputs/powerdns/powerdns.go +++ b/plugins/inputs/powerdns/powerdns.go @@ -18,10 +18,8 @@ type Powerdns struct { } var sampleConfig = ` - # An array of sockets to gather stats about. - # Specify a path to unix socket. - # - # If no servers are specified, then '/var/run/pdns.controlsocket' is used as the path. + ### An array of sockets to gather stats about. + ### Specify a path to unix socket. unix_sockets = ["/var/run/pdns.controlsocket"] ` diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index eaaea2843..6305416b7 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -30,15 +30,15 @@ func NewProcstat() *Procstat { } var sampleConfig = ` - # Must specify one of: pid_file, exe, or pattern - # PID file to monitor process + ### Must specify one of: pid_file, exe, or pattern + ### PID file to monitor process pid_file = "/var/run/nginx.pid" - # executable name (ie, pgrep ) + ### executable name (ie, pgrep ) # exe = "nginx" - # pattern as argument for pgrep (ie, pgrep -f ) + ### pattern as argument for pgrep (ie, pgrep -f ) # pattern = "nginx" - # Field name prefix + ### Field name prefix prefix = "" ` diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 9db0dd165..aea5c5f95 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -17,7 +17,7 @@ type Prometheus struct { } var sampleConfig = ` - # An array of urls to scrape metrics from. + ### An array of urls to scrape metrics from. urls = ["http://localhost:9100/metrics"] ` diff --git a/plugins/inputs/puppetagent/puppetagent.go b/plugins/inputs/puppetagent/puppetagent.go index 693d0aaff..882b1e3b8 100644 --- a/plugins/inputs/puppetagent/puppetagent.go +++ b/plugins/inputs/puppetagent/puppetagent.go @@ -18,7 +18,7 @@ type PuppetAgent struct { } var sampleConfig = ` - # Location of puppet last run summary file + ### Location of puppet last run summary file location = "/var/lib/puppet/state/last_run_summary.yaml" ` diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 87edc1ee9..8b287204f 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -107,8 +107,8 @@ var sampleConfig = ` # username = "guest" # password = "guest" - # A list of nodes to pull metrics about. If not specified, metrics for - # all nodes are gathered. + ### A list of nodes to pull metrics about. If not specified, metrics for + ### all nodes are gathered. # nodes = ["rabbit@node1", "rabbit@node2"] ` diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 221ed3c15..88420beac 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -19,14 +19,14 @@ type Redis struct { } var sampleConfig = ` - # specify servers via a url matching: - # [protocol://][:password]@address[:port] - # e.g. - # tcp://localhost:6379 - # tcp://:password@192.168.99.100 - # - # If no servers are specified, then localhost is used as the host. - # If no port is specified, 6379 is used + ### specify servers via a url matching: + ### [protocol://][:password]@address[:port] + ### e.g. + ### tcp://localhost:6379 + ### tcp://:password@192.168.99.100 + ### + ### If no servers are specified, then localhost is used as the host. + ### If no port is specified, 6379 is used servers = ["tcp://localhost:6379"] ` diff --git a/plugins/inputs/rethinkdb/rethinkdb.go b/plugins/inputs/rethinkdb/rethinkdb.go index 4d72fed55..94d31fe5f 100644 --- a/plugins/inputs/rethinkdb/rethinkdb.go +++ b/plugins/inputs/rethinkdb/rethinkdb.go @@ -16,11 +16,11 @@ type RethinkDB struct { } var sampleConfig = ` - # An array of URI to gather stats about. Specify an ip or hostname - # with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, - # rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. - # - # If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. + ### An array of URI to gather stats about. Specify an ip or hostname + ### with optional port add password. ie, + ### rethinkdb://user:auth_key@10.10.3.30:28105, + ### rethinkdb://10.10.3.33:18832, + ### 10.0.0.1:10000, etc. servers = ["127.0.0.1:28015"] ` diff --git a/plugins/inputs/sensors/sensors.go b/plugins/inputs/sensors/sensors.go index 7cfd02a8a..82cc7df89 100644 --- a/plugins/inputs/sensors/sensors.go +++ b/plugins/inputs/sensors/sensors.go @@ -20,15 +20,15 @@ func (_ *Sensors) Description() string { } var sensorsSampleConfig = ` - # By default, telegraf gathers stats from all sensors detected by the - # lm-sensors module. - # - # Only collect stats from the selected sensors. Sensors are listed as - # :. This information can be found by running the - # sensors command, e.g. sensors -u - # - # A * as the feature name will return all features of the chip - # + ### By default, telegraf gathers stats from all sensors detected by the + ### lm-sensors module. + ### + ### Only collect stats from the selected sensors. Sensors are listed as + ### :. This information can be found by running the + ### sensors command, e.g. sensors -u + ### + ### A * as the feature name will return all features of the chip + ### # sensors = ["coretemp-isa-0000:Core 0", "coretemp-isa-0001:*"] ` diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go index 50b78fc9b..1932fed41 100644 --- a/plugins/inputs/snmp/snmp.go +++ b/plugins/inputs/snmp/snmp.go @@ -72,11 +72,11 @@ var initNode = Node{ var NameToOid = make(map[string]string) var sampleConfig = ` - # Use 'oids.txt' file to translate oids to names - # To generate 'oids.txt' you need to run: - # snmptranslate -m all -Tz -On | sed -e 's/"//g' > /tmp/oids.txt - # Or if you have an other MIB folder with custom MIBs - # snmptranslate -M /mycustommibfolder -Tz -On -m all | sed -e 's/"//g' > oids.txt + ### Use 'oids.txt' file to translate oids to names + ### To generate 'oids.txt' you need to run: + ### snmptranslate -m all -Tz -On | sed -e 's/"//g' > /tmp/oids.txt + ### Or if you have an other MIB folder with custom MIBs + ### snmptranslate -M /mycustommibfolder -Tz -On -m all | sed -e 's/"//g' > oids.txt snmptranslate_file = "/tmp/oids.txt" [[inputs.snmp.host]] address = "192.168.2.2:161" diff --git a/plugins/inputs/sqlserver/sqlserver.go b/plugins/inputs/sqlserver/sqlserver.go index 3a67f065d..83d88b3c2 100644 --- a/plugins/inputs/sqlserver/sqlserver.go +++ b/plugins/inputs/sqlserver/sqlserver.go @@ -31,14 +31,14 @@ var queries MapQuery var defaultServer = "Server=.;app name=telegraf;log=1;" var sampleConfig = ` - # Specify instances to monitor with a list of connection strings. - # All connection parameters are optional. - # By default, the host is localhost, listening on default port, TCP 1433. - # for Windows, the user is the currently running AD user (SSO). - # See https://github.com/denisenkom/go-mssqldb for detailed connection parameters. - + ### Specify instances to monitor with a list of connection strings. + ### All connection parameters are optional. + ### By default, the host is localhost, listening on default port, TCP 1433. + ### for Windows, the user is the currently running AD user (SSO). + ### See https://github.com/denisenkom/go-mssqldb for detailed connection + ### parameters. # servers = [ - # "Server=192.168.1.10;Port=1433;User Id=telegraf;Password=T$l$gr@f69*;app name=telegraf;log=1;", + # "Server=192.168.1.10;Port=1433;User Id=;Password=;app name=telegraf;log=1;", # ] ` diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 2b80442d6..fb8de402e 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/services/graphite" + "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -123,37 +123,39 @@ func (_ *Statsd) Description() string { } const sampleConfig = ` - # Address and port to host UDP listener on + ### Address and port to host UDP listener on service_address = ":8125" - # Delete gauges every interval (default=false) + ### Delete gauges every interval (default=false) delete_gauges = false - # Delete counters every interval (default=false) + ### Delete counters every interval (default=false) delete_counters = false - # Delete sets every interval (default=false) + ### Delete sets every interval (default=false) delete_sets = false - # Delete timings & histograms every interval (default=true) + ### Delete timings & histograms every interval (default=true) delete_timings = true - # Percentiles to calculate for timing & histogram stats + ### Percentiles to calculate for timing & histogram stats percentiles = [90] - # convert measurement names, "." to "_" and "-" to "__" + ### convert measurement names, "." to "_" and "-" to "__" convert_names = true + ### Statsd data translation templates, more info can be read here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md#graphite # templates = [ # "cpu.* measurement*" # ] - # Number of UDP messages allowed to queue up, once filled, - # the statsd server will start dropping packets + ### Number of UDP messages allowed to queue up, once filled, + ### the statsd server will start dropping packets allowed_pending_messages = 10000 - # Number of timing/histogram values to track per-measurement in the - # calculation of percentiles. Raising this limit increases the accuracy - # of percentiles but also increases the memory usage and cpu time. + ### Number of timing/histogram values to track per-measurement in the + ### calculation of percentiles. Raising this limit increases the accuracy + ### of percentiles but also increases the memory usage and cpu time. percentile_limit = 1000 - # UDP packet size for the server to listen for. This will depend on the size - # of the packets that the client is sending, which is usually 1500 bytes. + ### UDP packet size for the server to listen for. This will depend on the size + ### of the packets that the client is sending, which is usually 1500 bytes. udp_packet_size = 1500 ` @@ -418,18 +420,14 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { } } - o := graphite.Options{ - Separator: "_", - Templates: s.Templates, - DefaultTags: tags, - } - var field string name := bucketparts[0] - p, err := graphite.NewParserWithOptions(o) + p, err := graphite.NewGraphiteParser(".", s.Templates, nil) if err == nil { + p.DefaultTags = tags name, tags, field, _ = p.ApplyTemplate(name) } + if s.ConvertNames { name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) diff --git a/plugins/inputs/system/cpu.go b/plugins/inputs/system/cpu.go index b8edfca9e..47b3368a7 100644 --- a/plugins/inputs/system/cpu.go +++ b/plugins/inputs/system/cpu.go @@ -28,11 +28,11 @@ func (_ *CPUStats) Description() string { } var sampleConfig = ` - # Whether to report per-cpu stats or not + ### Whether to report per-cpu stats or not percpu = true - # Whether to report total system cpu stats or not + ### Whether to report total system cpu stats or not totalcpu = true - # Comment this line if you want the raw CPU time metrics + ### Comment this line if you want the raw CPU time metrics drop = ["time_*"] ` diff --git a/plugins/inputs/system/disk.go b/plugins/inputs/system/disk.go index 1481f6a91..b8c611427 100644 --- a/plugins/inputs/system/disk.go +++ b/plugins/inputs/system/disk.go @@ -21,8 +21,8 @@ func (_ *DiskStats) Description() string { } var diskSampleConfig = ` - # By default, telegraf gather stats for all mountpoints. - # Setting mountpoints will restrict the stats to the specified mountpoints. + ### By default, telegraf gather stats for all mountpoints. + ### Setting mountpoints will restrict the stats to the specified mountpoints. # mount_points = ["/"] ` diff --git a/plugins/inputs/system/net.go b/plugins/inputs/system/net.go index 451892d37..ea8b66266 100644 --- a/plugins/inputs/system/net.go +++ b/plugins/inputs/system/net.go @@ -21,11 +21,11 @@ func (_ *NetIOStats) Description() string { } var netSampleConfig = ` - # By default, telegraf gathers stats from any up interface (excluding loopback) - # Setting interfaces will tell it to gather these explicit interfaces, - # regardless of status. - # - # interfaces = ["eth0", ... ] + ### By default, telegraf gathers stats from any up interface (excluding loopback) + ### Setting interfaces will tell it to gather these explicit interfaces, + ### regardless of status. + ### + # interfaces = ["eth0"] ` func (_ *NetIOStats) SampleConfig() string { diff --git a/plugins/inputs/system/netstat.go b/plugins/inputs/system/netstat.go index 4eab80e87..98b729bbe 100644 --- a/plugins/inputs/system/netstat.go +++ b/plugins/inputs/system/netstat.go @@ -13,7 +13,7 @@ type NetStats struct { } func (_ *NetStats) Description() string { - return "Read metrics about TCP status such as established, time wait etc and UDP sockets counts." + return "Read TCP metrics such as established, time wait and sockets counts." } var tcpstatSampleConfig = "" diff --git a/plugins/inputs/trig/trig.go b/plugins/inputs/trig/trig.go index e879f39ee..51879dfc1 100644 --- a/plugins/inputs/trig/trig.go +++ b/plugins/inputs/trig/trig.go @@ -13,7 +13,7 @@ type Trig struct { } var TrigConfig = ` - # Set the amplitude + ### Set the amplitude amplitude = 10.0 ` @@ -42,5 +42,5 @@ func (s *Trig) Gather(acc telegraf.Accumulator) error { } func init() { - inputs.Add("Trig", func() telegraf.Input { return &Trig{x: 0.0} }) + inputs.Add("trig", func() telegraf.Input { return &Trig{x: 0.0} }) } diff --git a/plugins/inputs/twemproxy/twemproxy.go b/plugins/inputs/twemproxy/twemproxy.go index 8d8349edb..d5ae12dee 100644 --- a/plugins/inputs/twemproxy/twemproxy.go +++ b/plugins/inputs/twemproxy/twemproxy.go @@ -17,9 +17,9 @@ type Twemproxy struct { } var sampleConfig = ` - # Twemproxy stats address and port (no scheme) + ### Twemproxy stats address and port (no scheme) addr = "localhost:22222" - # Monitor pool name + ### Monitor pool name pools = ["redis_pool", "mc_pool"] ` diff --git a/plugins/inputs/win_perf_counters/win_perf_counters.go b/plugins/inputs/win_perf_counters/win_perf_counters.go index cf3ff8af6..e243588a6 100644 --- a/plugins/inputs/win_perf_counters/win_perf_counters.go +++ b/plugins/inputs/win_perf_counters/win_perf_counters.go @@ -14,12 +14,12 @@ import ( ) var sampleConfig string = ` - # By default this plugin returns basic CPU and Disk statistics. - # See the README file for more examples. - # Uncomment examples below or write your own as you see fit. If the system - # being polled for data does not have the Object at startup of the Telegraf - # agent, it will not be gathered. - # Settings: + ### By default this plugin returns basic CPU and Disk statistics. + ### See the README file for more examples. + ### Uncomment examples below or write your own as you see fit. If the system + ### being polled for data does not have the Object at startup of the Telegraf + ### agent, it will not be gathered. + ### Settings: # PrintValid = false # Print All matching performance counters [[inputs.win_perf_counters.object]] diff --git a/plugins/inputs/zfs/zfs.go b/plugins/inputs/zfs/zfs.go index b6075a56f..57d1fece4 100644 --- a/plugins/inputs/zfs/zfs.go +++ b/plugins/inputs/zfs/zfs.go @@ -23,16 +23,16 @@ type poolInfo struct { } var sampleConfig = ` - # ZFS kstat path - # If not specified, then default is: - # kstatPath = "/proc/spl/kstat/zfs" - # - # By default, telegraf gather all zfs stats - # If not specified, then default is: - # kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"] - # - # By default, don't gather zpool stats - # poolMetrics = false + ### ZFS kstat path + ### If not specified, then default is: + kstatPath = "/proc/spl/kstat/zfs" + + ### By default, telegraf gather all zfs stats + ### If not specified, then default is: + kstatMetrics = ["arcstats", "zfetchstats", "vdev_cache_stats"] + + ### By default, don't gather zpool stats + poolMetrics = false ` func (z *Zfs) SampleConfig() string { diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index bd964d4cc..b18757cd6 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -20,11 +20,11 @@ type Zookeeper struct { } var sampleConfig = ` - # An array of address to gather stats about. Specify an ip or hostname - # with port. ie localhost:2181, 10.0.0.1:2181, etc. + ### An array of address to gather stats about. Specify an ip or hostname + ### with port. ie localhost:2181, 10.0.0.1:2181, etc. - # If no servers are specified, then localhost is used as the host. - # If no port is specified, 2181 is used + ### If no servers are specified, then localhost is used as the host. + ### If no port is specified, 2181 is used servers = [":2181"] ` diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 4d8c9f353..2b62750e3 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -40,6 +40,10 @@ func TestGraphiteError(t *testing.T) { func TestGraphiteOK(t *testing.T) { var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + go TCPServer(t, &wg) + // Init plugin g := Graphite{ Prefix: "my.prefix", @@ -63,24 +67,15 @@ func TestGraphiteOK(t *testing.T) { map[string]interface{}{"value": float64(3.14)}, time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) + // Prepare point list - var metrics []telegraf.Metric - metrics = append(metrics, m1) - metrics = append(metrics, m2) - metrics = append(metrics, m3) - // Start TCP server - wg.Add(1) - go TCPServer(t, &wg) - wg.Wait() - // Connect - wg.Add(1) + metrics := []telegraf.Metric{m1, m2, m3} err1 := g.Connect() - wg.Wait() require.NoError(t, err1) // Send Data err2 := g.Write(metrics) require.NoError(t, err2) - wg.Add(1) + // Waiting TCPserver wg.Wait() g.Close() @@ -88,9 +83,8 @@ func TestGraphiteOK(t *testing.T) { func TCPServer(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") - wg.Done() + defer wg.Done() conn, _ := tcpServer.Accept() - wg.Done() reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() @@ -100,7 +94,6 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { data3, _ := tp.ReadLine() assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) conn.Close() - wg.Done() } func TestGraphiteTags(t *testing.T) { diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index a94c898d0..c11484a48 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -33,14 +33,14 @@ type InfluxDB struct { var sampleConfig = ` ### The full HTTP or UDP endpoint URL for your InfluxDB instance. - ### Multiple urls can be specified but it is assumed that they are part of the same - ### cluster, this means that only ONE of the urls will be written to each interval. + ### Multiple urls can be specified as part of the same cluster, + ### this means that only ONE of the urls will be written to each interval. # urls = ["udp://localhost:8089"] # UDP endpoint example urls = ["http://localhost:8086"] # required ### The target database for metrics (telegraf will create it if not exists) database = "telegraf" # required ### Precision of writes, valid values are n, u, ms, s, m, and h - ### note: using second precision greatly helps InfluxDB compression + ### note: using "s" precision greatly improves InfluxDB compression precision = "s" ### Connection timeout (for the connection with InfluxDB), formatted as a string. diff --git a/plugins/outputs/riemann/riemann.go b/plugins/outputs/riemann/riemann.go index 326bb5705..d20441391 100644 --- a/plugins/outputs/riemann/riemann.go +++ b/plugins/outputs/riemann/riemann.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "os" + "sort" + "strings" "github.com/amir/raidman" "github.com/influxdata/telegraf" @@ -13,6 +15,7 @@ import ( type Riemann struct { URL string Transport string + Separator string client *raidman.Client } @@ -22,6 +25,8 @@ var sampleConfig = ` url = "localhost:5555" ### transport protocol to use either tcp or udp transport = "tcp" + ### separator to use between input name and field name in Riemann service name + separator = " " ` func (r *Riemann) Connect() error { @@ -55,7 +60,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error { var events []*raidman.Event for _, p := range metrics { - evs := buildEvents(p) + evs := buildEvents(p, r.Separator) for _, ev := range evs { events = append(events, ev) } @@ -70,7 +75,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error { return nil } -func buildEvents(p telegraf.Metric) []*raidman.Event { +func buildEvents(p telegraf.Metric, s string) []*raidman.Event { events := []*raidman.Event{} for fieldName, value := range p.Fields() { host, ok := p.Tags()["host"] @@ -85,15 +90,48 @@ func buildEvents(p telegraf.Metric) []*raidman.Event { event := &raidman.Event{ Host: host, - Service: p.Name() + "_" + fieldName, - Metric: value, + Service: serviceName(s, p.Name(), p.Tags(), fieldName), } + + switch value.(type) { + case string: + event.State = value.(string) + default: + event.Metric = value + } + events = append(events, event) } return events } +func serviceName(s string, n string, t map[string]string, f string) string { + serviceStrings := []string{} + serviceStrings = append(serviceStrings, n) + + // we'll skip the 'host' tag + tagStrings := []string{} + tagNames := []string{} + + for tagName := range t { + tagNames = append(tagNames, tagName) + } + sort.Strings(tagNames) + + for _, tagName := range tagNames { + if tagName != "host" { + tagStrings = append(tagStrings, t[tagName]) + } + } + var tagString string = strings.Join(tagStrings, s) + if tagString != "" { + serviceStrings = append(serviceStrings, tagString) + } + serviceStrings = append(serviceStrings, f) + return strings.Join(serviceStrings, s) +} + func init() { outputs.Add("riemann", func() telegraf.Output { return &Riemann{} diff --git a/plugins/parsers/graphite/config.go b/plugins/parsers/graphite/config.go new file mode 100644 index 000000000..7a5c759e7 --- /dev/null +++ b/plugins/parsers/graphite/config.go @@ -0,0 +1,135 @@ +package graphite + +import ( + "fmt" + "strings" +) + +const ( + // DefaultSeparator is the default join character to use when joining multiple + // measurment parts in a template. + DefaultSeparator = "." +) + +// Config represents the configuration for Graphite endpoints. +type Config struct { + Separator string + Templates []string +} + +// Validate validates the config's templates and tags. +func (c *Config) Validate() error { + if err := c.validateTemplates(); err != nil { + return err + } + + return nil +} + +func (c *Config) validateTemplates() error { + // map to keep track of filters we see + filters := map[string]struct{}{} + + for i, t := range c.Templates { + parts := strings.Fields(t) + // Ensure template string is non-empty + if len(parts) == 0 { + return fmt.Errorf("missing template at position: %d", i) + } + if len(parts) == 1 && parts[0] == "" { + return fmt.Errorf("missing template at position: %d", i) + } + + if len(parts) > 3 { + return fmt.Errorf("invalid template format: '%s'", t) + } + + template := t + filter := "" + tags := "" + if len(parts) >= 2 { + // We could have