Merge branch 'master' into master

This commit is contained in:
Leo Zhang 2016-03-29 22:45:18 +08:00
commit 361e4b8318
50 changed files with 2325 additions and 251 deletions

View File

@ -1,10 +1,28 @@
## v0.11.2 [unreleased] ## v0.11.2 [unreleased]
### Features ### Features
- [#927](https://github.com/influxdata/telegraf/pull/927): Adds parsing of tags to the statsd input when using DataDog's dogstatsd extension
- [#863](https://github.com/influxdata/telegraf/pull/863): AMQP output: allow external auth. Thanks @ekini! - [#863](https://github.com/influxdata/telegraf/pull/863): AMQP output: allow external auth. Thanks @ekini!
- [#707](https://github.com/influxdata/telegraf/pull/707): Improved prometheus plugin. Thanks @titilambert! - [#707](https://github.com/influxdata/telegraf/pull/707): Improved prometheus plugin. Thanks @titilambert!
- [#878](https://github.com/influxdata/telegraf/pull/878): Added json serializer. Thanks @ch3lo!
- [#880](https://github.com/influxdata/telegraf/pull/880): Add the ability to specify the bearer token to the prometheus plugin. Thanks @jchauncey!
- [#882](https://github.com/influxdata/telegraf/pull/882): Fixed SQL Server Plugin issues
- [#849](https://github.com/influxdata/telegraf/issues/849): Adding ability to parse single values as an input data type.
- [#844](https://github.com/influxdata/telegraf/pull/844): postgres_extensible plugin added. Thanks @menardorama!
- [#866](https://github.com/influxdata/telegraf/pull/866): couchbase input plugin. Thanks @ljosa!
- [#789](https://github.com/influxdata/telegraf/pull/789): Support multiple field specification and `field*` in graphite templates. Thanks @chrusty!
- [#762](https://github.com/influxdata/telegraf/pull/762): Nagios parser for the exec plugin. Thanks @titilambert!
- [#848](https://github.com/influxdata/telegraf/issues/848): Provide option to omit host tag from telegraf agent.
- [#928](https://github.com/influxdata/telegraf/pull/928): Deprecating the statsd "convert_names" options, expose separator config.
### Bugfixes ### Bugfixes
- [#890](https://github.com/influxdata/telegraf/issues/890): Create TLS config even if only ssl_ca is provided.
- [#884](https://github.com/influxdata/telegraf/issues/884): Do not call write method if there are 0 metrics to write.
- [#898](https://github.com/influxdata/telegraf/issues/898): Put database name in quotes, fixes special characters in the database name.
- [#656](https://github.com/influxdata/telegraf/issues/656): No longer run `lsof` on linux to get netstat data, fixes permissions issue.
- [#907](https://github.com/influxdata/telegraf/issues/907): Fix prometheus invalid label/measurement name key.
- [#841](https://github.com/influxdata/telegraf/issues/841): Fix memcached unix socket panic.
- [#873](https://github.com/influxdata/telegraf/issues/873): Fix SNMP plugin sometimes not returning metrics. Thanks @titiliambert!
## v0.11.1 [2016-03-17] ## v0.11.1 [2016-03-17]

9
Godeps
View File

@ -5,12 +5,14 @@ github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1
github.com/couchbase/gomemcached a5ea6356f648fec6ab89add00edd09151455b4b2
github.com/couchbase/goutils 5823a0cbaaa9008406021dc5daf80125ea30bba6
github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc github.com/dancannon/gorethink e7cac92ea2bc52638791a021f212145acfedb1fc
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367 github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
github.com/fsouza/go-dockerclient a49c8269a6899cae30da1f8a4b82e0ce945f9967 github.com/fsouza/go-dockerclient a49c8269a6899cae30da1f8a4b82e0ce945f9967
github.com/go-ini/ini 776aa739ce9373377cd16f526cdf06cb4c89b40f
github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee github.com/go-sql-driver/mysql 1fca743146605a172a266e1654e01e5cd5669bee
github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032 github.com/golang/protobuf 552c7b9542c194800fd493123b3798ef0a832032
github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7 github.com/golang/snappy 427fb6fc07997f43afa32f35e850833760e489a7
@ -21,7 +23,6 @@ github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48 github.com/influxdata/influxdb e3fef5593c21644f2b43af55d6e17e70910b0e48
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0 github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/jmespath/go-jmespath 0b12d6b521d83fc7f755e7cfc1b1fbdd35a01a74
github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720 github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720
github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36 github.com/lib/pq e182dc4027e2ded4b19396d638610f2653295f36
github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453 github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453
@ -31,16 +32,14 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/nats b13fc9d12b0b123ebc374e6b808c6228ae4234a3 github.com/nats-io/nats b13fc9d12b0b123ebc374e6b808c6228ae4234a3
github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa
github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980 github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980
github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831 github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831
github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 1de1357e7737a536c7f4ff6be7bd27977db4d2cb github.com/shirou/gopsutil 1f32ce1bb380845be7f5d174ac641a2c592c0c42
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3 github.com/wvanbergen/kafka 1a8639a45164fcc245d5c7b4bd3ccfbd1a0ffbf3
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8

View File

@ -17,13 +17,6 @@ new plugins.
## Installation: ## Installation:
NOTE: Telegraf 0.10.x is **not** backwards-compatible with previous versions
of telegraf, both in the database layout and the configuration file. 0.2.x
will continue to be supported, see below for download links.
For more details on the differences between Telegraf 0.2.x and 0.10.x, see
the [release blog post](https://influxdata.com/blog/announcing-telegraf-0-10-0/).
### Linux deb and rpm Packages: ### Linux deb and rpm Packages:
Latest: Latest:
@ -34,10 +27,6 @@ Latest (arm):
* http://get.influxdb.org/telegraf/telegraf_0.11.1-1_armhf.deb * http://get.influxdb.org/telegraf/telegraf_0.11.1-1_armhf.deb
* http://get.influxdb.org/telegraf/telegraf-0.11.1-1.armhf.rpm * http://get.influxdb.org/telegraf/telegraf-0.11.1-1.armhf.rpm
0.2.x:
* http://get.influxdb.org/telegraf/telegraf_0.2.4_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.2.4-1.x86_64.rpm
##### Package Instructions: ##### Package Instructions:
* Telegraf binary is installed in `/usr/bin/telegraf` * Telegraf binary is installed in `/usr/bin/telegraf`
@ -50,8 +39,9 @@ controlled via `systemctl [action] telegraf`
### yum/apt Repositories: ### yum/apt Repositories:
There is a yum/apt repo available for the whole InfluxData stack, see There is a yum/apt repo available for the whole InfluxData stack, see
[here](https://docs.influxdata.com/influxdb/v0.9/introduction/installation/#installation) [here](https://docs.influxdata.com/influxdb/v0.10/introduction/installation/#installation)
for instructions, replacing the `influxdb` package name with `telegraf`. for instructions on setting up the repo. Once it is configured, you will be able
to use this repo to install & update telegraf.
### Linux tarballs: ### Linux tarballs:
@ -60,11 +50,6 @@ Latest:
* http://get.influxdb.org/telegraf/telegraf-0.11.1-1_linux_i386.tar.gz * http://get.influxdb.org/telegraf/telegraf-0.11.1-1_linux_i386.tar.gz
* http://get.influxdb.org/telegraf/telegraf-0.11.1-1_linux_armhf.tar.gz * http://get.influxdb.org/telegraf/telegraf-0.11.1-1_linux_armhf.tar.gz
0.2.x:
* http://get.influxdb.org/telegraf/telegraf_linux_amd64_0.2.4.tar.gz
* http://get.influxdb.org/telegraf/telegraf_linux_386_0.2.4.tar.gz
* http://get.influxdb.org/telegraf/telegraf_linux_arm_0.2.4.tar.gz
##### tarball Instructions: ##### tarball Instructions:
To install the full directory structure with config file, run: To install the full directory structure with config file, run:
@ -79,6 +64,15 @@ To extract only the binary, run:
tar -zxvf telegraf-0.11.1-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf tar -zxvf telegraf-0.11.1-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf
``` ```
### FreeBSD tarball:
Latest:
* http://get.influxdb.org/telegraf/telegraf-0.11.1-1_freebsd_amd64.tar.gz
##### tarball Instructions:
See linux instructions above.
### Ansible Role: ### Ansible Role:
Ansible role: https://github.com/rossmcdonald/telegraf Ansible role: https://github.com/rossmcdonald/telegraf
@ -165,13 +159,14 @@ Currently implemented sources:
* aerospike * aerospike
* apache * apache
* bcache * bcache
* couchbase
* couchdb * couchdb
* disque * disque
* dns query time * dns query time
* docker * docker
* dovecot * dovecot
* elasticsearch * elasticsearch
* exec (generic executable plugin, support JSON, influx and graphite) * exec (generic executable plugin, support JSON, influx, graphite and nagios)
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* influxdb * influxdb
@ -191,6 +186,7 @@ Currently implemented sources:
* phusion passenger * phusion passenger
* ping * ping
* postgresql * postgresql
* postgresql_extensible
* powerdns * powerdns
* procstat * procstat
* prometheus * prometheus

View File

@ -27,6 +27,7 @@ func NewAgent(config *config.Config) (*Agent, error) {
Config: config, Config: config,
} }
if !a.Config.Agent.OmitHostname {
if a.Config.Agent.Hostname == "" { if a.Config.Agent.Hostname == "" {
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
@ -37,6 +38,7 @@ func NewAgent(config *config.Config) (*Agent, error) {
} }
config.Tags["host"] = a.Config.Agent.Hostname config.Tags["host"] = a.Config.Agent.Hostname
}
return a, nil return a, nil
} }

View File

@ -1,7 +1,6 @@
package agent package agent
import ( import (
"github.com/stretchr/testify/assert"
"testing" "testing"
"time" "time"
@ -11,8 +10,18 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/all" _ "github.com/influxdata/telegraf/plugins/inputs/all"
// needing to load the outputs // needing to load the outputs
_ "github.com/influxdata/telegraf/plugins/outputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/stretchr/testify/assert"
) )
func TestAgent_OmitHostname(t *testing.T) {
c := config.NewConfig()
c.Agent.OmitHostname = true
_, err := NewAgent(c)
assert.NoError(t, err)
assert.NotContains(t, c.Tags, "host")
}
func TestAgent_LoadPlugin(t *testing.T) { func TestAgent_LoadPlugin(t *testing.T) {
c := config.NewConfig() c := config.NewConfig()
c.InputFilters = []string{"mysql"} c.InputFilters = []string{"mysql"}

View File

@ -4,9 +4,9 @@ machine:
post: post:
- sudo service zookeeper stop - sudo service zookeeper stop
- go version - go version
- go version | grep 1.5.3 || sudo rm -rf /usr/local/go - go version | grep 1.6 || sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.5.3.linux-amd64.tar.gz - wget https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.5.3.linux-amd64.tar.gz - sudo tar -C /usr/local -xzf go1.6.linux-amd64.tar.gz
- go version - go version
dependencies: dependencies:

View File

@ -32,7 +32,7 @@ var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fInputFilters = flag.String("input-filter", "", var fInputFilters = flag.String("input-filter", "",
"filter the inputs to enable, separator is :") "filter the inputs to enable, separator is :")
var fInputList = flag.Bool("input-list", false, var fInputList = flag.Bool("input-list", false,
"print available output plugins.") "print available input plugins.")
var fOutputFilters = flag.String("output-filter", "", var fOutputFilters = flag.String("output-filter", "",
"filter the outputs to enable, separator is :") "filter the outputs to enable, separator is :")
var fOutputList = flag.Bool("output-list", false, var fOutputList = flag.Bool("output-list", false,

View File

@ -1,5 +1,12 @@
# Telegraf Input Data Formats # Telegraf Input Data Formats
Telegraf is able to parse the following input data formats into metrics:
1. InfluxDB Line Protocol
1. JSON
1. Graphite
1. Value, ie 45 or "booyah"
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
are a combination of four basic parts: are a combination of four basic parts:
@ -134,6 +141,38 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6 exec_mycollector,my_tag_1=foo a=5,b_c=6
``` ```
## Value:
The "value" data format translates single values into Telegraf metrics. This
is done by assigning a measurement name (which can be overridden using the
`name_override` config option), and setting a single field ("value") as the
parsed metric.
#### Value Configuration:
You can tell Telegraf what type of metric to collect by using the `data_type`
configuration option.
It is also recommended that you set `name_override` to a measurement name that
makes sense for your metric, otherwise it will just be set to the name of the
plugin.
```toml
[[inputs.exec]]
## Commands array
commands = ["cat /proc/sys/kernel/random/entropy_avail"]
## override the default metric name of "exec"
name_override = "entropy_available"
## Data format to consume. This can be "json", "value", influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type = "integer"
```
## Graphite: ## Graphite:
The Graphite data format translates graphite _dot_ buckets directly into The Graphite data format translates graphite _dot_ buckets directly into
@ -181,17 +220,32 @@ So the following template:
```toml ```toml
templates = [ templates = [
"measurement.measurement.field.region" "measurement.measurement.field.field.region"
] ]
``` ```
would result in the following Graphite -> Telegraf transformation. would result in the following Graphite -> Telegraf transformation.
``` ```
cpu.usage.idle.us-west 100 cpu.usage.idle.percent.us-west 100
=> cpu_usage,region=us-west idle=100 => cpu_usage,region=us-west idle_percent=100
``` ```
The field key can also be derived from the second "half" of the input metric-name by specifying ```field*```:
```toml
templates = [
"measurement.measurement.region.field*"
]
```
would result in the following Graphite -> Telegraf transformation.
```
cpu.usage.us-west.idle.percentage 100
=> cpu_usage,region=us-west idle_percentage=100
```
(This cannot be used in conjunction with "measurement*"!)
#### Filter Templates: #### Filter Templates:
Users can also filter the template(s) to use based on the name of the bucket, Users can also filter the template(s) to use based on the name of the bucket,
@ -272,3 +326,27 @@ There are many more options available,
"measurement*" "measurement*"
] ]
``` ```
## Nagios:
There are no additional configuration options for Nagios line-protocol. The
metrics are parsed directly into Telegraf metrics.
Note: Nagios Input Data Formats is only supported in `exec` input plugin.
#### Nagios Configuration:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/lib/nagios/plugins/check_load", "-w 5,6,7 -c 7,8,9"]
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Data format to consume. This can be "json", "influx", "graphite" or "nagios"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "nagios"
```

View File

@ -53,7 +53,7 @@ metrics are serialized directly into InfluxDB line-protocol.
## Files to write to, "stdout" is a specially handled file. ## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"] files = ["stdout", "/tmp/metrics.out"]
## Data format to output. This can be "influx" or "graphite" ## Data format to output. This can be "influx", "json" or "graphite"
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
@ -87,7 +87,7 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
## Files to write to, "stdout" is a specially handled file. ## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"] files = ["stdout", "/tmp/metrics.out"]
## Data format to output. This can be "influx" or "graphite" ## Data format to output. This can be "influx", "json" or "graphite"
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
@ -95,3 +95,37 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
prefix = "telegraf" prefix = "telegraf"
``` ```
## Json:
The Json data format serialized Telegraf metrics in json format. The format is:
```json
{
"fields":{
"field_1":30,
"field_2":4,
"field_N":59,
"n_images":660
},
"name":"docker",
"tags":{
"host":"raynor"
},
"timestamp":1458229140
}
```
#### Json Configuration:
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
## Data format to output. This can be "influx", "json" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
```

View File

@ -99,6 +99,7 @@ type AgentConfig struct {
// Quiet is the option for running in quiet mode // Quiet is the option for running in quiet mode
Quiet bool Quiet bool
Hostname string Hostname string
OmitHostname bool
} }
// Inputs returns a list of strings of the configured inputs. // Inputs returns a list of strings of the configured inputs.
@ -183,6 +184,8 @@ var header = `# Telegraf Configuration
quiet = false quiet = false
## Override default hostname, if empty use os.Hostname() ## Override default hostname, if empty use os.Hostname()
hostname = "" hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
# #
@ -701,12 +704,21 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
} }
} }
if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DataType = str.Value
}
}
}
c.MetricName = name c.MetricName = name
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
delete(tbl.Fields, "separator") delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates") delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys") delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "data_type")
return parsers.NewParser(c) return parsers.NewParser(c)
} }

View File

@ -86,15 +86,15 @@ func GetTLSConfig(
SSLCert, SSLKey, SSLCA string, SSLCert, SSLKey, SSLCA string,
InsecureSkipVerify bool, InsecureSkipVerify bool,
) (*tls.Config, error) { ) (*tls.Config, error) {
t := &tls.Config{} if SSLCert == "" && SSLKey == "" && SSLCA == "" && !InsecureSkipVerify {
if SSLCert != "" && SSLKey != "" && SSLCA != "" { return nil, nil
cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey)
if err != nil {
return nil, errors.New(fmt.Sprintf(
"Could not load TLS client key/certificate: %s",
err))
} }
t := &tls.Config{
InsecureSkipVerify: InsecureSkipVerify,
}
if SSLCA != "" {
caCert, err := ioutil.ReadFile(SSLCA) caCert, err := ioutil.ReadFile(SSLCA)
if err != nil { if err != nil {
return nil, errors.New(fmt.Sprintf("Could not load TLS CA: %s", return nil, errors.New(fmt.Sprintf("Could not load TLS CA: %s",
@ -103,20 +103,21 @@ func GetTLSConfig(
caCertPool := x509.NewCertPool() caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert) caCertPool.AppendCertsFromPEM(caCert)
t.RootCAs = caCertPool
}
t = &tls.Config{ if SSLCert != "" && SSLKey != "" {
Certificates: []tls.Certificate{cert}, cert, err := tls.LoadX509KeyPair(SSLCert, SSLKey)
RootCAs: caCertPool, if err != nil {
InsecureSkipVerify: InsecureSkipVerify, return nil, errors.New(fmt.Sprintf(
"Could not load TLS client key/certificate: %s",
err))
} }
t.Certificates = []tls.Certificate{cert}
t.BuildNameToCertificate() t.BuildNameToCertificate()
} else {
if InsecureSkipVerify {
t.InsecureSkipVerify = true
} else {
return nil, nil
}
} }
// will be nil by default if nothing is provided // will be nil by default if nothing is provided
return t, nil return t, nil
} }

View File

@ -121,6 +121,9 @@ func (ro *RunningOutput) Write() error {
} }
func (ro *RunningOutput) write(metrics []telegraf.Metric) error { func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
start := time.Now() start := time.Now()
err := ro.Output.Write(metrics) err := ro.Output.Write(metrics)
elapsed := time.Since(start) elapsed := time.Since(start)

View File

@ -4,6 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
_ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/disque"
_ "github.com/influxdata/telegraf/plugins/inputs/dns_query" _ "github.com/influxdata/telegraf/plugins/inputs/dns_query"
@ -35,6 +36,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/ping"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql" _ "github.com/influxdata/telegraf/plugins/inputs/postgresql"
_ "github.com/influxdata/telegraf/plugins/inputs/postgresql_extensible"
_ "github.com/influxdata/telegraf/plugins/inputs/powerdns" _ "github.com/influxdata/telegraf/plugins/inputs/powerdns"
_ "github.com/influxdata/telegraf/plugins/inputs/procstat" _ "github.com/influxdata/telegraf/plugins/inputs/procstat"
_ "github.com/influxdata/telegraf/plugins/inputs/prometheus" _ "github.com/influxdata/telegraf/plugins/inputs/prometheus"

View File

@ -0,0 +1,63 @@
# Telegraf Plugin: Couchbase
## Configuration:
```
# Read per-node and per-bucket metrics from Couchbase
[[inputs.couchbase]]
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## http://couchbase-0.example.com/
## http://admin:secret@couchbase-0.example.com:8091/
##
## If no servers are specified, then localhost is used as the host.
## If no protocol is specifed, HTTP is used.
## If no port is specified, 8091 is used.
servers = ["http://localhost:8091"]
```
## Measurements:
### couchbase_node
Tags:
- cluster: whatever you called it in `servers` in the configuration, e.g.: `http://couchbase-0.example.com/`
- hostname: Couchbase's name for the node and port, e.g., `172.16.10.187:8091`
Fields:
- memory_free (unit: bytes, example: 23181365248.0)
- memory_total (unit: bytes, example: 64424656896.0)
### couchbase_bucket
Tags:
- cluster: whatever you called it in `servers` in the configuration, e.g.: `http://couchbase-0.example.com/`)
- bucket: the name of the couchbase bucket, e.g., `blastro-df`
Fields:
- quota_percent_used (unit: percent, example: 68.85424936294555)
- ops_per_sec (unit: count, example: 5686.789686789687)
- disk_fetches (unit: count, example: 0.0)
- item_count (unit: count, example: 943239752.0)
- disk_used (unit: bytes, example: 409178772321.0)
- data_used (unit: bytes, example: 212179309111.0)
- mem_used (unit: bytes, example: 202156957464.0)
## Example output
```
$ telegraf -config telegraf.conf -input-filter couchbase -test
* Plugin: couchbase, Collection 1
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.10.187:8091 memory_free=22927384576,memory_total=64424656896 1458381183695864929
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.10.65:8091 memory_free=23520161792,memory_total=64424656896 1458381183695972112
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.13.105:8091 memory_free=23531704320,memory_total=64424656896 1458381183695995259
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.13.173:8091 memory_free=23628767232,memory_total=64424656896 1458381183696010870
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.15.120:8091 memory_free=23616692224,memory_total=64424656896 1458381183696027406
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.8.127:8091 memory_free=23431770112,memory_total=64424656896 1458381183696041040
> couchbase_node,cluster=https://couchbase-0.example.com/,hostname=172.16.8.148:8091 memory_free=23811371008,memory_total=64424656896 1458381183696059060
> couchbase_bucket,bucket=default,cluster=https://couchbase-0.example.com/ data_used=25743360,disk_fetches=0,disk_used=31744886,item_count=0,mem_used=77729224,ops_per_sec=0,quota_percent_used=10.58976636614118 1458381183696210074
> couchbase_bucket,bucket=demoncat,cluster=https://couchbase-0.example.com/ data_used=38157584951,disk_fetches=0,disk_used=62730302441,item_count=14662532,mem_used=24015304256,ops_per_sec=1207.753207753208,quota_percent_used=79.87855353525707 1458381183696242695
> couchbase_bucket,bucket=blastro-df,cluster=https://couchbase-0.example.com/ data_used=212552491622,disk_fetches=0,disk_used=413323157621,item_count=944655680,mem_used=202421103760,ops_per_sec=1692.176692176692,quota_percent_used=68.9442170551845 1458381183696272206
```

View File

@ -0,0 +1,104 @@
package couchbase
import (
couchbase "github.com/couchbase/go-couchbase"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"sync"
)
type Couchbase struct {
Servers []string
}
var sampleConfig = `
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## http://couchbase-0.example.com/
## http://admin:secret@couchbase-0.example.com:8091/
##
## If no servers are specified, then localhost is used as the host.
## If no protocol is specifed, HTTP is used.
## If no port is specified, 8091 is used.
servers = ["http://localhost:8091"]
`
func (r *Couchbase) SampleConfig() string {
return sampleConfig
}
func (r *Couchbase) Description() string {
return "Read metrics from one or many couchbase clusters"
}
// Reads stats from all configured clusters. Accumulates stats.
// Returns one of the errors encountered while gathering stats (if any).
func (r *Couchbase) Gather(acc telegraf.Accumulator) error {
if len(r.Servers) == 0 {
r.gatherServer("http://localhost:8091/", acc, nil)
return nil
}
var wg sync.WaitGroup
var outerr error
for _, serv := range r.Servers {
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = r.gatherServer(serv, acc, nil)
}(serv)
}
wg.Wait()
return outerr
}
func (r *Couchbase) gatherServer(addr string, acc telegraf.Accumulator, pool *couchbase.Pool) error {
if pool == nil {
client, err := couchbase.Connect(addr)
if err != nil {
return err
}
// `default` is the only possible pool name. It's a
// placeholder for a possible future Couchbase feature. See
// http://stackoverflow.com/a/16990911/17498.
p, err := client.GetPool("default")
if err != nil {
return err
}
pool = &p
}
for i := 0; i < len(pool.Nodes); i++ {
node := pool.Nodes[i]
tags := map[string]string{"cluster": addr, "hostname": node.Hostname}
fields := make(map[string]interface{})
fields["memory_free"] = node.MemoryFree
fields["memory_total"] = node.MemoryTotal
acc.AddFields("couchbase_node", fields, tags)
}
for bucketName, _ := range pool.BucketMap {
tags := map[string]string{"cluster": addr, "bucket": bucketName}
bs := pool.BucketMap[bucketName].BasicStats
fields := make(map[string]interface{})
fields["quota_percent_used"] = bs["quotaPercentUsed"]
fields["ops_per_sec"] = bs["opsPerSec"]
fields["disk_fetches"] = bs["diskFetches"]
fields["item_count"] = bs["itemCount"]
fields["disk_used"] = bs["diskUsed"]
fields["data_used"] = bs["dataUsed"]
fields["mem_used"] = bs["memUsed"]
acc.AddFields("couchbase_bucket", fields, tags)
}
return nil
}
func init() {
inputs.Add("couchbase", func() telegraf.Input {
return &Couchbase{}
})
}

File diff suppressed because one or more lines are too long

View File

@ -5,12 +5,14 @@ import (
"fmt" "fmt"
"os/exec" "os/exec"
"sync" "sync"
"syscall"
"github.com/gonuts/go-shellquote" "github.com/gonuts/go-shellquote"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
) )
const sampleConfig = ` const sampleConfig = `
@ -20,7 +22,7 @@ const sampleConfig = `
## measurement name suffix (for separating different commands) ## measurement name suffix (for separating different commands)
name_suffix = "_mycollector" name_suffix = "_mycollector"
## Data format to consume. This can be "json", "influx" or "graphite" ## Data format to consume. This can be "json", "influx", "graphite" or "nagios
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
@ -46,12 +48,32 @@ func NewExec() *Exec {
} }
type Runner interface { type Runner interface {
Run(*Exec, string) ([]byte, error) Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
} }
type CommandRunner struct{} type CommandRunner struct{}
func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) { func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}
func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
split_cmd, err := shellquote.Split(command) split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 { if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err) return nil, fmt.Errorf("exec: unable to parse command, %s", err)
@ -63,8 +85,18 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
return nil, fmt.Errorf("exec: %s for command '%s'", err, command) return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
} }
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
}
return out.Bytes(), nil return out.Bytes(), nil
} }
@ -72,7 +104,7 @@ func (c CommandRunner) Run(e *Exec, command string) ([]byte, error) {
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) {
defer e.wg.Done() defer e.wg.Done()
out, err := e.runner.Run(e, command) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
e.errChan <- err e.errChan <- err
return return

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -57,7 +58,7 @@ func newRunnerMock(out []byte, err error) Runner {
} }
} }
func (r runnerMock) Run(e *Exec, command string) ([]byte, error) { func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil { if r.err != nil {
return nil, r.err return nil, r.err
} }

View File

@ -94,14 +94,15 @@ func (m *Memcached) gatherServer(
acc telegraf.Accumulator, acc telegraf.Accumulator,
) error { ) error {
var conn net.Conn var conn net.Conn
var err error
if unix { if unix {
conn, err := net.DialTimeout("unix", address, defaultTimeout) conn, err = net.DialTimeout("unix", address, defaultTimeout)
if err != nil { if err != nil {
return err return err
} }
defer conn.Close() defer conn.Close()
} else { } else {
_, _, err := net.SplitHostPort(address) _, _, err = net.SplitHostPort(address)
if err != nil { if err != nil {
address = address + ":11211" address = address + ":11211"
} }
@ -113,6 +114,10 @@ func (m *Memcached) gatherServer(
defer conn.Close() defer conn.Close()
} }
if conn == nil {
return fmt.Errorf("Failed to create net connection")
}
// Extend connection // Extend connection
conn.SetDeadline(time.Now().Add(defaultTimeout)) conn.SetDeadline(time.Now().Add(defaultTimeout))

View File

@ -0,0 +1,231 @@
# PostgreSQL plugin
This postgresql plugin provides metrics for your postgres database. It has been
designed to parse ithe sql queries in the plugin section of your telegraf.conf.
For now only two queries are specified and it's up to you to add more; some per
query parameters have been added :
* The SQl query itself
* The minimum version supported (here in numeric display visible in pg_settings)
* A boolean to define if the query have to be run against some specific
* variables (defined in the databaes variable of the plugin section)
* The list of the column that have to be defined has tags
```
[[inputs.postgresql_extensible]]
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=...
# or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
#
# All connection parameters are optional. #
# Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying
# to grab metrics for.
#
address = "host=localhost user=postgres sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "testing"]
#
# Define the toml config where the sql queries are stored
# New queries can be added, if the withdbname is set to true and there is no
# databases defined in the 'databases field', the sql query is ended by a 'is
# not null' in order to make the query succeed.
# Be careful that the sqlquery must contain the where clause with a part of
# the filtering, the plugin will add a 'IN (dbname list)' clause if the
# withdbname is set to true
# Example :
# The sqlquery : "SELECT * FROM pg_stat_database where datname" become
# "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')"
# because the databases variable was set to ['postgres', 'pgbench' ] and the
# withdbname was true.
# Be careful that if the withdbname is set to false you d'ont have to define
# the where clause (aka with the dbname)
# the tagvalue field is used to define custom tags (separated by comas)
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database where datname"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname=false
tagvalue=""
```
The system can be easily extended using homemade metrics collection tools or
using postgreql extensions ([pg_stat_statements](http://www.postgresql.org/docs/current/static/pgstatstatements.html), [pg_proctab](https://github.com/markwkm/pg_proctab),[powa](http://dalibo.github.io/powa/)...)
# Sample Queries :
- telegraf.conf postgresql_extensible queries (assuming that you have configured
correctly your connection)
```
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="select * from sessions"
version=901
withdbname=false
tagvalue="db,username,state"
[[inputs.postgresql_extensible.query]]
sqlquery="select setting as max_connections from pg_settings where \
name='max_connections'"
version=801
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="select * from pg_stat_kcache"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="select setting as shared_buffers from pg_settings where \
name='shared_buffers'"
version=801
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT db, count( distinct blocking_pid ) AS num_blocking_sessions,\
count( distinct blocked_pid) AS num_blocked_sessions FROM \
public.blocking_procs group by db"
version=901
withdbname=false
tagvalue="db"
```
# Postgresql Side
postgresql.conf :
```
shared_preload_libraries = 'pg_stat_statements,pg_stat_kcache'
```
Please follow the requirements to setup those extensions.
In the database (can be a specific monitoring db)
```
create extension pg_stat_statements;
create extension pg_stat_kcache;
create extension pg_proctab;
```
(assuming that the extension is installed on the OS Layer)
- pg_stat_kcache is available on the postgresql.org yum repo
- pg_proctab is available at : https://github.com/markwkm/pg_proctab
##Views
- Blocking sessions
```
CREATE OR REPLACE VIEW public.blocking_procs AS
SELECT a.datname AS db,
kl.pid AS blocking_pid,
ka.usename AS blocking_user,
ka.query AS blocking_query,
bl.pid AS blocked_pid,
a.usename AS blocked_user,
a.query AS blocked_query,
to_char(age(now(), a.query_start), 'HH24h:MIm:SSs'::text) AS age
FROM pg_locks bl
JOIN pg_stat_activity a ON bl.pid = a.pid
JOIN pg_locks kl ON bl.locktype = kl.locktype AND NOT bl.database IS
DISTINCT FROM kl.database AND NOT bl.relation IS DISTINCT FROM kl.relation
AND NOT bl.page IS DISTINCT FROM kl.page AND NOT bl.tuple IS DISTINCT FROM
kl.tuple AND NOT bl.virtualxid IS DISTINCT FROM kl.virtualxid AND NOT
bl.transactionid IS DISTINCT FROM kl.transactionid AND NOT bl.classid IS
DISTINCT FROM kl.classid AND NOT bl.objid IS DISTINCT FROM kl.objid AND
NOT bl.objsubid IS DISTINCT FROM kl.objsubid AND bl.pid <> kl.pid
JOIN pg_stat_activity ka ON kl.pid = ka.pid
WHERE kl.granted AND NOT bl.granted
ORDER BY a.query_start;
```
- Sessions Statistics
```
CREATE OR REPLACE VIEW public.sessions AS
WITH proctab AS (
SELECT pg_proctab.pid,
CASE
WHEN pg_proctab.state::text = 'R'::bpchar::text
THEN 'running'::text
WHEN pg_proctab.state::text = 'D'::bpchar::text
THEN 'sleep-io'::text
WHEN pg_proctab.state::text = 'S'::bpchar::text
THEN 'sleep-waiting'::text
WHEN pg_proctab.state::text = 'Z'::bpchar::text
THEN 'zombie'::text
WHEN pg_proctab.state::text = 'T'::bpchar::text
THEN 'stopped'::text
ELSE NULL::text
END AS proc_state,
pg_proctab.ppid,
pg_proctab.utime,
pg_proctab.stime,
pg_proctab.vsize,
pg_proctab.rss,
pg_proctab.processor,
pg_proctab.rchar,
pg_proctab.wchar,
pg_proctab.syscr,
pg_proctab.syscw,
pg_proctab.reads,
pg_proctab.writes,
pg_proctab.cwrites
FROM pg_proctab() pg_proctab(pid, comm, fullcomm, state, ppid, pgrp,
session, tty_nr, tpgid, flags, minflt, cminflt, majflt, cmajflt,
utime, stime, cutime, cstime, priority, nice, num_threads,
itrealvalue, starttime, vsize, rss, exit_signal, processor,
rt_priority, policy, delayacct_blkio_ticks, uid, username, rchar,
wchar, syscr, syscw, reads, writes, cwrites)
), stat_activity AS (
SELECT pg_stat_activity.datname,
pg_stat_activity.pid,
pg_stat_activity.usename,
CASE
WHEN pg_stat_activity.query IS NULL THEN 'no query'::text
WHEN pg_stat_activity.query IS NOT NULL AND
pg_stat_activity.state = 'idle'::text THEN 'no query'::text
ELSE regexp_replace(pg_stat_activity.query, '[\n\r]+'::text,
' '::text, 'g'::text)
END AS query
FROM pg_stat_activity
)
SELECT stat.datname::name AS db,
stat.usename::name AS username,
stat.pid,
proc.proc_state::text AS state,
('"'::text || stat.query) || '"'::text AS query,
(proc.utime/1000)::bigint AS session_usertime,
(proc.stime/1000)::bigint AS session_systemtime,
proc.vsize AS session_virtual_memory_size,
proc.rss AS session_resident_memory_size,
proc.processor AS session_processor_number,
proc.rchar AS session_bytes_read,
proc.rchar-proc.reads AS session_logical_bytes_read,
proc.wchar AS session_bytes_written,
proc.wchar-proc.writes AS session_logical_bytes_writes,
proc.syscr AS session_read_io,
proc.syscw AS session_write_io,
proc.reads AS session_physical_reads,
proc.writes AS session_physical_writes,
proc.cwrites AS session_cancel_writes
FROM proctab proc,
stat_activity stat
WHERE proc.pid = stat.pid;
```

View File

@ -0,0 +1,275 @@
package postgresql_extensible
import (
"bytes"
"database/sql"
"fmt"
"regexp"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/lib/pq"
)
type Postgresql struct {
Address string
Databases []string
OrderedColumns []string
AllColumns []string
AdditionalTags []string
sanitizedAddress string
Query []struct {
Sqlquery string
Version int
Withdbname bool
Tagvalue string
}
}
type query []struct {
Sqlquery string
Version int
Withdbname bool
Tagvalue string
}
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var sampleConfig = `
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
#
# All connection parameters are optional. #
# Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying
# to grab metrics for.
#
address = "host=localhost user=postgres sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered.
# databases = ["app_production", "testing"]
#
# Define the toml config where the sql queries are stored
# New queries can be added, if the withdbname is set to true and there is no databases defined
# in the 'databases field', the sql query is ended by a 'is not null' in order to make the query
# succeed.
# Example :
# The sqlquery : "SELECT * FROM pg_stat_database where datname" become "SELECT * FROM pg_stat_database where datname IN ('postgres', 'pgbench')"
# because the databases variable was set to ['postgres', 'pgbench' ] and the withdbname was true.
# Be careful that if the withdbname is set to false you d'ont have to define the where clause (aka with the dbname)
# the tagvalue field is used to define custom tags (separated by comas)
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database"
version=901
withdbname=false
tagvalue=""
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_bgwriter"
version=901
withdbname=false
tagvalue=""
`
func (p *Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Description() string {
return "Read metrics from one or many postgresql servers"
}
func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns
}
var localhost = "host=localhost sslmode=disable"
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var sql_query string
var query_addon string
var db_version int
var query string
var tag_value string
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}
db, err := sql.Open("postgres", p.Address)
if err != nil {
return err
}
defer db.Close()
// Retreiving the database version
query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'`
err = db.QueryRow(query).Scan(&db_version)
if err != nil {
return err
}
// We loop in order to process each query
// Query is not run if Database version does not match the query version.
for i := range p.Query {
sql_query = p.Query[i].Sqlquery
tag_value = p.Query[i].Tagvalue
if p.Query[i].Withdbname {
if len(p.Databases) != 0 {
query_addon = fmt.Sprintf(` IN ('%s')`,
strings.Join(p.Databases, "','"))
} else {
query_addon = " is not null"
}
} else {
query_addon = ""
}
sql_query += query_addon
if p.Query[i].Version <= db_version {
rows, err := db.Query(sql_query)
if err != nil {
return err
}
defer rows.Close()
// grab the column information from the result
p.OrderedColumns, err = rows.Columns()
if err != nil {
return err
} else {
for _, v := range p.OrderedColumns {
p.AllColumns = append(p.AllColumns, v)
}
}
p.AdditionalTags = nil
if tag_value != "" {
tag_list := strings.Split(tag_value, ",")
for t := range tag_list {
p.AdditionalTags = append(p.AdditionalTags, tag_list[t])
}
}
for rows.Next() {
err = p.accRow(rows, acc)
if err != nil {
return err
}
}
}
}
return nil
}
type scanner interface {
Scan(dest ...interface{}) error
}
var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?")
func (p *Postgresql) SanitizedAddress() (_ string, err error) {
var canonicalizedAddress string
if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") {
canonicalizedAddress, err = pq.ParseURL(p.Address)
if err != nil {
return p.sanitizedAddress, err
}
} else {
canonicalizedAddress = p.Address
}
p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "")
return p.sanitizedAddress, err
}
func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error {
var columnVars []interface{}
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range p.OrderedColumns {
columnMap[column] = new(interface{})
}
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
}
// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)
if err != nil {
return err
}
if columnMap["datname"] != nil {
// extract the database name from the column map
dbnameChars := (*columnMap["datname"]).([]uint8)
for i := 0; i < len(dbnameChars); i++ {
dbname.WriteString(string(dbnameChars[i]))
}
} else {
dbname.WriteString("postgres")
}
var tagAddress string
tagAddress, err = p.SanitizedAddress()
if err != nil {
return err
}
// Process the additional tags
tags := map[string]string{}
tags["server"] = tagAddress
tags["db"] = dbname.String()
var isATag int
fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
//if !ignore && *val != "" {
if !ignore {
isATag = 0
for tag := range p.AdditionalTags {
if col == p.AdditionalTags[tag] {
isATag = 1
value_type_p := fmt.Sprintf(`%T`, *val)
if value_type_p == "[]uint8" {
tags[col] = fmt.Sprintf(`%s`, *val)
} else if value_type_p == "int64" {
tags[col] = fmt.Sprintf(`%v`, *val)
}
}
}
if isATag == 0 {
fields[col] = *val
}
}
}
acc.AddFields("postgresql", fields, tags)
return nil
}
func init() {
inputs.Add("postgresql_extensible", func() telegraf.Input {
return &Postgresql{}
})
}

View File

@ -0,0 +1,98 @@
package postgresql_extensible
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPostgresqlGeneratesMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
Databases: []string{"postgres"},
Query: query{
{Sqlquery: "select * from pg_stat_database",
Version: 901,
Withdbname: false,
Tagvalue: ""},
},
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
availableColumns := make(map[string]bool)
for _, col := range p.AllColumns {
availableColumns[col] = true
}
intMetrics := []string{
"xact_commit",
"xact_rollback",
"blks_read",
"blks_hit",
"tup_returned",
"tup_fetched",
"tup_inserted",
"tup_updated",
"tup_deleted",
"conflicts",
"temp_files",
"temp_bytes",
"deadlocks",
"numbackends",
}
floatMetrics := []string{
"blk_read_time",
"blk_write_time",
}
metricsCounted := 0
for _, metric := range intMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasIntField("postgresql", metric))
metricsCounted++
}
}
for _, metric := range floatMetrics {
_, ok := availableColumns[metric]
if ok {
assert.True(t, acc.HasFloatField("postgresql", metric))
metricsCounted++
}
}
assert.True(t, metricsCounted > 0)
assert.Equal(t, len(availableColumns)-len(p.IgnoredColumns()), metricsCounted)
}
func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
p := &Postgresql{
Address: fmt.Sprintf("host=%s user=postgres sslmode=disable",
testutil.GetLocalHost()),
}
var acc testutil.Accumulator
err := p.Gather(&acc)
require.NoError(t, err)
for col := range p.IgnoredColumns() {
assert.False(t, acc.HasMeasurement(col))
}
}

View File

@ -1,11 +1,13 @@
package prometheus package prometheus
import ( import (
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -13,18 +15,28 @@ import (
type Prometheus struct { type Prometheus struct {
Urls []string Urls []string
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`
} }
var sampleConfig = ` var sampleConfig = `
## An array of urls to scrape metrics from. ## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"] urls = ["http://localhost:9100/metrics"]
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Use bearer token for authorization
# bearer_token = /path/to/bearer/token
` `
func (r *Prometheus) SampleConfig() string { func (p *Prometheus) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (r *Prometheus) Description() string { func (p *Prometheus) Description() string {
return "Read metrics from one or many prometheus clients" return "Read metrics from one or many prometheus clients"
} }
@ -32,16 +44,16 @@ var ErrProtocolError = errors.New("prometheus protocol error")
// Reads stats from all configured servers accumulates stats. // Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (g *Prometheus) Gather(acc telegraf.Accumulator) error { func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error var outerr error
for _, serv := range g.Urls { for _, serv := range p.Urls {
wg.Add(1) wg.Add(1)
go func(serv string) { go func(serv string) {
defer wg.Done() defer wg.Done()
outerr = g.gatherURL(serv, acc) outerr = p.gatherURL(serv, acc)
}(serv) }(serv)
} }
@ -59,9 +71,34 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error { func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
collectDate := time.Now() collectDate := time.Now()
resp, err := client.Get(url) var req, err = http.NewRequest("GET", url, nil)
req.Header = make(http.Header)
var token []byte
var resp *http.Response
var rt http.RoundTripper = &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: p.InsecureSkipVerify,
},
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
if p.BearerToken != "" {
token, err = ioutil.ReadFile(p.BearerToken)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+string(token))
}
resp, err = rt.RoundTrip(req)
if err != nil { if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err) return fmt.Errorf("error making HTTP request to %s: %s", url, err)
} }

View File

@ -492,12 +492,12 @@ Note: the plugin will add instance name as tag *instance*
# oid attribute is useless # oid attribute is useless
# SNMP SUBTABLES # SNMP SUBTABLES
[[plugins.snmp.subtable]] [[inputs.snmp.subtable]]
name = "bytes_recv" name = "bytes_recv"
oid = ".1.3.6.1.2.1.31.1.1.1.6" oid = ".1.3.6.1.2.1.31.1.1.1.6"
unit = "octets" unit = "octets"
[[plugins.snmp.subtable]] [[inputs.snmp.subtable]]
name = "bytes_send" name = "bytes_send"
oid = ".1.3.6.1.2.1.31.1.1.1.10" oid = ".1.3.6.1.2.1.31.1.1.1.10"
unit = "octets" unit = "octets"
@ -505,10 +505,10 @@ Note: the plugin will add instance name as tag *instance*
#### Configuration notes #### Configuration notes
- In **plugins.snmp.table** section, the `oid` attribute is useless if - In **inputs.snmp.table** section, the `oid` attribute is useless if
the `sub_tables` attributes is defined the `sub_tables` attributes is defined
- In **plugins.snmp.subtable** section, you can put a name from `snmptranslate_file` - In **inputs.snmp.subtable** section, you can put a name from `snmptranslate_file`
as `oid` attribute instead of a valid OID as `oid` attribute instead of a valid OID
### Measurements & Fields: ### Measurements & Fields:

View File

@ -4,7 +4,6 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -308,11 +307,10 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
return err return err
} else { } else {
for _, line := range strings.Split(string(data), "\n") { for _, line := range strings.Split(string(data), "\n") {
oidsRegEx := regexp.MustCompile(`([^\t]*)\t*([^\t]*)`) oids := strings.Fields(string(line))
oids := oidsRegEx.FindStringSubmatch(string(line)) if len(oids) == 2 && oids[1] != "" {
if oids[2] != "" { oid_name := oids[0]
oid_name := oids[1] oid := oids[1]
oid := oids[2]
fillnode(s.initNode, oid_name, strings.Split(string(oid), ".")) fillnode(s.initNode, oid_name, strings.Split(string(oid), "."))
s.nameToOid[oid_name] = oid s.nameToOid[oid_name] = oid
} }

View File

@ -283,30 +283,75 @@ EXEC sp_executesql @DynamicPivotQuery;
const sqlMemoryClerk string = `SET NOCOUNT ON; const sqlMemoryClerk string = `SET NOCOUNT ON;
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
DECLARE @w TABLE (ClerkCategory nvarchar(64) NOT NULL, UsedPercent decimal(9,2), UsedBytes bigint) DECLARE @sqlVers numeric(4,2)
INSERT @w (ClerkCategory, UsedPercent, UsedBytes) SELECT @sqlVers = LEFT(CAST(SERVERPROPERTY('productversion') as varchar), 4)
SELECT ClerkCategory
, UsedPercent = SUM(UsedPercent)
, UsedBytes = SUM(UsedBytes)
FROM
(
SELECT ClerkCategory = CASE MC.[type]
WHEN 'MEMORYCLERK_SQLBUFFERPOOL' THEN 'Buffer pool'
WHEN 'CACHESTORE_SQLCP' THEN 'Cache (sql plans)'
WHEN 'CACHESTORE_OBJCP' THEN 'Cache (objects)'
ELSE 'Other' END
, SUM(pages_kb * 1024) AS UsedBytes
, Cast(100 * Sum(pages_kb)*1.0/(Select Sum(pages_kb) From sys.dm_os_memory_clerks) as Decimal(7, 4)) UsedPercent
FROM sys.dm_os_memory_clerks MC
WHERE pages_kb > 0
GROUP BY CASE MC.[type]
WHEN 'MEMORYCLERK_SQLBUFFERPOOL' THEN 'Buffer pool'
WHEN 'CACHESTORE_SQLCP' THEN 'Cache (sql plans)'
WHEN 'CACHESTORE_OBJCP' THEN 'Cache (objects)'
ELSE 'Other' END
) as T
GROUP BY ClerkCategory
IF OBJECT_ID('tempdb..#clerk') IS NOT NULL
DROP TABLE #clerk;
CREATE TABLE #clerk (
ClerkCategory nvarchar(64) NOT NULL,
UsedPercent decimal(9,2),
UsedBytes bigint
);
DECLARE @DynamicClerkQuery AS NVARCHAR(MAX)
IF @sqlVers < 11
BEGIN
SET @DynamicClerkQuery = N'
INSERT #clerk (ClerkCategory, UsedPercent, UsedBytes)
SELECT ClerkCategory
, UsedPercent = SUM(UsedPercent)
, UsedBytes = SUM(UsedBytes)
FROM
(
SELECT ClerkCategory = CASE MC.[type]
WHEN ''MEMORYCLERK_SQLBUFFERPOOL'' THEN ''Buffer pool''
WHEN ''CACHESTORE_SQLCP'' THEN ''Cache (sql plans)''
WHEN ''CACHESTORE_OBJCP'' THEN ''Cache (objects)''
ELSE ''Other'' END
, SUM((single_pages_kb + multi_pages_kb) * 1024) AS UsedBytes
, Cast(100 * Sum((single_pages_kb + multi_pages_kb))*1.0/(Select Sum((single_pages_kb + multi_pages_kb)) From sys.dm_os_memory_clerks) as Decimal(7, 4)) UsedPercent
FROM sys.dm_os_memory_clerks MC
WHERE (single_pages_kb + multi_pages_kb) > 0
GROUP BY CASE MC.[type]
WHEN ''MEMORYCLERK_SQLBUFFERPOOL'' THEN ''Buffer pool''
WHEN ''CACHESTORE_SQLCP'' THEN ''Cache (sql plans)''
WHEN ''CACHESTORE_OBJCP'' THEN ''Cache (objects)''
ELSE ''Other'' END
) as T
GROUP BY ClerkCategory;
'
END
ELSE
BEGIN
SET @DynamicClerkQuery = N'
INSERT #clerk (ClerkCategory, UsedPercent, UsedBytes)
SELECT ClerkCategory
, UsedPercent = SUM(UsedPercent)
, UsedBytes = SUM(UsedBytes)
FROM
(
SELECT ClerkCategory = CASE MC.[type]
WHEN ''MEMORYCLERK_SQLBUFFERPOOL'' THEN ''Buffer pool''
WHEN ''CACHESTORE_SQLCP'' THEN ''Cache (sql plans)''
WHEN ''CACHESTORE_OBJCP'' THEN ''Cache (objects)''
ELSE ''Other'' END
, SUM(pages_kb * 1024) AS UsedBytes
, Cast(100 * Sum(pages_kb)*1.0/(Select Sum(pages_kb) From sys.dm_os_memory_clerks) as Decimal(7, 4)) UsedPercent
FROM sys.dm_os_memory_clerks MC
WHERE pages_kb > 0
GROUP BY CASE MC.[type]
WHEN ''MEMORYCLERK_SQLBUFFERPOOL'' THEN ''Buffer pool''
WHEN ''CACHESTORE_SQLCP'' THEN ''Cache (sql plans)''
WHEN ''CACHESTORE_OBJCP'' THEN ''Cache (objects)''
ELSE ''Other'' END
) as T
GROUP BY ClerkCategory;
'
END
EXEC sp_executesql @DynamicClerkQuery;
SELECT SELECT
-- measurement -- measurement
measurement measurement
@ -325,7 +370,7 @@ SELECT measurement = 'Memory breakdown (%)'
, [Cache (objects)] = ISNULL(ROUND([Cache (objects)], 1), 0) , [Cache (objects)] = ISNULL(ROUND([Cache (objects)], 1), 0)
, [Cache (sql plans)] = ISNULL(ROUND([Cache (sql plans)], 1), 0) , [Cache (sql plans)] = ISNULL(ROUND([Cache (sql plans)], 1), 0)
, [Other] = ISNULL(ROUND([Other], 1), 0) , [Other] = ISNULL(ROUND([Other], 1), 0)
FROM (SELECT ClerkCategory, UsedPercent FROM @w) as G1 FROM (SELECT ClerkCategory, UsedPercent FROM #clerk) as G1
PIVOT PIVOT
( (
SUM(UsedPercent) SUM(UsedPercent)
@ -339,7 +384,7 @@ SELECT measurement = 'Memory breakdown (bytes)'
, [Cache (objects)] = ISNULL(ROUND([Cache (objects)], 1), 0) , [Cache (objects)] = ISNULL(ROUND([Cache (objects)], 1), 0)
, [Cache (sql plans)] = ISNULL(ROUND([Cache (sql plans)], 1), 0) , [Cache (sql plans)] = ISNULL(ROUND([Cache (sql plans)], 1), 0)
, [Other] = ISNULL(ROUND([Other], 1), 0) , [Other] = ISNULL(ROUND([Other], 1), 0)
FROM (SELECT ClerkCategory, UsedBytes FROM @w) as G2 FROM (SELECT ClerkCategory, UsedBytes FROM #clerk) as G2
PIVOT PIVOT
( (
SUM(UsedBytes) SUM(UsedBytes)
@ -698,7 +743,7 @@ IF OBJECT_ID('tempdb..#Databases') IS NOT NULL
CREATE TABLE #Databases CREATE TABLE #Databases
( (
Measurement nvarchar(64) NOT NULL, Measurement nvarchar(64) NOT NULL,
DatabaseName nvarchar(64) NOT NULL, DatabaseName nvarchar(128) NOT NULL,
Value tinyint NOT NULL Value tinyint NOT NULL
Primary Key(DatabaseName, Measurement) Primary Key(DatabaseName, Measurement)
); );

View File

@ -21,6 +21,10 @@
## convert measurement names, "." to "_" and "-" to "__" ## convert measurement names, "." to "_" and "-" to "__"
convert_names = true convert_names = true
## Parses tags in DataDog's dogstatsd format
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false
## Statsd data translation templates, more info can be read here: ## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [ # templates = [
@ -155,6 +159,7 @@ per-measurement in the calculation of percentiles. Raising this limit increases
the accuracy of percentiles but also increases the memory usage and cpu time. the accuracy of percentiles but also increases the memory usage and cpu time.
- **templates** []string: Templates for transforming statsd buckets into influx - **templates** []string: Templates for transforming statsd buckets into influx
measurements and tags. measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
### Statsd bucket -> InfluxDB line-protocol Templates ### Statsd bucket -> InfluxDB line-protocol Templates
@ -198,4 +203,4 @@ mem.cached.localhost:256|g
``` ```
There are many more options available, There are many more options available,
[More details can be found here](https://github.com/influxdata/influxdb/tree/master/services/graphite#templates) [More details can be found here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite)

View File

@ -21,11 +21,15 @@ const (
UDP_PACKET_SIZE int = 1500 UDP_PACKET_SIZE int = 1500
defaultFieldName = "value" defaultFieldName = "value"
defaultSeparator = "_"
) )
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
var prevInstance *Statsd
type Statsd struct { type Statsd struct {
// Address & Port to serve from // Address & Port to serve from
ServiceAddress string ServiceAddress string
@ -45,11 +49,18 @@ type Statsd struct {
DeleteTimings bool DeleteTimings bool
ConvertNames bool ConvertNames bool
// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string
// This flag enables parsing of tags in the dogstatsd extention to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool
// UDPPacketSize is the size of the read packets for the server listening // UDPPacketSize is the size of the read packets for the server listening
// for statsd UDP packets. This will default to 1500 bytes. // for statsd UDP packets. This will default to 1500 bytes.
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
sync.Mutex sync.Mutex
wg sync.WaitGroup
// Channel for all incoming statsd packets // Channel for all incoming statsd packets
in chan []byte in chan []byte
@ -65,23 +76,8 @@ type Statsd struct {
// bucket -> influx templates // bucket -> influx templates
Templates []string Templates []string
}
func NewStatsd() *Statsd { listener *net.UDPConn
s := Statsd{}
// Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.ConvertNames = true
s.UDPPacketSize = UDP_PACKET_SIZE
return &s
} }
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate> // One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
@ -140,8 +136,12 @@ const sampleConfig = `
## Percentiles to calculate for timing & histogram stats ## Percentiles to calculate for timing & histogram stats
percentiles = [90] percentiles = [90]
## convert measurement names, "." to "_" and "-" to "__" ## separator to use between elements of a statsd metric
convert_names = true metric_separator = "_"
## Parses tags in the datadog statsd format
## http://docs.datadoghq.com/guides/dogstatsd/
parse_data_dog_tags = false
## Statsd data translation templates, more info can be read here: ## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
@ -231,28 +231,48 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
// Make data structures // Make data structures
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages) s.in = make(chan []byte, s.AllowedPendingMessages)
if prevInstance == nil {
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings) s.timings = make(map[string]cachedtimings)
} else {
s.gauges = prevInstance.gauges
s.counters = prevInstance.counters
s.sets = prevInstance.sets
s.timings = prevInstance.timings
}
if s.ConvertNames {
log.Printf("WARNING statsd: convert_names config option is deprecated," +
" please use metric_separator instead")
}
if s.MetricSeparator == "" {
s.MetricSeparator = defaultSeparator
}
s.wg.Add(2)
// Start the UDP listener // Start the UDP listener
go s.udpListen() go s.udpListen()
// Start the line parser // Start the line parser
go s.parser() go s.parser()
log.Printf("Started the statsd service on %s\n", s.ServiceAddress) log.Printf("Started the statsd service on %s\n", s.ServiceAddress)
prevInstance = s
return nil return nil
} }
// udpListen starts listening for udp packets on the configured port. // udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error { func (s *Statsd) udpListen() error {
defer s.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
listener, err := net.ListenUDP("udp", address) s.listener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
defer listener.Close() log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
log.Println("Statsd listener listening on: ", listener.LocalAddr().String())
for { for {
select { select {
@ -260,9 +280,10 @@ func (s *Statsd) udpListen() error {
return nil return nil
default: default:
buf := make([]byte, s.UDPPacketSize) buf := make([]byte, s.UDPPacketSize)
n, _, err := listener.ReadFromUDP(buf) n, _, err := s.listener.ReadFromUDP(buf)
if err != nil { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR READ: %s\n", err.Error())
continue
} }
select { select {
@ -278,6 +299,7 @@ func (s *Statsd) udpListen() error {
// packet into statsd strings and then calls parseStatsdLine, which parses a // packet into statsd strings and then calls parseStatsdLine, which parses a
// single statsd metric into a struct. // single statsd metric into a struct.
func (s *Statsd) parser() error { func (s *Statsd) parser() error {
defer s.wg.Done()
for { for {
select { select {
case <-s.done: case <-s.done:
@ -300,6 +322,43 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
lineTags := make(map[string]string)
if s.ParseDataDogTags {
recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
// users.online:1|c|#sometagwithnovalue
// we will split on the pipe and remove any elements that are datadog
// tags, parse them, and rebuild the line sans the datadog tags
pipesplit := strings.Split(line, "|")
for _, segment := range pipesplit {
if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma separated
tagstr := segment[1:]
tags := strings.Split(tagstr, ",")
for _, tag := range tags {
ts := strings.Split(tag, ":")
var k, v string
switch len(ts) {
case 1:
// just a tag
k = ts[0]
v = ""
case 2:
k = ts[0]
v = ts[1]
}
if k != "" {
lineTags[k] = v
}
}
} else {
recombinedSegments = append(recombinedSegments, segment)
}
}
line = strings.Join(recombinedSegments, "|")
}
// Validate splitting the line on ":" // Validate splitting the line on ":"
bits := strings.Split(line, ":") bits := strings.Split(line, ":")
if len(bits) < 2 { if len(bits) < 2 {
@ -397,6 +456,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.tags["metric_type"] = "histogram" m.tags["metric_type"] = "histogram"
} }
if len(lineTags) > 0 {
for k, v := range lineTags {
m.tags[k] = v
}
}
// Make a unique key for the measurement name/tags // Make a unique key for the measurement name/tags
var tg []string var tg []string
for k, v := range m.tags { for k, v := range m.tags {
@ -431,7 +496,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
var field string var field string
name := bucketparts[0] name := bucketparts[0]
p, err := graphite.NewGraphiteParser(".", s.Templates, nil) p, err := graphite.NewGraphiteParser(s.MetricSeparator, s.Templates, nil)
if err == nil { if err == nil {
p.DefaultTags = tags p.DefaultTags = tags
name, tags, field, _ = p.ApplyTemplate(name) name, tags, field, _ = p.ApplyTemplate(name)
@ -558,6 +623,8 @@ func (s *Statsd) Stop() {
defer s.Unlock() defer s.Unlock()
log.Println("Stopping the statsd service") log.Println("Stopping the statsd service")
close(s.done) close(s.done)
s.listener.Close()
s.wg.Wait()
close(s.in) close(s.in)
} }

View File

@ -8,9 +8,26 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func NewTestStatsd() *Statsd {
s := Statsd{}
// Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.MetricSeparator = "_"
s.UDPPacketSize = UDP_PACKET_SIZE
return &s
}
// Invalid lines should return an error // Invalid lines should return an error
func TestParse_InvalidLines(t *testing.T) { func TestParse_InvalidLines(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
invalid_lines := []string{ invalid_lines := []string{
"i.dont.have.a.pipe:45g", "i.dont.have.a.pipe:45g",
"i.dont.have.a.colon45|c", "i.dont.have.a.colon45|c",
@ -34,7 +51,7 @@ func TestParse_InvalidLines(t *testing.T) {
// Invalid sample rates should be ignored and not applied // Invalid sample rates should be ignored and not applied
func TestParse_InvalidSampleRate(t *testing.T) { func TestParse_InvalidSampleRate(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
invalid_lines := []string{ invalid_lines := []string{
"invalid.sample.rate:45|c|0.1", "invalid.sample.rate:45|c|0.1",
"invalid.sample.rate.2:45|c|@foo", "invalid.sample.rate.2:45|c|@foo",
@ -84,9 +101,9 @@ func TestParse_InvalidSampleRate(t *testing.T) {
} }
} }
// Names should be parsed like . -> _ and - -> __ // Names should be parsed like . -> _
func TestParse_DefaultNameParsing(t *testing.T) { func TestParse_DefaultNameParsing(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
valid_lines := []string{ valid_lines := []string{
"valid:1|c", "valid:1|c",
"valid.foo-bar:11|c", "valid.foo-bar:11|c",
@ -108,7 +125,7 @@ func TestParse_DefaultNameParsing(t *testing.T) {
1, 1,
}, },
{ {
"valid_foo__bar", "valid_foo-bar",
11, 11,
}, },
} }
@ -123,7 +140,7 @@ func TestParse_DefaultNameParsing(t *testing.T) {
// Test that template name transformation works // Test that template name transformation works
func TestParse_Template(t *testing.T) { func TestParse_Template(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{ s.Templates = []string{
"measurement.measurement.host.service", "measurement.measurement.host.service",
} }
@ -165,7 +182,7 @@ func TestParse_Template(t *testing.T) {
// Test that template filters properly // Test that template filters properly
func TestParse_TemplateFilter(t *testing.T) { func TestParse_TemplateFilter(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{ s.Templates = []string{
"cpu.idle.* measurement.measurement.host", "cpu.idle.* measurement.measurement.host",
} }
@ -207,7 +224,7 @@ func TestParse_TemplateFilter(t *testing.T) {
// Test that most specific template is chosen // Test that most specific template is chosen
func TestParse_TemplateSpecificity(t *testing.T) { func TestParse_TemplateSpecificity(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{ s.Templates = []string{
"cpu.* measurement.foo.host", "cpu.* measurement.foo.host",
"cpu.idle.* measurement.measurement.host", "cpu.idle.* measurement.measurement.host",
@ -245,7 +262,7 @@ func TestParse_TemplateSpecificity(t *testing.T) {
// Test that most specific template is chosen // Test that most specific template is chosen
func TestParse_TemplateFields(t *testing.T) { func TestParse_TemplateFields(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{ s.Templates = []string{
"* measurement.measurement.field", "* measurement.measurement.field",
} }
@ -359,7 +376,7 @@ func TestParse_Fields(t *testing.T) {
// Test that tags within the bucket are parsed correctly // Test that tags within the bucket are parsed correctly
func TestParse_Tags(t *testing.T) { func TestParse_Tags(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
tests := []struct { tests := []struct {
bucket string bucket string
@ -410,9 +427,87 @@ func TestParse_Tags(t *testing.T) {
} }
} }
// Test that DataDog tags are parsed
func TestParse_DataDogTags(t *testing.T) {
s := NewTestStatsd()
s.ParseDataDogTags = true
lines := []string{
"my_counter:1|c|#host:localhost,environment:prod",
"my_gauge:10.1|g|#live",
"my_set:1|s|#host:localhost",
"my_timer:3|ms|@0.1|#live,host:localhost",
}
testTags := map[string]map[string]string{
"my_counter": map[string]string{
"host": "localhost",
"environment": "prod",
},
"my_gauge": map[string]string{
"live": "",
},
"my_set": map[string]string{
"host": "localhost",
},
"my_timer": map[string]string{
"live": "",
"host": "localhost",
},
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
sourceTags := map[string]map[string]string{
"my_gauge": tagsForItem(s.gauges),
"my_counter": tagsForItem(s.counters),
"my_set": tagsForItem(s.sets),
"my_timer": tagsForItem(s.timings),
}
for statName, tags := range testTags {
for k, v := range tags {
otherValue := sourceTags[statName][k]
if sourceTags[statName][k] != v {
t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue)
}
}
}
}
func tagsForItem(m interface{}) map[string]string {
switch m.(type) {
case map[string]cachedcounter:
for _, v := range m.(map[string]cachedcounter) {
return v.tags
}
case map[string]cachedgauge:
for _, v := range m.(map[string]cachedgauge) {
return v.tags
}
case map[string]cachedset:
for _, v := range m.(map[string]cachedset) {
return v.tags
}
case map[string]cachedtimings:
for _, v := range m.(map[string]cachedtimings) {
return v.tags
}
}
return nil
}
// Test that statsd buckets are parsed to measurement names properly // Test that statsd buckets are parsed to measurement names properly
func TestParseName(t *testing.T) { func TestParseName(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
tests := []struct { tests := []struct {
in_name string in_name string
@ -428,7 +523,7 @@ func TestParseName(t *testing.T) {
}, },
{ {
"foo.bar-baz", "foo.bar-baz",
"foo_bar__baz", "foo_bar-baz",
}, },
} }
@ -439,8 +534,8 @@ func TestParseName(t *testing.T) {
} }
} }
// Test with ConvertNames = false // Test with separator == "."
s.ConvertNames = false s.MetricSeparator = "."
tests = []struct { tests = []struct {
in_name string in_name string
@ -471,7 +566,7 @@ func TestParseName(t *testing.T) {
// Test that measurements with the same name, but different tags, are treated // Test that measurements with the same name, but different tags, are treated
// as different outputs // as different outputs
func TestParse_MeasurementsWithSameName(t *testing.T) { func TestParse_MeasurementsWithSameName(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
// Test that counters work // Test that counters work
valid_lines := []string{ valid_lines := []string{
@ -529,8 +624,8 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
"valid.multiple.mixed:1|c:1|ms:2|s:1|g", "valid.multiple.mixed:1|c:1|ms:2|s:1|g",
} }
s_single := NewStatsd() s_single := NewTestStatsd()
s_multiple := NewStatsd() s_multiple := NewTestStatsd()
for _, line := range single_lines { for _, line := range single_lines {
err := s_single.parseStatsdLine(line) err := s_single.parseStatsdLine(line)
@ -623,7 +718,7 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
// Valid lines should be parsed and their values should be cached // Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) { func TestParse_ValidLines(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
valid_lines := []string{ valid_lines := []string{
"valid:45|c", "valid:45|c",
"valid:45|s", "valid:45|s",
@ -642,7 +737,7 @@ func TestParse_ValidLines(t *testing.T) {
// Tests low-level functionality of gauges // Tests low-level functionality of gauges
func TestParse_Gauges(t *testing.T) { func TestParse_Gauges(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
// Test that gauge +- values work // Test that gauge +- values work
valid_lines := []string{ valid_lines := []string{
@ -708,7 +803,7 @@ func TestParse_Gauges(t *testing.T) {
// Tests low-level functionality of sets // Tests low-level functionality of sets
func TestParse_Sets(t *testing.T) { func TestParse_Sets(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
// Test that sets work // Test that sets work
valid_lines := []string{ valid_lines := []string{
@ -756,7 +851,7 @@ func TestParse_Sets(t *testing.T) {
// Tests low-level functionality of counters // Tests low-level functionality of counters
func TestParse_Counters(t *testing.T) { func TestParse_Counters(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
// Test that counters work // Test that counters work
valid_lines := []string{ valid_lines := []string{
@ -810,7 +905,7 @@ func TestParse_Counters(t *testing.T) {
// Tests low-level functionality of timings // Tests low-level functionality of timings
func TestParse_Timings(t *testing.T) { func TestParse_Timings(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Percentiles = []int{90} s.Percentiles = []int{90}
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -847,7 +942,7 @@ func TestParse_Timings(t *testing.T) {
// Tests low-level functionality of timings when multiple fields is enabled // Tests low-level functionality of timings when multiple fields is enabled
// and a measurement template has been defined which can parse field names // and a measurement template has been defined which can parse field names
func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{"measurement.field"} s.Templates = []string{"measurement.field"}
s.Percentiles = []int{90} s.Percentiles = []int{90}
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -896,7 +991,7 @@ func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
// but a measurement template hasn't been defined so we can't parse field names // but a measurement template hasn't been defined so we can't parse field names
// In this case the behaviour should be the same as normal behaviour // In this case the behaviour should be the same as normal behaviour
func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.Templates = []string{} s.Templates = []string{}
s.Percentiles = []int{90} s.Percentiles = []int{90}
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -944,7 +1039,7 @@ func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
} }
func TestParse_Timings_Delete(t *testing.T) { func TestParse_Timings_Delete(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.DeleteTimings = true s.DeleteTimings = true
fakeacc := &testutil.Accumulator{} fakeacc := &testutil.Accumulator{}
var err error var err error
@ -968,7 +1063,7 @@ func TestParse_Timings_Delete(t *testing.T) {
// Tests the delete_gauges option // Tests the delete_gauges option
func TestParse_Gauges_Delete(t *testing.T) { func TestParse_Gauges_Delete(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.DeleteGauges = true s.DeleteGauges = true
fakeacc := &testutil.Accumulator{} fakeacc := &testutil.Accumulator{}
var err error var err error
@ -994,7 +1089,7 @@ func TestParse_Gauges_Delete(t *testing.T) {
// Tests the delete_sets option // Tests the delete_sets option
func TestParse_Sets_Delete(t *testing.T) { func TestParse_Sets_Delete(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.DeleteSets = true s.DeleteSets = true
fakeacc := &testutil.Accumulator{} fakeacc := &testutil.Accumulator{}
var err error var err error
@ -1020,7 +1115,7 @@ func TestParse_Sets_Delete(t *testing.T) {
// Tests the delete_counters option // Tests the delete_counters option
func TestParse_Counters_Delete(t *testing.T) { func TestParse_Counters_Delete(t *testing.T) {
s := NewStatsd() s := NewTestStatsd()
s.DeleteCounters = true s.DeleteCounters = true
fakeacc := &testutil.Accumulator{} fakeacc := &testutil.Accumulator{}
var err error var err error

View File

@ -3,6 +3,7 @@ package udp_listener
import ( import (
"log" "log"
"net" "net"
"strings"
"sync" "sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -14,7 +15,9 @@ type UdpListener struct {
ServiceAddress string ServiceAddress string
UDPPacketSize int `toml:"udp_packet_size"` UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int AllowedPendingMessages int
sync.Mutex sync.Mutex
wg sync.WaitGroup
in chan []byte in chan []byte
done chan struct{} done chan struct{}
@ -23,6 +26,8 @@ type UdpListener struct {
// Keep the accumulator in this struct // Keep the accumulator in this struct
acc telegraf.Accumulator acc telegraf.Accumulator
listener *net.UDPConn
} }
const UDP_PACKET_SIZE int = 1500 const UDP_PACKET_SIZE int = 1500
@ -76,6 +81,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
u.in = make(chan []byte, u.AllowedPendingMessages) u.in = make(chan []byte, u.AllowedPendingMessages)
u.done = make(chan struct{}) u.done = make(chan struct{})
u.wg.Add(2)
go u.udpListen() go u.udpListen()
go u.udpParser() go u.udpParser()
@ -84,21 +90,22 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
} }
func (u *UdpListener) Stop() { func (u *UdpListener) Stop() {
u.Lock()
defer u.Unlock()
close(u.done) close(u.done)
u.listener.Close()
u.wg.Wait()
close(u.in) close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress) log.Println("Stopped UDP listener service on ", u.ServiceAddress)
} }
func (u *UdpListener) udpListen() error { func (u *UdpListener) udpListen() error {
defer u.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
listener, err := net.ListenUDP("udp", address) u.listener, err = net.ListenUDP("udp", address)
if err != nil { if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err) log.Fatalf("ERROR: ListenUDP - %s", err)
} }
defer listener.Close() log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
log.Println("UDP server listening on: ", listener.LocalAddr().String())
for { for {
select { select {
@ -106,9 +113,10 @@ func (u *UdpListener) udpListen() error {
return nil return nil
default: default:
buf := make([]byte, u.UDPPacketSize) buf := make([]byte, u.UDPPacketSize)
n, _, err := listener.ReadFromUDP(buf) n, _, err := u.listener.ReadFromUDP(buf)
if err != nil { if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error()) log.Printf("ERROR: %s\n", err.Error())
continue
} }
select { select {
@ -121,6 +129,7 @@ func (u *UdpListener) udpListen() error {
} }
func (u *UdpListener) udpParser() error { func (u *UdpListener) udpParser() error {
defer u.wg.Done()
for { for {
select { select {
case <-u.done: case <-u.done:

View File

@ -32,6 +32,7 @@ func TestRunParser(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser() listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -58,6 +59,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewInfluxParser() listener.parser, _ = parsers.NewInfluxParser()
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -78,6 +80,7 @@ func TestRunParserGraphiteMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg
@ -98,6 +101,7 @@ func TestRunParserJSONMsg(t *testing.T) {
defer close(listener.done) defer close(listener.done)
listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.wg.Add(1)
go listener.udpParser() go listener.udpParser()
in <- testmsg in <- testmsg

View File

@ -212,22 +212,22 @@ func TestZfsGeneratesMetrics(t *testing.T) {
} }
z = &Zfs{KstatPath: testKstatPath} z = &Zfs{KstatPath: testKstatPath}
acc = testutil.Accumulator{} acc2 := testutil.Accumulator{}
err = z.Gather(&acc) err = z.Gather(&acc2)
require.NoError(t, err) require.NoError(t, err)
acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags) acc2.AssertContainsTaggedFields(t, "zfs", intMetrics, tags)
acc.Metrics = nil acc2.Metrics = nil
intMetrics = getKstatMetricsArcOnly() intMetrics = getKstatMetricsArcOnly()
//two pools, one metric //two pools, one metric
z = &Zfs{KstatPath: testKstatPath, KstatMetrics: []string{"arcstats"}} z = &Zfs{KstatPath: testKstatPath, KstatMetrics: []string{"arcstats"}}
acc = testutil.Accumulator{} acc3 := testutil.Accumulator{}
err = z.Gather(&acc) err = z.Gather(&acc3)
require.NoError(t, err) require.NoError(t, err)
acc.AssertContainsTaggedFields(t, "zfs", intMetrics, tags) acc3.AssertContainsTaggedFields(t, "zfs", intMetrics, tags)
err = os.RemoveAll(os.TempDir() + "/telegraf") err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err) require.NoError(t, err)

View File

@ -139,6 +139,9 @@ func (d *Datadog) authenticatedUrl() string {
func buildMetrics(m telegraf.Metric) (map[string]Point, error) { func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
ms := make(map[string]Point) ms := make(map[string]Point)
for k, v := range m.Fields() { for k, v := range m.Fields() {
if !verifyValue(v) {
continue
}
var p Point var p Point
if err := p.setValue(v); err != nil { if err := p.setValue(v); err != nil {
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
@ -160,6 +163,14 @@ func buildTags(mTags map[string]string) []string {
return tags return tags
} }
func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}
func (p *Point) setValue(v interface{}) error { func (p *Point) setValue(v interface{}) error {
switch d := v.(type) { switch d := v.(type) {
case int: case int:

View File

@ -152,14 +152,6 @@ func TestBuildPoint(t *testing.T) {
}, },
nil, nil,
}, },
{
testutil.TestMetric("11234.5", "test7"),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
11234.5,
},
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
},
} }
for _, tt := range tagtests { for _, tt := range tagtests {
pt, err := buildMetrics(tt.ptIn) pt, err := buildMetrics(tt.ptIn)
@ -175,3 +167,25 @@ func TestBuildPoint(t *testing.T) {
} }
} }
} }
func TestVerifyValue(t *testing.T) {
var tagtests = []struct {
ptIn telegraf.Metric
validMetric bool
}{
{
testutil.TestMetric(float32(11234.5), "test1"),
true,
},
{
testutil.TestMetric("11234.5", "test2"),
false,
},
}
for _, tt := range tagtests {
ok := verifyValue(tt.ptIn.Fields()["value"])
if tt.validMetric != ok {
t.Errorf("%s: verification failed\n", tt.ptIn.Name())
}
}
}

View File

@ -2,7 +2,7 @@
This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP. This plugin writes to [InfluxDB](https://www.influxdb.com) via HTTP or UDP.
Required parameters: ### Required parameters:
* `urls`: List of strings, this is for InfluxDB clustering * `urls`: List of strings, this is for InfluxDB clustering
support. On each flush interval, Telegraf will randomly choose one of the urls support. On each flush interval, Telegraf will randomly choose one of the urls
@ -10,3 +10,17 @@ to write to. Each URL should start with either `http://` or `udp://`
* `database`: The name of the database to write to. * `database`: The name of the database to write to.
### Optional parameters:
* `retention_policy`: Retention policy to write to.
* `precision`: Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". note: using "s" precision greatly improves InfluxDB compression.
* `timeout`: Write timeout (for the InfluxDB client), formatted as a string. If not provided, will default to 5s. 0s means no timeout (not recommended).
* `username`: Username for influxdb
* `password`: Password for influxdb
* `user_agent`: Set the user agent for HTTP POSTs (can be useful for log differentiation)
* `udp_payload`: Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
## Optional SSL Config
* `ssl_ca`: SSL CA
* `ssl_cert`: SSL CERT
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)

View File

@ -127,7 +127,7 @@ func (i *InfluxDB) Connect() error {
// Create Database if it doesn't exist // Create Database if it doesn't exist
_, e := c.Query(client.Query{ _, e := c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", i.Database), Command: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS \"%s\"", i.Database),
}) })
if e != nil { if e != nil {

View File

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -156,10 +155,7 @@ func (l *Librato) Description() string {
func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string { func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string {
// Use the GraphiteSerializer // Use the GraphiteSerializer
graphiteSerializer := graphite.GraphiteSerializer{} graphiteSerializer := graphite.GraphiteSerializer{}
serializedMetric := graphiteSerializer.SerializeBucketName(m, fieldName) return graphiteSerializer.SerializeBucketName(m, fieldName)
// Deal with slash characters:
return strings.Replace(serializedMetric, "/", "-", -1)
} }
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
@ -169,6 +165,9 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
Name: l.buildGaugeName(m, fieldName), Name: l.buildGaugeName(m, fieldName),
MeasureTime: m.Time().Unix(), MeasureTime: m.Time().Unix(),
} }
if !gauge.verifyValue(value) {
continue
}
if err := gauge.setValue(value); err != nil { if err := gauge.setValue(value); err != nil {
return gauges, fmt.Errorf("unable to extract value from Fields, %s\n", return gauges, fmt.Errorf("unable to extract value from Fields, %s\n",
err.Error()) err.Error())
@ -190,6 +189,14 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
return gauges, nil return gauges, nil
} }
func (g *Gauge) verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}
func (g *Gauge) setValue(v interface{}) error { func (g *Gauge) setValue(v interface{}) error {
switch d := v.(type) { switch d := v.(type) {
case int: case int:

View File

@ -139,12 +139,8 @@ func TestBuildGauge(t *testing.T) {
}, },
{ {
testutil.TestMetric("11234.5", "test7"), testutil.TestMetric("11234.5", "test7"),
&Gauge{ nil,
Name: "value1.test7.value", nil,
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5,
},
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
}, },
} }
@ -158,6 +154,9 @@ func TestBuildGauge(t *testing.T) {
t.Errorf("%s: expected an error (%s) but none returned", t.Errorf("%s: expected an error (%s) but none returned",
gt.ptIn.Name(), gt.err.Error()) gt.ptIn.Name(), gt.err.Error())
} }
if len(gauges) != 0 && gt.outGauge == nil {
t.Errorf("%s: unexpected gauge, %+v\n", gt.ptIn.Name(), gt.outGauge)
}
if len(gauges) == 0 { if len(gauges) == 0 {
continue continue
} }

View File

@ -4,12 +4,26 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"regexp"
"strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var (
sanitizedChars = strings.NewReplacer("/", "_", "@", "_", " ", "_", "-", "_", ".", "_")
// Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$")
// Prometheus labels must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
)
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
metrics map[string]*prometheus.UntypedVec metrics map[string]*prometheus.UntypedVec
@ -64,54 +78,82 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
for _, point := range metrics { for _, point := range metrics {
var labels []string
key := point.Name() key := point.Name()
key = sanitizedChars.Replace(key)
for k, _ := range point.Tags() { var labels []string
if len(k) > 0 { l := prometheus.Labels{}
for k, v := range point.Tags() {
k = sanitizedChars.Replace(k)
if len(k) == 0 {
continue
}
if !labelName.MatchString(k) {
continue
}
labels = append(labels, k) labels = append(labels, k)
} l[k] = v
} }
if _, ok := p.metrics[key]; !ok { for n, val := range point.Fields() {
p.metrics[key] = prometheus.NewUntypedVec( // Ignore string and bool fields.
switch val.(type) {
case string:
continue
case bool:
continue
}
// sanitize the measurement name
n = sanitizedChars.Replace(n)
var mname string
if n == "value" {
mname = key
} else {
mname = fmt.Sprintf("%s_%s", key, n)
}
// verify that it is a valid measurement name
if !metricName.MatchString(mname) {
continue
}
// Create a new metric if it hasn't been created yet.
if _, ok := p.metrics[mname]; !ok {
p.metrics[mname] = prometheus.NewUntypedVec(
prometheus.UntypedOpts{ prometheus.UntypedOpts{
Name: key, Name: mname,
Help: fmt.Sprintf("Telegraf collected point '%s'", key), Help: "Telegraf collected metric",
}, },
labels, labels,
) )
prometheus.MustRegister(p.metrics[key]) if err := prometheus.Register(p.metrics[mname]); err != nil {
log.Printf("prometheus_client: Metric failed to register with prometheus, %s", err)
continue
}
} }
l := prometheus.Labels{}
for tk, tv := range point.Tags() {
l[tk] = tv
}
for _, val := range point.Fields() {
switch val := val.(type) { switch val := val.(type) {
default:
log.Printf("Prometheus output, unsupported type. key: %s, type: %T\n",
key, val)
case int64: case int64:
m, err := p.metrics[key].GetMetricWith(l) m, err := p.metrics[mname].GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
key, l, err.Error()) mname, l, err.Error())
continue continue
} }
m.Set(float64(val)) m.Set(float64(val))
case float64: case float64:
m, err := p.metrics[key].GetMetricWith(l) m, err := p.metrics[mname].GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
key, l, err.Error()) mname, l, err.Error())
continue continue
} }
m.Set(val) m.Set(val)
default:
continue
} }
} }
} }

View File

@ -231,6 +231,7 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,
type template struct { type template struct {
tags []string tags []string
defaultTags map[string]string defaultTags map[string]string
greedyField bool
greedyMeasurement bool greedyMeasurement bool
separator string separator string
} }
@ -248,6 +249,8 @@ func NewTemplate(pattern string, defaultTags map[string]string, separator string
} }
if tag == "measurement*" { if tag == "measurement*" {
template.greedyMeasurement = true template.greedyMeasurement = true
} else if tag == "field*" {
template.greedyField = true
} }
} }
@ -265,7 +268,7 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
var ( var (
measurement []string measurement []string
tags = make(map[string]string) tags = make(map[string]string)
field string field []string
) )
// Set any default tags // Set any default tags
@ -273,6 +276,18 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
tags[k] = v tags[k] = v
} }
// See if an invalid combination has been specified in the template:
for _, tag := range t.tags {
if tag == "measurement*" {
t.greedyMeasurement = true
} else if tag == "field*" {
t.greedyField = true
}
}
if t.greedyField && t.greedyMeasurement {
return "", nil, "", fmt.Errorf("either 'field*' or 'measurement*' can be used in each template (but not both together): %q", strings.Join(t.tags, t.separator))
}
for i, tag := range t.tags { for i, tag := range t.tags {
if i >= len(fields) { if i >= len(fields) {
continue continue
@ -281,10 +296,10 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
if tag == "measurement" { if tag == "measurement" {
measurement = append(measurement, fields[i]) measurement = append(measurement, fields[i])
} else if tag == "field" { } else if tag == "field" {
if len(field) != 0 { field = append(field, fields[i])
return "", nil, "", fmt.Errorf("'field' can only be used once in each template: %q", line) } else if tag == "field*" {
} field = append(field, fields[i:]...)
field = fields[i] break
} else if tag == "measurement*" { } else if tag == "measurement*" {
measurement = append(measurement, fields[i:]...) measurement = append(measurement, fields[i:]...)
break break
@ -293,7 +308,7 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
} }
} }
return strings.Join(measurement, t.separator), tags, field, nil return strings.Join(measurement, t.separator), tags, strings.Join(field, t.separator), nil
} }
// matcher determines which template should be applied to a given metric // matcher determines which template should be applied to a given metric

View File

@ -94,6 +94,20 @@ func TestTemplateApply(t *testing.T) {
measurement: "cpu.load", measurement: "cpu.load",
tags: map[string]string{"zone": "us-west"}, tags: map[string]string{"zone": "us-west"},
}, },
{
test: "conjoined fields",
input: "prod.us-west.server01.cpu.util.idle.percent",
template: "env.zone.host.measurement.measurement.field*",
measurement: "cpu.util",
tags: map[string]string{"env": "prod", "zone": "us-west", "host": "server01"},
},
{
test: "multiple fields",
input: "prod.us-west.server01.cpu.util.idle.percent.free",
template: "env.zone.host.measurement.measurement.field.field.reading",
measurement: "cpu.util",
tags: map[string]string{"env": "prod", "zone": "us-west", "host": "server01", "reading": "free"},
},
} }
for _, test := range tests { for _, test := range tests {
@ -187,6 +201,12 @@ func TestParse(t *testing.T) {
template: "measurement", template: "measurement",
err: `field "cpu" time: strconv.ParseFloat: parsing "14199724z57825": invalid syntax`, err: `field "cpu" time: strconv.ParseFloat: parsing "14199724z57825": invalid syntax`,
}, },
{
test: "measurement* and field* (invalid)",
input: `prod.us-west.server01.cpu.util.idle.percent 99.99 1419972457825`,
template: "env.zone.host.measurement*.field*",
err: `either 'field*' or 'measurement*' can be used in each template (but not both together): "env.zone.host.measurement*.field*"`,
},
} }
for _, test := range tests { for _, test := range tests {
@ -574,15 +594,48 @@ func TestApplyTemplateField(t *testing.T) {
} }
} }
func TestApplyTemplateFieldError(t *testing.T) { func TestApplyTemplateMultipleFieldsTogether(t *testing.T) {
p, err := NewGraphiteParser("_", p, err := NewGraphiteParser("_",
[]string{"current.* measurement.field.field"}, nil) []string{"current.* measurement.measurement.field.field"}, nil)
assert.NoError(t, err) assert.NoError(t, err)
_, _, _, err = p.ApplyTemplate("current.users.logged_in") measurement, _, field, err := p.ApplyTemplate("current.users.logged_in.ssh")
if err == nil {
t.Errorf("Parser.ApplyTemplate unexpected result. got %s, exp %s", err, assert.Equal(t, "current_users", measurement)
"'field' can only be used once in each template: current.users.logged_in")
if field != "logged_in_ssh" {
t.Errorf("Parser.ApplyTemplate unexpected result. got %s, exp %s",
field, "logged_in_ssh")
}
}
func TestApplyTemplateMultipleFieldsApart(t *testing.T) {
p, err := NewGraphiteParser("_",
[]string{"current.* measurement.measurement.field.method.field"}, nil)
assert.NoError(t, err)
measurement, _, field, err := p.ApplyTemplate("current.users.logged_in.ssh.total")
assert.Equal(t, "current_users", measurement)
if field != "logged_in_total" {
t.Errorf("Parser.ApplyTemplate unexpected result. got %s, exp %s",
field, "logged_in_total")
}
}
func TestApplyTemplateGreedyField(t *testing.T) {
p, err := NewGraphiteParser("_",
[]string{"current.* measurement.measurement.field*"}, nil)
assert.NoError(t, err)
measurement, _, field, err := p.ApplyTemplate("current.users.logged_in")
assert.Equal(t, "current_users", measurement)
if field != "logged_in" {
t.Errorf("Parser.ApplyTemplate unexpected result. got %s, exp %s",
field, "logged_in")
} }
} }

View File

@ -0,0 +1,102 @@
package nagios
import (
"regexp"
"strings"
"time"
"github.com/influxdata/telegraf"
)
type NagiosParser struct {
MetricName string
DefaultTags map[string]string
}
// Got from Alignak
// https://github.com/Alignak-monitoring/alignak/blob/develop/alignak/misc/perfdata.py
var perfSplitRegExp, _ = regexp.Compile(`([^=]+=\S+)`)
var nagiosRegExp, _ = regexp.Compile(`^([^=]+)=([\d\.\-\+eE]+)([\w\/%]*);?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE:~@]+)?;?([\d\.\-\+eE]+)?;?([\d\.\-\+eE]+)?;?\s*`)
func (p *NagiosParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
return metrics[0], err
}
func (p *NagiosParser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
//> rta,host=absol,unit=ms critical=6000,min=0,value=0.332,warning=4000 1456374625003628099
//> pl,host=absol,unit=% critical=90,min=0,value=0,warning=80 1456374625003693967
func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
// Convert to string
out := string(buf)
// Prepare output for splitting
// Delete escaped pipes
out = strings.Replace(out, `\|`, "___PROTECT_PIPE___", -1)
// Split lines and get the first one
lines := strings.Split(out, "\n")
// Split output and perfdatas
data_splitted := strings.Split(lines[0], "|")
if len(data_splitted) <= 1 {
// No pipe == no perf data
return nil, nil
}
// Get perfdatas
perfdatas := data_splitted[1]
// Add escaped pipes
perfdatas = strings.Replace(perfdatas, "___PROTECT_PIPE___", `\|`, -1)
// Split perfs
unParsedPerfs := perfSplitRegExp.FindAllSubmatch([]byte(perfdatas), -1)
// Iterate on all perfs
for _, unParsedPerfs := range unParsedPerfs {
// Get metrics
// Trim perf
trimedPerf := strings.Trim(string(unParsedPerfs[0]), " ")
// Parse perf
perf := nagiosRegExp.FindAllSubmatch([]byte(trimedPerf), -1)
// Bad string
if len(perf) == 0 {
continue
}
if len(perf[0]) <= 2 {
continue
}
if perf[0][1] == nil || perf[0][2] == nil {
continue
}
fieldName := string(perf[0][1])
tags := make(map[string]string)
if perf[0][3] != nil {
tags["unit"] = string(perf[0][3])
}
fields := make(map[string]interface{})
fields["value"] = perf[0][2]
// TODO should we set empty field
// if metric if there is no data ?
if perf[0][4] != nil {
fields["warning"] = perf[0][4]
}
if perf[0][5] != nil {
fields["critical"] = perf[0][5]
}
if perf[0][6] != nil {
fields["min"] = perf[0][6]
}
if perf[0][7] != nil {
fields["max"] = perf[0][7]
}
// Create metric
metric, err := telegraf.NewMetric(fieldName, tags, fields, time.Now().UTC())
if err != nil {
return nil, err
}
// Add Metric
metrics = append(metrics, metric)
}
return metrics, nil
}

View File

@ -0,0 +1,89 @@
package nagios
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const validOutput1 = `PING OK - Packet loss = 0%, RTA = 0.30 ms|rta=0.298000ms;4000.000000;6000.000000;0.000000 pl=0%;80;90;0;100
This is a long output
with three lines
`
const validOutput2 = "TCP OK - 0.008 second response time on port 80|time=0.008457s;;;0.000000;10.000000"
const validOutput3 = "TCP OK - 0.008 second response time on port 80|time=0.008457"
const invalidOutput3 = "PING OK - Packet loss = 0%, RTA = 0.30 ms"
const invalidOutput4 = "PING OK - Packet loss = 0%, RTA = 0.30 ms| =3;;;; dgasdg =;;;; sff=;;;;"
func TestParseValidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}
// Output1
metrics, err := parser.Parse([]byte(validOutput1))
require.NoError(t, err)
assert.Len(t, metrics, 2)
// rta
assert.Equal(t, "rta", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.298),
"warning": float64(4000),
"critical": float64(6000),
"min": float64(0),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "ms"}, metrics[0].Tags())
// pl
assert.Equal(t, "pl", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0),
"warning": float64(80),
"critical": float64(90),
"min": float64(0),
"max": float64(100),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{"unit": "%"}, metrics[1].Tags())
// Output2
metrics, err = parser.Parse([]byte(validOutput2))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
"min": float64(0),
"max": float64(10),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"unit": "s"}, metrics[0].Tags())
// Output3
metrics, err = parser.Parse([]byte(validOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 1)
// time
assert.Equal(t, "time", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(0.008457),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
}
func TestParseInvalidOutput(t *testing.T) {
parser := NagiosParser{
MetricName: "nagios_test",
}
// invalidOutput3
metrics, err := parser.Parse([]byte(invalidOutput3))
require.NoError(t, err)
assert.Len(t, metrics, 0)
// invalidOutput4
metrics, err = parser.Parse([]byte(invalidOutput4))
require.NoError(t, err)
assert.Len(t, metrics, 0)
}

View File

@ -8,6 +8,8 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
) )
// ParserInput is an interface for input plugins that are able to parse // ParserInput is an interface for input plugins that are able to parse
@ -38,7 +40,7 @@ type Parser interface {
// Config is a struct that covers the data types needed for all parser types, // Config is a struct that covers the data types needed for all parser types,
// and can be used to instantiate _any_ of the parsers. // and can be used to instantiate _any_ of the parsers.
type Config struct { type Config struct {
// Dataformat can be one of: json, influx, graphite // Dataformat can be one of: json, influx, graphite, value, nagios
DataFormat string DataFormat string
// Separator only applied to Graphite data. // Separator only applied to Graphite data.
@ -48,9 +50,12 @@ type Config struct {
// TagKeys only apply to JSON data // TagKeys only apply to JSON data
TagKeys []string TagKeys []string
// MetricName only applies to JSON data. This will be the name of the measurement. // MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string MetricName string
// DataType only applies to value, this will be the type to parse value to
DataType string
// DefaultTags are the default tags that will be added to all parsed metrics. // DefaultTags are the default tags that will be added to all parsed metrics.
DefaultTags map[string]string DefaultTags map[string]string
} }
@ -63,8 +68,13 @@ func NewParser(config *Config) (Parser, error) {
case "json": case "json":
parser, err = NewJSONParser(config.MetricName, parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags) config.TagKeys, config.DefaultTags)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
case "influx": case "influx":
parser, err = NewInfluxParser() parser, err = NewInfluxParser()
case "nagios":
parser, err = NewNagiosParser()
case "graphite": case "graphite":
parser, err = NewGraphiteParser(config.Separator, parser, err = NewGraphiteParser(config.Separator,
config.Templates, config.DefaultTags) config.Templates, config.DefaultTags)
@ -87,6 +97,10 @@ func NewJSONParser(
return parser, nil return parser, nil
} }
func NewNagiosParser() (Parser, error) {
return &nagios.NagiosParser{}, nil
}
func NewInfluxParser() (Parser, error) { func NewInfluxParser() (Parser, error) {
return &influx.InfluxParser{}, nil return &influx.InfluxParser{}, nil
} }
@ -98,3 +112,15 @@ func NewGraphiteParser(
) (Parser, error) { ) (Parser, error) {
return graphite.NewGraphiteParser(separator, templates, defaultTags) return graphite.NewGraphiteParser(separator, templates, defaultTags)
} }
func NewValueParser(
metricName string,
dataType string,
defaultTags map[string]string,
) (Parser, error) {
return &value.ValueParser{
MetricName: metricName,
DataType: dataType,
DefaultTags: defaultTags,
}, nil
}

View File

@ -0,0 +1,68 @@
package value
import (
"bytes"
"fmt"
"strconv"
"time"
"github.com/influxdata/telegraf"
)
type ValueParser struct {
MetricName string
DataType string
DefaultTags map[string]string
}
func (v *ValueParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// separate out any fields in the buffer, ignore anything but the last.
values := bytes.Fields(buf)
if len(values) < 1 {
return []telegraf.Metric{}, nil
}
valueStr := string(values[len(values)-1])
var value interface{}
var err error
switch v.DataType {
case "", "int", "integer":
value, err = strconv.Atoi(valueStr)
case "float", "long":
value, err = strconv.ParseFloat(valueStr, 64)
case "str", "string":
value = valueStr
case "bool", "boolean":
value, err = strconv.ParseBool(valueStr)
}
if err != nil {
return nil, err
}
fields := map[string]interface{}{"value": value}
metric, err := telegraf.NewMetric(v.MetricName, v.DefaultTags,
fields, time.Now().UTC())
if err != nil {
return nil, err
}
return []telegraf.Metric{metric}, nil
}
func (v *ValueParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := v.Parse([]byte(line))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, fmt.Errorf("Can not parse the line: %s, for data format: value", line)
}
return metrics[0], nil
}
func (v *ValueParser) SetDefaultTags(tags map[string]string) {
v.DefaultTags = tags
}

View File

@ -0,0 +1,238 @@
package value
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseValidValues(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
metrics, err := parser.Parse([]byte("55"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": int64(55),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "float",
}
metrics, err = parser.Parse([]byte("64"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(64),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "string",
}
metrics, err = parser.Parse([]byte("foobar"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": "foobar",
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "boolean",
}
metrics, err = parser.Parse([]byte("true"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": true,
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
}
func TestParseMultipleValues(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
metrics, err := parser.Parse([]byte(`55
45
223
12
999
`))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": int64(999),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())
}
func TestParseLineValidValues(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
metric, err := parser.ParseLine("55")
assert.NoError(t, err)
assert.Equal(t, "value_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": int64(55),
}, metric.Fields())
assert.Equal(t, map[string]string{}, metric.Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "float",
}
metric, err = parser.ParseLine("64")
assert.NoError(t, err)
assert.Equal(t, "value_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": float64(64),
}, metric.Fields())
assert.Equal(t, map[string]string{}, metric.Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "string",
}
metric, err = parser.ParseLine("foobar")
assert.NoError(t, err)
assert.Equal(t, "value_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": "foobar",
}, metric.Fields())
assert.Equal(t, map[string]string{}, metric.Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "boolean",
}
metric, err = parser.ParseLine("true")
assert.NoError(t, err)
assert.Equal(t, "value_test", metric.Name())
assert.Equal(t, map[string]interface{}{
"value": true,
}, metric.Fields())
assert.Equal(t, map[string]string{}, metric.Tags())
}
func TestParseInvalidValues(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
metrics, err := parser.Parse([]byte("55.0"))
assert.Error(t, err)
assert.Len(t, metrics, 0)
parser = ValueParser{
MetricName: "value_test",
DataType: "float",
}
metrics, err = parser.Parse([]byte("foobar"))
assert.Error(t, err)
assert.Len(t, metrics, 0)
parser = ValueParser{
MetricName: "value_test",
DataType: "boolean",
}
metrics, err = parser.Parse([]byte("213"))
assert.Error(t, err)
assert.Len(t, metrics, 0)
}
func TestParseLineInvalidValues(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
_, err := parser.ParseLine("55.0")
assert.Error(t, err)
parser = ValueParser{
MetricName: "value_test",
DataType: "float",
}
_, err = parser.ParseLine("foobar")
assert.Error(t, err)
parser = ValueParser{
MetricName: "value_test",
DataType: "boolean",
}
_, err = parser.ParseLine("213")
assert.Error(t, err)
}
func TestParseValidValuesDefaultTags(t *testing.T) {
parser := ValueParser{
MetricName: "value_test",
DataType: "integer",
}
parser.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err := parser.Parse([]byte("55"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": int64(55),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "float",
}
parser.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err = parser.Parse([]byte("64"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(64),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "string",
}
parser.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err = parser.Parse([]byte("foobar"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": "foobar",
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags())
parser = ValueParser{
MetricName: "value_test",
DataType: "boolean",
}
parser.SetDefaultTags(map[string]string{"test": "tag"})
metrics, err = parser.Parse([]byte("true"))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "value_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": true,
}, metrics[0].Fields())
assert.Equal(t, map[string]string{"test": "tag"}, metrics[0].Tags())
}

View File

@ -12,6 +12,8 @@ type GraphiteSerializer struct {
Prefix string Prefix string
} }
var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", " ", "_")
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{} out := []string{}
@ -85,5 +87,5 @@ func buildTags(metric telegraf.Metric) string {
tag_str += "." + tag_value tag_str += "." + tag_value
} }
} }
return tag_str return sanitizedChars.Replace(tag_str)
} }

View File

@ -0,0 +1,27 @@
package json
import (
ejson "encoding/json"
"github.com/influxdata/telegraf"
)
type JsonSerializer struct {
}
func (s *JsonSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
m := make(map[string]interface{})
m["tags"] = metric.Tags()
m["fields"] = metric.Fields()
m["name"] = metric.Name()
m["timestamp"] = metric.UnixNano() / 1000000000
serialized, err := ejson.Marshal(m)
if err != nil {
return []string{}, err
}
out = append(out, string(serialized))
return out, nil
}

View File

@ -0,0 +1,87 @@
package json
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
)
func TestSerializeMetricFloat(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("{\"fields\":{\"usage_idle\":91.5},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\"},\"timestamp\":%d}", now.Unix())}
assert.Equal(t, expS, mS)
}
func TestSerializeMetricInt(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("{\"fields\":{\"usage_idle\":90},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\"},\"timestamp\":%d}", now.Unix())}
assert.Equal(t, expS, mS)
}
func TestSerializeMetricString(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": "foobar",
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("{\"fields\":{\"usage_idle\":\"foobar\"},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\"},\"timestamp\":%d}", now.Unix())}
assert.Equal(t, expS, mS)
}
func TestSerializeMultiFields(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
"usage_total": 8559615,
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := JsonSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("{\"fields\":{\"usage_idle\":90,\"usage_total\":8559615},\"name\":\"cpu\",\"tags\":{\"cpu\":\"cpu0\"},\"timestamp\":%d}", now.Unix())}
assert.Equal(t, expS, mS)
}

View File

@ -5,6 +5,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
) )
// SerializerOutput is an interface for output plugins that are able to // SerializerOutput is an interface for output plugins that are able to
@ -40,10 +41,16 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewInfluxSerializer() serializer, err = NewInfluxSerializer()
case "graphite": case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix) serializer, err = NewGraphiteSerializer(config.Prefix)
case "json":
serializer, err = NewJsonSerializer()
} }
return serializer, err return serializer, err
} }
func NewJsonSerializer() (Serializer, error) {
return &json.JsonSerializer{}, nil
}
func NewInfluxSerializer() (Serializer, error) { func NewInfluxSerializer() (Serializer, error) {
return &influx.InfluxSerializer{}, nil return &influx.InfluxSerializer{}, nil
} }