diff --git a/CHANGELOG.md b/CHANGELOG.md index d45e57168..17444a34b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,23 @@ +## v1.1 [unreleased] + +### Release Notes + +### Features + +- [#1606](https://github.com/influxdata/telegraf/pull/1606): Remove carraige returns from exec plugin output on Windows +- [#1674](https://github.com/influxdata/telegraf/issues/1674): elasticsearch input: configurable timeout. +- [#1607](https://github.com/influxdata/telegraf/pull/1607): Massage metric names in Instrumental output plugin +- [#1572](https://github.com/influxdata/telegraf/pull/1572): mesos improvements. +- [#1513](https://github.com/influxdata/telegraf/issues/1513): Add Ceph Cluster Performance Statistics +- [#1650](https://github.com/influxdata/telegraf/issues/1650): Ability to configure response_timeout in httpjson input. +- [#1685](https://github.com/influxdata/telegraf/issues/1685): Add additional redis metrics. +- [#1539](https://github.com/influxdata/telegraf/pull/1539): Added capability to send metrics through Http API for OpenTSDB. +- [#1471](https://github.com/influxdata/telegraf/pull/1471): iptables input plugin. +- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin. + +### Bugfixes + + ## v1.0 [unreleased] ### Release Notes @@ -10,10 +30,6 @@ plugin, you will need to change your config file from `[[inputs.snmp]]` to `[[inputs.snmp_legacy]]`. The configuration of the new SNMP plugin is _not_ backwards-compatible. -- Telegraf now supports being installed as an official windows service, -which can be installed via -`> C:\Program Files\Telegraf\telegraf.exe --service install` - **Breaking Change**: Aerospike main server node measurements have been renamed aerospike_node. Aerospike namespace measurements have been renamed to aerospike_namespace. They will also now be tagged with the node_name @@ -44,6 +60,10 @@ should now look like: path = "/" ``` +- Telegraf now supports being installed as an official windows service, +which can be installed via +`> C:\Program Files\Telegraf\telegraf.exe --service install` + - `flush_jitter` behavior has been changed. The random jitter will now be evaluated at every flush interval, rather than once at startup. This makes it consistent with the behavior of `collection_jitter`. @@ -90,6 +110,7 @@ consistent with the behavior of `collection_jitter`. - [#1213](https://github.com/influxdata/telegraf/issues/1213): Add inactive & active memory to mem plugin. - [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service. - [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency. +- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin. ### Bugfixes diff --git a/Godeps b/Godeps index 3c70bcaf8..fc94b59c0 100644 --- a/Godeps +++ b/Godeps @@ -1,6 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc -github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a +github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 @@ -47,8 +47,7 @@ github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/shirou/gopsutil 4d0c402af66c78735c5ccf820dc2ca7de5e4ff08 -github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d -github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 +github.com/soniah/gosnmp eb32571c2410868d85849ad67d1e51d01273eb84 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 diff --git a/Makefile b/Makefile index 2951e175a..10ddbef6b 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,7 @@ prepare-windows: # Run all docker containers necessary for unit tests docker-run: + docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 docker run --name kafka \ -e ADVERTISED_HOST=localhost \ -e ADVERTISED_PORT=9092 \ @@ -52,29 +53,26 @@ docker-run: docker run --name postgres -p "5432:5432" -d postgres docker run --name rabbitmq -p "15672:15672" -p "5672:5672" -d rabbitmq:3-management docker run --name redis -p "6379:6379" -d redis - docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann - docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim # Run docker containers necessary for CircleCI unit tests docker-run-circle: + docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0 docker run --name kafka \ -e ADVERTISED_HOST=localhost \ -e ADVERTISED_PORT=9092 \ -p "2181:2181" -p "9092:9092" \ -d spotify/kafka - docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d blalor/riemann - docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp - -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann snmp + -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann + -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/README.md b/README.md index 5fbf1a1d7..68a34d283 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,12 @@ new plugins. ### Linux deb and rpm Packages: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_amd64.deb -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.x86_64.rpm +* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-rc1_amd64.deb +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_rc1.x86_64.rpm Latest (arm): -* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-beta3_armhf.deb -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_beta3.armhf.rpm +* https://dl.influxdata.com/telegraf/releases/telegraf_1.0.0-rc1_armhf.deb +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0_rc1.armhf.rpm ##### Package Instructions: @@ -46,14 +46,14 @@ to use this repo to install & update telegraf. ### Linux tarballs: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_amd64.tar.gz -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_i386.tar.gz -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_linux_armhf.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-rc1_linux_amd64.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-rc1_linux_i386.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-rc1_linux_armhf.tar.gz ### FreeBSD tarball: Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_freebsd_amd64.tar.gz +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-rc1_freebsd_amd64.tar.gz ### Ansible Role: @@ -69,7 +69,7 @@ brew install telegraf ### Windows Binaries (EXPERIMENTAL) Latest: -* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-beta3_windows_amd64.zip +* https://dl.influxdata.com/telegraf/releases/telegraf-1.0.0-rc1_windows_amd64.zip ### From Source: @@ -161,6 +161,7 @@ Currently implemented sources: * [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb) * [ipmi_sensor](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/ipmi_sensor) +* [iptables](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/iptables) * [jolokia](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/jolokia) * [leofs](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/leofs) * [lustre2](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/lustre2) @@ -190,6 +191,7 @@ Currently implemented sources: * [riak](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/riak) * [sensors](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/sensors) * [snmp](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/snmp) +* [snmp_legacy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/snmp_legacy) * [sql server](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/sqlserver) (microsoft) * [twemproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/twemproxy) * [varnish](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/varnish) @@ -219,6 +221,7 @@ Telegraf can also collect metrics via the following service plugins: * [kafka_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/kafka_consumer) * [nats_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nats_consumer) * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks) + * [filestack](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/filestack) * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github) * [mandrill](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/mandrill) * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar) diff --git a/docs/WINDOWS_SERVICE.md b/docs/WINDOWS_SERVICE.md index 0ef218350..66046cdbb 100644 --- a/docs/WINDOWS_SERVICE.md +++ b/docs/WINDOWS_SERVICE.md @@ -6,19 +6,18 @@ the general steps to set it up. 1. Obtain the telegraf windows distribution 2. Create the directory `C:\Program Files\Telegraf` (if you install in a different location simply specify the `-config` parameter with the desired location) -3. Place the telegraf.exe and the config file into `C:\Program Files\Telegraf` -4. To install the service into the Windows Service Manager, run (as an - administrator): +3. Place the telegraf.exe and the telegraf.conf config file into `C:\Program Files\Telegraf` +4. To install the service into the Windows Service Manager, run the following in PowerShell as an administrator (If necessary, you can wrap any spaces in the file paths in double quotes ""): ``` - > C:\Program Files\Telegraf\telegraf.exe --service install + > C:\"Program Files"\Telegraf\telegraf.exe --service install ``` 5. Edit the configuration file to meet your needs 6. To check that it works, run: ``` - > C:\Program Files\Telegraf\telegraf.exe --config C:\Program Files\Telegraf\telegraf.conf --test + > C:\"Program Files"\Telegraf\telegraf.exe --config C:\"Program Files"\Telegraf\telegraf.conf --test ``` 7. To start collecting data, run: diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 902c7f7fb..f6e9b2ffe 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -511,6 +511,10 @@ # # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. # [[inputs.ceph]] +# ## This is the recommended interval to poll. Too frequent and you will lose +# ## data points due to timeouts during rebalancing and recovery +# interval = '1m' +# # ## All configuration values are optional, defaults are shown below # # ## location of ceph binary @@ -525,6 +529,26 @@ # # ## suffix used to identify socket files # socket_suffix = "asok" +# +# ## Ceph user to authenticate as, ceph will search for the corresponding keyring +# ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the +# ## client section of ceph.conf for example: +# ## +# ## [client.telegraf] +# ## keyring = /etc/ceph/client.telegraf.keyring +# ## +# ## Consult the ceph documentation for more detail on keyring generation. +# ceph_user = "client.admin" +# +# ## Ceph configuration to use to locate the cluster +# ceph_config = "/etc/ceph/ceph.conf" +# +# ## Whether to gather statistics via the admin socket +# gather_admin_socket_stats = true +# +# ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config +# ## to be specified +# gather_cluster_stats = true # # Read specific statistics per cgroup @@ -886,6 +910,18 @@ # ## # servers = ["USERID:PASSW0RD@lan(192.168.1.1)"] +# # Gather packets and bytes throughput from iptables +# [[inputs.iptables]] +# ## iptables require root access on most systems. +# ## Setting 'use_sudo' to true will make use of sudo to run iptables. +# ## Users must configure sudo to allow telegraf user to run iptables. +# ## iptables can be restricted to only use list command "iptables -nvL" +# use_sudo = false +# ## define the table to monitor: +# table = "filter" +# ## Defines the chains to monitor: +# chains = [ "INPUT" ] + # # Read JMX metrics through Jolokia # [[inputs.jolokia]] diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index eb608723e..477772774 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" - as "github.com/sparrc/aerospike-client-go" + as "github.com/aerospike/aerospike-client-go" ) type Aerospike struct { diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 8463432f5..5f80b5196 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -10,7 +10,7 @@ import ( func TestAerospikeStatistics(t *testing.T) { if testing.Short() { - t.Skip("Skipping integration test in short mode") + t.Skip("Skipping aerospike integration tests.") } a := &Aerospike{ @@ -29,7 +29,7 @@ func TestAerospikeStatistics(t *testing.T) { func TestAerospikeStatisticsPartialErr(t *testing.T) { if testing.Short() { - t.Skip("Skipping integration test in short mode") + t.Skip("Skipping aerospike integration tests.") } a := &Aerospike{ diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index af759aac8..96fbdffe1 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -27,6 +27,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" + _ "github.com/influxdata/telegraf/plugins/inputs/iptables" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" @@ -60,6 +61,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" + _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" diff --git a/plugins/inputs/ceph/README.md b/plugins/inputs/ceph/README.md index ab358daaa..49ae09e73 100644 --- a/plugins/inputs/ceph/README.md +++ b/plugins/inputs/ceph/README.md @@ -2,7 +2,9 @@ Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. -The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds +*Admin Socket Stats* + +This gatherer works by scanning the configured SocketDir for OSD and MON socket files. When it finds a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump** The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are @@ -27,11 +29,26 @@ Would be parsed into the following metrics, all of which would be tagged with co - refresh_latency.sum: 5378.794002000 +*Cluster Stats* + +This gatherer works by invoking ceph commands against the cluster thus only requires the ceph client, valid +ceph configuration and an access key to function (the ceph_config and ceph_user configuration variables work +in conjunction to specify these prerequisites). It may be run on any server you wish which has access to +the cluster. The currently supported commands are: + +* ceph status +* ceph df +* ceph osd pool stats + ### Configuration: ``` # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. [[inputs.ceph]] + ## This is the recommended interval to poll. Too frequent and you will lose + ## data points due to timeouts during rebalancing and recovery + interval = '1m' + ## All configuration values are optional, defaults are shown below ## location of ceph binary @@ -46,15 +63,86 @@ Would be parsed into the following metrics, all of which would be tagged with co ## suffix used to identify socket files socket_suffix = "asok" + + ## Ceph user to authenticate as, ceph will search for the corresponding keyring + ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the + ## client section of ceph.conf for example: + ## + ## [client.telegraf] + ## keyring = /etc/ceph/client.telegraf.keyring + ## + ## Consult the ceph documentation for more detail on keyring generation. + ceph_user = "client.admin" + + ## Ceph configuration to use to locate the cluster + ceph_config = "/etc/ceph/ceph.conf" + + ## Whether to gather statistics via the admin socket + gather_admin_socket_stats = true + + ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config + ## to be specified + gather_cluster_stats = true ``` ### Measurements & Fields: +*Admin Socket Stats* + All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go. +*Cluster Stats* + +* ceph\_osdmap + * epoch (float) + * full (boolean) + * nearfull (boolean) + * num\_in\_osds (float) + * num\_osds (float) + * num\_remremapped\_pgs (float) + * num\_up\_osds (float) + +* ceph\_pgmap + * bytes\_avail (float) + * bytes\_total (float) + * bytes\_used (float) + * data\_bytes (float) + * num\_pgs (float) + * op\_per\_sec (float) + * read\_bytes\_sec (float) + * version (float) + * write\_bytes\_sec (float) + * recovering\_bytes\_per\_sec (float) + * recovering\_keys\_per\_sec (float) + * recovering\_objects\_per\_sec (float) + +* ceph\_pgmap\_state + * state name e.g. active+clean (float) + +* ceph\_usage + * bytes\_used (float) + * kb\_used (float) + * max\_avail (float) + * objects (float) + +* ceph\_pool\_usage + * bytes\_used (float) + * kb\_used (float) + * max\_avail (float) + * objects (float) + +* ceph\_pool\_stats + * op\_per\_sec (float) + * read\_bytes\_sec (float) + * write\_bytes\_sec (float) + * recovering\_object\_per\_sec (float) + * recovering\_bytes\_per\_sec (float) + * recovering\_keys\_per\_sec (float) ### Tags: +*Admin Socket Stats* + All measurements will have the following tags: - type: either 'osd' or 'mon' to indicate which type of node was queried @@ -96,9 +184,21 @@ All measurements will have the following tags: - throttle-osd_client_bytes - throttle-osd_client_messages +*Cluster Stats* + +* ceph\_pg\_state has the following tags: + * state (state for which the value applies e.g. active+clean, active+remapped+backfill) +* ceph\_pool\_usage has the following tags: + * id + * name +* ceph\_pool\_stats has the following tags: + * id + * name ### Example Output: +*Admin Socket Stats* +
telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d -input-filter ceph * Plugin: ceph, Collection 1 @@ -107,3 +207,16 @@ telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegr > ceph,collection=throttle-mon_daemon_bytes,id=node-2,type=mon get=4058121,get_or_fail_fail=0,get_or_fail_success=0,get_sum=6027348117,max=419430400,put=4058121,put_sum=6027348117,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814815661 > ceph,collection=throttle-msgr_dispatch_throttler-mon,id=node-2,type=mon get=54276277,get_or_fail_fail=0,get_or_fail_success=0,get_sum=370232877040,max=104857600,put=54276277,put_sum=370232877040,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814872064+ +*Cluster Stats* + +
+> ceph_osdmap,host=ceph-mon-0 epoch=170772,full=false,nearfull=false,num_in_osds=340,num_osds=340,num_remapped_pgs=0,num_up_osds=340 1468841037000000000 +> ceph_pgmap,host=ceph-mon-0 bytes_avail=634895531270144,bytes_total=812117151809536,bytes_used=177221620539392,data_bytes=56979991615058,num_pgs=22952,op_per_sec=15869,read_bytes_sec=43956026,version=39387592,write_bytes_sec=165344818 1468841037000000000 +> ceph_pgmap_state,host=ceph-mon-0 active+clean=22952 1468928660000000000 +> ceph_usage,host=ceph-mon-0 total_avail_bytes=634895514791936,total_bytes=812117151809536,total_used_bytes=177221637017600 1468841037000000000 +> ceph_pool_usage,host=ceph-mon-0,id=150,name=cinder.volumes bytes_used=12648553794802,kb_used=12352103316,max_avail=154342562489244,objects=3026295 1468841037000000000 +> ceph_pool_usage,host=ceph-mon-0,id=182,name=cinder.volumes.flash bytes_used=8541308223964,kb_used=8341121313,max_avail=39388593563936,objects=2075066 1468841037000000000 +> ceph_pool_stats,host=ceph-mon-0,id=150,name=cinder.volumes op_per_sec=1706,read_bytes_sec=28671674,write_bytes_sec=29994541 1468841037000000000 +> ceph_pool_stats,host=ceph-mon-0,id=182,name=cinder.volumes.flash op_per_sec=9748,read_bytes_sec=9605524,write_bytes_sec=45593310 1468841037000000000 +diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index d8ebf5017..d5ed464fa 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -23,33 +23,15 @@ const ( ) type Ceph struct { - CephBinary string - OsdPrefix string - MonPrefix string - SocketDir string - SocketSuffix string -} - -func (c *Ceph) setDefaults() { - if c.CephBinary == "" { - c.CephBinary = "/usr/bin/ceph" - } - - if c.OsdPrefix == "" { - c.OsdPrefix = osdPrefix - } - - if c.MonPrefix == "" { - c.MonPrefix = monPrefix - } - - if c.SocketDir == "" { - c.SocketDir = "/var/run/ceph" - } - - if c.SocketSuffix == "" { - c.SocketSuffix = sockSuffix - } + CephBinary string + OsdPrefix string + MonPrefix string + SocketDir string + SocketSuffix string + CephUser string + CephConfig string + GatherAdminSocketStats bool + GatherClusterStats bool } func (c *Ceph) Description() string { @@ -57,6 +39,10 @@ func (c *Ceph) Description() string { } var sampleConfig = ` + ## This is the recommended interval to poll. Too frequent and you will lose + ## data points due to timeouts during rebalancing and recovery + interval = '1m' + ## All configuration values are optional, defaults are shown below ## location of ceph binary @@ -71,6 +57,18 @@ var sampleConfig = ` ## suffix used to identify socket files socket_suffix = "asok" + + ## Ceph user to authenticate as + ceph_user = "client.admin" + + ## Ceph configuration to use to locate the cluster + ceph_config = "/etc/ceph/ceph.conf" + + ## Whether to gather statistics via the admin socket + gather_admin_socket_stats = true + + ## Whether to gather statistics via ceph commands + gather_cluster_stats = true ` func (c *Ceph) SampleConfig() string { @@ -78,7 +76,22 @@ func (c *Ceph) SampleConfig() string { } func (c *Ceph) Gather(acc telegraf.Accumulator) error { - c.setDefaults() + if c.GatherAdminSocketStats { + if err := c.gatherAdminSocketStats(acc); err != nil { + return err + } + } + + if c.GatherClusterStats { + if err := c.gatherClusterStats(acc); err != nil { + return err + } + } + + return nil +} + +func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { sockets, err := findSockets(c) if err != nil { return fmt.Errorf("failed to find sockets at path '%s': %v", c.SocketDir, err) @@ -104,8 +117,46 @@ func (c *Ceph) Gather(acc telegraf.Accumulator) error { return nil } +func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error { + jobs := []struct { + command string + parser func(telegraf.Accumulator, string) error + }{ + {"status", decodeStatus}, + {"df", decodeDf}, + {"osd pool stats", decodeOsdPoolStats}, + } + + // For each job, execute against the cluster, parse and accumulate the data points + for _, job := range jobs { + output, err := c.exec(job.command) + if err != nil { + return fmt.Errorf("error executing command: %v", err) + } + err = job.parser(acc, output) + if err != nil { + return fmt.Errorf("error parsing output: %v", err) + } + } + + return nil +} + func init() { - inputs.Add(measurement, func() telegraf.Input { return &Ceph{} }) + c := Ceph{ + CephBinary: "/usr/bin/ceph", + OsdPrefix: osdPrefix, + MonPrefix: monPrefix, + SocketDir: "/var/run/ceph", + SocketSuffix: sockSuffix, + CephUser: "client.admin", + CephConfig: "/etc/ceph/ceph.conf", + GatherAdminSocketStats: true, + GatherClusterStats: false, + } + + inputs.Add(measurement, func() telegraf.Input { return &c }) + } var perfDump = func(binary string, socket *socket) (string, error) { @@ -247,3 +298,192 @@ func flatten(data interface{}) []*metric { return metrics } + +func (c *Ceph) exec(command string) (string, error) { + cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"} + cmdArgs = append(cmdArgs, strings.Split(command, " ")...) + + cmd := exec.Command(c.CephBinary, cmdArgs...) + + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + return "", fmt.Errorf("error running ceph %v: %s", command, err) + } + + output := out.String() + + // Ceph doesn't sanitize its output, and may return invalid JSON. Patch this + // up for them, as having some inaccurate data is better than none. + output = strings.Replace(output, "-inf", "0", -1) + output = strings.Replace(output, "inf", "0", -1) + + return output, nil +} + +func decodeStatus(acc telegraf.Accumulator, input string) error { + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + err = decodeStatusOsdmap(acc, data) + if err != nil { + return err + } + + err = decodeStatusPgmap(acc, data) + if err != nil { + return err + } + + err = decodeStatusPgmapState(acc, data) + if err != nil { + return err + } + + return nil +} + +func decodeStatusOsdmap(acc telegraf.Accumulator, data map[string]interface{}) error { + osdmap, ok := data["osdmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) + } + fields, ok := osdmap["osdmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) + } + acc.AddFields("ceph_osdmap", fields, map[string]string{}) + return nil +} + +func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) error { + pgmap, ok := data["pgmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) + } + fields := make(map[string]interface{}) + for key, value := range pgmap { + switch value.(type) { + case float64: + fields[key] = value + } + } + acc.AddFields("ceph_pgmap", fields, map[string]string{}) + return nil +} + +func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error { + pgmap, ok := data["pgmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) + } + fields := make(map[string]interface{}) + for key, value := range pgmap { + switch value.(type) { + case []interface{}: + if key != "pgs_by_state" { + continue + } + for _, state := range value.([]interface{}) { + state_map, ok := state.(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state", measurement) + } + state_name, ok := state_map["state_name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement) + } + state_count, ok := state_map["count"].(float64) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement) + } + fields[state_name] = state_count + } + } + } + acc.AddFields("ceph_pgmap_state", fields, map[string]string{}) + return nil +} + +func decodeDf(acc telegraf.Accumulator, input string) error { + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + // ceph.usage: records global utilization and number of objects + stats_fields, ok := data["stats"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df stats", measurement) + } + acc.AddFields("ceph_usage", stats_fields, map[string]string{}) + + // ceph.pool.usage: records per pool utilization and number of objects + pools, ok := data["pools"].([]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pools", measurement) + } + + for _, pool := range pools { + pool_map, ok := pool.(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool", measurement) + } + pool_name, ok := pool_map["name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool name", measurement) + } + fields, ok := pool_map["stats"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool stats", measurement) + } + tags := map[string]string{ + "name": pool_name, + } + acc.AddFields("ceph_pool_usage", fields, tags) + } + + return nil +} + +func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { + data := make([]map[string]interface{}, 0) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + // ceph.pool.stats: records pre pool IO and recovery throughput + for _, pool := range data { + pool_name, ok := pool["pool_name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osd pool stats name", measurement) + } + // Note: the 'recovery' object looks broken (in hammer), so it's omitted + objects := []string{ + "client_io_rate", + "recovery_rate", + } + fields := make(map[string]interface{}) + for _, object := range objects { + perfdata, ok := pool[object].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osd pool stats", measurement) + } + for key, value := range perfdata { + fields[key] = value + } + } + tags := map[string]string{ + "name": pool_name, + } + acc.AddFields("ceph_pool_stats", fields, tags) + } + + return nil +} diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go index ce96943be..f7b17ece3 100644 --- a/plugins/inputs/ceph/ceph_test.go +++ b/plugins/inputs/ceph/ceph_test.go @@ -65,12 +65,17 @@ func TestFindSockets(t *testing.T) { assert.NoError(t, err) }() c := &Ceph{ - CephBinary: "foo", - SocketDir: tmpdir, + CephBinary: "foo", + OsdPrefix: "ceph-osd", + MonPrefix: "ceph-mon", + SocketDir: tmpdir, + SocketSuffix: "asok", + CephUser: "client.admin", + CephConfig: "/etc/ceph/ceph.conf", + GatherAdminSocketStats: true, + GatherClusterStats: false, } - c.setDefaults() - for _, st := range sockTestParams { createTestFiles(tmpdir, st) diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index df62e62bc..4430e48fd 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -34,6 +34,11 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## Metric Statistic Namespace (required) namespace = 'AWS/ELB' + ## Maximum requests per second. Note that the global default AWS rate limit is + ## 10 reqs/sec, so if you define multiple namespaces, these should add up to a + ## maximum of 10. Optional - default value is 10. + ratelimit = 10 + ## Metrics to Pull (optional) ## Defaults to all Metrics in Namespace if nothing is provided ## Refreshes Namespace available metrics every 1h diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index f3019eb4b..ebc4147d8 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -33,6 +33,7 @@ type ( Namespace string `toml:"namespace"` Metrics []*Metric `toml:"metrics"` CacheTTL internal.Duration `toml:"cache_ttl"` + RateLimit int `toml:"ratelimit"` client cloudwatchClient metricCache *MetricCache } @@ -96,6 +97,11 @@ func (c *CloudWatch) SampleConfig() string { ## Metric Statistic Namespace (required) namespace = 'AWS/ELB' + ## Maximum requests per second. Note that the global default AWS rate limit is + ## 10 reqs/sec, so if you define multiple namespaces, these should add up to a + ## maximum of 10. Optional - default value is 10. + ratelimit = 10 + ## Metrics to Pull (optional) ## Defaults to all Metrics in Namespace if nothing is provided ## Refreshes Namespace available metrics every 1h @@ -175,7 +181,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { // limit concurrency or we can easily exhaust user connection limit // see cloudwatch API request limits: // http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html - lmtr := limiter.NewRateLimiter(10, time.Second) + lmtr := limiter.NewRateLimiter(c.RateLimit, time.Second) defer lmtr.Stop() var wg sync.WaitGroup wg.Add(len(metrics)) @@ -195,7 +201,8 @@ func init() { inputs.Add("cloudwatch", func() telegraf.Input { ttl, _ := time.ParseDuration("1hr") return &CloudWatch{ - CacheTTL: internal.Duration{Duration: ttl}, + CacheTTL: internal.Duration{Duration: ttl}, + RateLimit: 10, } }) } diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 8f8a3ad0b..73fca9253 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -58,6 +58,7 @@ func TestGather(t *testing.T) { Namespace: "AWS/ELB", Delay: internalDuration, Period: internalDuration, + RateLimit: 10, } var acc testutil.Accumulator diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index 526bc3f39..2cf6f4d77 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -8,9 +8,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference ``` [[inputs.elasticsearch]] + ## specify a list of one or more Elasticsearch servers servers = ["http://localhost:9200"] + + ## Timeout for HTTP requests to the elastic search server(s) + http_timeout = "5s" + + ## set local to false when you want to read the indices stats from all nodes + ## within the cluster local = true - cluster_health = true + + ## set cluster_health to true when you want to also obtain cluster level stats + cluster_health = false ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index ef0a4c199..896e03f2e 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -62,6 +62,9 @@ const sampleConfig = ` ## specify a list of one or more Elasticsearch servers servers = ["http://localhost:9200"] + ## Timeout for HTTP requests to the elastic search server(s) + http_timeout = "5s" + ## set local to false when you want to read the indices stats from all nodes ## within the cluster local = true @@ -82,6 +85,7 @@ const sampleConfig = ` type Elasticsearch struct { Local bool Servers []string + HttpTimeout internal.Duration ClusterHealth bool SSLCA string `toml:"ssl_ca"` // Path to CA file SSLCert string `toml:"ssl_cert"` // Path to host cert file @@ -92,7 +96,9 @@ type Elasticsearch struct { // NewElasticsearch return a new instance of Elasticsearch func NewElasticsearch() *Elasticsearch { - return &Elasticsearch{} + return &Elasticsearch{ + HttpTimeout: internal.Duration{Duration: time.Second * 5}, + } } // SampleConfig returns sample configuration for this plugin. @@ -150,12 +156,12 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) { return nil, err } tr := &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), + ResponseHeaderTimeout: e.HttpTimeout.Duration, TLSClientConfig: tlsCfg, } client := &http.Client{ Transport: tr, - Timeout: time.Duration(4 * time.Second), + Timeout: e.HttpTimeout.Duration, } return client, nil diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 060a4f308..f2fc60e5c 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -5,6 +5,7 @@ import ( "fmt" "os/exec" "path/filepath" + "runtime" "strings" "sync" "syscall" @@ -114,9 +115,36 @@ func (c CommandRunner) Run( } } + out = removeCarriageReturns(out) return out.Bytes(), nil } +// removeCarriageReturns removes all carriage returns from the input if the +// OS is Windows. It does not return any errors. +func removeCarriageReturns(b bytes.Buffer) bytes.Buffer { + if runtime.GOOS == "windows" { + var buf bytes.Buffer + for { + byt, er := b.ReadBytes(0x0D) + end := len(byt) + if nil == er { + end -= 1 + } + if nil != byt { + buf.Write(byt[:end]) + } else { + break + } + if nil != er { + break + } + } + b = buf + } + return b + +} + func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { defer wg.Done() diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index cd9c9eaef..ac527a12f 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -1,7 +1,9 @@ package exec import ( + "bytes" "fmt" + "runtime" "testing" "github.com/influxdata/telegraf" @@ -46,6 +48,29 @@ cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 ` +type CarriageReturnTest struct { + input []byte + output []byte +} + +var crTests = []CarriageReturnTest{ + {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69, + 0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65, + 0x20, 0x33}, + []byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, + 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}}, + {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, + 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}, + []byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0a, 0x4c, 0x69, 0x6e, + 0x65, 0x20, 0x32, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}}, + {[]byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c, + 0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20, + 0x6c, 0x69, 0x6e, 0x65}, + []byte{0x54, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x6c, + 0x6c, 0x20, 0x6f, 0x6e, 0x65, 0x20, 0x62, 0x69, 0x67, 0x20, + 0x6c, 0x69, 0x6e, 0x65}}, +} + type runnerMock struct { out []byte err error @@ -217,3 +242,21 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) { } acc.AssertContainsFields(t, "metric", fields) } + +func TestRemoveCarriageReturns(t *testing.T) { + if runtime.GOOS == "windows" { + // Test that all carriage returns are removed + for _, test := range crTests { + b := bytes.NewBuffer(test.input) + out := removeCarriageReturns(*b) + assert.True(t, bytes.Equal(test.output, out.Bytes())) + } + } else { + // Test that the buffer is returned unaltered + for _, test := range crTests { + b := bytes.NewBuffer(test.input) + out := removeCarriageReturns(*b) + assert.True(t, bytes.Equal(test.input, out.Bytes())) + } + } +} diff --git a/plugins/inputs/httpjson/README.md b/plugins/inputs/httpjson/README.md index 707b256df..81680e6ec 100644 --- a/plugins/inputs/httpjson/README.md +++ b/plugins/inputs/httpjson/README.md @@ -2,8 +2,7 @@ The httpjson plugin can collect data from remote URLs which respond with JSON. Then it flattens JSON and finds all numeric values, treating them as floats. -For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats at http://my.service.com/_stats, you would configure the HTTP JSON -plugin like this: +For example, if you have a service called _mycollector_, which has HTTP endpoint for gathering stats at http://my.service.com/_stats, you would configure the HTTP JSON plugin like this: ``` [[inputs.httpjson]] @@ -15,12 +14,17 @@ plugin like this: # HTTP method to use (case-sensitive) method = "GET" + + # Set response_timeout (default 5 seconds) + response_timeout = "5s" ``` `name` is used as a prefix for the measurements. `method` specifies HTTP method to use for requests. +`response_timeout` specifies timeout to wait to get the response + You can also specify which keys from server response should be considered tags: ``` @@ -94,8 +98,7 @@ httpjson_mycollector_b_e,service='service01',server='http://my.service.com/_stat # Example 2, Multiple Services: -There is also the option to collect JSON from multiple services, here is an -example doing that. +There is also the option to collect JSON from multiple services, here is an example doing that. ``` [[inputs.httpjson]] diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 6fe4da1e5..89bfccf77 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -16,13 +16,15 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" ) +// HttpJson struct type HttpJson struct { - Name string - Servers []string - Method string - TagKeys []string - Parameters map[string]string - Headers map[string]string + Name string + Servers []string + Method string + TagKeys []string + ResponseTimeout internal.Duration + Parameters map[string]string + Headers map[string]string // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -79,6 +81,8 @@ var sampleConfig = ` "http://localhost:9999/stats/", "http://localhost:9998/stats/", ] + ## Set response_timeout (default 5 seconds) + response_timeout = "5s" ## HTTP method to use: GET or POST (case-sensitive) method = "GET" @@ -126,12 +130,12 @@ func (h *HttpJson) Gather(acc telegraf.Accumulator) error { return err } tr := &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), + ResponseHeaderTimeout: h.ResponseTimeout.Duration, TLSClientConfig: tlsCfg, } client := &http.Client{ Transport: tr, - Timeout: time.Duration(4 * time.Second), + Timeout: h.ResponseTimeout.Duration, } h.client.SetHTTPClient(client) } @@ -291,6 +295,9 @@ func init() { inputs.Add("httpjson", func() telegraf.Input { return &HttpJson{ client: &RealHTTPClient{}, + ResponseTimeout: internal.Duration{ + Duration: 5 * time.Second, + }, } }) } diff --git a/plugins/inputs/iptables/README.md b/plugins/inputs/iptables/README.md new file mode 100644 index 000000000..f5ebd4780 --- /dev/null +++ b/plugins/inputs/iptables/README.md @@ -0,0 +1,74 @@ +# Iptables Plugin + +The iptables plugin gathers packets and bytes counters for rules within a set of table and chain from the Linux's iptables firewall. + +Rules are identified through associated comment. Rules without comment are ignored. + +The iptables command requires CAP_NET_ADMIN and CAP_NET_RAW capabilities. You have several options to grant telegraf to run iptables: + +* Run telegraf as root. This is strongly discouraged. +* Configure systemd to run telegraf with CAP_NET_ADMIN and CAP_NET_RAW. This is the simplest and recommended option. +* Configure sudo to grant telegraf to run iptables. This is the most restrictive option, but require sudo setup. + +### Using systemd capabilities + +You may run `systemctl edit telegraf.service` and add the following: + +``` +[Service] +CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN +AmbientCapabilities=CAP_NET_RAW CAP_NET_ADMIN +``` + +Since telegraf will fork a process to run iptables, `AmbientCapabilities` is required to transmit the capabilities bounding set to the forked process. + +### Using sudo + +You may edit your sudo configuration with the following: + +```sudo +telegraf ALL=(root) NOPASSWD: /usr/bin/iptables -nvL * +``` + +### Configuration: + +```toml + # use sudo to run iptables + use_sudo = false + # defines the table to monitor: + table = "filter" + # defines the chains to monitor: + chains = [ "INPUT" ] +``` + +### Measurements & Fields: + + +- iptables + - pkts (integer, count) + - bytes (integer, bytes) + +### Tags: + +- All measurements have the following tags: + - table + - chain + - ruleid + +The `ruleid` is the comment associated to the rule. + +### Example Output: + +``` +$ iptables -nvL INPUT +Chain INPUT (policy DROP 0 packets, 0 bytes) +pkts bytes target prot opt in out source destination +100 1024 ACCEPT tcp -- * * 192.168.0.0/24 0.0.0.0/0 tcp dpt:22 /* ssh */ + 42 2048 ACCEPT tcp -- * * 192.168.0.0/24 0.0.0.0/0 tcp dpt:80 /* httpd */ +``` + +``` +$ ./telegraf -config telegraf.conf -input-filter iptables -test +iptables,table=filter,chain=INPUT,ruleid=ssh pkts=100i,bytes=1024i 1453831884664956455 +iptables,table=filter,chain=INPUT,ruleid=httpd pkts=42i,bytes=2048i 1453831884664956455 +``` diff --git a/plugins/inputs/iptables/iptables.go b/plugins/inputs/iptables/iptables.go new file mode 100644 index 000000000..4ceb45230 --- /dev/null +++ b/plugins/inputs/iptables/iptables.go @@ -0,0 +1,128 @@ +// +build linux + +package iptables + +import ( + "errors" + "os/exec" + "regexp" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Iptables is a telegraf plugin to gather packets and bytes throughput from Linux's iptables packet filter. +type Iptables struct { + UseSudo bool + Table string + Chains []string + lister chainLister +} + +// Description returns a short description of the plugin. +func (ipt *Iptables) Description() string { + return "Gather packets and bytes throughput from iptables" +} + +// SampleConfig returns sample configuration options. +func (ipt *Iptables) SampleConfig() string { + return ` + ## iptables require root access on most systems. + ## Setting 'use_sudo' to true will make use of sudo to run iptables. + ## Users must configure sudo to allow telegraf user to run iptables with no password. + ## iptables can be restricted to only list command "iptables -nvL" + use_sudo = false + ## defines the table to monitor: + table = "filter" + ## defines the chains to monitor: + chains = [ "INPUT" ] +` +} + +// Gather gathers iptables packets and bytes throughput from the configured tables and chains. +func (ipt *Iptables) Gather(acc telegraf.Accumulator) error { + if ipt.Table == "" || len(ipt.Chains) == 0 { + return nil + } + // best effort : we continue through the chains even if an error is encountered, + // but we keep track of the last error. + var err error + for _, chain := range ipt.Chains { + data, e := ipt.lister(ipt.Table, chain) + if e != nil { + err = e + continue + } + e = ipt.parseAndGather(data, acc) + if e != nil { + err = e + continue + } + } + return err +} + +func (ipt *Iptables) chainList(table, chain string) (string, error) { + iptablePath, err := exec.LookPath("iptables") + if err != nil { + return "", err + } + var args []string + name := iptablePath + if ipt.UseSudo { + name = "sudo" + args = append(args, iptablePath) + } + args = append(args, "-nvL", chain, "-t", table, "-x") + c := exec.Command(name, args...) + out, err := c.Output() + return string(out), err +} + +const measurement = "iptables" + +var errParse = errors.New("Cannot parse iptables list information") +var chainNameRe = regexp.MustCompile(`^Chain\s+(\S+)`) +var fieldsHeaderRe = regexp.MustCompile(`^\s*pkts\s+bytes\s+`) +var valuesRe = regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+.*?(/\*\s(.*)\s\*/)?$`) + +func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error { + lines := strings.Split(data, "\n") + if len(lines) < 3 { + return nil + } + mchain := chainNameRe.FindStringSubmatch(lines[0]) + if mchain == nil { + return errParse + } + if !fieldsHeaderRe.MatchString(lines[1]) { + return errParse + } + for _, line := range lines[2:] { + mv := valuesRe.FindAllStringSubmatch(line, -1) + // best effort : if line does not match or rule is not commented forget about it + if len(mv) == 0 || len(mv[0]) != 5 || mv[0][4] == "" { + continue + } + tags := map[string]string{"table": ipt.Table, "chain": mchain[1], "ruleid": mv[0][4]} + fields := make(map[string]interface{}) + // since parse error is already catched by the regexp, + // we never enter ther error case here => no error check (but still need a test to cover the case) + fields["pkts"], _ = strconv.ParseUint(mv[0][1], 10, 64) + fields["bytes"], _ = strconv.ParseUint(mv[0][2], 10, 64) + acc.AddFields(measurement, fields, tags) + } + return nil +} + +type chainLister func(table, chain string) (string, error) + +func init() { + inputs.Add("iptables", func() telegraf.Input { + ipt := new(Iptables) + ipt.lister = ipt.chainList + return ipt + }) +} diff --git a/plugins/inputs/iptables/iptables_nocompile.go b/plugins/inputs/iptables/iptables_nocompile.go new file mode 100644 index 000000000..f71b4208e --- /dev/null +++ b/plugins/inputs/iptables/iptables_nocompile.go @@ -0,0 +1,3 @@ +// +build !linux + +package iptables diff --git a/plugins/inputs/iptables/iptables_test.go b/plugins/inputs/iptables/iptables_test.go new file mode 100644 index 000000000..bd8a2a726 --- /dev/null +++ b/plugins/inputs/iptables/iptables_test.go @@ -0,0 +1,206 @@ +// +build linux + +package iptables + +import ( + "errors" + "reflect" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func TestIptables_Gather(t *testing.T) { + tests := []struct { + table string + chains []string + values []string + tags []map[string]string + fields [][]map[string]interface{} + err error + }{ + { // 1 - no configured table => no results + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + `}, + }, + { // 2 - no configured chains => no results + table: "filter", + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + `}, + }, + { // 3 - pkts and bytes are gathered as integers + table: "filter", + chains: []string{"INPUT"}, + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 /* foobar */ + `}, + tags: []map[string]string{map[string]string{"table": "filter", "chain": "INPUT", "ruleid": "foobar"}}, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"pkts": uint64(57), "bytes": uint64(4520)}}, + }, + }, + { // 4 - missing fields header => no results + table: "filter", + chains: []string{"INPUT"}, + values: []string{`Chain INPUT (policy ACCEPT 58 packets, 5096 bytes)`}, + }, + { // 5 - invalid chain header => error + table: "filter", + chains: []string{"INPUT"}, + values: []string{ + `INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + `}, + err: errParse, + }, + { // 6 - invalid fields header => error + table: "filter", + chains: []string{"INPUT"}, + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + `}, + err: errParse, + }, + { // 7 - invalid integer value => best effort, no error + table: "filter", + chains: []string{"INPUT"}, + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + K 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + `}, + }, + { // 8 - Multiple rows, multipe chains => no error + table: "filter", + chains: []string{"INPUT", "FORWARD"}, + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 100 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + 200 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 /* foo */ + `, + `Chain FORWARD (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 300 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 /* bar */ + 400 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 + 500 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 /* foobar */ + `, + }, + tags: []map[string]string{ + map[string]string{"table": "filter", "chain": "INPUT", "ruleid": "foo"}, + map[string]string{"table": "filter", "chain": "FORWARD", "ruleid": "bar"}, + map[string]string{"table": "filter", "chain": "FORWARD", "ruleid": "foobar"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"pkts": uint64(200), "bytes": uint64(4520)}}, + {map[string]interface{}{"pkts": uint64(300), "bytes": uint64(4520)}}, + {map[string]interface{}{"pkts": uint64(500), "bytes": uint64(4520)}}, + }, + }, + { // 9 - comments are used as ruleid if any + table: "filter", + chains: []string{"INPUT"}, + values: []string{ + `Chain INPUT (policy ACCEPT 58 packets, 5096 bytes) + pkts bytes target prot opt in out source destination + 57 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 tcp dpt:22 /* foobar */ + 100 4520 RETURN tcp -- * * 0.0.0.0/0 0.0.0.0/0 tcp dpt:80 + `}, + tags: []map[string]string{ + map[string]string{"table": "filter", "chain": "INPUT", "ruleid": "foobar"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"pkts": uint64(57), "bytes": uint64(4520)}}, + }, + }, + } + + for i, tt := range tests { + i++ + ipt := &Iptables{ + Table: tt.table, + Chains: tt.chains, + lister: func(table, chain string) (string, error) { + if len(tt.values) > 0 { + v := tt.values[0] + tt.values = tt.values[1:] + return v, nil + } + return "", nil + }, + } + acc := new(testutil.Accumulator) + err := ipt.Gather(acc) + if !reflect.DeepEqual(tt.err, err) { + t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err) + } + if tt.table == "" { + n := acc.NFields() + if n != 0 { + t.Errorf("%d: expected 0 fields if empty table got %d", i, n) + } + continue + } + if len(tt.chains) == 0 { + n := acc.NFields() + if n != 0 { + t.Errorf("%d: expected 0 fields if empty chains got %d", i, n) + } + continue + } + if len(tt.tags) == 0 { + n := acc.NFields() + if n != 0 { + t.Errorf("%d: expected 0 values got %d", i, n) + } + continue + } + n := 0 + for j, tags := range tt.tags { + for k, fields := range tt.fields[j] { + if len(acc.Metrics) < n+1 { + t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics)) + break + } + m := acc.Metrics[n] + if !reflect.DeepEqual(m.Measurement, measurement) { + t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement) + } + if !reflect.DeepEqual(m.Tags, tags) { + t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags) + } + if !reflect.DeepEqual(m.Fields, fields) { + t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields) + } + n++ + } + } + } +} + +func TestIptables_Gather_listerError(t *testing.T) { + errFoo := errors.New("error foobar") + ipt := &Iptables{ + Table: "nat", + Chains: []string{"foo", "bar"}, + lister: func(table, chain string) (string, error) { + return "", errFoo + }, + } + acc := new(testutil.Accumulator) + err := ipt.Gather(acc) + if !reflect.DeepEqual(err, errFoo) { + t.Errorf("Expected error %#v got\n%#v\n", errFoo, err) + } +} diff --git a/plugins/inputs/mesos/README.md b/plugins/inputs/mesos/README.md index 1d3a5f7bf..9151ff9a2 100644 --- a/plugins/inputs/mesos/README.md +++ b/plugins/inputs/mesos/README.md @@ -241,7 +241,7 @@ Mesos tasks metric groups - executor_name - framework_id - source -- statistics (all metrics below will have `statistics_` prefix included in their names +- statistics - cpus_limit - cpus_system_time_secs - cpus_user_time_secs @@ -266,14 +266,20 @@ Mesos tasks metric groups - server - role (master/slave) -- Tasks measurements have the following tags: +- All master measurements have the extra tags: + - state (leader/follower) + +- Tasks measurements have the following tags: - server + - framework_id + - task_id ### Example Output: ``` $ telegraf -config ~/mesos.conf -input-filter mesos -test * Plugin: mesos, Collection 1 -mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, +mesos,role=master,state=leader,host=172.17.8.102,server=172.17.8.101 +allocator/event_queue_dispatches=0,master/cpus_percent=0, master/cpus_revocable_percent=0,master/cpus_revocable_total=0, master/cpus_revocable_used=0,master/cpus_total=2, master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0, @@ -293,13 +299,13 @@ master/messages_deactivate_framework=0 ... Meoso tasks metrics (if enabled): ``` -mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2 -statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14, -statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928, -statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928, -statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0, -statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0, -statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016, -statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0, -statistics_timestamp=1465486052.70525 1465486053052811792... +mesos-tasks,host=172.17.8.102,server=172.17.8.101,framework_id=e3060235-c4ed-4765-9d36-784e3beca07f-0000,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2 +cpus_limit=0.2,cpus_system_time_secs=142.49,cpus_user_time_secs=388.14, +mem_anon_bytes=359129088,mem_cache_bytes=3964928, +mem_critical_pressure_counter=0,mem_file_bytes=3964928, +mem_limit_bytes=767557632,mem_low_pressure_counter=0, +mem_mapped_file_bytes=114688,mem_medium_pressure_counter=0, +mem_rss_bytes=359129088,mem_swap_bytes=0,mem_total_bytes=363094016, +mem_total_memsw_bytes=363094016,mem_unevictable_bytes=0, +timestamp=1465486052.70525 1465486053052811792... ``` diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index a719dc9f4..ffcd5969b 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -116,7 +116,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { for _, v := range m.Slaves { wg.Add(1) go func(c string) { - errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc) + errorChannel <- m.gatherMainMetrics(c, ":5051", SLAVE, acc) wg.Done() return }(v) @@ -420,8 +420,15 @@ var client = &http.Client{ Timeout: time.Duration(4 * time.Second), } +// TaskStats struct for JSON API output /monitor/statistics +type TaskStats struct { + ExecutorID string `json:"executor_id"` + FrameworkID string `json:"framework_id"` + Statistics map[string]interface{} `json:"statistics"` +} + func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error { - var metrics []map[string]interface{} + var metrics []TaskStats host, _, err := net.SplitHostPort(address) if err != nil { @@ -452,16 +459,18 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t } for _, task := range metrics { - tags["task_id"] = task["executor_id"].(string) + tags["task_id"] = task.ExecutorID + tags["framework_id"] = task.FrameworkID jf := jsonparser.JSONFlattener{} - err = jf.FlattenJSON("", task) + err = jf.FlattenJSON("", task.Statistics) if err != nil { return err } + timestamp := time.Unix(int64(jf.Fields["timestamp"].(float64)), 0) - acc.AddFields("mesos-tasks", jf.Fields, tags) + acc.AddFields("mesos_tasks", jf.Fields, tags, timestamp) } return nil @@ -510,6 +519,14 @@ func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc t return err } + if role == MASTER { + if jf.Fields["master/elected"] != 0.0 { + tags["state"] = "leader" + } else { + tags["state"] = "standby" + } + } + acc.AddFields("mesos", jf.Fields, tags) return nil diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index 062e23e4a..4ea6f6e16 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -345,7 +345,10 @@ func TestMesosSlave(t *testing.T) { t.Errorf(err.Error()) } - acc.AssertContainsFields(t, "mesos-tasks", jf.Fields) + acc.AssertContainsFields( + t, + "mesos_tasks", + slaveTaskMetrics["statistics"].(map[string]interface{})) } func TestSlaveFilter(t *testing.T) { diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 10b8c2f75..63744d88c 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -1376,6 +1376,7 @@ func (m *Mysql) gatherPerfEventsStatements(db *sql.DB, serv string, acc telegraf &rowsAffected, &rowsSent, &rowsExamined, &tmpTables, &tmpDiskTables, &sortMergePasses, &sortRows, + &noIndexUsed, ) if err != nil { diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index b08eedee3..98a6bc659 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -44,40 +44,9 @@ func (r *Redis) Description() string { } var Tracking = map[string]string{ - "uptime_in_seconds": "uptime", - "connected_clients": "clients", - "used_memory": "used_memory", - "used_memory_rss": "used_memory_rss", - "used_memory_peak": "used_memory_peak", - "used_memory_lua": "used_memory_lua", - "rdb_changes_since_last_save": "rdb_changes_since_last_save", - "total_connections_received": "total_connections_received", - "total_commands_processed": "total_commands_processed", - "instantaneous_ops_per_sec": "instantaneous_ops_per_sec", - "instantaneous_input_kbps": "instantaneous_input_kbps", - "instantaneous_output_kbps": "instantaneous_output_kbps", - "sync_full": "sync_full", - "sync_partial_ok": "sync_partial_ok", - "sync_partial_err": "sync_partial_err", - "expired_keys": "expired_keys", - "evicted_keys": "evicted_keys", - "keyspace_hits": "keyspace_hits", - "keyspace_misses": "keyspace_misses", - "pubsub_channels": "pubsub_channels", - "pubsub_patterns": "pubsub_patterns", - "latest_fork_usec": "latest_fork_usec", - "connected_slaves": "connected_slaves", - "master_repl_offset": "master_repl_offset", - "master_last_io_seconds_ago": "master_last_io_seconds_ago", - "repl_backlog_active": "repl_backlog_active", - "repl_backlog_size": "repl_backlog_size", - "repl_backlog_histlen": "repl_backlog_histlen", - "mem_fragmentation_ratio": "mem_fragmentation_ratio", - "used_cpu_sys": "used_cpu_sys", - "used_cpu_user": "used_cpu_user", - "used_cpu_sys_children": "used_cpu_sys_children", - "used_cpu_user_children": "used_cpu_user_children", - "role": "replication_role", + "uptime_in_seconds": "uptime", + "connected_clients": "clients", + "role": "replication_role", } var ErrProtocolError = errors.New("redis protocol error") @@ -188,6 +157,7 @@ func gatherInfoOutput( acc telegraf.Accumulator, tags map[string]string, ) error { + var section string var keyspace_hits, keyspace_misses uint64 = 0, 0 scanner := bufio.NewScanner(rdr) @@ -198,7 +168,13 @@ func gatherInfoOutput( break } - if len(line) == 0 || line[0] == '#' { + if len(line) == 0 { + continue + } + if line[0] == '#' { + if len(line) > 2 { + section = line[2:] + } continue } @@ -206,42 +182,69 @@ func gatherInfoOutput( if len(parts) < 2 { continue } - name := string(parts[0]) - metric, ok := Tracking[name] - if !ok { - kline := strings.TrimSpace(string(parts[1])) - gatherKeyspaceLine(name, kline, acc, tags) + + if section == "Server" { + if name != "lru_clock" && name != "uptime_in_seconds" { + continue + } + } + + if name == "mem_allocator" { continue } + if strings.HasSuffix(name, "_human") { + continue + } + + metric, ok := Tracking[name] + if !ok { + if section == "Keyspace" { + kline := strings.TrimSpace(string(parts[1])) + gatherKeyspaceLine(name, kline, acc, tags) + continue + } + metric = name + } + val := strings.TrimSpace(parts[1]) - ival, err := strconv.ParseUint(val, 10, 64) - if name == "keyspace_hits" { - keyspace_hits = ival + // Try parsing as a uint + if ival, err := strconv.ParseUint(val, 10, 64); err == nil { + switch name { + case "keyspace_hits": + keyspace_hits = ival + case "keyspace_misses": + keyspace_misses = ival + case "rdb_last_save_time": + // influxdb can't calculate this, so we have to do it + fields["rdb_last_save_time_elapsed"] = uint64(time.Now().Unix()) - ival + } + fields[metric] = ival + continue } - if name == "keyspace_misses" { - keyspace_misses = ival + // Try parsing as an int + if ival, err := strconv.ParseInt(val, 10, 64); err == nil { + fields[metric] = ival + continue } + // Try parsing as a float + if fval, err := strconv.ParseFloat(val, 64); err == nil { + fields[metric] = fval + continue + } + + // Treat it as a string + if name == "role" { tags["replication_role"] = val continue } - if err == nil { - fields[metric] = ival - continue - } - - fval, err := strconv.ParseFloat(val, 64) - if err != nil { - return err - } - - fields[metric] = fval + fields[metric] = val } var keyspace_hitrate float64 = 0.0 if keyspace_hits != 0 || keyspace_misses != 0 { diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index 2e2fc1e37..cf62da0bd 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -5,8 +5,10 @@ import ( "fmt" "strings" "testing" + "time" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -37,40 +39,73 @@ func TestRedis_ParseMetrics(t *testing.T) { tags = map[string]string{"host": "redis.net", "replication_role": "master"} fields := map[string]interface{}{ - "uptime": uint64(238), - "clients": uint64(1), - "used_memory": uint64(1003936), - "used_memory_rss": uint64(811008), - "used_memory_peak": uint64(1003936), - "used_memory_lua": uint64(33792), - "rdb_changes_since_last_save": uint64(0), - "total_connections_received": uint64(2), - "total_commands_processed": uint64(1), - "instantaneous_ops_per_sec": uint64(0), - "sync_full": uint64(0), - "sync_partial_ok": uint64(0), - "sync_partial_err": uint64(0), - "expired_keys": uint64(0), - "evicted_keys": uint64(0), - "keyspace_hits": uint64(1), - "keyspace_misses": uint64(1), - "pubsub_channels": uint64(0), - "pubsub_patterns": uint64(0), - "latest_fork_usec": uint64(0), - "connected_slaves": uint64(0), - "master_repl_offset": uint64(0), - "repl_backlog_active": uint64(0), - "repl_backlog_size": uint64(1048576), - "repl_backlog_histlen": uint64(0), - "mem_fragmentation_ratio": float64(0.81), - "instantaneous_input_kbps": float64(876.16), - "instantaneous_output_kbps": float64(3010.23), - "used_cpu_sys": float64(0.14), - "used_cpu_user": float64(0.05), - "used_cpu_sys_children": float64(0.00), - "used_cpu_user_children": float64(0.00), - "keyspace_hitrate": float64(0.50), + "uptime": uint64(238), + "lru_clock": uint64(2364819), + "clients": uint64(1), + "client_longest_output_list": uint64(0), + "client_biggest_input_buf": uint64(0), + "blocked_clients": uint64(0), + "used_memory": uint64(1003936), + "used_memory_rss": uint64(811008), + "used_memory_peak": uint64(1003936), + "used_memory_lua": uint64(33792), + "mem_fragmentation_ratio": float64(0.81), + "loading": uint64(0), + "rdb_changes_since_last_save": uint64(0), + "rdb_bgsave_in_progress": uint64(0), + "rdb_last_save_time": uint64(1428427941), + "rdb_last_bgsave_status": "ok", + "rdb_last_bgsave_time_sec": int64(-1), + "rdb_current_bgsave_time_sec": int64(-1), + "aof_enabled": uint64(0), + "aof_rewrite_in_progress": uint64(0), + "aof_rewrite_scheduled": uint64(0), + "aof_last_rewrite_time_sec": int64(-1), + "aof_current_rewrite_time_sec": int64(-1), + "aof_last_bgrewrite_status": "ok", + "aof_last_write_status": "ok", + "total_connections_received": uint64(2), + "total_commands_processed": uint64(1), + "instantaneous_ops_per_sec": uint64(0), + "instantaneous_input_kbps": float64(876.16), + "instantaneous_output_kbps": float64(3010.23), + "rejected_connections": uint64(0), + "sync_full": uint64(0), + "sync_partial_ok": uint64(0), + "sync_partial_err": uint64(0), + "expired_keys": uint64(0), + "evicted_keys": uint64(0), + "keyspace_hits": uint64(1), + "keyspace_misses": uint64(1), + "pubsub_channels": uint64(0), + "pubsub_patterns": uint64(0), + "latest_fork_usec": uint64(0), + "connected_slaves": uint64(0), + "master_repl_offset": uint64(0), + "repl_backlog_active": uint64(0), + "repl_backlog_size": uint64(1048576), + "repl_backlog_first_byte_offset": uint64(0), + "repl_backlog_histlen": uint64(0), + "used_cpu_sys": float64(0.14), + "used_cpu_user": float64(0.05), + "used_cpu_sys_children": float64(0.00), + "used_cpu_user_children": float64(0.00), + "keyspace_hitrate": float64(0.50), } + + // We have to test rdb_last_save_time_offset manually because the value is based on the time when gathered + for _, m := range acc.Metrics { + for k, v := range m.Fields { + if k == "rdb_last_save_time_elapsed" { + fields[k] = v + } + } + } + assert.InDelta(t, + uint64(time.Now().Unix())-fields["rdb_last_save_time"].(uint64), + fields["rdb_last_save_time_elapsed"].(uint64), + 2) // allow for 2 seconds worth of offset + keyspaceTags := map[string]string{"host": "redis.net", "replication_role": "master", "database": "db0"} keyspaceFields := map[string]interface{}{ "avg_ttl": uint64(0), diff --git a/plugins/inputs/snmp/README.md b/plugins/inputs/snmp/README.md new file mode 100644 index 000000000..b5a694abd --- /dev/null +++ b/plugins/inputs/snmp/README.md @@ -0,0 +1,167 @@ +# SNMP Plugin + +The SNMP input plugin gathers metrics from SNMP agents. + +## Configuration: + +### Example: + +SNMP data: +``` +.1.0.0.0.1.1.0 octet_str "foo" +.1.0.0.0.1.1.1 octet_str "bar" +.1.0.0.0.1.102 octet_str "bad" +.1.0.0.0.1.2.0 integer 1 +.1.0.0.0.1.2.1 integer 2 +.1.0.0.0.1.3.0 octet_str "0.123" +.1.0.0.0.1.3.1 octet_str "0.456" +.1.0.0.0.1.3.2 octet_str "9.999" +.1.0.0.1.1 octet_str "baz" +.1.0.0.1.2 uinteger 54321 +.1.0.0.1.3 uinteger 234 +``` + +Telegraf config: +```toml +[[inputs.snmp]] + agents = [ "127.0.0.1:161" ] + version = 2 + community = "public" + + name = "system" + [[inputs.snmp.field]] + name = "hostname" + oid = ".1.0.0.1.1" + is_tag = true + [[inputs.snmp.field]] + name = "uptime" + oid = ".1.0.0.1.2" + [[inputs.snmp.field]] + name = "loadavg" + oid = ".1.0.0.1.3" + conversion = "float(2)" + + [[inputs.snmp.table]] + name = "remote_servers" + inherit_tags = [ "hostname" ] + [[inputs.snmp.table.field]] + name = "server" + oid = ".1.0.0.0.1.1" + is_tag = true + [[inputs.snmp.table.field]] + name = "connections" + oid = ".1.0.0.0.1.2" + [[inputs.snmp.table.field]] + name = "latency" + oid = ".1.0.0.0.1.3" + conversion = "float" +``` + +Resulting output: +``` +* Plugin: snmp, Collection 1 +> system,agent_host=127.0.0.1,host=mylocalhost,hostname=baz loadavg=2.34,uptime=54321i 1468953135000000000 +> remote_servers,agent_host=127.0.0.1,host=mylocalhost,hostname=baz,server=foo connections=1i,latency=0.123 1468953135000000000 +> remote_servers,agent_host=127.0.0.1,host=mylocalhost,hostname=baz,server=bar connections=2i,latency=0.456 1468953135000000000 +``` + +#### Configuration via MIB: + +This example uses the SNMP data above, but is configured via the MIB. +The example MIB file can be found in the `testdata` directory. See the [MIB lookups](#mib-lookups) section for more information. + +Telegraf config: +```toml +[[inputs.snmp]] + agents = [ "127.0.0.1:161" ] + version = 2 + community = "public" + + [[inputs.snmp.field]] + oid = "TEST::hostname" + is_tag = true + + [[inputs.snmp.table]] + oid = "TEST::testTable" + inherit_tags = "hostname" +``` + +Resulting output: +``` +* Plugin: snmp, Collection 1 +> testTable,agent_host=127.0.0.1,host=mylocalhost,hostname=baz,server=foo connections=1i,latency="0.123" 1468953135000000000 +> testTable,agent_host=127.0.0.1,host=mylocalhost,hostname=baz,server=bar connections=2i,latency="0.456" 1468953135000000000 +``` + +### Config parameters + +* `agents`: Default: `[]` +List of SNMP agents to connect to in the form of `IP[:PORT]`. If `:PORT` is unspecified, it defaults to `161`. + +* `version`: Default: `2` +SNMP protocol version to use. + +* `community`: Default: `"public"` +SNMP community to use. + +* `max_repetitions`: Default: `50` +Maximum number of iterations for repeating variables. + +* `sec_name`: +Security name for authenticated SNMPv3 requests. + +* `auth_protocol`: Values: `"MD5"`,`"SHA"`,`""`. Default: `""` +Authentication protocol for authenticated SNMPv3 requests. + +* `auth_password`: +Authentication password for authenticated SNMPv3 requests. + +* `sec_level`: Values: `"noAuthNoPriv"`,`"authNoPriv"`,`"authPriv"`. Default: `"noAuthNoPriv"` +Security level used for SNMPv3 messages. + +* `context_name`: +Context name used for SNMPv3 requests. + +* `priv_protocol`: Values: `"DES"`,`"AES"`,`""`. Default: `""` +Privacy protocol used for encrypted SNMPv3 messages. + +* `priv_password`: +Privacy password used for encrypted SNMPv3 messages. + + +* `name`: +Output measurement name. + +#### Field parameters: +* `oid`: +OID to get. May be a numeric or textual OID. + +* `name`: +Output field/tag name. +If not specified, it defaults to the value of `oid`. If `oid` is numeric, an attempt to translate the numeric OID into a texual OID will be made. + +* `is_tag`: +Output this field as a tag. + +* `conversion`: Values: `"float(X)"`,`"float"`,`"int"`,`""`. Default: `""` +Converts the value according to the given specification. + + - `float(X)`: Converts the input value into a float and divides by the Xth power of 10. Efficively just moves the decimal left X places. For example a value of `123` with `float(2)` will result in `1.23`. + - `float`: Converts the value into a float with no adjustment. Same as `float(0)`. + - `int`: Convertes the value into an integer. + +#### Table parameters: +* `oid`: +Automatically populates the table's fields using data from the MIB. + +* `name`: +Output measurement name. +If not specified, it defaults to the value of `oid`. If `oid` is numeric, an attempt to translate the numeric OID into a texual OID will be made. + +* `inherit_tags`: +Which tags to inherit from the top-level config and to use in the output of this table's measurement. + +### MIB lookups +If the plugin is configured such that it needs to perform lookups from the MIB, it will use the net-snmp utilities `snmptranslate` and `snmptable`. + +When performing the lookups, the plugin will load all available MIBs. If your MIB files are in a custom path, you may add the path using the `MIBDIRS` environment variable. See [`man 1 snmpcmd`](http://net-snmp.sourceforge.net/docs/man/snmpcmd.html#lbAK) for more information on the variable. diff --git a/plugins/inputs/snmp/snmp.go b/plugins/inputs/snmp/snmp.go new file mode 100644 index 000000000..3cd8968b4 --- /dev/null +++ b/plugins/inputs/snmp/snmp.go @@ -0,0 +1,791 @@ +package snmp + +import ( + "bytes" + "fmt" + "math" + "net" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + + "github.com/soniah/gosnmp" +) + +const description = `Retrieves SNMP values from remote agents` +const sampleConfig = ` + agents = [ "127.0.0.1:161" ] + timeout = "5s" + version = 2 + + # SNMPv1 & SNMPv2 parameters + community = "public" + + # SNMPv2 & SNMPv3 parameters + max_repetitions = 50 + + # SNMPv3 parameters + #sec_name = "myuser" + #auth_protocol = "md5" # Values: "MD5", "SHA", "" + #auth_password = "password123" + #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv" + #context_name = "" + #priv_protocol = "" # Values: "DES", "AES", "" + #priv_password = "" + + # measurement name + name = "system" + [[inputs.snmp.field]] + name = "hostname" + oid = ".1.0.0.1.1" + [[inputs.snmp.field]] + name = "uptime" + oid = ".1.0.0.1.2" + [[inputs.snmp.field]] + name = "load" + oid = ".1.0.0.1.3" + [[inputs.snmp.field]] + oid = "HOST-RESOURCES-MIB::hrMemorySize" + + [[inputs.snmp.table]] + # measurement name + name = "remote_servers" + inherit_tags = [ "hostname" ] + [[inputs.snmp.table.field]] + name = "server" + oid = ".1.0.0.0.1.0" + is_tag = true + [[inputs.snmp.table.field]] + name = "connections" + oid = ".1.0.0.0.1.1" + [[inputs.snmp.table.field]] + name = "latency" + oid = ".1.0.0.0.1.2" + + [[inputs.snmp.table]] + # auto populate table's fields using the MIB + oid = "HOST-RESOURCES-MIB::hrNetworkTable" +` + +// execCommand is so tests can mock out exec.Command usage. +var execCommand = exec.Command + +// execCmd executes the specified command, returning the STDOUT content. +// If command exits with error status, the output is captured into the returned error. +func execCmd(arg0 string, args ...string) ([]byte, error) { + out, err := execCommand(arg0, args...).Output() + if err != nil { + if err, ok := err.(*exec.ExitError); ok { + return nil, NestedError{ + Err: err, + NestedErr: fmt.Errorf("%s", bytes.TrimRight(err.Stderr, "\n")), + } + } + return nil, err + } + return out, nil +} + +// Snmp holds the configuration for the plugin. +type Snmp struct { + // The SNMP agent to query. Format is ADDR[:PORT] (e.g. 1.2.3.4:161). + Agents []string + // Timeout to wait for a response. + Timeout internal.Duration + Retries int + // Values: 1, 2, 3 + Version uint8 + + // Parameters for Version 1 & 2 + Community string + + // Parameters for Version 2 & 3 + MaxRepetitions uint + + // Parameters for Version 3 + ContextName string + // Values: "noAuthNoPriv", "authNoPriv", "authPriv" + SecLevel string + SecName string + // Values: "MD5", "SHA", "". Default: "" + AuthProtocol string + AuthPassword string + // Values: "DES", "AES", "". Default: "" + PrivProtocol string + PrivPassword string + EngineID string + EngineBoots uint32 + EngineTime uint32 + + Tables []Table `toml:"table"` + + // Name & Fields are the elements of a Table. + // Telegraf chokes if we try to embed a Table. So instead we have to embed the + // fields of a Table, and construct a Table during runtime. + Name string + Fields []Field `toml:"field"` + + connectionCache map[string]snmpConnection + initialized bool +} + +func (s *Snmp) init() error { + if s.initialized { + return nil + } + + for i := range s.Tables { + if err := s.Tables[i].init(); err != nil { + return err + } + } + + for i := range s.Fields { + if err := s.Fields[i].init(); err != nil { + return err + } + } + + s.initialized = true + return nil +} + +// Table holds the configuration for a SNMP table. +type Table struct { + // Name will be the name of the measurement. + Name string + + // Which tags to inherit from the top-level config. + InheritTags []string + + // Fields is the tags and values to look up. + Fields []Field `toml:"field"` + + // OID for automatic field population. + // If provided, init() will populate Fields with all the table columns of the + // given OID. + Oid string + + initialized bool +} + +// init() populates Fields if a table OID is provided. +func (t *Table) init() error { + if t.initialized { + return nil + } + if t.Oid == "" { + t.initialized = true + return nil + } + + mibPrefix := "" + if err := snmpTranslate(&mibPrefix, &t.Oid, &t.Name); err != nil { + return err + } + + // first attempt to get the table's tags + tagOids := map[string]struct{}{} + // We have to guess that the "entry" oid is `t.Oid+".1"`. snmptable and snmptranslate don't seem to have a way to provide the info. + if out, err := execCmd("snmptranslate", "-m", "all", "-Td", t.Oid+".1"); err == nil { + lines := bytes.Split(out, []byte{'\n'}) + // get the MIB name if we didn't get it above + if mibPrefix == "" { + if i := bytes.Index(lines[0], []byte("::")); i != -1 { + mibPrefix = string(lines[0][:i+2]) + } + } + + for _, line := range lines { + if !bytes.HasPrefix(line, []byte(" INDEX")) { + continue + } + + i := bytes.Index(line, []byte("{ ")) + if i == -1 { // parse error + continue + } + line = line[i+2:] + i = bytes.Index(line, []byte(" }")) + if i == -1 { // parse error + continue + } + line = line[:i] + for _, col := range bytes.Split(line, []byte(", ")) { + tagOids[mibPrefix+string(col)] = struct{}{} + } + } + } + + // this won't actually try to run a query. The `-Ch` will just cause it to dump headers. + out, err := execCmd("snmptable", "-m", "all", "-Ch", "-Cl", "-c", "public", "127.0.0.1", t.Oid) + if err != nil { + return Errorf(err, "getting table columns for %s", t.Oid) + } + cols := bytes.SplitN(out, []byte{'\n'}, 2)[0] + if len(cols) == 0 { + return fmt.Errorf("unable to get columns for table %s", t.Oid) + } + for _, col := range bytes.Split(cols, []byte{' '}) { + if len(col) == 0 { + continue + } + col := string(col) + _, isTag := tagOids[mibPrefix+col] + t.Fields = append(t.Fields, Field{Name: col, Oid: mibPrefix + col, IsTag: isTag}) + } + + // initialize all the nested fields + for i := range t.Fields { + if err := t.Fields[i].init(); err != nil { + return err + } + } + + t.initialized = true + return nil +} + +// Field holds the configuration for a Field to look up. +type Field struct { + // Name will be the name of the field. + Name string + // OID is prefix for this field. The plugin will perform a walk through all + // OIDs with this as their parent. For each value found, the plugin will strip + // off the OID prefix, and use the remainder as the index. For multiple fields + // to show up in the same row, they must share the same index. + Oid string + // IsTag controls whether this OID is output as a tag or a value. + IsTag bool + // Conversion controls any type conversion that is done on the value. + // "float"/"float(0)" will convert the value into a float. + // "float(X)" will convert the value into a float, and then move the decimal before Xth right-most digit. + // "int" will conver the value into an integer. + Conversion string + + initialized bool +} + +// init() converts OID names to numbers, and sets the .Name attribute if unset. +func (f *Field) init() error { + if f.initialized { + return nil + } + + if err := snmpTranslate(nil, &f.Oid, &f.Name); err != nil { + return err + } + + //TODO use textual convention conversion from the MIB + + f.initialized = true + return nil +} + +// RTable is the resulting table built from a Table. +type RTable struct { + // Name is the name of the field, copied from Table.Name. + Name string + // Time is the time the table was built. + Time time.Time + // Rows are the rows that were found, one row for each table OID index found. + Rows []RTableRow +} + +// RTableRow is the resulting row containing all the OID values which shared +// the same index. +type RTableRow struct { + // Tags are all the Field values which had IsTag=true. + Tags map[string]string + // Fields are all the Field values which had IsTag=false. + Fields map[string]interface{} +} + +// NestedError wraps an error returned from deeper in the code. +type NestedError struct { + // Err is the error from where the NestedError was constructed. + Err error + // NestedError is the error that was passed back from the called function. + NestedErr error +} + +// Error returns a concatenated string of all the nested errors. +func (ne NestedError) Error() string { + return ne.Err.Error() + ": " + ne.NestedErr.Error() +} + +// Errorf is a convenience function for constructing a NestedError. +func Errorf(err error, msg string, format ...interface{}) error { + return NestedError{ + NestedErr: err, + Err: fmt.Errorf(msg, format...), + } +} + +func init() { + inputs.Add("snmp", func() telegraf.Input { + return &Snmp{ + Retries: 5, + MaxRepetitions: 50, + Timeout: internal.Duration{Duration: 5 * time.Second}, + Version: 2, + Community: "public", + } + }) +} + +// SampleConfig returns the default configuration of the input. +func (s *Snmp) SampleConfig() string { + return sampleConfig +} + +// Description returns a one-sentence description on the input. +func (s *Snmp) Description() string { + return description +} + +// Gather retrieves all the configured fields and tables. +// Any error encountered does not halt the process. The errors are accumulated +// and returned at the end. +func (s *Snmp) Gather(acc telegraf.Accumulator) error { + if err := s.init(); err != nil { + return err + } + + for _, agent := range s.Agents { + gs, err := s.getConnection(agent) + if err != nil { + acc.AddError(Errorf(err, "agent %s", agent)) + continue + } + + // First is the top-level fields. We treat the fields as table prefixes with an empty index. + t := Table{ + Name: s.Name, + Fields: s.Fields, + } + topTags := map[string]string{} + if err := s.gatherTable(acc, gs, t, topTags, false); err != nil { + acc.AddError(Errorf(err, "agent %s", agent)) + } + + // Now is the real tables. + for _, t := range s.Tables { + if err := s.gatherTable(acc, gs, t, topTags, true); err != nil { + acc.AddError(Errorf(err, "agent %s", agent)) + } + } + } + + return nil +} + +func (s *Snmp) gatherTable(acc telegraf.Accumulator, gs snmpConnection, t Table, topTags map[string]string, walk bool) error { + rt, err := t.Build(gs, walk) + if err != nil { + return err + } + + for _, tr := range rt.Rows { + if !walk { + // top-level table. Add tags to topTags. + for k, v := range tr.Tags { + topTags[k] = v + } + } else { + // real table. Inherit any specified tags. + for _, k := range t.InheritTags { + if v, ok := topTags[k]; ok { + tr.Tags[k] = v + } + } + } + if _, ok := tr.Tags["agent_host"]; !ok { + tr.Tags["agent_host"] = gs.Host() + } + acc.AddFields(rt.Name, tr.Fields, tr.Tags, rt.Time) + } + + return nil +} + +// Build retrieves all the fields specified in the table and constructs the RTable. +func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) { + rows := map[string]RTableRow{} + + tagCount := 0 + for _, f := range t.Fields { + if f.IsTag { + tagCount++ + } + + if len(f.Oid) == 0 { + return nil, fmt.Errorf("cannot have empty OID") + } + var oid string + if f.Oid[0] == '.' { + oid = f.Oid + } else { + // make sure OID has "." because the BulkWalkAll results do, and the prefix needs to match + oid = "." + f.Oid + } + + // ifv contains a mapping of table OID index to field value + ifv := map[string]interface{}{} + + if !walk { + // This is used when fetching non-table fields. Fields configured a the top + // scope of the plugin. + // We fetch the fields directly, and add them to ifv as if the index were an + // empty string. This results in all the non-table fields sharing the same + // index, and being added on the same row. + if pkt, err := gs.Get([]string{oid}); err != nil { + return nil, Errorf(err, "performing get") + } else if pkt != nil && len(pkt.Variables) > 0 && pkt.Variables[0].Type != gosnmp.NoSuchObject { + ent := pkt.Variables[0] + ifv[ent.Name[len(oid):]] = fieldConvert(f.Conversion, ent.Value) + } + } else { + err := gs.Walk(oid, func(ent gosnmp.SnmpPDU) error { + if len(ent.Name) <= len(oid) || ent.Name[:len(oid)+1] != oid+"." { + return NestedError{} // break the walk + } + ifv[ent.Name[len(oid):]] = fieldConvert(f.Conversion, ent.Value) + return nil + }) + if err != nil { + if _, ok := err.(NestedError); !ok { + return nil, Errorf(err, "performing bulk walk") + } + } + } + + for i, v := range ifv { + rtr, ok := rows[i] + if !ok { + rtr = RTableRow{} + rtr.Tags = map[string]string{} + rtr.Fields = map[string]interface{}{} + rows[i] = rtr + } + if f.IsTag { + if vs, ok := v.(string); ok { + rtr.Tags[f.Name] = vs + } else { + rtr.Tags[f.Name] = fmt.Sprintf("%v", v) + } + } else { + rtr.Fields[f.Name] = v + } + } + } + + rt := RTable{ + Name: t.Name, + Time: time.Now(), //TODO record time at start + Rows: make([]RTableRow, 0, len(rows)), + } + for _, r := range rows { + if len(r.Tags) < tagCount { + // don't add rows which are missing tags, as without tags you can't filter + continue + } + rt.Rows = append(rt.Rows, r) + } + return &rt, nil +} + +// snmpConnection is an interface which wraps a *gosnmp.GoSNMP object. +// We interact through an interface so we can mock it out in tests. +type snmpConnection interface { + Host() string + //BulkWalkAll(string) ([]gosnmp.SnmpPDU, error) + Walk(string, gosnmp.WalkFunc) error + Get(oids []string) (*gosnmp.SnmpPacket, error) +} + +// gosnmpWrapper wraps a *gosnmp.GoSNMP object so we can use it as a snmpConnection. +type gosnmpWrapper struct { + *gosnmp.GoSNMP +} + +// Host returns the value of GoSNMP.Target. +func (gsw gosnmpWrapper) Host() string { + return gsw.Target +} + +// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the +// connection is using SNMPv1 or newer. +// Also, if any error is encountered, it will just once reconnect and try again. +func (gsw gosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error { + var err error + // On error, retry once. + // Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function. + for i := 0; i < 2; i++ { + if gsw.Version == gosnmp.Version1 { + err = gsw.GoSNMP.Walk(oid, fn) + } else { + err = gsw.GoSNMP.BulkWalk(oid, fn) + } + if err == nil { + return nil + } + if err := gsw.GoSNMP.Connect(); err != nil { + return Errorf(err, "reconnecting") + } + } + return err +} + +// Get wraps GoSNMP.GET(). +// If any error is encountered, it will just once reconnect and try again. +func (gsw gosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) { + var err error + var pkt *gosnmp.SnmpPacket + for i := 0; i < 2; i++ { + pkt, err = gsw.GoSNMP.Get(oids) + if err == nil { + return pkt, nil + } + if err := gsw.GoSNMP.Connect(); err != nil { + return nil, Errorf(err, "reconnecting") + } + } + return nil, err +} + +// getConnection creates a snmpConnection (*gosnmp.GoSNMP) object and caches the +// result using `agent` as the cache key. +func (s *Snmp) getConnection(agent string) (snmpConnection, error) { + if s.connectionCache == nil { + s.connectionCache = map[string]snmpConnection{} + } + if gs, ok := s.connectionCache[agent]; ok { + return gs, nil + } + + gs := gosnmpWrapper{&gosnmp.GoSNMP{}} + + host, portStr, err := net.SplitHostPort(agent) + if err != nil { + if err, ok := err.(*net.AddrError); !ok || err.Err != "missing port in address" { + return nil, Errorf(err, "parsing host") + } + host = agent + portStr = "161" + } + gs.Target = host + + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + return nil, Errorf(err, "parsing port") + } + gs.Port = uint16(port) + + gs.Timeout = s.Timeout.Duration + + gs.Retries = s.Retries + + switch s.Version { + case 3: + gs.Version = gosnmp.Version3 + case 2, 0: + gs.Version = gosnmp.Version2c + case 1: + gs.Version = gosnmp.Version1 + default: + return nil, fmt.Errorf("invalid version") + } + + if s.Version < 3 { + if s.Community == "" { + gs.Community = "public" + } else { + gs.Community = s.Community + } + } + + gs.MaxRepetitions = int(s.MaxRepetitions) + + if s.Version == 3 { + gs.ContextName = s.ContextName + + sp := &gosnmp.UsmSecurityParameters{} + gs.SecurityParameters = sp + gs.SecurityModel = gosnmp.UserSecurityModel + + switch strings.ToLower(s.SecLevel) { + case "noauthnopriv", "": + gs.MsgFlags = gosnmp.NoAuthNoPriv + case "authnopriv": + gs.MsgFlags = gosnmp.AuthNoPriv + case "authpriv": + gs.MsgFlags = gosnmp.AuthPriv + default: + return nil, fmt.Errorf("invalid secLevel") + } + + sp.UserName = s.SecName + + switch strings.ToLower(s.AuthProtocol) { + case "md5": + sp.AuthenticationProtocol = gosnmp.MD5 + case "sha": + sp.AuthenticationProtocol = gosnmp.SHA + case "": + sp.AuthenticationProtocol = gosnmp.NoAuth + default: + return nil, fmt.Errorf("invalid authProtocol") + } + + sp.AuthenticationPassphrase = s.AuthPassword + + switch strings.ToLower(s.PrivProtocol) { + case "des": + sp.PrivacyProtocol = gosnmp.DES + case "aes": + sp.PrivacyProtocol = gosnmp.AES + case "": + sp.PrivacyProtocol = gosnmp.NoPriv + default: + return nil, fmt.Errorf("invalid privProtocol") + } + + sp.PrivacyPassphrase = s.PrivPassword + + sp.AuthoritativeEngineID = s.EngineID + + sp.AuthoritativeEngineBoots = s.EngineBoots + + sp.AuthoritativeEngineTime = s.EngineTime + } + + if err := gs.Connect(); err != nil { + return nil, Errorf(err, "setting up connection") + } + + s.connectionCache[agent] = gs + return gs, nil +} + +// fieldConvert converts from any type according to the conv specification +// "float"/"float(0)" will convert the value into a float. +// "float(X)" will convert the value into a float, and then move the decimal before Xth right-most digit. +// "int" will convert the value into an integer. +// "" will convert a byte slice into a string. +// Any other conv will return the input value unchanged. +func fieldConvert(conv string, v interface{}) interface{} { + if conv == "" { + if bs, ok := v.([]byte); ok { + return string(bs) + } + return v + } + + var d int + if _, err := fmt.Sscanf(conv, "float(%d)", &d); err == nil || conv == "float" { + switch vt := v.(type) { + case float32: + v = float64(vt) / math.Pow10(d) + case float64: + v = float64(vt) / math.Pow10(d) + case int: + v = float64(vt) / math.Pow10(d) + case int8: + v = float64(vt) / math.Pow10(d) + case int16: + v = float64(vt) / math.Pow10(d) + case int32: + v = float64(vt) / math.Pow10(d) + case int64: + v = float64(vt) / math.Pow10(d) + case uint: + v = float64(vt) / math.Pow10(d) + case uint8: + v = float64(vt) / math.Pow10(d) + case uint16: + v = float64(vt) / math.Pow10(d) + case uint32: + v = float64(vt) / math.Pow10(d) + case uint64: + v = float64(vt) / math.Pow10(d) + case []byte: + vf, _ := strconv.ParseFloat(string(vt), 64) + v = vf / math.Pow10(d) + case string: + vf, _ := strconv.ParseFloat(vt, 64) + v = vf / math.Pow10(d) + } + } + if conv == "int" { + switch vt := v.(type) { + case float32: + v = int64(vt) + case float64: + v = int64(vt) + case int: + v = int64(vt) + case int8: + v = int64(vt) + case int16: + v = int64(vt) + case int32: + v = int64(vt) + case int64: + v = int64(vt) + case uint: + v = int64(vt) + case uint8: + v = int64(vt) + case uint16: + v = int64(vt) + case uint32: + v = int64(vt) + case uint64: + v = int64(vt) + case []byte: + v, _ = strconv.Atoi(string(vt)) + case string: + v, _ = strconv.Atoi(vt) + } + } + + return v +} + +// snmpTranslate resolves the given OID. +// The contents of the oid parameter will be replaced with the numeric oid value. +// If name is empty, the textual OID value is stored in it. If the textual OID cannot be translated, the numeric OID is stored instead. +// If mibPrefix is non-nil, the MIB in which the OID was found is stored, with a suffix of "::". +func snmpTranslate(mibPrefix *string, oid *string, name *string) error { + if strings.ContainsAny(*oid, ":abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") { + out, err := execCmd("snmptranslate", "-m", "all", "-On", *oid) + if err != nil { + return Errorf(err, "translating %s", *oid) + } + *oid = string(bytes.TrimSuffix(out, []byte{'\n'})) + } + + if *name == "" { + out, err := execCmd("snmptranslate", "-m", "all", *oid) + if err != nil { + //TODO debug message + *name = *oid + } else { + if i := bytes.Index(out, []byte("::")); i != -1 { + if mibPrefix != nil { + *mibPrefix = string(out[:i+2]) + } + out = out[i+2:] + } + *name = string(bytes.TrimSuffix(out, []byte{'\n'})) + } + } + + return nil +} diff --git a/plugins/inputs/snmp/snmp_test.go b/plugins/inputs/snmp/snmp_test.go new file mode 100644 index 000000000..62f3e6c2f --- /dev/null +++ b/plugins/inputs/snmp/snmp_test.go @@ -0,0 +1,641 @@ +package snmp + +import ( + "fmt" + "net" + "os" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/toml" + "github.com/soniah/gosnmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func mockExecCommand(arg0 string, args ...string) *exec.Cmd { + args = append([]string{"-test.run=TestMockExecCommand", "--", arg0}, args...) + cmd := exec.Command(os.Args[0], args...) + cmd.Stderr = os.Stderr // so the test output shows errors + return cmd +} +func TestMockExecCommand(t *testing.T) { + var cmd []string + for _, arg := range os.Args { + if string(arg) == "--" { + cmd = []string{} + continue + } + if cmd == nil { + continue + } + cmd = append(cmd, string(arg)) + } + if cmd == nil { + return + } + + // will not properly handle args with spaces, but it's good enough + cmdStr := strings.Join(cmd, " ") + switch cmdStr { + case "snmptranslate -m all .1.0.0.0": + fmt.Printf("TEST::testTable\n") + case "snmptranslate -m all .1.0.0.0.1.1": + fmt.Printf("server\n") + case "snmptranslate -m all .1.0.0.0.1.1.0": + fmt.Printf("server.0\n") + case "snmptranslate -m all .1.0.0.1.1": + fmt.Printf("hostname\n") + case "snmptranslate -m all .999": + fmt.Printf(".999\n") + case "snmptranslate -m all -On TEST::testTable": + fmt.Printf(".1.0.0.0\n") + case "snmptranslate -m all -On TEST::hostname": + fmt.Printf(".1.0.0.1.1\n") + case "snmptranslate -m all -On TEST::server": + fmt.Printf(".1.0.0.0.1.1\n") + case "snmptranslate -m all -On TEST::connections": + fmt.Printf(".1.0.0.0.1.2\n") + case "snmptranslate -m all -On TEST::latency": + fmt.Printf(".1.0.0.0.1.3\n") + case "snmptranslate -m all -On TEST::server.0": + fmt.Printf(".1.0.0.0.1.1.0\n") + case "snmptranslate -m all -Td .1.0.0.0.1": + fmt.Printf(`TEST::testTableEntry +testTableEntry OBJECT-TYPE + -- FROM TEST + MAX-ACCESS not-accessible + STATUS current + INDEX { server } +::= { iso(1) 2 testOID(3) testTable(0) 1 } +`) + case "snmptable -m all -Ch -Cl -c public 127.0.0.1 .1.0.0.0": + fmt.Printf(`server connections latency +TEST::testTable: No entries +`) + default: + fmt.Fprintf(os.Stderr, "Command not mocked: `%s`\n", cmdStr) + // you get the expected output by running the missing command with `-M testdata` in the plugin directory. + os.Exit(1) + } + os.Exit(0) +} +func init() { + execCommand = mockExecCommand +} + +type testSNMPConnection struct { + host string + values map[string]interface{} +} + +func (tsc *testSNMPConnection) Host() string { + return tsc.host +} + +func (tsc *testSNMPConnection) Get(oids []string) (*gosnmp.SnmpPacket, error) { + sp := &gosnmp.SnmpPacket{} + for _, oid := range oids { + v, ok := tsc.values[oid] + if !ok { + sp.Variables = append(sp.Variables, gosnmp.SnmpPDU{ + Name: oid, + Type: gosnmp.NoSuchObject, + }) + continue + } + sp.Variables = append(sp.Variables, gosnmp.SnmpPDU{ + Name: oid, + Value: v, + }) + } + return sp, nil +} +func (tsc *testSNMPConnection) Walk(oid string, wf gosnmp.WalkFunc) error { + for void, v := range tsc.values { + if void == oid || (len(void) > len(oid) && void[:len(oid)+1] == oid+".") { + if err := wf(gosnmp.SnmpPDU{ + Name: void, + Value: v, + }); err != nil { + return err + } + } + } + return nil +} + +var tsc = &testSNMPConnection{ + host: "tsc", + values: map[string]interface{}{ + ".1.0.0.0.1.1.0": "foo", + ".1.0.0.0.1.1.1": []byte("bar"), + ".1.0.0.0.1.102": "bad", + ".1.0.0.0.1.2.0": 1, + ".1.0.0.0.1.2.1": 2, + ".1.0.0.0.1.3.0": "0.123", + ".1.0.0.0.1.3.1": "0.456", + ".1.0.0.0.1.3.2": "9.999", + ".1.0.0.0.1.4.0": 123456, + ".1.0.0.1.1": "baz", + ".1.0.0.1.2": 234, + ".1.0.0.1.3": []byte("byte slice"), + }, +} + +func TestSampleConfig(t *testing.T) { + conf := struct { + Inputs struct { + Snmp []*Snmp + } + }{} + err := toml.Unmarshal([]byte("[[inputs.snmp]]\n"+(*Snmp)(nil).SampleConfig()), &conf) + assert.NoError(t, err) + + s := Snmp{ + Agents: []string{"127.0.0.1:161"}, + Timeout: internal.Duration{Duration: 5 * time.Second}, + Version: 2, + Community: "public", + MaxRepetitions: 50, + + Name: "system", + Fields: []Field{ + {Name: "hostname", Oid: ".1.0.0.1.1"}, + {Name: "uptime", Oid: ".1.0.0.1.2"}, + {Name: "load", Oid: ".1.0.0.1.3"}, + {Oid: "HOST-RESOURCES-MIB::hrMemorySize"}, + }, + Tables: []Table{ + { + Name: "remote_servers", + InheritTags: []string{"hostname"}, + Fields: []Field{ + {Name: "server", Oid: ".1.0.0.0.1.0", IsTag: true}, + {Name: "connections", Oid: ".1.0.0.0.1.1"}, + {Name: "latency", Oid: ".1.0.0.0.1.2"}, + }, + }, + { + Oid: "HOST-RESOURCES-MIB::hrNetworkTable", + }, + }, + } + assert.Equal(t, s, *conf.Inputs.Snmp[0]) +} + +func TestFieldInit(t *testing.T) { + translations := []struct { + inputOid string + inputName string + expectedOid string + expectedName string + }{ + {".1.0.0.0.1.1", "", ".1.0.0.0.1.1", "server"}, + {".1.0.0.0.1.1.0", "", ".1.0.0.0.1.1.0", "server.0"}, + {".999", "", ".999", ".999"}, + {"TEST::server", "", ".1.0.0.0.1.1", "server"}, + {"TEST::server.0", "", ".1.0.0.0.1.1.0", "server.0"}, + {"TEST::server", "foo", ".1.0.0.0.1.1", "foo"}, + } + + for _, txl := range translations { + f := Field{Oid: txl.inputOid, Name: txl.inputName} + err := f.init() + if !assert.NoError(t, err, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName) { + continue + } + assert.Equal(t, txl.expectedOid, f.Oid, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName) + assert.Equal(t, txl.expectedName, f.Name, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName) + } +} + +func TestTableInit(t *testing.T) { + tbl := Table{ + Oid: ".1.0.0.0", + Fields: []Field{{Oid: ".999", Name: "foo"}}, + } + err := tbl.init() + require.NoError(t, err) + + assert.Equal(t, "testTable", tbl.Name) + + assert.Len(t, tbl.Fields, 4) + assert.Contains(t, tbl.Fields, Field{Oid: ".999", Name: "foo", initialized: true}) + assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true}) + assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true}) + assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true}) +} + +func TestSnmpInit(t *testing.T) { + s := &Snmp{ + Tables: []Table{ + {Oid: "TEST::testTable"}, + }, + Fields: []Field{ + {Oid: "TEST::hostname"}, + }, + } + + err := s.init() + require.NoError(t, err) + + assert.Len(t, s.Tables[0].Fields, 3) + assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true}) + assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true}) + assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true}) + + assert.Equal(t, Field{ + Oid: ".1.0.0.1.1", + Name: "hostname", + initialized: true, + }, s.Fields[0]) +} + +func TestGetSNMPConnection_v2(t *testing.T) { + s := &Snmp{ + Timeout: internal.Duration{Duration: 3 * time.Second}, + Retries: 4, + Version: 2, + Community: "foo", + } + + gsc, err := s.getConnection("1.2.3.4:567") + require.NoError(t, err) + gs := gsc.(gosnmpWrapper) + assert.Equal(t, "1.2.3.4", gs.Target) + assert.EqualValues(t, 567, gs.Port) + assert.Equal(t, gosnmp.Version2c, gs.Version) + assert.Equal(t, "foo", gs.Community) + + gsc, err = s.getConnection("1.2.3.4") + require.NoError(t, err) + gs = gsc.(gosnmpWrapper) + assert.Equal(t, "1.2.3.4", gs.Target) + assert.EqualValues(t, 161, gs.Port) +} + +func TestGetSNMPConnection_v3(t *testing.T) { + s := &Snmp{ + Version: 3, + MaxRepetitions: 20, + ContextName: "mycontext", + SecLevel: "authPriv", + SecName: "myuser", + AuthProtocol: "md5", + AuthPassword: "password123", + PrivProtocol: "des", + PrivPassword: "321drowssap", + EngineID: "myengineid", + EngineBoots: 1, + EngineTime: 2, + } + + gsc, err := s.getConnection("1.2.3.4") + require.NoError(t, err) + gs := gsc.(gosnmpWrapper) + assert.Equal(t, gs.Version, gosnmp.Version3) + sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters) + assert.Equal(t, "1.2.3.4", gsc.Host()) + assert.Equal(t, 20, gs.MaxRepetitions) + assert.Equal(t, "mycontext", gs.ContextName) + assert.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv) + assert.Equal(t, "myuser", sp.UserName) + assert.Equal(t, gosnmp.MD5, sp.AuthenticationProtocol) + assert.Equal(t, "password123", sp.AuthenticationPassphrase) + assert.Equal(t, gosnmp.DES, sp.PrivacyProtocol) + assert.Equal(t, "321drowssap", sp.PrivacyPassphrase) + assert.Equal(t, "myengineid", sp.AuthoritativeEngineID) + assert.EqualValues(t, 1, sp.AuthoritativeEngineBoots) + assert.EqualValues(t, 2, sp.AuthoritativeEngineTime) +} + +func TestGetSNMPConnection_caching(t *testing.T) { + s := &Snmp{} + gs1, err := s.getConnection("1.2.3.4") + require.NoError(t, err) + gs2, err := s.getConnection("1.2.3.4") + require.NoError(t, err) + gs3, err := s.getConnection("1.2.3.5") + require.NoError(t, err) + assert.True(t, gs1 == gs2) + assert.False(t, gs2 == gs3) +} + +func TestGosnmpWrapper_walk_retry(t *testing.T) { + srvr, err := net.ListenUDP("udp4", &net.UDPAddr{}) + defer srvr.Close() + require.NoError(t, err) + reqCount := 0 + // Set up a WaitGroup to wait for the server goroutine to exit and protect + // reqCount. + // Even though simultaneous access is impossible because the server will be + // blocked on ReadFrom, without this the race detector gets unhappy. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, 256) + for { + _, addr, err := srvr.ReadFrom(buf) + if err != nil { + return + } + reqCount++ + + srvr.WriteTo([]byte{'X'}, addr) // will cause decoding error + } + }() + + gs := &gosnmp.GoSNMP{ + Target: srvr.LocalAddr().(*net.UDPAddr).IP.String(), + Port: uint16(srvr.LocalAddr().(*net.UDPAddr).Port), + Version: gosnmp.Version2c, + Community: "public", + Timeout: time.Millisecond * 10, + Retries: 1, + } + err = gs.Connect() + require.NoError(t, err) + conn := gs.Conn + + gsw := gosnmpWrapper{gs} + err = gsw.Walk(".1.0.0", func(_ gosnmp.SnmpPDU) error { return nil }) + srvr.Close() + wg.Wait() + assert.Error(t, err) + assert.False(t, gs.Conn == conn) + assert.Equal(t, (gs.Retries+1)*2, reqCount) +} + +func TestGosnmpWrapper_get_retry(t *testing.T) { + srvr, err := net.ListenUDP("udp4", &net.UDPAddr{}) + defer srvr.Close() + require.NoError(t, err) + reqCount := 0 + // Set up a WaitGroup to wait for the server goroutine to exit and protect + // reqCount. + // Even though simultaneous access is impossible because the server will be + // blocked on ReadFrom, without this the race detector gets unhappy. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, 256) + for { + _, addr, err := srvr.ReadFrom(buf) + if err != nil { + return + } + reqCount++ + + srvr.WriteTo([]byte{'X'}, addr) // will cause decoding error + } + }() + + gs := &gosnmp.GoSNMP{ + Target: srvr.LocalAddr().(*net.UDPAddr).IP.String(), + Port: uint16(srvr.LocalAddr().(*net.UDPAddr).Port), + Version: gosnmp.Version2c, + Community: "public", + Timeout: time.Millisecond * 10, + Retries: 1, + } + err = gs.Connect() + require.NoError(t, err) + conn := gs.Conn + + gsw := gosnmpWrapper{gs} + _, err = gsw.Get([]string{".1.0.0"}) + srvr.Close() + wg.Wait() + assert.Error(t, err) + assert.False(t, gs.Conn == conn) + assert.Equal(t, (gs.Retries+1)*2, reqCount) +} + +func TestTableBuild_walk(t *testing.T) { + tbl := Table{ + Name: "mytable", + Fields: []Field{ + { + Name: "myfield1", + Oid: ".1.0.0.0.1.1", + IsTag: true, + }, + { + Name: "myfield2", + Oid: ".1.0.0.0.1.2", + }, + { + Name: "myfield3", + Oid: ".1.0.0.0.1.3", + Conversion: "float", + }, + }, + } + + tb, err := tbl.Build(tsc, true) + require.NoError(t, err) + + assert.Equal(t, tb.Name, "mytable") + rtr1 := RTableRow{ + Tags: map[string]string{"myfield1": "foo"}, + Fields: map[string]interface{}{"myfield2": 1, "myfield3": float64(0.123)}, + } + rtr2 := RTableRow{ + Tags: map[string]string{"myfield1": "bar"}, + Fields: map[string]interface{}{"myfield2": 2, "myfield3": float64(0.456)}, + } + assert.Len(t, tb.Rows, 2) + assert.Contains(t, tb.Rows, rtr1) + assert.Contains(t, tb.Rows, rtr2) +} + +func TestTableBuild_noWalk(t *testing.T) { + tbl := Table{ + Name: "mytable", + Fields: []Field{ + { + Name: "myfield1", + Oid: ".1.0.0.1.1", + IsTag: true, + }, + { + Name: "myfield2", + Oid: ".1.0.0.1.2", + }, + { + Name: "myfield3", + Oid: ".1.0.0.1.2", + IsTag: true, + }, + }, + } + + tb, err := tbl.Build(tsc, false) + require.NoError(t, err) + + rtr := RTableRow{ + Tags: map[string]string{"myfield1": "baz", "myfield3": "234"}, + Fields: map[string]interface{}{"myfield2": 234}, + } + assert.Len(t, tb.Rows, 1) + assert.Contains(t, tb.Rows, rtr) +} + +func TestGather(t *testing.T) { + s := &Snmp{ + Agents: []string{"TestGather"}, + Name: "mytable", + Fields: []Field{ + { + Name: "myfield1", + Oid: ".1.0.0.1.1", + IsTag: true, + }, + { + Name: "myfield2", + Oid: ".1.0.0.1.2", + }, + { + Name: "myfield3", + Oid: "1.0.0.1.1", + }, + }, + Tables: []Table{ + { + Name: "myOtherTable", + InheritTags: []string{"myfield1"}, + Fields: []Field{ + { + Name: "myOtherField", + Oid: ".1.0.0.0.1.4", + }, + }, + }, + }, + + connectionCache: map[string]snmpConnection{ + "TestGather": tsc, + }, + } + + acc := &testutil.Accumulator{} + + tstart := time.Now() + s.Gather(acc) + tstop := time.Now() + + require.Len(t, acc.Metrics, 2) + + m := acc.Metrics[0] + assert.Equal(t, "mytable", m.Measurement) + assert.Equal(t, "tsc", m.Tags["agent_host"]) + assert.Equal(t, "baz", m.Tags["myfield1"]) + assert.Len(t, m.Fields, 2) + assert.Equal(t, 234, m.Fields["myfield2"]) + assert.Equal(t, "baz", m.Fields["myfield3"]) + assert.True(t, tstart.Before(m.Time)) + assert.True(t, tstop.After(m.Time)) + + m2 := acc.Metrics[1] + assert.Equal(t, "myOtherTable", m2.Measurement) + assert.Equal(t, "tsc", m2.Tags["agent_host"]) + assert.Equal(t, "baz", m2.Tags["myfield1"]) + assert.Len(t, m2.Fields, 1) + assert.Equal(t, 123456, m2.Fields["myOtherField"]) +} + +func TestGather_host(t *testing.T) { + s := &Snmp{ + Agents: []string{"TestGather"}, + Name: "mytable", + Fields: []Field{ + { + Name: "host", + Oid: ".1.0.0.1.1", + IsTag: true, + }, + { + Name: "myfield2", + Oid: ".1.0.0.1.2", + }, + }, + + connectionCache: map[string]snmpConnection{ + "TestGather": tsc, + }, + } + + acc := &testutil.Accumulator{} + + s.Gather(acc) + + require.Len(t, acc.Metrics, 1) + m := acc.Metrics[0] + assert.Equal(t, "baz", m.Tags["host"]) +} + +func TestFieldConvert(t *testing.T) { + testTable := []struct { + input interface{} + conv string + expected interface{} + }{ + {[]byte("foo"), "", string("foo")}, + {"0.123", "float", float64(0.123)}, + {[]byte("0.123"), "float", float64(0.123)}, + {float32(0.123), "float", float64(float32(0.123))}, + {float64(0.123), "float", float64(0.123)}, + {123, "float", float64(123)}, + {123, "float(0)", float64(123)}, + {123, "float(4)", float64(0.0123)}, + {int8(123), "float(3)", float64(0.123)}, + {int16(123), "float(3)", float64(0.123)}, + {int32(123), "float(3)", float64(0.123)}, + {int64(123), "float(3)", float64(0.123)}, + {uint(123), "float(3)", float64(0.123)}, + {uint8(123), "float(3)", float64(0.123)}, + {uint16(123), "float(3)", float64(0.123)}, + {uint32(123), "float(3)", float64(0.123)}, + {uint64(123), "float(3)", float64(0.123)}, + {"123", "int", int64(123)}, + {[]byte("123"), "int", int64(123)}, + {float32(12.3), "int", int64(12)}, + {float64(12.3), "int", int64(12)}, + {int(123), "int", int64(123)}, + {int8(123), "int", int64(123)}, + {int16(123), "int", int64(123)}, + {int32(123), "int", int64(123)}, + {int64(123), "int", int64(123)}, + {uint(123), "int", int64(123)}, + {uint8(123), "int", int64(123)}, + {uint16(123), "int", int64(123)}, + {uint32(123), "int", int64(123)}, + {uint64(123), "int", int64(123)}, + } + + for _, tc := range testTable { + act := fieldConvert(tc.conv, tc.input) + assert.EqualValues(t, tc.expected, act, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected) + } +} + +func TestError(t *testing.T) { + e := fmt.Errorf("nested error") + err := Errorf(e, "top error %d", 123) + require.Error(t, err) + + ne, ok := err.(NestedError) + require.True(t, ok) + assert.Equal(t, e, ne.NestedErr) + + assert.Contains(t, err.Error(), "top error 123") + assert.Contains(t, err.Error(), "nested error") +} diff --git a/plugins/inputs/snmp/testdata/snmpd.conf b/plugins/inputs/snmp/testdata/snmpd.conf new file mode 100644 index 000000000..3f3151a65 --- /dev/null +++ b/plugins/inputs/snmp/testdata/snmpd.conf @@ -0,0 +1,17 @@ +# This config provides the data represented in the plugin documentation +# Requires net-snmp >= 5.7 + +#agentaddress UDP:127.0.0.1:1161 +rocommunity public + +override .1.0.0.0.1.1.0 octet_str "foo" +override .1.0.0.0.1.1.1 octet_str "bar" +override .1.0.0.0.1.102 octet_str "bad" +override .1.0.0.0.1.2.0 integer 1 +override .1.0.0.0.1.2.1 integer 2 +override .1.0.0.0.1.3.0 octet_str "0.123" +override .1.0.0.0.1.3.1 octet_str "0.456" +override .1.0.0.0.1.3.2 octet_str "9.999" +override .1.0.0.1.1 octet_str "baz" +override .1.0.0.1.2 uinteger 54321 +override .1.0.0.1.3 uinteger 234 diff --git a/plugins/inputs/snmp/testdata/test.mib b/plugins/inputs/snmp/testdata/test.mib new file mode 100644 index 000000000..d3246673b --- /dev/null +++ b/plugins/inputs/snmp/testdata/test.mib @@ -0,0 +1,51 @@ +TEST DEFINITIONS ::= BEGIN + +testOID ::= { 1 0 0 } + +testTable OBJECT-TYPE + SYNTAX SEQUENCE OF testTableEntry + MAX-ACCESS not-accessible + STATUS current + ::= { testOID 0 } + +testTableEntry OBJECT-TYPE + SYNTAX TestTableEntry + MAX-ACCESS not-accessible + STATUS current + INDEX { + server + } + ::= { testTable 1 } + +TestTableEntry ::= + SEQUENCE { + server OCTET STRING, + connections INTEGER, + latency OCTET STRING, + } + +server OBJECT-TYPE + SYNTAX OCTET STRING + MAX-ACCESS read-only + STATUS current + ::= { testTableEntry 1 } + +connections OBJECT-TYPE + SYNTAX INTEGER + MAX-ACCESS read-only + STATUS current + ::= { testTableEntry 2 } + +latency OBJECT-TYPE + SYNTAX OCTET STRING + MAX-ACCESS read-only + STATUS current + ::= { testTableEntry 3 } + +hostname OBJECT-TYPE + SYNTAX OCTET STRING + MAX-ACCESS read-only + STATUS current + ::= { testOID 1 1 } + +END diff --git a/plugins/inputs/snmp_legacy/snmp_legacy_test.go b/plugins/inputs/snmp_legacy/snmp_legacy_test.go deleted file mode 100644 index a6bf2922b..000000000 --- a/plugins/inputs/snmp_legacy/snmp_legacy_test.go +++ /dev/null @@ -1,482 +0,0 @@ -package snmp_legacy - -import ( - "testing" - - "github.com/influxdata/telegraf/testutil" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSNMPErrorGet1(t *testing.T) { - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: ".1.3.6.1.2.1.2.2.1.16.1", - } - h := Host{ - Collect: []string{"oid1"}, - } - s := Snmp{ - SnmptranslateFile: "bad_oid.txt", - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.Error(t, err) -} - -func TestSNMPErrorGet2(t *testing.T) { - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: ".1.3.6.1.2.1.2.2.1.16.1", - } - h := Host{ - Collect: []string{"oid1"}, - } - s := Snmp{ - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - assert.Equal(t, 0, len(acc.Metrics)) -} - -func TestSNMPErrorBulk(t *testing.T) { - bulk1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: ".1.3.6.1.2.1.2.2.1.16", - } - h := Host{ - Address: testutil.GetLocalHost(), - Collect: []string{"oid1"}, - } - s := Snmp{ - Host: []Host{h}, - Bulk: []Data{bulk1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - assert.Equal(t, 0, len(acc.Metrics)) -} - -func TestSNMPGet1(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: ".1.3.6.1.2.1.2.2.1.16.1", - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - } - s := Snmp{ - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "oid1", - map[string]interface{}{ - "oid1": uint(543846), - }, - map[string]string{ - "unit": "octets", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPGet2(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - get1 := Data{ - Name: "oid1", - Oid: "ifNumber", - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifNumber", - map[string]interface{}{ - "ifNumber": int(4), - }, - map[string]string{ - "instance": "0", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPGet3(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: "ifSpeed", - Instance: "1", - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifSpeed", - map[string]interface{}{ - "ifSpeed": uint(10000000), - }, - map[string]string{ - "unit": "octets", - "instance": "1", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPEasyGet4(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: "ifSpeed", - Instance: "1", - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - GetOids: []string{"ifNumber"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifSpeed", - map[string]interface{}{ - "ifSpeed": uint(10000000), - }, - map[string]string{ - "unit": "octets", - "instance": "1", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifNumber", - map[string]interface{}{ - "ifNumber": int(4), - }, - map[string]string{ - "instance": "0", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPEasyGet5(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - get1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: "ifSpeed", - Instance: "1", - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - GetOids: []string{".1.3.6.1.2.1.2.1.0"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Get: []Data{get1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifSpeed", - map[string]interface{}{ - "ifSpeed": uint(10000000), - }, - map[string]string{ - "unit": "octets", - "instance": "1", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifNumber", - map[string]interface{}{ - "ifNumber": int(4), - }, - map[string]string{ - "instance": "0", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPEasyGet6(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - GetOids: []string{"1.3.6.1.2.1.2.1.0"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifNumber", - map[string]interface{}{ - "ifNumber": int(4), - }, - map[string]string{ - "instance": "0", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -func TestSNMPBulk1(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - bulk1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: ".1.3.6.1.2.1.2.2.1.16", - MaxRepetition: 2, - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Bulk: []Data{bulk1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(543846), - }, - map[string]string{ - "unit": "octets", - "instance": "1", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(26475179), - }, - map[string]string{ - "unit": "octets", - "instance": "2", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(108963968), - }, - map[string]string{ - "unit": "octets", - "instance": "3", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(12991453), - }, - map[string]string{ - "unit": "octets", - "instance": "36", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} - -// TODO find why, if this test is active -// Circle CI stops with the following error... -// bash scripts/circle-test.sh died unexpectedly -// Maybe the test is too long ?? -func dTestSNMPBulk2(t *testing.T) { - bulk1 := Data{ - Name: "oid1", - Unit: "octets", - Oid: "ifOutOctets", - MaxRepetition: 2, - } - h := Host{ - Address: testutil.GetLocalHost() + ":31161", - Community: "telegraf", - Version: 2, - Timeout: 2.0, - Retries: 2, - Collect: []string{"oid1"}, - } - s := Snmp{ - SnmptranslateFile: "./testdata/oids.txt", - Host: []Host{h}, - Bulk: []Data{bulk1}, - } - - var acc testutil.Accumulator - err := s.Gather(&acc) - require.NoError(t, err) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(543846), - }, - map[string]string{ - "unit": "octets", - "instance": "1", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(26475179), - }, - map[string]string{ - "unit": "octets", - "instance": "2", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(108963968), - }, - map[string]string{ - "unit": "octets", - "instance": "3", - "snmp_host": testutil.GetLocalHost(), - }, - ) - - acc.AssertContainsTaggedFields(t, - "ifOutOctets", - map[string]interface{}{ - "ifOutOctets": uint(12991453), - }, - map[string]string{ - "unit": "octets", - "instance": "36", - "snmp_host": testutil.GetLocalHost(), - }, - ) -} diff --git a/plugins/inputs/snmp_legacy/testdata/oids.txt b/plugins/inputs/snmp_legacy/testdata/oids.txt deleted file mode 100644 index 1a351be90..000000000 --- a/plugins/inputs/snmp_legacy/testdata/oids.txt +++ /dev/null @@ -1,32 +0,0 @@ -org 1.3 -dod 1.3.6 -internet 1.3.6.1 -directory 1.3.6.1.1 -mgmt 1.3.6.1.2 -mib-2 1.3.6.1.2.1 -interfaces 1.3.6.1.2.1.2 -ifNumber 1.3.6.1.2.1.2.1 -ifTable 1.3.6.1.2.1.2.2 -ifEntry 1.3.6.1.2.1.2.2.1 -ifIndex 1.3.6.1.2.1.2.2.1.1 -ifDescr 1.3.6.1.2.1.2.2.1.2 -ifType 1.3.6.1.2.1.2.2.1.3 -ifMtu 1.3.6.1.2.1.2.2.1.4 -ifSpeed 1.3.6.1.2.1.2.2.1.5 -ifPhysAddress 1.3.6.1.2.1.2.2.1.6 -ifAdminStatus 1.3.6.1.2.1.2.2.1.7 -ifOperStatus 1.3.6.1.2.1.2.2.1.8 -ifLastChange 1.3.6.1.2.1.2.2.1.9 -ifInOctets 1.3.6.1.2.1.2.2.1.10 -ifInUcastPkts 1.3.6.1.2.1.2.2.1.11 -ifInNUcastPkts 1.3.6.1.2.1.2.2.1.12 -ifInDiscards 1.3.6.1.2.1.2.2.1.13 -ifInErrors 1.3.6.1.2.1.2.2.1.14 -ifInUnknownProtos 1.3.6.1.2.1.2.2.1.15 -ifOutOctets 1.3.6.1.2.1.2.2.1.16 -ifOutUcastPkts 1.3.6.1.2.1.2.2.1.17 -ifOutNUcastPkts 1.3.6.1.2.1.2.2.1.18 -ifOutDiscards 1.3.6.1.2.1.2.2.1.19 -ifOutErrors 1.3.6.1.2.1.2.2.1.20 -ifOutQLen 1.3.6.1.2.1.2.2.1.21 -ifSpecific 1.3.6.1.2.1.2.2.1.22 diff --git a/plugins/inputs/webhooks/README.md b/plugins/inputs/webhooks/README.md index 86e6685b8..bc7714e9e 100644 --- a/plugins/inputs/webhooks/README.md +++ b/plugins/inputs/webhooks/README.md @@ -15,6 +15,7 @@ $ sudo service telegraf start ## Available webhooks +- [Filestack](filestack/) - [Github](github/) - [Mandrill](mandrill/) - [Rollbar](rollbar/) diff --git a/plugins/inputs/webhooks/filestack/README.md b/plugins/inputs/webhooks/filestack/README.md new file mode 100644 index 000000000..585e6f202 --- /dev/null +++ b/plugins/inputs/webhooks/filestack/README.md @@ -0,0 +1,17 @@ +# Filestack webhook + +You should configure your Filestack's Webhooks to point at the `webhooks` service. To do this go to `filestack.com/`, select your app and click `Credentials > Webhooks`. In the resulting page, set the `URL` to `http://