diff --git a/CHANGELOG.md b/CHANGELOG.md index 237183a85..dc83441b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,31 @@ +## v0.10.5 [unreleased] + +### Release Notes + +### Features + +### Bugfixes + +## v0.10.4 [2016-02-24] + +### Release Notes +- The pass/drop parameters have been renamed to fielddrop/fieldpass parameters, +to more accurately indicate their purpose. +- There are also now namedrop/namepass parameters for passing/dropping based +on the metric _name_. +- Experimental windows builds now available. + +### Features +- [#727](https://github.com/influxdata/telegraf/pull/727): riak input, thanks @jcoene! +- [#694](https://github.com/influxdata/telegraf/pull/694): DNS Query input, thanks @mjasion! +- [#724](https://github.com/influxdata/telegraf/pull/724): username matching for procstat input, thanks @zorel! +- [#736](https://github.com/influxdata/telegraf/pull/736): Ignore dummy filesystems from disk plugin. Thanks @PierreF! +- [#737](https://github.com/influxdata/telegraf/pull/737): Support multiple fields for statsd input. Thanks @mattheath! + +### Bugfixes +- [#701](https://github.com/influxdata/telegraf/pull/701): output write count shouldnt print in quiet mode. +- [#746](https://github.com/influxdata/telegraf/pull/746): httpjson plugin: Fix HTTP GET parameters. + ## v0.10.3 [2016-02-18] ### Release Notes diff --git a/Godeps b/Godeps index d0d2194c6..d2ac1857f 100644 --- a/Godeps +++ b/Godeps @@ -50,3 +50,4 @@ gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70 gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64 gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4 +github.com/miekg/dns e0d84d97e59bcb6561eae269c4e94d25b66822cb \ No newline at end of file diff --git a/Godeps_windows b/Godeps_windows index 034fb4fec..dd46184ec 100644 --- a/Godeps_windows +++ b/Godeps_windows @@ -1,4 +1,4 @@ -git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034 +git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git 617c801af238c3af2d9e72c5d4a0f02edad03ce5 github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5 @@ -21,18 +21,18 @@ github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67 -github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67 +github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f github.com/lxn/win 9a7734ea4db26bc593d52f6a8a957afdad39c5c1 github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3bb336a453 +github.com/miekg/dns e0d84d97e59bcb6561eae269c4e94d25b66822cb github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9 +github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f -github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2 github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 diff --git a/Makefile b/Makefile index fc8d71de0..ef316bd03 100644 --- a/Makefile +++ b/Makefile @@ -92,14 +92,17 @@ docker-kill: -docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp # Run full unit tests using docker containers (includes setup and teardown) -test: docker-kill docker-run +test: vet docker-kill docker-run # Sleeping for kafka leadership election, TSDB setup, etc. sleep 60 # SUCCESS, running tests go test -race ./... # Run "short" unit tests -test-short: +test-short: vet go test -short ./... -.PHONY: test +vet: + go vet ./... + +.PHONY: test test-short vet build default diff --git a/README.md b/README.md index d93494ecb..0f3f81ebf 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,12 @@ the [release blog post](https://influxdata.com/blog/announcing-telegraf-0-10-0/) ### Linux deb and rpm Packages: Latest: -* http://get.influxdb.org/telegraf/telegraf_0.10.3-1_amd64.deb -* http://get.influxdb.org/telegraf/telegraf-0.10.3-1.x86_64.rpm +* http://get.influxdb.org/telegraf/telegraf_0.10.4-1_amd64.deb +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1.x86_64.rpm + +Latest (arm): +* http://get.influxdb.org/telegraf/telegraf_0.10.4-1_arm.deb +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1.arm.rpm 0.2.x: * http://get.influxdb.org/telegraf/telegraf_0.2.4_amd64.deb @@ -52,9 +56,9 @@ for instructions, replacing the `influxdb` package name with `telegraf`. ### Linux tarballs: Latest: -* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_amd64.tar.gz -* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_i386.tar.gz -* http://get.influxdb.org/telegraf/telegraf-0.10.3-1_linux_arm.tar.gz +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1_linux_amd64.tar.gz +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1_linux_i386.tar.gz +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1_linux_arm.tar.gz 0.2.x: * http://get.influxdb.org/telegraf/telegraf_linux_amd64_0.2.4.tar.gz @@ -66,13 +70,13 @@ Latest: To install the full directory structure with config file, run: ``` -sudo tar -C / -zxvf ./telegraf-0.10.3-1_linux_amd64.tar.gz +sudo tar -C / -zxvf ./telegraf-0.10.4-1_linux_amd64.tar.gz ``` To extract only the binary, run: ``` -tar -zxvf telegraf-0.10.3-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf +tar -zxvf telegraf-0.10.4-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf ``` ### Ansible Role: @@ -86,6 +90,12 @@ brew update brew install telegraf ``` +### Windows Binaries (EXPERIMENTAL) + +Latest: +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1_windows_amd64.zip +* http://get.influxdb.org/telegraf/telegraf-0.10.4-1_windows_i386.zip + ### From Source: Telegraf manages dependencies via [gdm](https://github.com/sparrc/gdm), @@ -157,6 +167,7 @@ Currently implemented sources: * bcache * couchdb * disque +* dns query time * docker * dovecot * elasticsearch @@ -187,12 +198,13 @@ Currently implemented sources: * raindrops * redis * rethinkdb +* riak +* sensors (only available if built from source) +* snmp * sql server (microsoft) * twemproxy * zfs * zookeeper -* sensors -* snmp * win_perf_counters (windows performance counters) * system * cpu diff --git a/agent/accumulator.go b/agent/accumulator.go index 9361ad82e..b04ff2b53 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -43,6 +43,11 @@ func (ac *accumulator) Add( ) { fields := make(map[string]interface{}) fields["value"] = value + + if !ac.inputConfig.Filter.ShouldNamePass(measurement) { + return + } + ac.AddFields(measurement, fields, tags, t...) } @@ -56,6 +61,10 @@ func (ac *accumulator) AddFields( return } + if !ac.inputConfig.Filter.ShouldNamePass(measurement) { + return + } + if !ac.inputConfig.Filter.ShouldTagsPass(tags) { return } @@ -92,7 +101,7 @@ func (ac *accumulator) AddFields( for k, v := range fields { // Filter out any filtered fields if ac.inputConfig != nil { - if !ac.inputConfig.Filter.ShouldPass(k) { + if !ac.inputConfig.Filter.ShouldFieldsPass(k) { continue } } diff --git a/agent/agent.go b/agent/agent.go index 42ade45f2..8a8800cc2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -44,6 +44,8 @@ func NewAgent(config *config.Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.Config.Outputs { + o.Quiet = a.Config.Agent.Quiet + switch ot := o.Output.(type) { case telegraf.ServiceOutput: if err := ot.Start(); err != nil { diff --git a/circle.yml b/circle.yml index d86d46dba..8fd255a78 100644 --- a/circle.yml +++ b/circle.yml @@ -4,14 +4,17 @@ machine: post: - sudo service zookeeper stop - go version - - go version | grep 1.5.2 || sudo rm -rf /usr/local/go - - wget https://storage.googleapis.com/golang/go1.5.2.linux-amd64.tar.gz - - sudo tar -C /usr/local -xzf go1.5.2.linux-amd64.tar.gz + - go version | grep 1.5.3 || sudo rm -rf /usr/local/go + - wget https://storage.googleapis.com/golang/go1.5.3.linux-amd64.tar.gz + - sudo tar -C /usr/local -xzf go1.5.3.linux-amd64.tar.gz - go version dependencies: override: - docker info + post: + - gem install fpm + - sudo apt-get install -y rpm python-boto test: override: diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index f4214b5d4..58dbdf261 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -58,10 +58,14 @@ you can configure that here. There are also filters that can be configured per input: -* **pass**: An array of strings that is used to filter metrics generated by the +* **namepass**: An array of strings that is used to filter metrics generated by the +current input. Each string in the array is tested as a glob match against +measurement names and if it matches, the field is emitted. +* **namedrop**: The inverse of pass, if a measurement name matches, it is not emitted. +* **fieldpass**: An array of strings that is used to filter metrics generated by the current input. Each string in the array is tested as a glob match against field names and if it matches, the field is emitted. -* **drop**: The inverse of pass, if a field name matches, it is not emitted. +* **fielddrop**: The inverse of pass, if a field name matches, it is not emitted. * **tagpass**: tag names and arrays of strings that are used to filter measurements by the current input. Each string in the array is tested as a glob match against the tag name, and if it matches the measurement is emitted. @@ -117,18 +121,32 @@ fields which begin with `time_`. path = [ "/opt", "/home*" ] ``` -#### Input Config: pass and drop +#### Input Config: fieldpass and fielddrop ```toml # Drop all metrics for guest & steal CPU usage [[inputs.cpu]] percpu = false totalcpu = true - drop = ["usage_guest", "usage_steal"] + fielddrop = ["usage_guest", "usage_steal"] # Only store inode related metrics for disks [[inputs.disk]] - pass = ["inodes*"] + fieldpass = ["inodes*"] +``` + +#### Input Config: namepass and namedrop + +```toml +# Drop all metrics about containers for kubelet +[[inputs.prometheus]] + urls = ["http://kube-node-1:4194/metrics"] + namedrop = ["container_"] + +# Only store rest client related metrics for kubelet +[[inputs.prometheus]] + urls = ["http://kube-node-1:4194/metrics"] + namepass = ["rest_client_"] ``` #### Input config: prefix, suffix, and override @@ -191,7 +209,7 @@ configuring each output sink is different, but examples can be found by running `telegraf -sample-config`. Outputs also support the same configurable options as inputs -(pass, drop, tagpass, tagdrop) +(namepass, namedrop, tagpass, tagdrop) ```toml [[outputs.influxdb]] @@ -199,14 +217,14 @@ Outputs also support the same configurable options as inputs database = "telegraf" precision = "s" # Drop all measurements that start with "aerospike" - drop = ["aerospike*"] + namedrop = ["aerospike*"] [[outputs.influxdb]] urls = [ "http://localhost:8086" ] database = "telegraf-aerospike-data" precision = "s" # Only accept aerospike data: - pass = ["aerospike*"] + namepass = ["aerospike*"] [[outputs.influxdb]] urls = [ "http://localhost:8086" ] diff --git a/etc/telegraf.conf b/etc/telegraf.conf index db87251d5..d8a295442 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -67,9 +67,9 @@ # note: using second precision greatly helps InfluxDB compression precision = "s" - # Connection timeout (for the connection with InfluxDB), formatted as a string. - # If not provided, will default to 0 (no timeout) - # timeout = "5s" + ## Write timeout (for the InfluxDB client), formatted as a string. + ## If not provided, will default to 5s. 0s means no timeout (not recommended). + timeout = "5s" # username = "telegraf" # password = "metricsmetricsmetricsmetrics" # Set the user agent for HTTP POSTs (can be useful for log differentiation) @@ -89,7 +89,7 @@ # Whether to report total system cpu stats or not totalcpu = true # Comment this line if you want the raw CPU time metrics - drop = ["time_*"] + fielddrop = ["time_*"] # Read metrics about disk usage by mount point [[inputs.disk]] @@ -97,6 +97,10 @@ # Setting mountpoints will restrict the stats to the specified mountpoints. # mount_points=["/"] + # Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually + # present on /run, /var/run, /dev/shm or /dev). + ignore_fs = ["tmpfs", "devtmpfs"] + # Read metrics about disk IO by device [[inputs.diskio]] # By default, telegraf will gather stats for all devices including diff --git a/etc/telegraf_windows.conf b/etc/telegraf_windows.conf new file mode 100644 index 000000000..7e66cb209 --- /dev/null +++ b/etc/telegraf_windows.conf @@ -0,0 +1,164 @@ +# Telegraf configuration + +# Telegraf is entirely plugin driven. All metrics are gathered from the +# declared inputs, and sent to the declared outputs. + +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. + +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config +# file would generate. + +# Global tags can be specified here in key="value" format. +[global_tags] + # dc = "us-east-1" # will tag all metrics with dc=us-east-1 + # rack = "1a" + +# Configuration for telegraf agent +[agent] + ## Default data collection interval for all inputs + interval = "10s" + ## Rounds collection interval to 'interval' + ## ie, if interval="10s" then always collect on :00, :10, :20, etc. + round_interval = true + + ## Telegraf will cache metric_buffer_limit metrics for each output, and will + ## flush this buffer on a successful write. + metric_buffer_limit = 10000 + ## Flush the buffer whenever full, regardless of flush_interval. + flush_buffer_when_full = true + + ## Collection jitter is used to jitter the collection by a random amount. + ## Each plugin will sleep for a random time within jitter before collecting. + ## This can be used to avoid many plugins querying things like sysfs at the + ## same time, which can have a measurable effect on the system. + collection_jitter = "0s" + + ## Default flushing interval for all outputs. You shouldn't set this below + ## interval. Maximum flush_interval will be flush_interval + flush_jitter + flush_interval = "10s" + ## Jitter the flush interval by a random amount. This is primarily to avoid + ## large write spikes for users running a large number of telegraf instances. + ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s + flush_jitter = "0s" + + ## Run telegraf in debug mode + debug = false + ## Run telegraf in quiet mode + quiet = false + ## Override default hostname, if empty use os.Hostname() + hostname = "" + + +############################################################################### +# OUTPUTS # +############################################################################### + +# Configuration for influxdb server to send metrics to +[[outputs.influxdb]] + # The full HTTP or UDP endpoint URL for your InfluxDB instance. + # Multiple urls can be specified but it is assumed that they are part of the same + # cluster, this means that only ONE of the urls will be written to each interval. + # urls = ["udp://localhost:8089"] # UDP endpoint example + urls = ["http://localhost:8086"] # required + # The target database for metrics (telegraf will create it if not exists) + database = "telegraf" # required + # Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h". + # note: using second precision greatly helps InfluxDB compression + precision = "s" + + ## Write timeout (for the InfluxDB client), formatted as a string. + ## If not provided, will default to 5s. 0s means no timeout (not recommended). + timeout = "5s" + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + # Set the user agent for HTTP POSTs (can be useful for log differentiation) + # user_agent = "telegraf" + # Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes) + # udp_payload = 512 + + +############################################################################### +# INPUTS # +############################################################################### + +# Windows Performance Counters plugin. +# These are the recommended method of monitoring system metrics on windows, +# as the regular system plugins (inputs.cpu, inputs.mem, etc.) rely on WMI, +# which utilizes a lot of system resources. +# +# See more configuration examples at: +# https://github.com/influxdata/telegraf/tree/master/plugins/inputs/win_perf_counters + +[[inputs.win_perf_counters]] + [[inputs.win_perf_counters.object]] + # Processor usage, alternative to native, reports on a per core. + ObjectName = "Processor" + Instances = ["*"] + Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"] + Measurement = "win_cpu" + #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + + [[inputs.win_perf_counters.object]] + # Disk times and queues + ObjectName = "LogicalDisk" + Instances = ["*"] + Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"] + Measurement = "win_disk" + #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + + [[inputs.win_perf_counters.object]] + ObjectName = "System" + Counters = ["Context Switches/sec","System Calls/sec"] + Instances = ["------"] + Measurement = "win_system" + #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + + [[inputs.win_perf_counters.object]] + # Example query where the Instance portion must be removed to get data back, such as from the Memory object. + ObjectName = "Memory" + Counters = ["Available Bytes","Cache Faults/sec","Demand Zero Faults/sec","Page Faults/sec","Pages/sec","Transition Faults/sec","Pool Nonpaged Bytes","Pool Paged Bytes"] + Instances = ["------"] # Use 6 x - to remove the Instance bit from the query. + Measurement = "win_mem" + #IncludeTotal=false #Set to true to include _Total instance when querying for all (*). + + +# Windows system plugins using WMI (disabled by default, using +# win_perf_counters over WMI is recommended) + +# Read metrics about cpu usage +#[[inputs.cpu]] + ## Whether to report per-cpu stats or not + #percpu = true + ## Whether to report total system cpu stats or not + #totalcpu = true + ## Comment this line if you want the raw CPU time metrics + #fielddrop = ["time_*"] + +# Read metrics about disk usage by mount point +#[[inputs.disk]] + ## By default, telegraf gather stats for all mountpoints. + ## Setting mountpoints will restrict the stats to the specified mountpoints. + ## mount_points=["/"] + + ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually + ## present on /run, /var/run, /dev/shm or /dev). + #ignore_fs = ["tmpfs", "devtmpfs"] + +# Read metrics about disk IO by device +#[[inputs.diskio]] + ## By default, telegraf will gather stats for all devices including + ## disk partitions. + ## Setting devices will restrict the stats to the specified devices. + ## devices = ["sda", "sdb"] + ## Uncomment the following line if you do not need disk serial numbers. + ## skip_serial_number = true + +# Read metrics about memory usage +#[[inputs.mem]] + # no configuration + +# Read metrics about swap memory usage +#[[inputs.swap]] + # no configuration + diff --git a/internal/config/config.go b/internal/config/config.go index fc374d628..b5b73e06e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -185,25 +185,22 @@ var header = `# Telegraf Configuration hostname = "" -############################################################################### -# OUTPUTS # -############################################################################### +# +# OUTPUTS: +# ` var pluginHeader = ` - -############################################################################### -# INPUTS # -############################################################################### - +# +# INPUTS: +# ` var serviceInputHeader = ` - -############################################################################### -# SERVICE INPUTS # -############################################################################### +# +# SERVICE INPUTS: +# ` // PrintSampleConfig prints the sample config @@ -429,7 +426,6 @@ func (c *Config) addOutput(name string, table *ast.Table) error { ro.MetricBufferLimit = c.Agent.MetricBufferLimit } ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull - ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil } @@ -478,18 +474,19 @@ func (c *Config) addInput(name string, table *ast.Table) error { return nil } -// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to +// buildFilter builds a Filter +// (tagpass/tagdrop/namepass/namedrop/fieldpass/fielddrop) to // be inserted into the internal_models.OutputConfig/internal_models.InputConfig to be used for prefix // filtering on tags and measurements func buildFilter(tbl *ast.Table) internal_models.Filter { f := internal_models.Filter{} - if node, ok := tbl.Fields["pass"]; ok { + if node, ok := tbl.Fields["namepass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - f.Pass = append(f.Pass, str.Value) + f.NamePass = append(f.NamePass, str.Value) f.IsActive = true } } @@ -497,12 +494,12 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { } } - if node, ok := tbl.Fields["drop"]; ok { + if node, ok := tbl.Fields["namedrop"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - f.Drop = append(f.Drop, str.Value) + f.NameDrop = append(f.NameDrop, str.Value) f.IsActive = true } } @@ -510,6 +507,38 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { } } + fields := []string{"pass", "fieldpass"} + for _, field := range fields { + if node, ok := tbl.Fields[field]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + f.FieldPass = append(f.FieldPass, str.Value) + f.IsActive = true + } + } + } + } + } + } + + fields = []string{"drop", "fielddrop"} + for _, field := range fields { + if node, ok := tbl.Fields[field]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + f.FieldDrop = append(f.FieldDrop, str.Value) + f.IsActive = true + } + } + } + } + } + } + if node, ok := tbl.Fields["tagpass"]; ok { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { @@ -548,6 +577,10 @@ func buildFilter(tbl *ast.Table) internal_models.Filter { } } + delete(tbl.Fields, "namedrop") + delete(tbl.Fields, "namepass") + delete(tbl.Fields, "fielddrop") + delete(tbl.Fields, "fieldpass") delete(tbl.Fields, "drop") delete(tbl.Fields, "pass") delete(tbl.Fields, "tagdrop") @@ -717,5 +750,12 @@ func buildOutput(name string, tbl *ast.Table) (*internal_models.OutputConfig, er Name: name, Filter: buildFilter(tbl), } + // Outputs don't support FieldDrop/FieldPass, so set to NameDrop/NamePass + if len(oc.Filter.FieldDrop) > 0 { + oc.Filter.NameDrop = oc.Filter.FieldDrop + } + if len(oc.Filter.FieldPass) > 0 { + oc.Filter.NamePass = oc.Filter.FieldPass + } return oc, nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 0e9f2c967..f0add8b98 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -23,8 +23,10 @@ func TestConfig_LoadSingleInput(t *testing.T) { mConfig := &internal_models.InputConfig{ Name: "memcached", Filter: internal_models.Filter{ - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, + NameDrop: []string{"metricname2"}, + NamePass: []string{"metricname1"}, + FieldDrop: []string{"other", "stuff"}, + FieldPass: []string{"some", "strings"}, TagDrop: []internal_models.TagFilter{ internal_models.TagFilter{ Name: "badtag", @@ -66,8 +68,10 @@ func TestConfig_LoadDirectory(t *testing.T) { mConfig := &internal_models.InputConfig{ Name: "memcached", Filter: internal_models.Filter{ - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, + NameDrop: []string{"metricname2"}, + NamePass: []string{"metricname1"}, + FieldDrop: []string{"other", "stuff"}, + FieldPass: []string{"some", "strings"}, TagDrop: []internal_models.TagFilter{ internal_models.TagFilter{ Name: "badtag", diff --git a/internal/config/testdata/single_plugin.toml b/internal/config/testdata/single_plugin.toml index 6670f6b2f..664937b25 100644 --- a/internal/config/testdata/single_plugin.toml +++ b/internal/config/testdata/single_plugin.toml @@ -1,7 +1,9 @@ [[inputs.memcached]] servers = ["localhost"] - pass = ["some", "strings"] - drop = ["other", "stuff"] + namepass = ["metricname1"] + namedrop = ["metricname2"] + fieldpass = ["some", "strings"] + fielddrop = ["other", "stuff"] interval = "5s" [inputs.memcached.tagpass] goodtag = ["mytag"] diff --git a/internal/config/testdata/subconfig/memcached.conf b/internal/config/testdata/subconfig/memcached.conf index 4c43febc7..2cd07d15d 100644 --- a/internal/config/testdata/subconfig/memcached.conf +++ b/internal/config/testdata/subconfig/memcached.conf @@ -1,5 +1,7 @@ [[inputs.memcached]] servers = ["192.168.1.1"] + namepass = ["metricname1"] + namedrop = ["metricname2"] pass = ["some", "strings"] drop = ["other", "stuff"] interval = "5s" diff --git a/internal/models/filter.go b/internal/models/filter.go index 9b4f2ba90..e2b1377f4 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -15,8 +15,11 @@ type TagFilter struct { // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { - Drop []string - Pass []string + NameDrop []string + NamePass []string + + FieldDrop []string + FieldPass []string TagDrop []TagFilter TagPass []TagFilter @@ -25,17 +28,17 @@ type Filter struct { } func (f Filter) ShouldMetricPass(metric telegraf.Metric) bool { - if f.ShouldPass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) { + if f.ShouldNamePass(metric.Name()) && f.ShouldTagsPass(metric.Tags()) { return true } return false } -// ShouldPass returns true if the metric should pass, false if should drop +// ShouldFieldsPass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters -func (f Filter) ShouldPass(key string) bool { - if f.Pass != nil { - for _, pat := range f.Pass { +func (f Filter) ShouldNamePass(key string) bool { + if f.NamePass != nil { + for _, pat := range f.NamePass { // TODO remove HasPrefix check, leaving it for now for legacy support. // Cam, 2015-12-07 if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { @@ -45,8 +48,36 @@ func (f Filter) ShouldPass(key string) bool { return false } - if f.Drop != nil { - for _, pat := range f.Drop { + if f.NameDrop != nil { + for _, pat := range f.NameDrop { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return false + } + } + + return true + } + return true +} + +// ShouldFieldsPass returns true if the metric should pass, false if should drop +// based on the drop/pass filter parameters +func (f Filter) ShouldFieldsPass(key string) bool { + if f.FieldPass != nil { + for _, pat := range f.FieldPass { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return true + } + } + return false + } + + if f.FieldDrop != nil { + for _, pat := range f.FieldDrop { // TODO remove HasPrefix check, leaving it for now for legacy support. // Cam, 2015-12-07 if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index 320c38407..c69398494 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -18,15 +18,15 @@ func TestFilter_Empty(t *testing.T) { } for _, measurement := range measurements { - if !f.ShouldPass(measurement) { + if !f.ShouldFieldsPass(measurement) { t.Errorf("Expected measurement %s to pass", measurement) } } } -func TestFilter_Pass(t *testing.T) { +func TestFilter_NamePass(t *testing.T) { f := Filter{ - Pass: []string{"foo*", "cpu_usage_idle"}, + NamePass: []string{"foo*", "cpu_usage_idle"}, } passes := []string{ @@ -45,21 +45,21 @@ func TestFilter_Pass(t *testing.T) { } for _, measurement := range passes { - if !f.ShouldPass(measurement) { + if !f.ShouldNamePass(measurement) { t.Errorf("Expected measurement %s to pass", measurement) } } for _, measurement := range drops { - if f.ShouldPass(measurement) { + if f.ShouldNamePass(measurement) { t.Errorf("Expected measurement %s to drop", measurement) } } } -func TestFilter_Drop(t *testing.T) { +func TestFilter_NameDrop(t *testing.T) { f := Filter{ - Drop: []string{"foo*", "cpu_usage_idle"}, + NameDrop: []string{"foo*", "cpu_usage_idle"}, } drops := []string{ @@ -78,13 +78,79 @@ func TestFilter_Drop(t *testing.T) { } for _, measurement := range passes { - if !f.ShouldPass(measurement) { + if !f.ShouldNamePass(measurement) { t.Errorf("Expected measurement %s to pass", measurement) } } for _, measurement := range drops { - if f.ShouldPass(measurement) { + if f.ShouldNamePass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_FieldPass(t *testing.T) { + f := Filter{ + FieldPass: []string{"foo*", "cpu_usage_idle"}, + } + + passes := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + drops := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldFieldsPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldFieldsPass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_FieldDrop(t *testing.T) { + f := Filter{ + FieldDrop: []string{"foo*", "cpu_usage_idle"}, + } + + drops := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + passes := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldFieldsPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldFieldsPass(measurement) { t.Errorf("Expected measurement %s to drop", measurement) } } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 74331e54b..5af18fcff 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/disque" + _ "github.com/influxdata/telegraf/plugins/inputs/dns_query" _ "github.com/influxdata/telegraf/plugins/inputs/docker" _ "github.com/influxdata/telegraf/plugins/inputs/dovecot" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" @@ -40,6 +41,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" + _ "github.com/influxdata/telegraf/plugins/inputs/riak" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" diff --git a/plugins/inputs/dns_query/README.md b/plugins/inputs/dns_query/README.md new file mode 100644 index 000000000..34b285c37 --- /dev/null +++ b/plugins/inputs/dns_query/README.md @@ -0,0 +1,51 @@ +# DNS Query Input Plugin + +The DNS plugin gathers dns query times in miliseconds - like [Dig](https://en.wikipedia.org/wiki/Dig_\(command\)) + +### Configuration: + +``` +# Sample Config: +[[inputs.dns_query]] + ## servers to query + servers = ["8.8.8.8"] # required + + ## Domains or subdomains to query. "." (root) is default + domains = ["."] # optional + + ## Query record type. Posible values: A, AAAA, ANY, CNAME, MX, NS, PTR, SOA, SPF, SRV, TXT. Default is "NS" + record_type = "A" # optional + + ## Dns server port. 53 is default + port = 53 # optional + + ## Query timeout in seconds. Default is 2 seconds + timeout = 2 # optional +``` + +For querying more than one record type make: + +``` +[[inputs.dns_query]] + domains = ["mjasion.pl"] + servers = ["8.8.8.8", "8.8.4.4"] + record_type = "A" + +[[inputs.dns_query]] + domains = ["mjasion.pl"] + servers = ["8.8.8.8", "8.8.4.4"] + record_type = "MX" +``` + +### Tags: + +- server +- domain +- record_type + +### Example output: + +``` +./telegraf -config telegraf.conf -test -input-filter dns_query -test +> dns_query,domain=mjasion.pl,record_type=A,server=8.8.8.8 query_time_ms=67.189842 1456082743585760680 +``` diff --git a/plugins/inputs/dns_query/dns_query.go b/plugins/inputs/dns_query/dns_query.go new file mode 100644 index 000000000..397482a98 --- /dev/null +++ b/plugins/inputs/dns_query/dns_query.go @@ -0,0 +1,159 @@ +package dns_query + +import ( + "errors" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/miekg/dns" + "net" + "strconv" + "time" +) + +type DnsQuery struct { + // Domains or subdomains to query + Domains []string + + // Server to query + Servers []string + + // Record type + RecordType string `toml:"record_type"` + + // DNS server port number + Port int + + // Dns query timeout in seconds. 0 means no timeout + Timeout int +} + +var sampleConfig = ` + ## servers to query + servers = ["8.8.8.8"] # required + + ## Domains or subdomains to query. "."(root) is default + domains = ["."] # optional + + ## Query record type. Posible values: A, AAAA, CNAME, MX, NS, PTR, TXT, SOA, SPF, SRV. Default is "NS" + record_type = "A" # optional + + ## Dns server port. 53 is default + port = 53 # optional + + ## Query timeout in seconds. Default is 2 seconds + timeout = 2 # optional +` + +func (d *DnsQuery) SampleConfig() string { + return sampleConfig +} + +func (d *DnsQuery) Description() string { + return "Query given DNS server and gives statistics" +} +func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { + d.setDefaultValues() + for _, domain := range d.Domains { + for _, server := range d.Servers { + dnsQueryTime, err := d.getDnsQueryTime(domain, server) + if err != nil { + return err + } + tags := map[string]string{ + "server": server, + "domain": domain, + "record_type": d.RecordType, + } + + fields := map[string]interface{}{"query_time_ms": dnsQueryTime} + acc.AddFields("dns_query", fields, tags) + } + } + + return nil +} + +func (d *DnsQuery) setDefaultValues() { + if len(d.RecordType) == 0 { + d.RecordType = "NS" + } + + if len(d.Domains) == 0 { + d.Domains = []string{"."} + d.RecordType = "NS" + } + + if d.Port == 0 { + d.Port = 53 + } + + if d.Timeout == 0 { + d.Timeout = 2 + } +} + +func (d *DnsQuery) getDnsQueryTime(domain string, server string) (float64, error) { + dnsQueryTime := float64(0) + + c := new(dns.Client) + c.ReadTimeout = time.Duration(d.Timeout) * time.Second + + m := new(dns.Msg) + recordType, err := d.parseRecordType() + if err != nil { + return dnsQueryTime, err + } + m.SetQuestion(dns.Fqdn(domain), recordType) + m.RecursionDesired = true + + r, rtt, err := c.Exchange(m, net.JoinHostPort(server, strconv.Itoa(d.Port))) + if err != nil { + return dnsQueryTime, err + } + if r.Rcode != dns.RcodeSuccess { + return dnsQueryTime, errors.New(fmt.Sprintf("Invalid answer name %s after %s query for %s\n", domain, d.RecordType, domain)) + } + dnsQueryTime = float64(rtt.Nanoseconds()) / 1e6 + return dnsQueryTime, nil +} + +func (d *DnsQuery) parseRecordType() (uint16, error) { + var recordType uint16 + var error error + + switch d.RecordType { + case "A": + recordType = dns.TypeA + case "AAAA": + recordType = dns.TypeAAAA + case "ANY": + recordType = dns.TypeANY + case "CNAME": + recordType = dns.TypeCNAME + case "MX": + recordType = dns.TypeMX + case "NS": + recordType = dns.TypeNS + case "PTR": + recordType = dns.TypePTR + case "SOA": + recordType = dns.TypeSOA + case "SPF": + recordType = dns.TypeSPF + case "SRV": + recordType = dns.TypeSRV + case "TXT": + recordType = dns.TypeTXT + default: + error = errors.New(fmt.Sprintf("Record type %s not recognized", d.RecordType)) + } + + return recordType, error +} + +func init() { + inputs.Add("dns_query", func() telegraf.Input { + return &DnsQuery{} + }) +} diff --git a/plugins/inputs/dns_query/dns_query_test.go b/plugins/inputs/dns_query/dns_query_test.go new file mode 100644 index 000000000..076db5fab --- /dev/null +++ b/plugins/inputs/dns_query/dns_query_test.go @@ -0,0 +1,192 @@ +package dns_query + +import ( + "github.com/influxdata/telegraf/testutil" + "github.com/miekg/dns" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var servers = []string{"8.8.8.8"} +var domains = []string{"google.com"} + +func TestGathering(t *testing.T) { + var dnsConfig = DnsQuery{ + Servers: servers, + Domains: domains, + } + var acc testutil.Accumulator + + err := dnsConfig.Gather(&acc) + assert.NoError(t, err) + metric, ok := acc.Get("dns_query") + assert.True(t, ok) + queryTime, _ := metric.Fields["query_time_ms"].(float64) + + assert.NotEqual(t, 0, queryTime) +} + +func TestGatheringMxRecord(t *testing.T) { + var dnsConfig = DnsQuery{ + Servers: servers, + Domains: domains, + } + var acc testutil.Accumulator + dnsConfig.RecordType = "MX" + + err := dnsConfig.Gather(&acc) + assert.NoError(t, err) + metric, ok := acc.Get("dns_query") + assert.True(t, ok) + queryTime, _ := metric.Fields["query_time_ms"].(float64) + + assert.NotEqual(t, 0, queryTime) +} + +func TestGatheringRootDomain(t *testing.T) { + var dnsConfig = DnsQuery{ + Servers: servers, + Domains: []string{"."}, + RecordType: "MX", + } + var acc testutil.Accumulator + tags := map[string]string{ + "server": "8.8.8.8", + "domain": ".", + "record_type": "MX", + } + fields := map[string]interface{}{} + + err := dnsConfig.Gather(&acc) + assert.NoError(t, err) + metric, ok := acc.Get("dns_query") + assert.True(t, ok) + queryTime, _ := metric.Fields["query_time_ms"].(float64) + + fields["query_time_ms"] = queryTime + acc.AssertContainsTaggedFields(t, "dns_query", fields, tags) +} + +func TestMetricContainsServerAndDomainAndRecordTypeTags(t *testing.T) { + var dnsConfig = DnsQuery{ + Servers: servers, + Domains: domains, + } + var acc testutil.Accumulator + tags := map[string]string{ + "server": "8.8.8.8", + "domain": "google.com", + "record_type": "NS", + } + fields := map[string]interface{}{} + + err := dnsConfig.Gather(&acc) + assert.NoError(t, err) + metric, ok := acc.Get("dns_query") + assert.True(t, ok) + queryTime, _ := metric.Fields["query_time_ms"].(float64) + + fields["query_time_ms"] = queryTime + acc.AssertContainsTaggedFields(t, "dns_query", fields, tags) +} + +func TestGatheringTimeout(t *testing.T) { + var dnsConfig = DnsQuery{ + Servers: servers, + Domains: domains, + } + var acc testutil.Accumulator + dnsConfig.Port = 60054 + dnsConfig.Timeout = 1 + var err error + + channel := make(chan error, 1) + go func() { + channel <- dnsConfig.Gather(&acc) + }() + select { + case res := <-channel: + err = res + case <-time.After(time.Second * 2): + err = nil + } + + assert.Error(t, err) + assert.Contains(t, err.Error(), "i/o timeout") +} + +func TestSettingDefaultValues(t *testing.T) { + dnsConfig := DnsQuery{} + + dnsConfig.setDefaultValues() + + assert.Equal(t, []string{"."}, dnsConfig.Domains, "Default domain not equal \".\"") + assert.Equal(t, "NS", dnsConfig.RecordType, "Default record type not equal 'NS'") + assert.Equal(t, 53, dnsConfig.Port, "Default port number not equal 53") + assert.Equal(t, 2, dnsConfig.Timeout, "Default timeout not equal 2") + + dnsConfig = DnsQuery{Domains: []string{"."}} + + dnsConfig.setDefaultValues() + + assert.Equal(t, "NS", dnsConfig.RecordType, "Default record type not equal 'NS'") +} + +func TestRecordTypeParser(t *testing.T) { + var dnsConfig = DnsQuery{} + var recordType uint16 + + dnsConfig.RecordType = "A" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeA, recordType) + + dnsConfig.RecordType = "AAAA" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeAAAA, recordType) + + dnsConfig.RecordType = "ANY" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeANY, recordType) + + dnsConfig.RecordType = "CNAME" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeCNAME, recordType) + + dnsConfig.RecordType = "MX" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeMX, recordType) + + dnsConfig.RecordType = "NS" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeNS, recordType) + + dnsConfig.RecordType = "PTR" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypePTR, recordType) + + dnsConfig.RecordType = "SOA" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeSOA, recordType) + + dnsConfig.RecordType = "SPF" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeSPF, recordType) + + dnsConfig.RecordType = "SRV" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeSRV, recordType) + + dnsConfig.RecordType = "TXT" + recordType, _ = dnsConfig.parseRecordType() + assert.Equal(t, dns.TypeTXT, recordType) +} + +func TestRecordTypeParserError(t *testing.T) { + var dnsConfig = DnsQuery{} + var err error + + dnsConfig.RecordType = "nil" + _, err = dnsConfig.parseRecordType() + assert.Error(t, err) +} diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index d5dddd7d4..c055f66de 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -1,7 +1,6 @@ package httpjson import ( - "bytes" "errors" "fmt" "io/ioutil" @@ -23,7 +22,8 @@ type HttpJson struct { TagKeys []string Parameters map[string]string Headers map[string]string - client HTTPClient + + client HTTPClient } type HTTPClient interface { @@ -182,15 +182,14 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) { return "", -1, fmt.Errorf("Invalid server URL \"%s\"", serverURL) } - params := url.Values{} data := url.Values{} - switch { case h.Method == "GET": - requestURL.RawQuery = params.Encode() + params := requestURL.Query() for k, v := range h.Parameters { params.Add(k, v) } + requestURL.RawQuery = params.Encode() case h.Method == "POST": requestURL.RawQuery = "" @@ -200,7 +199,8 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) { } // Create + send request - req, err := http.NewRequest(h.Method, requestURL.String(), bytes.NewBufferString(data.Encode())) + req, err := http.NewRequest(h.Method, requestURL.String(), + strings.NewReader(data.Encode())) if err != nil { return "", -1, err } diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index f5f81c7c3..b6b57a167 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -1,8 +1,10 @@ package httpjson import ( + "fmt" "io/ioutil" "net/http" + "net/http/httptest" "strings" "testing" @@ -27,6 +29,75 @@ const validJSON = ` "another_list": [4] }` +const validJSON2 = `{ + "user":{ + "hash_rate":0, + "expected_24h_rewards":0, + "total_rewards":0.000595109232, + "paid_rewards":0, + "unpaid_rewards":0.000595109232, + "past_24h_rewards":0, + "total_work":"5172625408", + "blocks_found":0 + }, + "workers":{ + "brminer.1":{ + "hash_rate":0, + "hash_rate_24h":0, + "valid_shares":"6176", + "stale_shares":"0", + "invalid_shares":"0", + "rewards":4.5506464e-5, + "rewards_24h":0, + "reset_time":1455409950 + }, + "brminer.2":{ + "hash_rate":0, + "hash_rate_24h":0, + "valid_shares":"0", + "stale_shares":"0", + "invalid_shares":"0", + "rewards":0, + "rewards_24h":0, + "reset_time":1455936726 + }, + "brminer.3":{ + "hash_rate":0, + "hash_rate_24h":0, + "valid_shares":"0", + "stale_shares":"0", + "invalid_shares":"0", + "rewards":0, + "rewards_24h":0, + "reset_time":1455936733 + } + }, + "pool":{ + "hash_rate":114100000, + "active_users":843, + "total_work":"5015346808842682368", + "pps_ratio":1.04, + "pps_rate":7.655e-9 + }, + "network":{ + "hash_rate":1426117703, + "block_number":944895, + "time_per_block":156, + "difficulty":51825.72835216, + "next_difficulty":51916.15249019, + "retarget_time":95053 + }, + "market":{ + "ltc_btc":0.00798, + "ltc_usd":3.37801, + "ltc_eur":3.113, + "ltc_gbp":2.32807, + "ltc_rub":241.796, + "ltc_cny":21.3883, + "btc_usd":422.852 + } +}` + const validJSONTags = ` { "value": 15, @@ -149,6 +220,222 @@ func TestHttpJson200(t *testing.T) { } } +// Test that GET Parameters from the url string are applied properly +func TestHttpJsonGET_URL(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + key := r.FormValue("api_key") + assert.Equal(t, "mykey", key) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, validJSON2) + })) + defer ts.Close() + + a := HttpJson{ + Servers: []string{ts.URL + "?api_key=mykey"}, + Name: "", + Method: "GET", + client: RealHTTPClient{client: &http.Client{}}, + } + + var acc testutil.Accumulator + err := a.Gather(&acc) + require.NoError(t, err) + + // remove response_time from gathered fields because it's non-deterministic + delete(acc.Metrics[0].Fields, "response_time") + + fields := map[string]interface{}{ + "market_btc_usd": float64(422.852), + "market_ltc_btc": float64(0.00798), + "market_ltc_cny": float64(21.3883), + "market_ltc_eur": float64(3.113), + "market_ltc_gbp": float64(2.32807), + "market_ltc_rub": float64(241.796), + "market_ltc_usd": float64(3.37801), + "network_block_number": float64(944895), + "network_difficulty": float64(51825.72835216), + "network_hash_rate": float64(1.426117703e+09), + "network_next_difficulty": float64(51916.15249019), + "network_retarget_time": float64(95053), + "network_time_per_block": float64(156), + "pool_active_users": float64(843), + "pool_hash_rate": float64(1.141e+08), + "pool_pps_rate": float64(7.655e-09), + "pool_pps_ratio": float64(1.04), + "user_blocks_found": float64(0), + "user_expected_24h_rewards": float64(0), + "user_hash_rate": float64(0), + "user_paid_rewards": float64(0), + "user_past_24h_rewards": float64(0), + "user_total_rewards": float64(0.000595109232), + "user_unpaid_rewards": float64(0.000595109232), + "workers_brminer.1_hash_rate": float64(0), + "workers_brminer.1_hash_rate_24h": float64(0), + "workers_brminer.1_reset_time": float64(1.45540995e+09), + "workers_brminer.1_rewards": float64(4.5506464e-05), + "workers_brminer.1_rewards_24h": float64(0), + "workers_brminer.2_hash_rate": float64(0), + "workers_brminer.2_hash_rate_24h": float64(0), + "workers_brminer.2_reset_time": float64(1.455936726e+09), + "workers_brminer.2_rewards": float64(0), + "workers_brminer.2_rewards_24h": float64(0), + "workers_brminer.3_hash_rate": float64(0), + "workers_brminer.3_hash_rate_24h": float64(0), + "workers_brminer.3_reset_time": float64(1.455936733e+09), + "workers_brminer.3_rewards": float64(0), + "workers_brminer.3_rewards_24h": float64(0), + } + + acc.AssertContainsFields(t, "httpjson", fields) +} + +// Test that GET Parameters are applied properly +func TestHttpJsonGET(t *testing.T) { + params := map[string]string{ + "api_key": "mykey", + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + key := r.FormValue("api_key") + assert.Equal(t, "mykey", key) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, validJSON2) + })) + defer ts.Close() + + a := HttpJson{ + Servers: []string{ts.URL}, + Name: "", + Method: "GET", + Parameters: params, + client: RealHTTPClient{client: &http.Client{}}, + } + + var acc testutil.Accumulator + err := a.Gather(&acc) + require.NoError(t, err) + + // remove response_time from gathered fields because it's non-deterministic + delete(acc.Metrics[0].Fields, "response_time") + + fields := map[string]interface{}{ + "market_btc_usd": float64(422.852), + "market_ltc_btc": float64(0.00798), + "market_ltc_cny": float64(21.3883), + "market_ltc_eur": float64(3.113), + "market_ltc_gbp": float64(2.32807), + "market_ltc_rub": float64(241.796), + "market_ltc_usd": float64(3.37801), + "network_block_number": float64(944895), + "network_difficulty": float64(51825.72835216), + "network_hash_rate": float64(1.426117703e+09), + "network_next_difficulty": float64(51916.15249019), + "network_retarget_time": float64(95053), + "network_time_per_block": float64(156), + "pool_active_users": float64(843), + "pool_hash_rate": float64(1.141e+08), + "pool_pps_rate": float64(7.655e-09), + "pool_pps_ratio": float64(1.04), + "user_blocks_found": float64(0), + "user_expected_24h_rewards": float64(0), + "user_hash_rate": float64(0), + "user_paid_rewards": float64(0), + "user_past_24h_rewards": float64(0), + "user_total_rewards": float64(0.000595109232), + "user_unpaid_rewards": float64(0.000595109232), + "workers_brminer.1_hash_rate": float64(0), + "workers_brminer.1_hash_rate_24h": float64(0), + "workers_brminer.1_reset_time": float64(1.45540995e+09), + "workers_brminer.1_rewards": float64(4.5506464e-05), + "workers_brminer.1_rewards_24h": float64(0), + "workers_brminer.2_hash_rate": float64(0), + "workers_brminer.2_hash_rate_24h": float64(0), + "workers_brminer.2_reset_time": float64(1.455936726e+09), + "workers_brminer.2_rewards": float64(0), + "workers_brminer.2_rewards_24h": float64(0), + "workers_brminer.3_hash_rate": float64(0), + "workers_brminer.3_hash_rate_24h": float64(0), + "workers_brminer.3_reset_time": float64(1.455936733e+09), + "workers_brminer.3_rewards": float64(0), + "workers_brminer.3_rewards_24h": float64(0), + } + + acc.AssertContainsFields(t, "httpjson", fields) +} + +// Test that POST Parameters are applied properly +func TestHttpJsonPOST(t *testing.T) { + params := map[string]string{ + "api_key": "mykey", + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, "api_key=mykey", string(body)) + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, validJSON2) + })) + defer ts.Close() + + a := HttpJson{ + Servers: []string{ts.URL}, + Name: "", + Method: "POST", + Parameters: params, + client: RealHTTPClient{client: &http.Client{}}, + } + + var acc testutil.Accumulator + err := a.Gather(&acc) + require.NoError(t, err) + + // remove response_time from gathered fields because it's non-deterministic + delete(acc.Metrics[0].Fields, "response_time") + + fields := map[string]interface{}{ + "market_btc_usd": float64(422.852), + "market_ltc_btc": float64(0.00798), + "market_ltc_cny": float64(21.3883), + "market_ltc_eur": float64(3.113), + "market_ltc_gbp": float64(2.32807), + "market_ltc_rub": float64(241.796), + "market_ltc_usd": float64(3.37801), + "network_block_number": float64(944895), + "network_difficulty": float64(51825.72835216), + "network_hash_rate": float64(1.426117703e+09), + "network_next_difficulty": float64(51916.15249019), + "network_retarget_time": float64(95053), + "network_time_per_block": float64(156), + "pool_active_users": float64(843), + "pool_hash_rate": float64(1.141e+08), + "pool_pps_rate": float64(7.655e-09), + "pool_pps_ratio": float64(1.04), + "user_blocks_found": float64(0), + "user_expected_24h_rewards": float64(0), + "user_hash_rate": float64(0), + "user_paid_rewards": float64(0), + "user_past_24h_rewards": float64(0), + "user_total_rewards": float64(0.000595109232), + "user_unpaid_rewards": float64(0.000595109232), + "workers_brminer.1_hash_rate": float64(0), + "workers_brminer.1_hash_rate_24h": float64(0), + "workers_brminer.1_reset_time": float64(1.45540995e+09), + "workers_brminer.1_rewards": float64(4.5506464e-05), + "workers_brminer.1_rewards_24h": float64(0), + "workers_brminer.2_hash_rate": float64(0), + "workers_brminer.2_hash_rate_24h": float64(0), + "workers_brminer.2_reset_time": float64(1.455936726e+09), + "workers_brminer.2_rewards": float64(0), + "workers_brminer.2_rewards_24h": float64(0), + "workers_brminer.3_hash_rate": float64(0), + "workers_brminer.3_hash_rate_24h": float64(0), + "workers_brminer.3_reset_time": float64(1.455936733e+09), + "workers_brminer.3_rewards": float64(0), + "workers_brminer.3_rewards_24h": float64(0), + } + + acc.AssertContainsFields(t, "httpjson", fields) +} + // Test response to HTTP 500 func TestHttpJson500(t *testing.T) { httpjson := genMockHttpJson(validJSON, 500) diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index 0c37af509..90552c2a6 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -7,7 +7,8 @@ individual process using their /proc data. The plugin will tag processes by their PID and their process name. -Processes can be specified either by pid file or by executable name. Procstat +Processes can be specified either by pid file, by executable name, by command +line pattern matching, or by username (in this order or priority. Procstat plugin will use `pgrep` when executable name is provided to obtain the pid. Proctstas plugin will transmit IO, memory, cpu, file descriptor related measurements for every process specified. A prefix can be set to isolate diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index d3f18d5ea..e5ae207fe 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -19,6 +19,7 @@ type Procstat struct { Exe string Pattern string Prefix string + User string pidmap map[int32]*process.Process } @@ -37,6 +38,8 @@ var sampleConfig = ` # exe = "nginx" ## pattern as argument for pgrep (ie, pgrep -f ) # pattern = "nginx" + ## user as argument for pgrep (ie, pgrep -u ) + # user = "nginx" ## Field name prefix prefix = "" @@ -53,8 +56,8 @@ func (_ *Procstat) Description() string { func (p *Procstat) Gather(acc telegraf.Accumulator) error { err := p.createProcesses() if err != nil { - log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s", - p.Exe, p.PidFile, p.Pattern, err.Error()) + log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", + p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) } else { for _, proc := range p.pidmap { p := NewSpecProcessor(p.Prefix, acc, proc) @@ -103,6 +106,8 @@ func (p *Procstat) getAllPids() ([]int32, error) { pids, err = pidsFromExe(p.Exe) } else if p.Pattern != "" { pids, err = pidsFromPattern(p.Pattern) + } else if p.User != "" { + pids, err = pidsFromUser(p.User) } else { err = fmt.Errorf("Either exe, pid_file or pattern has to be specified") } @@ -175,6 +180,30 @@ func pidsFromPattern(pattern string) ([]int32, error) { return out, outerr } +func pidsFromUser(user string) ([]int32, error) { + var out []int32 + var outerr error + bin, err := exec.LookPath("pgrep") + if err != nil { + return out, fmt.Errorf("Couldn't find pgrep binary: %s", err) + } + pgrep, err := exec.Command(bin, "-u", user).Output() + if err != nil { + return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err) + } else { + pids := strings.Fields(string(pgrep)) + for _, pid := range pids { + ipid, err := strconv.Atoi(pid) + if err == nil { + out = append(out, int32(ipid)) + } else { + outerr = err + } + } + } + return out, outerr +} + func init() { inputs.Add("procstat", func() telegraf.Input { return NewProcstat() diff --git a/plugins/inputs/riak/README.md b/plugins/inputs/riak/README.md new file mode 100644 index 000000000..07f2eb09d --- /dev/null +++ b/plugins/inputs/riak/README.md @@ -0,0 +1,76 @@ +# Riak Plugin + +The Riak plugin gathers metrics from one or more riak instances. + +### Configuration: + +```toml +# Description +[[inputs.riak]] + # Specify a list of one or more riak http servers + servers = ["http://localhost:8098"] +``` + +### Measurements & Fields: + +Riak provides one measurement named "riak", with the following fields: + +- cpu_avg1 +- cpu_avg15 +- cpu_avg5 +- memory_code +- memory_ets +- memory_processes +- memory_system +- memory_total +- node_get_fsm_objsize_100 +- node_get_fsm_objsize_95 +- node_get_fsm_objsize_99 +- node_get_fsm_objsize_mean +- node_get_fsm_objsize_median +- node_get_fsm_siblings_100 +- node_get_fsm_siblings_95 +- node_get_fsm_siblings_99 +- node_get_fsm_siblings_mean +- node_get_fsm_siblings_median +- node_get_fsm_time_100 +- node_get_fsm_time_95 +- node_get_fsm_time_99 +- node_get_fsm_time_mean +- node_get_fsm_time_median +- node_gets +- node_gets_total +- node_put_fsm_time_100 +- node_put_fsm_time_95 +- node_put_fsm_time_99 +- node_put_fsm_time_mean +- node_put_fsm_time_median +- node_puts +- node_puts_total +- pbc_active +- pbc_connects +- pbc_connects_total +- vnode_gets +- vnode_gets_total +- vnode_index_reads +- vnode_index_reads_total +- vnode_index_writes +- vnode_index_writes_total +- vnode_puts +- vnode_puts_total + +Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds. + +### Tags: + +All measurements have the following tags: + +- server (the host:port of the given server address, ex. `127.0.0.1:8087`) +- nodename (the internal node name received, ex. `riak@127.0.0.1`) + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter riak -test +> riak,nodename=riak@127.0.0.1,server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332 +``` \ No newline at end of file diff --git a/plugins/inputs/riak/riak.go b/plugins/inputs/riak/riak.go new file mode 100644 index 000000000..6750c75a0 --- /dev/null +++ b/plugins/inputs/riak/riak.go @@ -0,0 +1,196 @@ +package riak + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Type Riak gathers statistics from one or more Riak instances +type Riak struct { + // Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098) + Servers []string + + client *http.Client +} + +// NewRiak return a new instance of Riak with a default http client +func NewRiak() *Riak { + return &Riak{client: http.DefaultClient} +} + +// Type riakStats represents the data that is received from Riak +type riakStats struct { + CpuAvg1 int64 `json:"cpu_avg1"` + CpuAvg15 int64 `json:"cpu_avg15"` + CpuAvg5 int64 `json:"cpu_avg5"` + MemoryCode int64 `json:"memory_code"` + MemoryEts int64 `json:"memory_ets"` + MemoryProcesses int64 `json:"memory_processes"` + MemorySystem int64 `json:"memory_system"` + MemoryTotal int64 `json:"memory_total"` + NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"` + NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"` + NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"` + NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"` + NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"` + NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"` + NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"` + NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"` + NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"` + NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"` + NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"` + NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"` + NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"` + NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"` + NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"` + NodeGets int64 `json:"node_gets"` + NodeGetsTotal int64 `json:"node_gets_total"` + Nodename string `json:"nodename"` + NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"` + NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"` + NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"` + NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"` + NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"` + NodePuts int64 `json:"node_puts"` + NodePutsTotal int64 `json:"node_puts_total"` + PbcActive int64 `json:"pbc_active"` + PbcConnects int64 `json:"pbc_connects"` + PbcConnectsTotal int64 `json:"pbc_connects_total"` + VnodeGets int64 `json:"vnode_gets"` + VnodeGetsTotal int64 `json:"vnode_gets_total"` + VnodeIndexReads int64 `json:"vnode_index_reads"` + VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"` + VnodeIndexWrites int64 `json:"vnode_index_writes"` + VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"` + VnodePuts int64 `json:"vnode_puts"` + VnodePutsTotal int64 `json:"vnode_puts_total"` +} + +// A sample configuration to only gather stats from localhost, default port. +const sampleConfig = ` + # Specify a list of one or more riak http servers + servers = ["http://localhost:8098"] +` + +// Returns a sample configuration for the plugin +func (r *Riak) SampleConfig() string { + return sampleConfig +} + +// Returns a description of the plugin +func (r *Riak) Description() string { + return "Read metrics one or many Riak servers" +} + +// Reads stats from all configured servers. +func (r *Riak) Gather(acc telegraf.Accumulator) error { + // Default to a single server at localhost (default port) if none specified + if len(r.Servers) == 0 { + r.Servers = []string{"http://127.0.0.1:8098"} + } + + // Range over all servers, gathering stats. Returns early in case of any error. + for _, s := range r.Servers { + if err := r.gatherServer(s, acc); err != nil { + return err + } + } + + return nil +} + +// Gathers stats from a single server, adding them to the accumulator +func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error { + // Parse the given URL to extract the server tag + u, err := url.Parse(s) + if err != nil { + return fmt.Errorf("riak unable to parse given server url %s: %s", s, err) + } + + // Perform the GET request to the riak /stats endpoint + resp, err := r.client.Get(s + "/stats") + if err != nil { + return err + } + defer resp.Body.Close() + + // Successful responses will always return status code 200 + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode) + } + + // Decode the response JSON into a new stats struct + stats := &riakStats{} + if err := json.NewDecoder(resp.Body).Decode(stats); err != nil { + return fmt.Errorf("unable to decode riak response: %s", err) + } + + // Build a map of tags + tags := map[string]string{ + "nodename": stats.Nodename, + "server": u.Host, + } + + // Build a map of field values + fields := map[string]interface{}{ + "cpu_avg1": stats.CpuAvg1, + "cpu_avg15": stats.CpuAvg15, + "cpu_avg5": stats.CpuAvg5, + "memory_code": stats.MemoryCode, + "memory_ets": stats.MemoryEts, + "memory_processes": stats.MemoryProcesses, + "memory_system": stats.MemorySystem, + "memory_total": stats.MemoryTotal, + "node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100, + "node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95, + "node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99, + "node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean, + "node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian, + "node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100, + "node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95, + "node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99, + "node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean, + "node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian, + "node_get_fsm_time_100": stats.NodeGetFsmTime100, + "node_get_fsm_time_95": stats.NodeGetFsmTime95, + "node_get_fsm_time_99": stats.NodeGetFsmTime99, + "node_get_fsm_time_mean": stats.NodeGetFsmTimeMean, + "node_get_fsm_time_median": stats.NodeGetFsmTimeMedian, + "node_gets": stats.NodeGets, + "node_gets_total": stats.NodeGetsTotal, + "node_put_fsm_time_100": stats.NodePutFsmTime100, + "node_put_fsm_time_95": stats.NodePutFsmTime95, + "node_put_fsm_time_99": stats.NodePutFsmTime99, + "node_put_fsm_time_mean": stats.NodePutFsmTimeMean, + "node_put_fsm_time_median": stats.NodePutFsmTimeMedian, + "node_puts": stats.NodePuts, + "node_puts_total": stats.NodePutsTotal, + "pbc_active": stats.PbcActive, + "pbc_connects": stats.PbcConnects, + "pbc_connects_total": stats.PbcConnectsTotal, + "vnode_gets": stats.VnodeGets, + "vnode_gets_total": stats.VnodeGetsTotal, + "vnode_index_reads": stats.VnodeIndexReads, + "vnode_index_reads_total": stats.VnodeIndexReadsTotal, + "vnode_index_writes": stats.VnodeIndexWrites, + "vnode_index_writes_total": stats.VnodeIndexWritesTotal, + "vnode_puts": stats.VnodePuts, + "vnode_puts_total": stats.VnodePutsTotal, + } + + // Accumulate the tags and values + acc.AddFields("riak", fields, tags) + + return nil +} + +func init() { + inputs.Add("riak", func() telegraf.Input { + return NewRiak() + }) +} diff --git a/plugins/inputs/riak/riak_test.go b/plugins/inputs/riak/riak_test.go new file mode 100644 index 000000000..49da4e7ea --- /dev/null +++ b/plugins/inputs/riak/riak_test.go @@ -0,0 +1,275 @@ +package riak + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestRiak(t *testing.T) { + // Create a test server with the const response JSON + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, response) + })) + defer ts.Close() + + // Parse the URL of the test server, used to verify the expected host + u, err := url.Parse(ts.URL) + require.NoError(t, err) + + // Create a new Riak instance with our given test server + riak := NewRiak() + riak.Servers = []string{ts.URL} + + // Create a test accumulator + acc := &testutil.Accumulator{} + + // Gather data from the test server + err = riak.Gather(acc) + require.NoError(t, err) + + // Expect the correct values for all known keys + expectFields := map[string]interface{}{ + "cpu_avg1": int64(504), + "cpu_avg15": int64(294), + "cpu_avg5": int64(325), + "memory_code": int64(12329143), + "memory_ets": int64(17330176), + "memory_processes": int64(58454730), + "memory_system": int64(120401678), + "memory_total": int64(178856408), + "node_get_fsm_objsize_100": int64(73596), + "node_get_fsm_objsize_95": int64(36663), + "node_get_fsm_objsize_99": int64(51552), + "node_get_fsm_objsize_mean": int64(13241), + "node_get_fsm_objsize_median": int64(10365), + "node_get_fsm_siblings_100": int64(1), + "node_get_fsm_siblings_95": int64(1), + "node_get_fsm_siblings_99": int64(1), + "node_get_fsm_siblings_mean": int64(1), + "node_get_fsm_siblings_median": int64(1), + "node_get_fsm_time_100": int64(230445), + "node_get_fsm_time_95": int64(24259), + "node_get_fsm_time_99": int64(96653), + "node_get_fsm_time_mean": int64(6851), + "node_get_fsm_time_median": int64(2368), + "node_gets": int64(1116), + "node_gets_total": int64(1026058217), + "node_put_fsm_time_100": int64(267390), + "node_put_fsm_time_95": int64(38286), + "node_put_fsm_time_99": int64(84422), + "node_put_fsm_time_mean": int64(10832), + "node_put_fsm_time_median": int64(4085), + "node_puts": int64(1155), + "node_puts_total": int64(444895769), + "pbc_active": int64(360), + "pbc_connects": int64(120), + "pbc_connects_total": int64(66793268), + "vnode_gets": int64(14629), + "vnode_gets_total": int64(3748432761), + "vnode_index_reads": int64(20), + "vnode_index_reads_total": int64(3438296), + "vnode_index_writes": int64(4293), + "vnode_index_writes_total": int64(1515986619), + "vnode_puts": int64(4308), + "vnode_puts_total": int64(1519062272), + } + + // Expect the correct values for all tags + expectTags := map[string]string{ + "nodename": "riak@127.0.0.1", + "server": u.Host, + } + + acc.AssertContainsTaggedFields(t, "riak", expectFields, expectTags) +} + +var response = ` +{ + "riak_kv_stat_ts": 1455908558, + "vnode_gets": 14629, + "vnode_gets_total": 3748432761, + "vnode_puts": 4308, + "vnode_puts_total": 1519062272, + "vnode_index_refreshes": 0, + "vnode_index_refreshes_total": 0, + "vnode_index_reads": 20, + "vnode_index_reads_total": 3438296, + "vnode_index_writes": 4293, + "vnode_index_writes_total": 1515986619, + "vnode_index_writes_postings": 1, + "vnode_index_writes_postings_total": 265613, + "vnode_index_deletes": 0, + "vnode_index_deletes_total": 0, + "vnode_index_deletes_postings": 0, + "vnode_index_deletes_postings_total": 1, + "node_gets": 1116, + "node_gets_total": 1026058217, + "node_get_fsm_siblings_mean": 1, + "node_get_fsm_siblings_median": 1, + "node_get_fsm_siblings_95": 1, + "node_get_fsm_siblings_99": 1, + "node_get_fsm_siblings_100": 1, + "node_get_fsm_objsize_mean": 13241, + "node_get_fsm_objsize_median": 10365, + "node_get_fsm_objsize_95": 36663, + "node_get_fsm_objsize_99": 51552, + "node_get_fsm_objsize_100": 73596, + "node_get_fsm_time_mean": 6851, + "node_get_fsm_time_median": 2368, + "node_get_fsm_time_95": 24259, + "node_get_fsm_time_99": 96653, + "node_get_fsm_time_100": 230445, + "node_puts": 1155, + "node_puts_total": 444895769, + "node_put_fsm_time_mean": 10832, + "node_put_fsm_time_median": 4085, + "node_put_fsm_time_95": 38286, + "node_put_fsm_time_99": 84422, + "node_put_fsm_time_100": 267390, + "read_repairs": 2, + "read_repairs_total": 7918375, + "coord_redirs_total": 118238575, + "executing_mappers": 0, + "precommit_fail": 0, + "postcommit_fail": 0, + "index_fsm_create": 0, + "index_fsm_create_error": 0, + "index_fsm_active": 0, + "list_fsm_create": 0, + "list_fsm_create_error": 0, + "list_fsm_active": 0, + "pbc_active": 360, + "pbc_connects": 120, + "pbc_connects_total": 66793268, + "late_put_fsm_coordinator_ack": 152, + "node_get_fsm_active": 1, + "node_get_fsm_active_60s": 1029, + "node_get_fsm_in_rate": 21, + "node_get_fsm_out_rate": 21, + "node_get_fsm_rejected": 0, + "node_get_fsm_rejected_60s": 0, + "node_get_fsm_rejected_total": 0, + "node_put_fsm_active": 69, + "node_put_fsm_active_60s": 1053, + "node_put_fsm_in_rate": 30, + "node_put_fsm_out_rate": 31, + "node_put_fsm_rejected": 0, + "node_put_fsm_rejected_60s": 0, + "node_put_fsm_rejected_total": 0, + "read_repairs_primary_outofdate_one": 4, + "read_repairs_primary_outofdate_count": 14761552, + "read_repairs_primary_notfound_one": 0, + "read_repairs_primary_notfound_count": 65879, + "read_repairs_fallback_outofdate_one": 0, + "read_repairs_fallback_outofdate_count": 23761, + "read_repairs_fallback_notfound_one": 0, + "read_repairs_fallback_notfound_count": 455697, + "leveldb_read_block_error": 0, + "riak_pipe_stat_ts": 1455908558, + "pipeline_active": 0, + "pipeline_create_count": 0, + "pipeline_create_one": 0, + "pipeline_create_error_count": 0, + "pipeline_create_error_one": 0, + "cpu_nprocs": 362, + "cpu_avg1": 504, + "cpu_avg5": 325, + "cpu_avg15": 294, + "mem_total": 33695432704, + "mem_allocated": 33454874624, + "nodename": "riak@127.0.0.1", + "connected_nodes": [], + "sys_driver_version": "2.0", + "sys_global_heaps_size": 0, + "sys_heap_type": "private", + "sys_logical_processors": 8, + "sys_otp_release": "R15B01", + "sys_process_count": 2201, + "sys_smp_support": true, + "sys_system_version": "Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:8:8] [async-threads:64] [kernel-poll:true]", + "sys_system_architecture": "x86_64-unknown-linux-gnu", + "sys_threads_enabled": true, + "sys_thread_pool_size": 64, + "sys_wordsize": 8, + "ring_members": [ + "riak@127.0.0.1" + ], + "ring_num_partitions": 256, + "ring_ownership": "[{'riak@127.0.0.1',256}]", + "ring_creation_size": 256, + "storage_backend": "riak_kv_eleveldb_backend", + "erlydtl_version": "0.7.0", + "riak_control_version": "1.4.12-0-g964c5db", + "cluster_info_version": "1.2.4", + "riak_search_version": "1.4.12-0-g7fe0e00", + "merge_index_version": "1.3.2-0-gcb38ee7", + "riak_kv_version": "1.4.12-0-gc6bbd66", + "sidejob_version": "0.2.0", + "riak_api_version": "1.4.12-0-gd9e1cc8", + "riak_pipe_version": "1.4.12-0-g986a226", + "riak_core_version": "1.4.10", + "bitcask_version": "1.6.8-0-gea14cb0", + "basho_stats_version": "1.0.3", + "webmachine_version": "1.10.4-0-gfcff795", + "mochiweb_version": "1.5.1p6", + "inets_version": "5.9", + "erlang_js_version": "1.2.2", + "runtime_tools_version": "1.8.8", + "os_mon_version": "2.2.9", + "riak_sysmon_version": "1.1.3", + "ssl_version": "5.0.1", + "public_key_version": "0.15", + "crypto_version": "2.1", + "sasl_version": "2.2.1", + "lager_version": "2.0.1", + "goldrush_version": "0.1.5", + "compiler_version": "4.8.1", + "syntax_tools_version": "1.6.8", + "stdlib_version": "1.18.1", + "kernel_version": "2.15.1", + "memory_total": 178856408, + "memory_processes": 58454730, + "memory_processes_used": 58371238, + "memory_system": 120401678, + "memory_atom": 586345, + "memory_atom_used": 563485, + "memory_binary": 48677920, + "memory_code": 12329143, + "memory_ets": 17330176, + "riak_core_stat_ts": 1455908559, + "ignored_gossip_total": 0, + "rings_reconciled_total": 5459, + "rings_reconciled": 0, + "gossip_received": 6, + "rejected_handoffs": 94, + "handoff_timeouts": 0, + "dropped_vnode_requests_total": 0, + "converge_delay_min": 0, + "converge_delay_max": 0, + "converge_delay_mean": 0, + "converge_delay_last": 0, + "rebalance_delay_min": 0, + "rebalance_delay_max": 0, + "rebalance_delay_mean": 0, + "rebalance_delay_last": 0, + "riak_kv_vnodes_running": 16, + "riak_kv_vnodeq_min": 0, + "riak_kv_vnodeq_median": 0, + "riak_kv_vnodeq_mean": 0, + "riak_kv_vnodeq_max": 0, + "riak_kv_vnodeq_total": 0, + "riak_pipe_vnodes_running": 16, + "riak_pipe_vnodeq_min": 0, + "riak_pipe_vnodeq_median": 0, + "riak_pipe_vnodeq_mean": 0, + "riak_pipe_vnodeq_max": 0, + "riak_pipe_vnodeq_total": 0 +} +` diff --git a/plugins/inputs/snmp/snmp_test.go b/plugins/inputs/snmp/snmp_test.go index 8b3f91380..22414fb79 100644 --- a/plugins/inputs/snmp/snmp_test.go +++ b/plugins/inputs/snmp/snmp_test.go @@ -69,6 +69,9 @@ func TestSNMPErrorBulk(t *testing.T) { } func TestSNMPGet1(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } get1 := Data{ Name: "oid1", Unit: "octets", @@ -104,6 +107,9 @@ func TestSNMPGet1(t *testing.T) { } func TestSNMPGet2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } get1 := Data{ Name: "oid1", Oid: "ifNumber", @@ -139,6 +145,9 @@ func TestSNMPGet2(t *testing.T) { } func TestSNMPGet3(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } get1 := Data{ Name: "oid1", Unit: "octets", @@ -177,6 +186,9 @@ func TestSNMPGet3(t *testing.T) { } func TestSNMPEasyGet4(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } get1 := Data{ Name: "oid1", Unit: "octets", @@ -227,6 +239,9 @@ func TestSNMPEasyGet4(t *testing.T) { } func TestSNMPEasyGet5(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } get1 := Data{ Name: "oid1", Unit: "octets", @@ -277,6 +292,9 @@ func TestSNMPEasyGet5(t *testing.T) { } func TestSNMPEasyGet6(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } h := Host{ Address: testutil.GetLocalHost() + ":31161", Community: "telegraf", @@ -307,6 +325,9 @@ func TestSNMPEasyGet6(t *testing.T) { } func TestSNMPBulk1(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } bulk1 := Data{ Name: "oid1", Unit: "octets", diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 830e9d25c..a16e78b5c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -17,7 +17,11 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -const UDP_PACKET_SIZE int = 1500 +const ( + UDP_PACKET_SIZE int = 1500 + + defaultFieldName = "value" +) var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + "You may want to increase allowed_pending_messages in the config\n" @@ -113,9 +117,9 @@ type cachedcounter struct { } type cachedtimings struct { - name string - stats RunningStats - tags map[string]string + name string + fields map[string]RunningStats + tags map[string]string } func (_ *Statsd) Description() string { @@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { now := time.Now() for _, metric := range s.timings { + // Defining a template to parse field names for timers allows us to split + // out multiple fields per timer. In this case we prefix each stat with the + // field name and store these all in a single measurement. fields := make(map[string]interface{}) - fields["mean"] = metric.stats.Mean() - fields["stddev"] = metric.stats.Stddev() - fields["upper"] = metric.stats.Upper() - fields["lower"] = metric.stats.Lower() - fields["count"] = metric.stats.Count() - for _, percentile := range s.Percentiles { - name := fmt.Sprintf("%v_percentile", percentile) - fields[name] = metric.stats.Percentile(percentile) + for fieldName, stats := range metric.fields { + var prefix string + if fieldName != defaultFieldName { + prefix = fieldName + "_" + } + fields[prefix+"mean"] = stats.Mean() + fields[prefix+"stddev"] = stats.Stddev() + fields[prefix+"upper"] = stats.Upper() + fields[prefix+"lower"] = stats.Lower() + fields[prefix+"count"] = stats.Count() + for _, percentile := range s.Percentiles { + name := fmt.Sprintf("%s%v_percentile", prefix, percentile) + fields[name] = stats.Percentile(percentile) + } } + acc.AddFields(metric.name, fields, metric.tags, now) } if s.DeleteTimings { @@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error { // Parse the name & tags from bucket m.name, m.field, m.tags = s.parseName(m.bucket) - // fields are not supported for timings, so if specified combine into - // the name - if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" { - m.name += "_" + m.field - } switch m.mtype { case "c": m.tags["metric_type"] = "counter" @@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { name = strings.Replace(name, "-", "__", -1) } if field == "" { - field = "value" + field = defaultFieldName } return name, field, tags @@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) { func (s *Statsd) aggregate(m metric) { switch m.mtype { case "ms", "h": + // Check if the measurement exists cached, ok := s.timings[m.hash] if !ok { cached = cachedtimings{ - name: m.name, - tags: m.tags, - stats: RunningStats{ - PercLimit: s.PercentileLimit, - }, + name: m.name, + fields: make(map[string]RunningStats), + tags: m.tags, + } + } + // Check if the field exists. If we've not enabled multiple fields per timer + // this will be the default field name, eg. "value" + field, ok := cached.fields[m.field] + if !ok { + field = RunningStats{ + PercLimit: s.PercentileLimit, } } - if m.samplerate > 0 { for i := 0; i < int(1.0/m.samplerate); i++ { - cached.stats.AddValue(m.floatvalue) + field.AddValue(m.floatvalue) } - s.timings[m.hash] = cached } else { - cached.stats.AddValue(m.floatvalue) - s.timings[m.hash] = cached + field.AddValue(m.floatvalue) } + cached.fields[m.field] = field + s.timings[m.hash] = cached case "c": // check if the measurement exists _, ok := s.counters[m.hash] diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index a285467b0..3a87f00aa 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { // A 0 with invalid samplerate will add a single 0, // plus the last bit of value 1 // which adds up to 12 individual datapoints to be cached - if cachedtiming.stats.n != 12 { - t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) + if cachedtiming.fields[defaultFieldName].n != 12 { + t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n) } - if cachedtiming.stats.upper != 1 { - t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) + if cachedtiming.fields[defaultFieldName].upper != 1 { + t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper) } } @@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) { } acc.AssertContainsFields(t, "test_timing", valid) +} +// Tests low-level functionality of timings when multiple fields is enabled +// and a measurement template has been defined which can parse field names +func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { + s := NewStatsd() + s.Templates = []string{"measurement.field"} + s.Percentiles = []int{90} + acc := &testutil.Accumulator{} + + validLines := []string{ + "test_timing.success:1|ms", + "test_timing.success:11|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.error:2|ms", + "test_timing.error:22|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + } + + for _, line := range validLines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + s.Gather(acc) + + valid := map[string]interface{}{ + "success_90_percentile": float64(11), + "success_count": int64(5), + "success_lower": float64(1), + "success_mean": float64(3), + "success_stddev": float64(4), + "success_upper": float64(11), + + "error_90_percentile": float64(22), + "error_count": int64(5), + "error_lower": float64(2), + "error_mean": float64(6), + "error_stddev": float64(8), + "error_upper": float64(22), + } + + acc.AssertContainsFields(t, "test_timing", valid) +} + +// Tests low-level functionality of timings when multiple fields is enabled +// but a measurement template hasn't been defined so we can't parse field names +// In this case the behaviour should be the same as normal behaviour +func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { + s := NewStatsd() + s.Templates = []string{} + s.Percentiles = []int{90} + acc := &testutil.Accumulator{} + + validLines := []string{ + "test_timing.success:1|ms", + "test_timing.success:11|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.error:2|ms", + "test_timing.error:22|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + } + + for _, line := range validLines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + s.Gather(acc) + + expectedSuccess := map[string]interface{}{ + "90_percentile": float64(11), + "count": int64(5), + "lower": float64(1), + "mean": float64(3), + "stddev": float64(4), + "upper": float64(11), + } + expectedError := map[string]interface{}{ + "90_percentile": float64(22), + "count": int64(5), + "lower": float64(2), + "mean": float64(6), + "stddev": float64(8), + "upper": float64(22), + } + + acc.AssertContainsFields(t, "test_timing_success", expectedSuccess) + acc.AssertContainsFields(t, "test_timing_error", expectedError) } func TestParse_Timings_Delete(t *testing.T) { diff --git a/plugins/inputs/system/cpu.go b/plugins/inputs/system/cpu.go index bef2a28f4..035b8e1f5 100644 --- a/plugins/inputs/system/cpu.go +++ b/plugins/inputs/system/cpu.go @@ -33,7 +33,7 @@ var sampleConfig = ` ## Whether to report total system cpu stats or not totalcpu = true ## Comment this line if you want the raw CPU time metrics - drop = ["time_*"] + fielddrop = ["time_*"] ` func (_ *CPUStats) SampleConfig() string { @@ -113,6 +113,10 @@ func totalCpuTime(t cpu.CPUTimesStat) float64 { func init() { inputs.Add("cpu", func() telegraf.Input { - return &CPUStats{ps: &systemPS{}} + return &CPUStats{ + PerCPU: true, + TotalCPU: true, + ps: &systemPS{}, + } }) } diff --git a/plugins/inputs/system/disk.go b/plugins/inputs/system/disk.go index 0488c839a..5784a7322 100644 --- a/plugins/inputs/system/disk.go +++ b/plugins/inputs/system/disk.go @@ -14,6 +14,7 @@ type DiskStats struct { Mountpoints []string MountPoints []string + IgnoreFS []string `toml:"ignore_fs"` } func (_ *DiskStats) Description() string { @@ -24,6 +25,10 @@ var diskSampleConfig = ` ## By default, telegraf gather stats for all mountpoints. ## Setting mountpoints will restrict the stats to the specified mountpoints. # mount_points = ["/"] + + ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually + ## present on /run, /var/run, /dev/shm or /dev). + ignore_fs = ["tmpfs", "devtmpfs"] ` func (_ *DiskStats) SampleConfig() string { @@ -36,12 +41,16 @@ func (s *DiskStats) Gather(acc telegraf.Accumulator) error { s.MountPoints = s.Mountpoints } - disks, err := s.ps.DiskUsage(s.MountPoints) + disks, err := s.ps.DiskUsage(s.MountPoints, s.IgnoreFS) if err != nil { return fmt.Errorf("error getting disk usage info: %s", err) } for _, du := range disks { + if du.Total == 0 { + // Skip dummy filesystem (procfs, cgroupfs, ...) + continue + } tags := map[string]string{ "path": du.Path, "fstype": du.Fstype, @@ -79,11 +88,11 @@ func (_ *DiskIOStats) Description() string { } var diskIoSampleConfig = ` - # By default, telegraf will gather stats for all devices including - # disk partitions. - # Setting devices will restrict the stats to the specified devices. + ## By default, telegraf will gather stats for all devices including + ## disk partitions. + ## Setting devices will restrict the stats to the specified devices. # devices = ["sda", "sdb"] - # Uncomment the following line if you do not need disk serial numbers. + ## Uncomment the following line if you do not need disk serial numbers. # skip_serial_number = true ` diff --git a/plugins/inputs/system/disk_test.go b/plugins/inputs/system/disk_test.go index 86537be23..0a722148b 100644 --- a/plugins/inputs/system/disk_test.go +++ b/plugins/inputs/system/disk_test.go @@ -50,9 +50,9 @@ func TestDiskStats(t *testing.T) { }, } - mps.On("DiskUsage", []string(nil)).Return(duAll, nil) - mps.On("DiskUsage", []string{"/", "/dev"}).Return(duFiltered, nil) - mps.On("DiskUsage", []string{"/", "/home"}).Return(duAll, nil) + mps.On("DiskUsage", []string(nil), []string(nil)).Return(duAll, nil) + mps.On("DiskUsage", []string{"/", "/dev"}, []string(nil)).Return(duFiltered, nil) + mps.On("DiskUsage", []string{"/", "/home"}, []string(nil)).Return(duAll, nil) err = (&DiskStats{ps: &mps}).Gather(&acc) require.NoError(t, err) diff --git a/plugins/inputs/system/mock_PS.go b/plugins/inputs/system/mock_PS.go index 6e9a5f93e..fd6afda0f 100644 --- a/plugins/inputs/system/mock_PS.go +++ b/plugins/inputs/system/mock_PS.go @@ -33,8 +33,8 @@ func (m *MockPS) CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) { return r0, r1 } -func (m *MockPS) DiskUsage(mountPointFilter []string) ([]*disk.DiskUsageStat, error) { - ret := m.Called(mountPointFilter) +func (m *MockPS) DiskUsage(mountPointFilter []string, fstypeExclude []string) ([]*disk.DiskUsageStat, error) { + ret := m.Called(mountPointFilter, fstypeExclude) r0 := ret.Get(0).([]*disk.DiskUsageStat) r1 := ret.Error(1) diff --git a/plugins/inputs/system/ps.go b/plugins/inputs/system/ps.go index 0a505bfc4..f1a1b27d7 100644 --- a/plugins/inputs/system/ps.go +++ b/plugins/inputs/system/ps.go @@ -14,7 +14,7 @@ import ( type PS interface { CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) - DiskUsage(mountPointFilter []string) ([]*disk.DiskUsageStat, error) + DiskUsage(mountPointFilter []string, fstypeExclude []string) ([]*disk.DiskUsageStat, error) NetIO() ([]net.NetIOCountersStat, error) NetProto() ([]net.NetProtoCountersStat, error) DiskIO() (map[string]disk.DiskIOCountersStat, error) @@ -53,6 +53,7 @@ func (s *systemPS) CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) { func (s *systemPS) DiskUsage( mountPointFilter []string, + fstypeExclude []string, ) ([]*disk.DiskUsageStat, error) { parts, err := disk.DiskPartitions(true) if err != nil { @@ -60,9 +61,13 @@ func (s *systemPS) DiskUsage( } // Make a "set" out of the filter slice - filterSet := make(map[string]bool) + mountPointFilterSet := make(map[string]bool) for _, filter := range mountPointFilter { - filterSet[filter] = true + mountPointFilterSet[filter] = true + } + fstypeExcludeSet := make(map[string]bool) + for _, filter := range fstypeExclude { + fstypeExcludeSet[filter] = true } var usage []*disk.DiskUsageStat @@ -71,7 +76,7 @@ func (s *systemPS) DiskUsage( if len(mountPointFilter) > 0 { // If the mount point is not a member of the filter set, // don't gather info on it. - _, ok := filterSet[p.Mountpoint] + _, ok := mountPointFilterSet[p.Mountpoint] if !ok { continue } @@ -81,6 +86,12 @@ func (s *systemPS) DiskUsage( if err != nil { return nil, err } + // If the mount point is a member of the exclude set, + // don't gather info on it. + _, ok := fstypeExcludeSet[p.Fstype] + if ok { + continue + } du.Fstype = p.Fstype usage = append(usage, du) } diff --git a/plugins/outputs/file/file.go b/plugins/outputs/file/file.go index e593e3cea..743c0f03f 100644 --- a/plugins/outputs/file/file.go +++ b/plugins/outputs/file/file.go @@ -36,6 +36,11 @@ func (f *File) SetSerializer(serializer serializers.Serializer) { func (f *File) Connect() error { writers := []io.Writer{} + + if len(f.Files) == 0 { + f.Files = []string{"stdout"} + } + for _, file := range f.Files { if file == "stdout" { writers = append(writers, os.Stdout) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 683227717..60d235511 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -52,9 +52,9 @@ var sampleConfig = ` ## note: using "s" precision greatly improves InfluxDB compression precision = "s" - ## Connection timeout (for the connection with InfluxDB), formatted as a string. - ## If not provided, will default to 0 (no timeout) - # timeout = "5s" + ## Write timeout (for the InfluxDB client), formatted as a string. + ## If not provided, will default to 5s. 0s means no timeout (not recommended). + timeout = "5s" # username = "telegraf" # password = "metricsmetricsmetricsmetrics" ## Set the user agent for HTTP POSTs (can be useful for log differentiation) @@ -185,6 +185,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func init() { outputs.Add("influxdb", func() telegraf.Output { - return &InfluxDB{} + return &InfluxDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } }) } diff --git a/scripts/build.py b/scripts/build.py index b25b44982..1465e36f3 100755 --- a/scripts/build.py +++ b/scripts/build.py @@ -30,6 +30,7 @@ INIT_SCRIPT = "scripts/init.sh" SYSTEMD_SCRIPT = "scripts/telegraf.service" LOGROTATE_SCRIPT = "etc/logrotate.d/telegraf" DEFAULT_CONFIG = "etc/telegraf.conf" +DEFAULT_WINDOWS_CONFIG = "etc/telegraf_windows.conf" POSTINST_SCRIPT = "scripts/post-install.sh" PREINST_SCRIPT = "scripts/pre-install.sh" @@ -70,13 +71,13 @@ targets = { supported_builds = { 'darwin': [ "amd64", "i386" ], - 'windows': [ "amd64", "i386", "arm" ], + 'windows': [ "amd64", "i386" ], 'linux': [ "amd64", "i386", "arm" ] } supported_packages = { "darwin": [ "tar", "zip" ], "linux": [ "deb", "rpm", "tar", "zip" ], - "windows": [ "tar", "zip" ], + "windows": [ "zip" ], } supported_tags = { # "linux": { @@ -287,6 +288,8 @@ def build(version=None, print("Starting build...") for b, c in targets.items(): + if platform == 'windows': + b = b + '.exe' print("\t- Building '{}'...".format(os.path.join(outdir, b))) build_command = "" build_command += "GOOS={} GOARCH={} ".format(platform, arch) @@ -349,20 +352,25 @@ def create_package_fs(build_root): create_dir(os.path.join(build_root, d)) os.chmod(os.path.join(build_root, d), 0o755) -def package_scripts(build_root): +def package_scripts(build_root, windows=False): print("\t- Copying scripts and sample configuration to build directory") - shutil.copyfile(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1])) - os.chmod(os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1]), 0o644) - shutil.copyfile(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1])) - os.chmod(os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1]), 0o644) - shutil.copyfile(LOGROTATE_SCRIPT, os.path.join(build_root, LOGROTATE_DIR[1:], "telegraf")) - os.chmod(os.path.join(build_root, LOGROTATE_DIR[1:], "telegraf"), 0o644) - shutil.copyfile(DEFAULT_CONFIG, os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf")) - os.chmod(os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf"), 0o644) + if windows: + shutil.copyfile(DEFAULT_WINDOWS_CONFIG, os.path.join(build_root, "telegraf.conf")) + os.chmod(os.path.join(build_root, "telegraf.conf"), 0o644) + else: + shutil.copyfile(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1])) + os.chmod(os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1]), 0o644) + shutil.copyfile(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1])) + os.chmod(os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1]), 0o644) + shutil.copyfile(LOGROTATE_SCRIPT, os.path.join(build_root, LOGROTATE_DIR[1:], "telegraf")) + os.chmod(os.path.join(build_root, LOGROTATE_DIR[1:], "telegraf"), 0o644) + shutil.copyfile(DEFAULT_CONFIG, os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf")) + os.chmod(os.path.join(build_root, CONFIG_DIR[1:], "telegraf.conf"), 0o644) def go_get(): print("Retrieving Go dependencies...") run("go get github.com/sparrc/gdm") + run("gdm restore -f Godeps_windows") run("gdm restore") def generate_md5_from_file(path): @@ -393,15 +401,18 @@ def build_packages(build_output, version, pkg_arch, nightly=False, rc=None, iter build_root = os.path.join(tmp_build_dir, p, a) # Create directory tree to mimic file system of package create_dir(build_root) - create_package_fs(build_root) - # Copy in packaging and miscellaneous scripts - package_scripts(build_root) + if p == 'windows': + package_scripts(build_root, windows=True) + else: + create_package_fs(build_root) + # Copy in packaging and miscellaneous scripts + package_scripts(build_root) # Copy newly-built binaries to packaging directory for b in targets: if p == 'windows': b = b + '.exe' fr = os.path.join(current_location, b) - to = os.path.join(build_root, INSTALL_ROOT_DIR[1:], b) + to = os.path.join(build_root, b) print("\t- [{}][{}] - Moving from '{}' to '{}'".format(p, a, fr, to)) copy_file(fr, to) # Package the directory structure @@ -429,34 +440,44 @@ def build_packages(build_output, version, pkg_arch, nightly=False, rc=None, iter a = pkg_arch if a == '386': a = 'i386' - fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( - fpm_common_args, - name, - a, - package_type, - package_version, - package_iteration, - build_root, - current_location) - if pkg_arch is not None: - a = saved_a - if package_type == "rpm": - fpm_command += "--depends coreutils " - fpm_command += "--depends lsof" - out = run(fpm_command, shell=True) - matches = re.search(':path=>"(.*)"', out) - outfile = None - if matches is not None: - outfile = matches.groups()[0] - if outfile is None: - print("[ COULD NOT DETERMINE OUTPUT ]") - else: - # Strip nightly version (the unix epoch) from filename - if nightly and package_type in ['deb', 'rpm']: - outfile = rename_file(outfile, outfile.replace("{}-{}".format(version, iteration), "nightly")) - outfiles.append(os.path.join(os.getcwd(), outfile)) - # Display MD5 hash for generated package + if package_type == 'zip': + zip_command = "cd {} && zip {}.zip ./*".format( + build_root, + name) + run(zip_command, shell=True) + run("mv {}.zip {}".format(os.path.join(build_root, name), current_location), shell=True) + outfile = os.path.join(current_location, name+".zip") + outfiles.append(outfile) print("\t\tMD5 = {}".format(generate_md5_from_file(outfile))) + else: + fpm_command = "fpm {} --name {} -a {} -t {} --version {} --iteration {} -C {} -p {} ".format( + fpm_common_args, + name, + a, + package_type, + package_version, + package_iteration, + build_root, + current_location) + if pkg_arch is not None: + a = saved_a + if package_type == "rpm": + fpm_command += "--depends coreutils " + fpm_command += "--depends lsof" + out = run(fpm_command, shell=True) + matches = re.search(':path=>"(.*)"', out) + outfile = None + if matches is not None: + outfile = matches.groups()[0] + if outfile is None: + print("[ COULD NOT DETERMINE OUTPUT ]") + else: + # Strip nightly version (the unix epoch) from filename + if nightly and package_type in ['deb', 'rpm']: + outfile = rename_file(outfile, outfile.replace("{}-{}".format(version, iteration), "nightly")) + outfiles.append(os.path.join(os.getcwd(), outfile)) + # Display MD5 hash for generated package + print("\t\tMD5 = {}".format(generate_md5_from_file(outfile))) print("") if debug: print("[DEBUG] package outfiles: {}".format(outfiles)) diff --git a/scripts/circle-test.sh b/scripts/circle-test.sh index 72f297f9f..91511b050 100755 --- a/scripts/circle-test.sh +++ b/scripts/circle-test.sh @@ -70,4 +70,12 @@ exit_if_fail telegraf -config $tmpdir/config.toml \ mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS -exit $rc +eval "git describe --exact-match HEAD" +if [ $? -eq 0 ]; then + unset GOGC + tag=$(git describe --exact-match HEAD) + echo $tag + exit_if_fail ./scripts/build.py --package --version=$tag --platform=linux --arch=all --upload + exit_if_fail ./scripts/build.py --package --version=$tag --platform=windows --arch=all --upload + mv build $CIRCLE_ARTIFACTS +fi diff --git a/testutil/accumulator.go b/testutil/accumulator.go index cb56d8d28..9b6fb2373 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -133,13 +133,7 @@ func (a *Accumulator) AssertContainsTaggedFields( } if p.Measurement == measurement { - if !reflect.DeepEqual(fields, p.Fields) { - pActual, _ := json.MarshalIndent(p.Fields, "", " ") - pExp, _ := json.MarshalIndent(fields, "", " ") - msg := fmt.Sprintf("Actual:\n%s\n(%T) \nExpected:\n%s\n(%T)", - string(pActual), p.Fields, string(pExp), fields) - assert.Fail(t, msg) - } + assert.Equal(t, fields, p.Fields) return } } @@ -156,13 +150,7 @@ func (a *Accumulator) AssertContainsFields( defer a.Unlock() for _, p := range a.Metrics { if p.Measurement == measurement { - if !reflect.DeepEqual(fields, p.Fields) { - pActual, _ := json.MarshalIndent(p.Fields, "", " ") - pExp, _ := json.MarshalIndent(fields, "", " ") - msg := fmt.Sprintf("Actual:\n%s\n(%T) \nExpected:\n%s\n(%T)", - string(pActual), p.Fields, string(pExp), fields) - assert.Fail(t, msg) - } + assert.Equal(t, fields, p.Fields) return } }