From 5f3a1fcb8619a46dfbc9ce882693dbcb05a43d0c Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 11 Dec 2015 13:07:32 -0700 Subject: [PATCH] Updating system plugins for 0.3.0 --- CHANGELOG.md | 13 + CONFIGURATION.md | 177 ++++++++ README.md | 133 +----- accumulator.go | 74 +++- agent.go | 6 +- etc/telegraf.conf | 4 +- internal/config/config.go | 59 ++- internal/config/testdata/telegraf-agent.toml | 2 +- plugins/system/cpu.go | 53 +-- plugins/system/cpu_test.go | 106 +++++ plugins/system/disk.go | 34 +- plugins/system/disk_test.go | 161 +++++++ plugins/system/docker.go | 77 ++-- plugins/system/memory.go | 40 +- plugins/system/memory_test.go | 73 ++++ plugins/system/net.go | 19 +- plugins/system/net_test.go | 88 ++++ plugins/system/netstat.go | 30 +- plugins/system/ps.go | 52 +++ plugins/system/system.go | 13 +- plugins/system/system_test.go | 426 ------------------- 21 files changed, 928 insertions(+), 712 deletions(-) create mode 100644 CONFIGURATION.md create mode 100644 plugins/system/cpu_test.go create mode 100644 plugins/system/disk_test.go create mode 100644 plugins/system/memory_test.go create mode 100644 plugins/system/net_test.go delete mode 100644 plugins/system/system_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f562c3ea..d3ba9574c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## v0.3.0 [unreleased] + +### Release Notes +- **breaking change** the `io` plugin has been renamed `diskio` +- **breaking change** Plugin measurements aggregated into a single measurement. + +### Features +- Plugin measurements aggregated into a single measurement. +- Added ability to specify per-plugin tags +- Added ability to specify per-plugin measurement suffix and prefix + +### Bugfixes + ## v0.2.5 [unreleased] ### Features diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 000000000..c1040df2d --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,177 @@ +# Telegraf Configuration + +## Plugin Configuration + +There are some configuration options that are configurable per plugin: + +* **name_override**: Override the base name of the measurement. +(Default is the name of the plugin). +* **name_prefix**: Specifies a prefix to attach to the measurement name. +* **name_suffix**: Specifies a suffix to attach to the measurement name. +* **tags**: A map of tags to apply to a specific plugin's measurements. + +### Plugin Filters + +There are also filters that can be configured per plugin: + +* **pass**: An array of strings that is used to filter metrics generated by the +current plugin. Each string in the array is tested as a glob match against field names +and if it matches, the field is emitted. +* **drop**: The inverse of pass, if a field name matches, it is not emitted. +* **tagpass**: tag names and arrays of strings that are used to filter +measurements by the current plugin. Each string in the array is tested as a glob +match against the tag name, and if it matches the measurement is emitted. +* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not emitted. +This is tested on measurements that have passed the tagpass test. +* **interval**: How often to gather this metric. Normal plugins use a single +global interval, but if one particular plugin should be run less or more often, +you can configure that here. + +### Plugin Configuration Examples + +This is a full working config that will output CPU data to an InfluxDB instance +at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output +measurements at a 10s interval and will collect per-cpu data, dropping any +fields which begin with `time_`. + +```toml +[tags] + dc = "denver-1" + +[agent] + interval = "10s" + +# OUTPUTS +[outputs] +[[outputs.influxdb]] + url = "http://192.168.59.103:8086" # required. + database = "telegraf" # required. + precision = "s" + +# PLUGINS +[plugins] +[[plugins.cpu]] + percpu = true + totalcpu = false + # filter all fields beginning with 'time_' + drop = ["time_*"] +``` + +### Plugin Config: tagpass and tagdrop + +```toml +[plugins] +[[plugins.cpu]] + percpu = true + totalcpu = false + drop = ["cpu_time"] + # Don't collect CPU data for cpu6 & cpu7 + [plugins.cpu.tagdrop] + cpu = [ "cpu6", "cpu7" ] + +[[plugins.disk]] + [plugins.disk.tagpass] + # tagpass conditions are OR, not AND. + # If the (filesystem is ext4 or xfs) OR (the path is /opt or /home) + # then the metric passes + fstype = [ "ext4", "xfs" ] + # Globs can also be used on the tag values + path = [ "/opt", "/home*" ] +``` + +### Plugin Config: pass and drop + +```toml +# Drop all metrics for guest & steal CPU usage +[[plugins.cpu]] + percpu = false + totalcpu = true + drop = ["usage_guest", "usage_steal"] + +# Only store inode related metrics for disks +[[plugins.disk]] + pass = ["inodes*"] +``` + +### Plugin config: prefix, suffix, and override + +This plugin will emit measurements with the name `cpu_total` + +```toml +[[plugins.cpu]] + name_suffix = "_total" + percpu = false + totalcpu = true +``` + +This will emit measurements with the name `foobar` + +```toml +[[plugins.cpu]] + name_override = "foobar" + percpu = false + totalcpu = true +``` + +### Plugin config: tags + +This plugin will emit measurements with two additional tags: `tag1=foo` and +`tag2=bar` + +```toml +[[plugins.cpu]] + percpu = false + totalcpu = true + [plugins.cpu.tags] + tag1 = "foo" + tag2 = "bar" +``` + +### Multiple plugins of the same type + +Additional plugins (or outputs) of the same type can be specified, +just define more instances in the config file: + +```toml +[[plugins.cpu]] + percpu = false + totalcpu = true + +[[plugins.cpu]] + percpu = true + totalcpu = false + drop = ["cpu_time*"] +``` + +## Output Configuration + +Telegraf also supports specifying multiple output sinks to send data to, +configuring each output sink is different, but examples can be +found by running `telegraf -sample-config`. + +Outputs also support the same configurable options as plugins +(pass, drop, tagpass, tagdrop), added in 0.2.4 + +```toml +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf" + precision = "s" + # Drop all measurements that start with "aerospike" + drop = ["aerospike*"] + +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf-aerospike-data" + precision = "s" + # Only accept aerospike data: + pass = ["aerospike*"] + +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf-cpu0-data" + precision = "s" + # Only store measurements where the tag "cpu" matches the value "cpu0" + [outputs.influxdb.tagpass] + cpu = ["cpu0"] +``` diff --git a/README.md b/README.md index 4130d55bb..aefcece44 100644 --- a/README.md +++ b/README.md @@ -116,99 +116,10 @@ unit parser, e.g. "10s" for 10 seconds or "5m" for 5 minutes. * **debug**: Set to true to gather and send metrics to STDOUT as well as InfluxDB. -## Plugin Options +## Configuration -There are 5 configuration options that are configurable per plugin: - -* **pass**: An array of strings that is used to filter metrics generated by the -current plugin. Each string in the array is tested as a glob match against metric names -and if it matches, the metric is emitted. -* **drop**: The inverse of pass, if a metric name matches, it is not emitted. -* **tagpass**: tag names and arrays of strings that are used to filter metrics by the current plugin. Each string in the array is tested as a glob match against -the tag name, and if it matches the metric is emitted. -* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted. -This is tested on metrics that have passed the tagpass test. -* **interval**: How often to gather this metric. Normal plugins use a single -global interval, but if one particular plugin should be run less or more often, -you can configure that here. - -### Plugin Configuration Examples - -This is a full working config that will output CPU data to an InfluxDB instance -at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output -measurements at a 10s interval and will collect per-cpu data, dropping any -measurements which begin with `cpu_time`. - -```toml -[tags] - dc = "denver-1" - -[agent] - interval = "10s" - -# OUTPUTS -[outputs] -[[outputs.influxdb]] - url = "http://192.168.59.103:8086" # required. - database = "telegraf" # required. - precision = "s" - -# PLUGINS -[plugins] -[[plugins.cpu]] - percpu = true - totalcpu = false - drop = ["cpu_time*"] -``` - -Below is how to configure `tagpass` and `tagdrop` parameters - -```toml -[plugins] -[[plugins.cpu]] - percpu = true - totalcpu = false - drop = ["cpu_time"] - # Don't collect CPU data for cpu6 & cpu7 - [plugins.cpu.tagdrop] - cpu = [ "cpu6", "cpu7" ] - -[[plugins.disk]] - [plugins.disk.tagpass] - # tagpass conditions are OR, not AND. - # If the (filesystem is ext4 or xfs) OR (the path is /opt or /home) - # then the metric passes - fstype = [ "ext4", "xfs" ] - # Globs can also be used on the tag values - path = [ "/opt", "/home*" ] -``` - -Below is how to configure `pass` and `drop` parameters - -```toml -# Drop all metrics for guest CPU usage -[[plugins.cpu]] - drop = [ "cpu_usage_guest" ] - -# Only store inode related metrics for disks -[[plugins.disk]] - pass = [ "disk_inodes*" ] -``` - - -Additional plugins (or outputs) of the same type can be specified, -just define more instances in the config file: - -```toml -[[plugins.cpu]] - percpu = false - totalcpu = true - -[[plugins.cpu]] - percpu = true - totalcpu = false - drop = ["cpu_time*"] -``` +See the [configuration guide](CONFIGURATION.md) for a rundown of the more advanced +configuration options. ## Supported Plugins @@ -226,7 +137,7 @@ Telegraf currently has support for collecting metrics from: * haproxy * httpjson (generic JSON-emitting http service plugin) * influxdb -* jolokia (remote JMX with JSON over HTTP) +* jolokia * leofs * lustre2 * mailchimp @@ -249,10 +160,10 @@ Telegraf currently has support for collecting metrics from: * system * cpu * mem - * io * net * netstat * disk + * diskio * swap ## Supported Service Plugins @@ -265,40 +176,6 @@ Telegraf can collect metrics via the following services: We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. -## Output options - -Telegraf also supports specifying multiple output sinks to send data to, -configuring each output sink is different, but examples can be -found by running `telegraf -sample-config`. - -Outputs also support the same configurable options as plugins -(pass, drop, tagpass, tagdrop), added in 0.2.4 - -```toml -[[outputs.influxdb]] - urls = [ "http://localhost:8086" ] - database = "telegraf" - precision = "s" - # Drop all measurements that start with "aerospike" - drop = ["aerospike*"] - -[[outputs.influxdb]] - urls = [ "http://localhost:8086" ] - database = "telegraf-aerospike-data" - precision = "s" - # Only accept aerospike data: - pass = ["aerospike*"] - -[[outputs.influxdb]] - urls = [ "http://localhost:8086" ] - database = "telegraf-cpu0-data" - precision = "s" - # Only store measurements where the tag "cpu" matches the value "cpu0" - [outputs.influxdb.tagpass] - cpu = ["cpu0"] -``` - - ## Supported Outputs * influxdb diff --git a/accumulator.go b/accumulator.go index 8dbf2e8aa..2defc8c7b 100644 --- a/accumulator.go +++ b/accumulator.go @@ -69,30 +69,72 @@ func (ac *accumulator) AddFields( tags map[string]string, t ...time.Time, ) { - // Validate uint64 and float64 fields + if !ac.pluginConfig.Filter.ShouldTagsPass(tags) { + return + } + + // Override measurement name if set + if len(ac.pluginConfig.NameOverride) != 0 { + measurement = ac.pluginConfig.NameOverride + } + // Apply measurement prefix and suffix if set + if len(ac.pluginConfig.MeasurementPrefix) != 0 { + measurement = ac.pluginConfig.MeasurementPrefix + measurement + } + if len(ac.pluginConfig.MeasurementSuffix) != 0 { + measurement = measurement + ac.pluginConfig.MeasurementSuffix + } + + if tags == nil { + tags = make(map[string]string) + } + // Apply plugin-wide tags if set + for k, v := range ac.pluginConfig.Tags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + // Apply daemon-wide tags if set + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + result := make(map[string]interface{}) for k, v := range fields { + // Filter out any filtered fields + if ac.pluginConfig != nil { + if !ac.pluginConfig.Filter.ShouldPass(k) { + continue + } + } + result[k] = v + + // Validate uint64 and float64 fields switch val := v.(type) { case uint64: // InfluxDB does not support writing uint64 if val < uint64(9223372036854775808) { - fields[k] = int64(val) + result[k] = int64(val) } else { - fields[k] = int64(9223372036854775807) + result[k] = int64(9223372036854775807) } case float64: // NaNs are invalid values in influxdb, skip measurement if math.IsNaN(val) || math.IsInf(val, 0) { if ac.debug { - log.Printf("Measurement [%s] has a NaN or Inf field, skipping", - measurement) + log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+ + "field, skipping", + measurement, k) } - return + continue } } } - - if tags == nil { - tags = make(map[string]string) + fields = nil + if len(result) == 0 { + return } var timestamp time.Time @@ -106,19 +148,7 @@ func (ac *accumulator) AddFields( measurement = ac.prefix + measurement } - if ac.pluginConfig != nil { - if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) { - return - } - } - - for k, v := range ac.defaultTags { - if _, ok := tags[k]; !ok { - tags[k] = v - } - } - - pt, err := client.NewPoint(measurement, tags, fields, timestamp) + pt, err := client.NewPoint(measurement, tags, result, timestamp) if err != nil { log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) return diff --git a/agent.go b/agent.go index 68b1b5f16..1658027a7 100644 --- a/agent.go +++ b/agent.go @@ -104,7 +104,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - acc.SetPrefix(plugin.Name + "_") + // acc.SetPrefix(plugin.Name + "_") acc.SetDefaultTags(a.Config.Tags) if err := plugin.Plugin.Gather(acc); err != nil { @@ -141,7 +141,7 @@ func (a *Agent) gatherSeparate( acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - acc.SetPrefix(plugin.Name + "_") + // acc.SetPrefix(plugin.Name + "_") acc.SetDefaultTags(a.Config.Tags) if err := plugin.Plugin.Gather(acc); err != nil { @@ -187,7 +187,7 @@ func (a *Agent) Test() error { for _, plugin := range a.Config.Plugins { acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(true) - acc.SetPrefix(plugin.Name + "_") + // acc.SetPrefix(plugin.Name + "_") fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name) if plugin.Config.Interval != 0 { diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 0781d3028..9460cef25 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -97,8 +97,8 @@ # Mountpoints=["/"] # Read metrics about disk IO by device -[[plugins.io]] - # By default, telegraf will gather stats for all devices including +[[plugins.diskio]] + # By default, telegraf will gather stats for all devices including # disk partitions. # Setting devices will restrict the stats to the specified devices. # Devices=["sda","sdb"] diff --git a/internal/config/config.go b/internal/config/config.go index 348496f0a..0270a3913 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -112,9 +112,13 @@ type Filter struct { // PluginConfig containing a name, interval, and filter type PluginConfig struct { - Name string - Filter Filter - Interval time.Duration + Name string + NameOverride string + MeasurementPrefix string + MeasurementSuffix string + Tags map[string]string + Filter Filter + Interval time.Duration } // OutputConfig containing name and filter @@ -142,12 +146,12 @@ func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point { // ShouldPass returns true if the metric should pass, false if should drop // based on the drop/pass filter parameters -func (f Filter) ShouldPass(measurement string) bool { +func (f Filter) ShouldPass(fieldkey string) bool { if f.Pass != nil { for _, pat := range f.Pass { // TODO remove HasPrefix check, leaving it for now for legacy support. // Cam, 2015-12-07 - if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { + if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { return true } } @@ -158,7 +162,7 @@ func (f Filter) ShouldPass(measurement string) bool { for _, pat := range f.Drop { // TODO remove HasPrefix check, leaving it for now for legacy support. // Cam, 2015-12-07 - if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { + if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { return false } } @@ -628,7 +632,8 @@ func buildFilter(tbl *ast.Table) Filter { return f } -// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a +// buildPlugin parses plugin specific items from the ast.Table, +// builds the filter and returns a // PluginConfig to be inserted into RunningPlugin func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { cp := &PluginConfig{Name: name} @@ -644,10 +649,47 @@ func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { } } } + + if node, ok := tbl.Fields["name_prefix"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + cp.MeasurementPrefix = str.Value + } + } + } + + if node, ok := tbl.Fields["name_suffix"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + cp.MeasurementSuffix = str.Value + } + } + } + + if node, ok := tbl.Fields["name_override"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + cp.NameOverride = str.Value + } + } + } + + cp.Tags = make(map[string]string) + if node, ok := tbl.Fields["tags"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + if err := toml.UnmarshalTable(subtbl, cp.Tags); err != nil { + log.Printf("Could not parse tags for plugin %s\n", name) + } + } + } + + delete(tbl.Fields, "name_prefix") + delete(tbl.Fields, "name_suffix") + delete(tbl.Fields, "name_override") delete(tbl.Fields, "interval") + delete(tbl.Fields, "tags") cp.Filter = buildFilter(tbl) return cp, nil - } // buildOutput parses output specific items from the ast.Table, builds the filter and returns an @@ -659,5 +701,4 @@ func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { Filter: buildFilter(tbl), } return oc, nil - } diff --git a/internal/config/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml index e63e47b56..42ce89cd8 100644 --- a/internal/config/testdata/telegraf-agent.toml +++ b/internal/config/testdata/telegraf-agent.toml @@ -105,7 +105,7 @@ urls = ["http://localhost/server-status?auto"] drop = ["cpu_time"] # Read metrics about disk usage by mount point -[[plugins.disk]] +[[plugins.diskio]] # no configuration # Read metrics from one or many disque servers diff --git a/plugins/system/cpu.go b/plugins/system/cpu.go index 837a1bc23..24350fc6c 100644 --- a/plugins/system/cpu.go +++ b/plugins/system/cpu.go @@ -2,6 +2,7 @@ package system import ( "fmt" + "time" "github.com/influxdb/telegraf/plugins" "github.com/shirou/gopsutil/cpu" @@ -31,7 +32,7 @@ var sampleConfig = ` # Whether to report total system cpu stats or not totalcpu = true # Comment this line if you want the raw CPU time metrics - drop = ["cpu_time*"] + drop = ["time_*"] ` func (_ *CPUStats) SampleConfig() string { @@ -43,6 +44,7 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error { if err != nil { return fmt.Errorf("error getting CPU info: %s", err) } + now := time.Now() for i, cts := range times { tags := map[string]string{ @@ -51,21 +53,24 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error { total := totalCpuTime(cts) - // Add total cpu numbers - add(acc, "time_user", cts.User, tags) - add(acc, "time_system", cts.System, tags) - add(acc, "time_idle", cts.Idle, tags) - add(acc, "time_nice", cts.Nice, tags) - add(acc, "time_iowait", cts.Iowait, tags) - add(acc, "time_irq", cts.Irq, tags) - add(acc, "time_softirq", cts.Softirq, tags) - add(acc, "time_steal", cts.Steal, tags) - add(acc, "time_guest", cts.Guest, tags) - add(acc, "time_guest_nice", cts.GuestNice, tags) + // Add cpu time metrics + fields := map[string]interface{}{ + "time_user": cts.User, + "time_system": cts.System, + "time_idle": cts.Idle, + "time_nice": cts.Nice, + "time_iowait": cts.Iowait, + "time_irq": cts.Irq, + "time_softirq": cts.Softirq, + "time_steal": cts.Steal, + "time_guest": cts.Guest, + "time_guest_nice": cts.GuestNice, + } // Add in percentage if len(s.lastStats) == 0 { - // If it's the 1st gather, can't get CPU stats yet + acc.AddFields("cpu", fields, tags, now) + // If it's the 1st gather, can't get CPU Usage stats yet continue } lastCts := s.lastStats[i] @@ -81,17 +86,17 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error { continue } - add(acc, "usage_user", 100*(cts.User-lastCts.User)/totalDelta, tags) - add(acc, "usage_system", 100*(cts.System-lastCts.System)/totalDelta, tags) - add(acc, "usage_idle", 100*(cts.Idle-lastCts.Idle)/totalDelta, tags) - add(acc, "usage_nice", 100*(cts.Nice-lastCts.Nice)/totalDelta, tags) - add(acc, "usage_iowait", 100*(cts.Iowait-lastCts.Iowait)/totalDelta, tags) - add(acc, "usage_irq", 100*(cts.Irq-lastCts.Irq)/totalDelta, tags) - add(acc, "usage_softirq", 100*(cts.Softirq-lastCts.Softirq)/totalDelta, tags) - add(acc, "usage_steal", 100*(cts.Steal-lastCts.Steal)/totalDelta, tags) - add(acc, "usage_guest", 100*(cts.Guest-lastCts.Guest)/totalDelta, tags) - add(acc, "usage_guest_nice", 100*(cts.GuestNice-lastCts.GuestNice)/totalDelta, tags) - + fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta + fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta + fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta + fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta + fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta + fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta + fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta + fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta + fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta + fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta + acc.AddFields("cpu", fields, tags, now) } s.lastStats = times diff --git a/plugins/system/cpu_test.go b/plugins/system/cpu_test.go new file mode 100644 index 000000000..843d166cb --- /dev/null +++ b/plugins/system/cpu_test.go @@ -0,0 +1,106 @@ +package system + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/shirou/gopsutil/cpu" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCPUStats(t *testing.T) { + var mps MockPS + defer mps.AssertExpectations(t) + var acc testutil.Accumulator + + cts := cpu.CPUTimesStat{ + CPU: "cpu0", + User: 3.1, + System: 8.2, + Idle: 80.1, + Nice: 1.3, + Iowait: 0.2, + Irq: 0.1, + Softirq: 0.11, + Steal: 0.0511, + Guest: 8.1, + GuestNice: 0.324, + } + + cts2 := cpu.CPUTimesStat{ + CPU: "cpu0", + User: 11.4, // increased by 8.3 + System: 10.9, // increased by 2.7 + Idle: 158.8699, // increased by 78.7699 (for total increase of 100) + Nice: 2.5, // increased by 1.2 + Iowait: 0.7, // increased by 0.5 + Irq: 1.2, // increased by 1.1 + Softirq: 0.31, // increased by 0.2 + Steal: 0.2812, // increased by 0.0001 + Guest: 12.9, // increased by 4.8 + GuestNice: 2.524, // increased by 2.2 + } + + mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil) + + cs := NewCPUStats(&mps) + + cputags := map[string]string{ + "cpu": "cpu0", + } + + err := cs.Gather(&acc) + require.NoError(t, err) + numCPUPoints := len(acc.Points) + + expectedCPUPoints := 10 + assert.Equal(t, expectedCPUPoints, numCPUPoints) + + // Computed values are checked with delta > 0 becasue of floating point arithmatic + // imprecision + assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags) + + mps2 := MockPS{} + mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil) + cs.ps = &mps2 + + // Should have added cpu percentages too + err = cs.Gather(&acc) + require.NoError(t, err) + + numCPUPoints = len(acc.Points) - numCPUPoints + expectedCPUPoints = 20 + assert.Equal(t, expectedCPUPoints, numCPUPoints) + + assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags) + assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags) + + assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags) + assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags) +} diff --git a/plugins/system/disk.go b/plugins/system/disk.go index 2e202f8d2..410044d2c 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -50,12 +50,15 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error { "path": du.Path, "fstype": du.Fstype, } - acc.Add("total", du.Total, tags) - acc.Add("free", du.Free, tags) - acc.Add("used", du.Total-du.Free, tags) - acc.Add("inodes_total", du.InodesTotal, tags) - acc.Add("inodes_free", du.InodesFree, tags) - acc.Add("inodes_used", du.InodesTotal-du.InodesFree, tags) + fields := map[string]interface{}{ + "total": du.Total, + "free": du.Free, + "used": du.Total - du.Free, + "inodes_total": du.InodesTotal, + "inodes_free": du.InodesFree, + "inodes_used": du.InodesTotal - du.InodesFree, + } + acc.AddFields("disk", fields, tags) } return nil @@ -115,13 +118,16 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { } } - acc.Add("reads", io.ReadCount, tags) - acc.Add("writes", io.WriteCount, tags) - acc.Add("read_bytes", io.ReadBytes, tags) - acc.Add("write_bytes", io.WriteBytes, tags) - acc.Add("read_time", io.ReadTime, tags) - acc.Add("write_time", io.WriteTime, tags) - acc.Add("io_time", io.IoTime, tags) + fields := map[string]interface{}{ + "reads": io.ReadCount, + "writes": io.WriteCount, + "read_bytes": io.ReadBytes, + "write_bytes": io.WriteBytes, + "read_time": io.ReadTime, + "write_time": io.WriteTime, + "io_time": io.IoTime, + } + acc.AddFields("diskio", fields, tags) } return nil @@ -132,7 +138,7 @@ func init() { return &DiskStats{ps: &systemPS{}} }) - plugins.Add("io", func() plugins.Plugin { + plugins.Add("diskio", func() plugins.Plugin { return &DiskIOStats{ps: &systemPS{}} }) } diff --git a/plugins/system/disk_test.go b/plugins/system/disk_test.go new file mode 100644 index 000000000..abeba736b --- /dev/null +++ b/plugins/system/disk_test.go @@ -0,0 +1,161 @@ +package system + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/shirou/gopsutil/disk" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiskStats(t *testing.T) { + var mps MockPS + defer mps.AssertExpectations(t) + var acc testutil.Accumulator + var err error + + du := []*disk.DiskUsageStat{ + { + Path: "/", + Fstype: "ext4", + Total: 128, + Free: 23, + InodesTotal: 1234, + InodesFree: 234, + }, + { + Path: "/home", + Fstype: "ext4", + Total: 256, + Free: 46, + InodesTotal: 2468, + InodesFree: 468, + }, + } + + mps.On("DiskUsage").Return(du, nil) + + err = (&DiskStats{ps: &mps}).Gather(&acc) + require.NoError(t, err) + + numDiskPoints := len(acc.Points) + expectedAllDiskPoints := 12 + assert.Equal(t, expectedAllDiskPoints, numDiskPoints) + + tags1 := map[string]string{ + "path": "/", + "fstype": "ext4", + } + tags2 := map[string]string{ + "path": "/home", + "fstype": "ext4", + } + + assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1)) + assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1)) + assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1)) + assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1)) + assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1)) + assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1)) + assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2)) + assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2)) + assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2)) + assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2)) + assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2)) + assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2)) + + // We expect 6 more DiskPoints to show up with an explicit match on "/" + // and /home not matching the /dev in Mountpoints + err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc) + assert.Equal(t, expectedAllDiskPoints+6, len(acc.Points)) + + // We should see all the diskpoints as Mountpoints includes both + // / and /home + err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc) + assert.Equal(t, 2*expectedAllDiskPoints+6, len(acc.Points)) + +} + +func TestDiskIOStats(t *testing.T) { + var mps MockPS + defer mps.AssertExpectations(t) + var acc testutil.Accumulator + var err error + + diskio1 := disk.DiskIOCountersStat{ + + ReadCount: 888, + WriteCount: 5341, + ReadBytes: 100000, + WriteBytes: 200000, + ReadTime: 7123, + WriteTime: 9087, + Name: "sda1", + IoTime: 123552, + SerialNumber: "ab-123-ad", + } + diskio2 := disk.DiskIOCountersStat{ + ReadCount: 444, + WriteCount: 2341, + ReadBytes: 200000, + WriteBytes: 400000, + ReadTime: 3123, + WriteTime: 6087, + Name: "sdb1", + IoTime: 246552, + SerialNumber: "bb-123-ad", + } + + mps.On("DiskIO").Return( + map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, + nil) + + err = (&DiskIOStats{ps: &mps}).Gather(&acc) + require.NoError(t, err) + + numDiskIOPoints := len(acc.Points) + expectedAllDiskIOPoints := 14 + assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) + + dtags1 := map[string]string{ + "name": "sda1", + "serial": "ab-123-ad", + } + dtags2 := map[string]string{ + "name": "sdb1", + "serial": "bb-123-ad", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) + + // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" + // and serial should be missing from the tags with SkipSerialNumber set + err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) + assert.Equal(t, expectedAllDiskIOPoints+7, len(acc.Points)) + + dtags3 := map[string]string{ + "name": "sdb1", + } + + assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) + assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) + assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) + assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) + assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) +} diff --git a/plugins/system/docker.go b/plugins/system/docker.go index 94f8ad059..2d6146a59 100644 --- a/plugins/system/docker.go +++ b/plugins/system/docker.go @@ -36,44 +36,47 @@ func (s *DockerStats) Gather(acc plugins.Accumulator) error { cts := cont.CPU - acc.Add("user", cts.User, tags) - acc.Add("system", cts.System, tags) - acc.Add("idle", cts.Idle, tags) - acc.Add("nice", cts.Nice, tags) - acc.Add("iowait", cts.Iowait, tags) - acc.Add("irq", cts.Irq, tags) - acc.Add("softirq", cts.Softirq, tags) - acc.Add("steal", cts.Steal, tags) - acc.Add("guest", cts.Guest, tags) - acc.Add("guest_nice", cts.GuestNice, tags) + fields := map[string]interface{}{ + "user": cts.User, + "system": cts.System, + "idle": cts.Idle, + "nice": cts.Nice, + "iowait": cts.Iowait, + "irq": cts.Irq, + "softirq": cts.Softirq, + "steal": cts.Steal, + "guest": cts.Guest, + "guest_nice": cts.GuestNice, - acc.Add("cache", cont.Mem.Cache, tags) - acc.Add("rss", cont.Mem.RSS, tags) - acc.Add("rss_huge", cont.Mem.RSSHuge, tags) - acc.Add("mapped_file", cont.Mem.MappedFile, tags) - acc.Add("swap_in", cont.Mem.Pgpgin, tags) - acc.Add("swap_out", cont.Mem.Pgpgout, tags) - acc.Add("page_fault", cont.Mem.Pgfault, tags) - acc.Add("page_major_fault", cont.Mem.Pgmajfault, tags) - acc.Add("inactive_anon", cont.Mem.InactiveAnon, tags) - acc.Add("active_anon", cont.Mem.ActiveAnon, tags) - acc.Add("inactive_file", cont.Mem.InactiveFile, tags) - acc.Add("active_file", cont.Mem.ActiveFile, tags) - acc.Add("unevictable", cont.Mem.Unevictable, tags) - acc.Add("memory_limit", cont.Mem.HierarchicalMemoryLimit, tags) - acc.Add("total_cache", cont.Mem.TotalCache, tags) - acc.Add("total_rss", cont.Mem.TotalRSS, tags) - acc.Add("total_rss_huge", cont.Mem.TotalRSSHuge, tags) - acc.Add("total_mapped_file", cont.Mem.TotalMappedFile, tags) - acc.Add("total_swap_in", cont.Mem.TotalPgpgIn, tags) - acc.Add("total_swap_out", cont.Mem.TotalPgpgOut, tags) - acc.Add("total_page_fault", cont.Mem.TotalPgFault, tags) - acc.Add("total_page_major_fault", cont.Mem.TotalPgMajFault, tags) - acc.Add("total_inactive_anon", cont.Mem.TotalInactiveAnon, tags) - acc.Add("total_active_anon", cont.Mem.TotalActiveAnon, tags) - acc.Add("total_inactive_file", cont.Mem.TotalInactiveFile, tags) - acc.Add("total_active_file", cont.Mem.TotalActiveFile, tags) - acc.Add("total_unevictable", cont.Mem.TotalUnevictable, tags) + "cache": cont.Mem.Cache, + "rss": cont.Mem.RSS, + "rss_huge": cont.Mem.RSSHuge, + "mapped_file": cont.Mem.MappedFile, + "swap_in": cont.Mem.Pgpgin, + "swap_out": cont.Mem.Pgpgout, + "page_fault": cont.Mem.Pgfault, + "page_major_fault": cont.Mem.Pgmajfault, + "inactive_anon": cont.Mem.InactiveAnon, + "active_anon": cont.Mem.ActiveAnon, + "inactive_file": cont.Mem.InactiveFile, + "active_file": cont.Mem.ActiveFile, + "unevictable": cont.Mem.Unevictable, + "memory_limit": cont.Mem.HierarchicalMemoryLimit, + "total_cache": cont.Mem.TotalCache, + "total_rss": cont.Mem.TotalRSS, + "total_rss_huge": cont.Mem.TotalRSSHuge, + "total_mapped_file": cont.Mem.TotalMappedFile, + "total_swap_in": cont.Mem.TotalPgpgIn, + "total_swap_out": cont.Mem.TotalPgpgOut, + "total_page_fault": cont.Mem.TotalPgFault, + "total_page_major_fault": cont.Mem.TotalPgMajFault, + "total_inactive_anon": cont.Mem.TotalInactiveAnon, + "total_active_anon": cont.Mem.TotalActiveAnon, + "total_inactive_file": cont.Mem.TotalInactiveFile, + "total_active_file": cont.Mem.TotalActiveFile, + "total_unevictable": cont.Mem.TotalUnevictable, + } + acc.AddFields("docker", fields, tags) } return nil diff --git a/plugins/system/memory.go b/plugins/system/memory.go index 11e7afbb1..23ce94608 100644 --- a/plugins/system/memory.go +++ b/plugins/system/memory.go @@ -22,18 +22,17 @@ func (s *MemStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting virtual memory info: %s", err) } - vmtags := map[string]string(nil) - - acc.Add("total", vm.Total, vmtags) - acc.Add("available", vm.Available, vmtags) - acc.Add("used", vm.Used, vmtags) - acc.Add("free", vm.Free, vmtags) - acc.Add("cached", vm.Cached, vmtags) - acc.Add("buffered", vm.Buffers, vmtags) - acc.Add("used_percent", 100*float64(vm.Used)/float64(vm.Total), vmtags) - acc.Add("available_percent", - 100*float64(vm.Available)/float64(vm.Total), - vmtags) + fields := map[string]interface{}{ + "total": vm.Total, + "available": vm.Available, + "used": vm.Used, + "free": vm.Free, + "cached": vm.Cached, + "buffered": vm.Buffers, + "used_percent": 100 * float64(vm.Used) / float64(vm.Total), + "available_percent": 100 * float64(vm.Available) / float64(vm.Total), + } + acc.AddFields("mem", fields, nil) return nil } @@ -54,14 +53,15 @@ func (s *SwapStats) Gather(acc plugins.Accumulator) error { return fmt.Errorf("error getting swap memory info: %s", err) } - swaptags := map[string]string(nil) - - acc.Add("total", swap.Total, swaptags) - acc.Add("used", swap.Used, swaptags) - acc.Add("free", swap.Free, swaptags) - acc.Add("used_percent", swap.UsedPercent, swaptags) - acc.Add("in", swap.Sin, swaptags) - acc.Add("out", swap.Sout, swaptags) + fields := map[string]interface{}{ + "total": swap.Total, + "used": swap.Used, + "free": swap.Free, + "used_percent": swap.UsedPercent, + "in": swap.Sin, + "out": swap.Sout, + } + acc.AddFields("swap", fields, nil) return nil } diff --git a/plugins/system/memory_test.go b/plugins/system/memory_test.go new file mode 100644 index 000000000..4b97501a9 --- /dev/null +++ b/plugins/system/memory_test.go @@ -0,0 +1,73 @@ +package system + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/shirou/gopsutil/mem" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemStats(t *testing.T) { + var mps MockPS + var err error + defer mps.AssertExpectations(t) + var acc testutil.Accumulator + + vms := &mem.VirtualMemoryStat{ + Total: 12400, + Available: 7600, + Used: 5000, + Free: 1235, + // Active: 8134, + // Inactive: 1124, + // Buffers: 771, + // Cached: 4312, + // Wired: 134, + // Shared: 2142, + } + + mps.On("VMStat").Return(vms, nil) + + sms := &mem.SwapMemoryStat{ + Total: 8123, + Used: 1232, + Free: 6412, + UsedPercent: 12.2, + Sin: 7, + Sout: 830, + } + + mps.On("SwapStat").Return(sms, nil) + + err = (&MemStats{&mps}).Gather(&acc) + require.NoError(t, err) + + vmtags := map[string]string(nil) + + assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags)) + assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags)) + assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags)) + assert.True(t, acc.CheckTaggedValue("available_percent", + float64(7600)/float64(12400)*100, + vmtags)) + assert.True(t, acc.CheckTaggedValue("used_percent", + float64(5000)/float64(12400)*100, + vmtags)) + assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags)) + + acc.Points = nil + + err = (&SwapStats{&mps}).Gather(&acc) + require.NoError(t, err) + + swaptags := map[string]string(nil) + + assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags)) + assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags)) + assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags)) + assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags)) + assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags)) + assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags)) +} diff --git a/plugins/system/net.go b/plugins/system/net.go index 9dbcc4577..23f856d6d 100644 --- a/plugins/system/net.go +++ b/plugins/system/net.go @@ -70,14 +70,17 @@ func (s *NetIOStats) Gather(acc plugins.Accumulator) error { "interface": io.Name, } - acc.Add("bytes_sent", io.BytesSent, tags) - acc.Add("bytes_recv", io.BytesRecv, tags) - acc.Add("packets_sent", io.PacketsSent, tags) - acc.Add("packets_recv", io.PacketsRecv, tags) - acc.Add("err_in", io.Errin, tags) - acc.Add("err_out", io.Errout, tags) - acc.Add("drop_in", io.Dropin, tags) - acc.Add("drop_out", io.Dropout, tags) + fields := map[string]interface{}{ + "bytes_sent": io.BytesSent, + "bytes_recv": io.BytesRecv, + "packets_sent": io.PacketsSent, + "packets_recv": io.PacketsRecv, + "err_in": io.Errin, + "err_out": io.Errout, + "drop_in": io.Dropin, + "drop_out": io.Dropout, + } + acc.AddFields("net", fields, tags) } // Get system wide stats for different network protocols diff --git a/plugins/system/net_test.go b/plugins/system/net_test.go new file mode 100644 index 000000000..042b6a2fb --- /dev/null +++ b/plugins/system/net_test.go @@ -0,0 +1,88 @@ +package system + +import ( + "syscall" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/shirou/gopsutil/net" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNetStats(t *testing.T) { + var mps MockPS + var err error + defer mps.AssertExpectations(t) + var acc testutil.Accumulator + + netio := net.NetIOCountersStat{ + Name: "eth0", + BytesSent: 1123, + BytesRecv: 8734422, + PacketsSent: 781, + PacketsRecv: 23456, + Errin: 832, + Errout: 8, + Dropin: 7, + Dropout: 1, + } + + mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil) + + netprotos := []net.NetProtoCountersStat{ + net.NetProtoCountersStat{ + Protocol: "Udp", + Stats: map[string]int64{ + "InDatagrams": 4655, + "NoPorts": 892592, + }, + }, + } + mps.On("NetProto").Return(netprotos, nil) + + netstats := []net.NetConnectionStat{ + net.NetConnectionStat{ + Type: syscall.SOCK_DGRAM, + }, + net.NetConnectionStat{ + Status: "ESTABLISHED", + }, + net.NetConnectionStat{ + Status: "ESTABLISHED", + }, + net.NetConnectionStat{ + Status: "CLOSE", + }, + } + + mps.On("NetConnections").Return(netstats, nil) + + err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc) + require.NoError(t, err) + + ntags := map[string]string{ + "interface": "eth0", + } + + assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) + assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) + assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592))) + assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655))) + + acc.Points = nil + + err = (&NetStats{&mps}).Gather(&acc) + require.NoError(t, err) + netstattags := map[string]string(nil) + + assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags)) + assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags)) + assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags)) +} diff --git a/plugins/system/netstat.go b/plugins/system/netstat.go index 9fe512ddd..bd28971bc 100644 --- a/plugins/system/netstat.go +++ b/plugins/system/netstat.go @@ -42,19 +42,23 @@ func (s *NetStats) Gather(acc plugins.Accumulator) error { } counts[netcon.Status] = c + 1 } - acc.Add("tcp_established", counts["ESTABLISHED"], tags) - acc.Add("tcp_syn_sent", counts["SYN_SENT"], tags) - acc.Add("tcp_syn_recv", counts["SYN_RECV"], tags) - acc.Add("tcp_fin_wait1", counts["FIN_WAIT1"], tags) - acc.Add("tcp_fin_wait2", counts["FIN_WAIT2"], tags) - acc.Add("tcp_time_wait", counts["TIME_WAIT"], tags) - acc.Add("tcp_close", counts["CLOSE"], tags) - acc.Add("tcp_close_wait", counts["CLOSE_WAIT"], tags) - acc.Add("tcp_last_ack", counts["LAST_ACK"], tags) - acc.Add("tcp_listen", counts["LISTEN"], tags) - acc.Add("tcp_closing", counts["CLOSING"], tags) - acc.Add("tcp_none", counts["NONE"], tags) - acc.Add("udp_socket", counts["UDP"], tags) + + fields := map[string]interface{}{ + "tcp_established": counts["ESTABLISHED"], + "tcp_syn_sent": counts["SYN_SENT"], + "tcp_syn_recv": counts["SYN_RECV"], + "tcp_fin_wait1": counts["FIN_WAIT1"], + "tcp_fin_wait2": counts["FIN_WAIT2"], + "tcp_time_wait": counts["TIME_WAIT"], + "tcp_close": counts["CLOSE"], + "tcp_close_wait": counts["CLOSE_WAIT"], + "tcp_last_ack": counts["LAST_ACK"], + "tcp_listen": counts["LISTEN"], + "tcp_closing": counts["CLOSING"], + "tcp_none": counts["NONE"], + "udp_socket": counts["UDP"], + } + acc.AddFields("netstat", fields, tags) return nil } diff --git a/plugins/system/ps.go b/plugins/system/ps.go index d0c35c62c..0b7a38527 100644 --- a/plugins/system/ps.go +++ b/plugins/system/ps.go @@ -1,12 +1,16 @@ package system import ( + "fmt" gonet "net" "os" + "reflect" "strings" + "testing" "github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/plugins" + "github.com/influxdb/telegraf/testutil" dc "github.com/fsouza/go-dockerclient" "github.com/shirou/gopsutil/cpu" @@ -14,6 +18,8 @@ import ( "github.com/shirou/gopsutil/docker" "github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/net" + + "github.com/stretchr/testify/assert" ) type DockerContainerStat struct { @@ -166,3 +172,49 @@ func (s *systemPS) DockerStat() ([]*DockerContainerStat, error) { return stats, nil } + +// Asserts that a given accumulator contains a measurment of type float64 with +// specific tags within a certain distance of a given expected value. Asserts a failure +// if the measurement is of the wrong type, or if no matching measurements are found +// +// Paramaters: +// t *testing.T : Testing object to use +// acc testutil.Accumulator: Accumulator to examine +// measurement string : Name of the measurement to examine +// expectedValue float64 : Value to search for within the measurement +// delta float64 : Maximum acceptable distance of an accumulated value +// from the expectedValue parameter. Useful when +// floating-point arithmatic imprecision makes looking +// for an exact match impractical +// tags map[string]string : Tag set the found measurement must have. Set to nil to +// ignore the tag set. +func assertContainsTaggedFloat( + t *testing.T, + acc *testutil.Accumulator, + measurement string, + expectedValue float64, + delta float64, + tags map[string]string, +) { + var actualValue float64 + for _, pt := range acc.Points { + if pt.Measurement == measurement { + if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) { + if value, ok := pt.Fields["value"].(float64); ok { + actualValue = value + if (value >= expectedValue-delta) && (value <= expectedValue+delta) { + // Found the point, return without failing + return + } + } else { + assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", + measurement)) + } + + } + } + } + msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f", + measurement, delta, expectedValue, actualValue) + assert.Fail(t, msg) +} diff --git a/plugins/system/system.go b/plugins/system/system.go index 4481ac0a3..82d4f4f24 100644 --- a/plugins/system/system.go +++ b/plugins/system/system.go @@ -37,11 +37,14 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error { return err } - acc.Add("load1", loadavg.Load1, nil) - acc.Add("load5", loadavg.Load5, nil) - acc.Add("load15", loadavg.Load15, nil) - acc.Add("uptime", float64(hostinfo.Uptime), nil) - acc.Add("uptime_format", format_uptime(hostinfo.Uptime), nil) + fields := map[string]interface{}{ + "load1": loadavg.Load1, + "load5": loadavg.Load5, + "load15": loadavg.Load15, + "uptime": float64(hostinfo.Uptime), + "uptime_format": format_uptime(hostinfo.Uptime), + } + acc.AddFields("system", fields, nil) return nil } diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go deleted file mode 100644 index fca1d2c35..000000000 --- a/plugins/system/system_test.go +++ /dev/null @@ -1,426 +0,0 @@ -package system - -import ( - "fmt" - "reflect" - "syscall" - "testing" - - "github.com/influxdb/telegraf/testutil" - "github.com/shirou/gopsutil/cpu" - "github.com/shirou/gopsutil/disk" - "github.com/shirou/gopsutil/mem" - "github.com/shirou/gopsutil/net" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSystemStats_GenerateStats(t *testing.T) { - var mps MockPS - - defer mps.AssertExpectations(t) - - var acc testutil.Accumulator - - cts := cpu.CPUTimesStat{ - CPU: "cpu0", - User: 3.1, - System: 8.2, - Idle: 80.1, - Nice: 1.3, - Iowait: 0.2, - Irq: 0.1, - Softirq: 0.11, - Steal: 0.0511, - Guest: 8.1, - GuestNice: 0.324, - } - - cts2 := cpu.CPUTimesStat{ - CPU: "cpu0", - User: 11.4, // increased by 8.3 - System: 10.9, // increased by 2.7 - Idle: 158.8699, // increased by 78.7699 (for total increase of 100) - Nice: 2.5, // increased by 1.2 - Iowait: 0.7, // increased by 0.5 - Irq: 1.2, // increased by 1.1 - Softirq: 0.31, // increased by 0.2 - Steal: 0.2812, // increased by 0.0001 - Guest: 12.9, // increased by 4.8 - GuestNice: 2.524, // increased by 2.2 - } - - mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil) - - du := []*disk.DiskUsageStat{ - { - Path: "/", - Fstype: "ext4", - Total: 128, - Free: 23, - InodesTotal: 1234, - InodesFree: 234, - }, - { - Path: "/home", - Fstype: "ext4", - Total: 256, - Free: 46, - InodesTotal: 2468, - InodesFree: 468, - }, - } - - mps.On("DiskUsage").Return(du, nil) - - diskio1 := disk.DiskIOCountersStat{ - - ReadCount: 888, - WriteCount: 5341, - ReadBytes: 100000, - WriteBytes: 200000, - ReadTime: 7123, - WriteTime: 9087, - Name: "sda1", - IoTime: 123552, - SerialNumber: "ab-123-ad", - } - diskio2 := disk.DiskIOCountersStat{ - ReadCount: 444, - WriteCount: 2341, - ReadBytes: 200000, - WriteBytes: 400000, - ReadTime: 3123, - WriteTime: 6087, - Name: "sdb1", - IoTime: 246552, - SerialNumber: "bb-123-ad", - } - - mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil) - - netio := net.NetIOCountersStat{ - Name: "eth0", - BytesSent: 1123, - BytesRecv: 8734422, - PacketsSent: 781, - PacketsRecv: 23456, - Errin: 832, - Errout: 8, - Dropin: 7, - Dropout: 1, - } - - mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil) - - netprotos := []net.NetProtoCountersStat{ - net.NetProtoCountersStat{ - Protocol: "Udp", - Stats: map[string]int64{ - "InDatagrams": 4655, - "NoPorts": 892592, - }, - }, - } - mps.On("NetProto").Return(netprotos, nil) - - vms := &mem.VirtualMemoryStat{ - Total: 12400, - Available: 7600, - Used: 5000, - Free: 1235, - // Active: 8134, - // Inactive: 1124, - // Buffers: 771, - // Cached: 4312, - // Wired: 134, - // Shared: 2142, - } - - mps.On("VMStat").Return(vms, nil) - - sms := &mem.SwapMemoryStat{ - Total: 8123, - Used: 1232, - Free: 6412, - UsedPercent: 12.2, - Sin: 7, - Sout: 830, - } - - mps.On("SwapStat").Return(sms, nil) - - netstats := []net.NetConnectionStat{ - net.NetConnectionStat{ - Type: syscall.SOCK_DGRAM, - }, - net.NetConnectionStat{ - Status: "ESTABLISHED", - }, - net.NetConnectionStat{ - Status: "ESTABLISHED", - }, - net.NetConnectionStat{ - Status: "CLOSE", - }, - } - - mps.On("NetConnections").Return(netstats, nil) - - cs := NewCPUStats(&mps) - - cputags := map[string]string{ - "cpu": "cpu0", - } - - preCPUPoints := len(acc.Points) - err := cs.Gather(&acc) - require.NoError(t, err) - numCPUPoints := len(acc.Points) - preCPUPoints - - expectedCPUPoints := 10 - assert.Equal(t, expectedCPUPoints, numCPUPoints) - - // Computed values are checked with delta > 0 becasue of floating point arithmatic - // imprecision - assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags) - - mps2 := MockPS{} - mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil) - cs.ps = &mps2 - - // Should have added cpu percentages too - err = cs.Gather(&acc) - require.NoError(t, err) - - numCPUPoints = len(acc.Points) - (preCPUPoints + numCPUPoints) - expectedCPUPoints = 20 - assert.Equal(t, expectedCPUPoints, numCPUPoints) - - assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags) - assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags) - - assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags) - assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags) - - preDiskPoints := len(acc.Points) - - err = (&DiskStats{ps: &mps}).Gather(&acc) - require.NoError(t, err) - - numDiskPoints := len(acc.Points) - preDiskPoints - expectedAllDiskPoints := 12 - assert.Equal(t, expectedAllDiskPoints, numDiskPoints) - - tags1 := map[string]string{ - "path": "/", - "fstype": "ext4", - } - tags2 := map[string]string{ - "path": "/home", - "fstype": "ext4", - } - - assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1)) - assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1)) - assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1)) - assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1)) - assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1)) - assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1)) - assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2)) - assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2)) - assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2)) - assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2)) - assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2)) - assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2)) - - // We expect 6 more DiskPoints to show up with an explicit match on "/" - // and /home not matching the /dev in Mountpoints - err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc) - assert.Equal(t, preDiskPoints+expectedAllDiskPoints+6, len(acc.Points)) - - // We should see all the diskpoints as Mountpoints includes both - // / and /home - err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc) - assert.Equal(t, preDiskPoints+2*expectedAllDiskPoints+6, len(acc.Points)) - - err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc) - require.NoError(t, err) - - ntags := map[string]string{ - "interface": "eth0", - } - - assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags)) - assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags)) - assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592))) - assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655))) - - preDiskIOPoints := len(acc.Points) - - err = (&DiskIOStats{ps: &mps}).Gather(&acc) - require.NoError(t, err) - - numDiskIOPoints := len(acc.Points) - preDiskIOPoints - expectedAllDiskIOPoints := 14 - assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints) - - dtags1 := map[string]string{ - "name": "sda1", - "serial": "ab-123-ad", - } - dtags2 := map[string]string{ - "name": "sdb1", - "serial": "bb-123-ad", - } - - assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1)) - assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2)) - - // We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1" - // and serial should be missing from the tags with SkipSerialNumber set - err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc) - assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points)) - - dtags3 := map[string]string{ - "name": "sdb1", - } - - assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3)) - assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3)) - assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3)) - assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3)) - assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3)) - assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3)) - assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3)) - - err = (&MemStats{&mps}).Gather(&acc) - require.NoError(t, err) - - vmtags := map[string]string(nil) - - assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags)) - assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags)) - assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags)) - assert.True(t, acc.CheckTaggedValue("available_percent", - float64(7600)/float64(12400)*100, - vmtags)) - assert.True(t, acc.CheckTaggedValue("used_percent", - float64(5000)/float64(12400)*100, - vmtags)) - assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags)) - - acc.Points = nil - - err = (&SwapStats{&mps}).Gather(&acc) - require.NoError(t, err) - - swaptags := map[string]string(nil) - - assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags)) - assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags)) - assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags)) - assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags)) - assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags)) - assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags)) - - acc.Points = nil - - err = (&NetStats{&mps}).Gather(&acc) - require.NoError(t, err) - netstattags := map[string]string(nil) - - assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags)) - assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags)) - assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags)) - -} - -// Asserts that a given accumulator contains a measurment of type float64 with -// specific tags within a certain distance of a given expected value. Asserts a failure -// if the measurement is of the wrong type, or if no matching measurements are found -// -// Paramaters: -// t *testing.T : Testing object to use -// acc testutil.Accumulator: Accumulator to examine -// measurement string : Name of the measurement to examine -// expectedValue float64 : Value to search for within the measurement -// delta float64 : Maximum acceptable distance of an accumulated value -// from the expectedValue parameter. Useful when -// floating-point arithmatic imprecision makes looking -// for an exact match impractical -// tags map[string]string : Tag set the found measurement must have. Set to nil to -// ignore the tag set. -func assertContainsTaggedFloat( - t *testing.T, - acc *testutil.Accumulator, - measurement string, - expectedValue float64, - delta float64, - tags map[string]string, -) { - var actualValue float64 - for _, pt := range acc.Points { - if pt.Measurement == measurement { - if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) { - if value, ok := pt.Fields["value"].(float64); ok { - actualValue = value - if (value >= expectedValue-delta) && (value <= expectedValue+delta) { - // Found the point, return without failing - return - } - } else { - assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64", - measurement)) - } - - } - } - } - msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f", - measurement, delta, expectedValue, actualValue) - assert.Fail(t, msg) -}