Updating system plugins for 0.3.0

This commit is contained in:
Cameron Sparr 2015-12-11 13:07:32 -07:00
parent 5bf7c4d241
commit 5f3a1fcb86
21 changed files with 928 additions and 712 deletions

View File

@ -1,3 +1,16 @@
## v0.3.0 [unreleased]
### Release Notes
- **breaking change** the `io` plugin has been renamed `diskio`
- **breaking change** Plugin measurements aggregated into a single measurement.
### Features
- Plugin measurements aggregated into a single measurement.
- Added ability to specify per-plugin tags
- Added ability to specify per-plugin measurement suffix and prefix
### Bugfixes
## v0.2.5 [unreleased] ## v0.2.5 [unreleased]
### Features ### Features

177
CONFIGURATION.md Normal file
View File

@ -0,0 +1,177 @@
# Telegraf Configuration
## Plugin Configuration
There are some configuration options that are configurable per plugin:
* **name_override**: Override the base name of the measurement.
(Default is the name of the plugin).
* **name_prefix**: Specifies a prefix to attach to the measurement name.
* **name_suffix**: Specifies a suffix to attach to the measurement name.
* **tags**: A map of tags to apply to a specific plugin's measurements.
### Plugin Filters
There are also filters that can be configured per plugin:
* **pass**: An array of strings that is used to filter metrics generated by the
current plugin. Each string in the array is tested as a glob match against field names
and if it matches, the field is emitted.
* **drop**: The inverse of pass, if a field name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter
measurements by the current plugin. Each string in the array is tested as a glob
match against the tag name, and if it matches the measurement is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not emitted.
This is tested on measurements that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
you can configure that here.
### Plugin Configuration Examples
This is a full working config that will output CPU data to an InfluxDB instance
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect per-cpu data, dropping any
fields which begin with `time_`.
```toml
[tags]
dc = "denver-1"
[agent]
interval = "10s"
# OUTPUTS
[outputs]
[[outputs.influxdb]]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
# PLUGINS
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
# filter all fields beginning with 'time_'
drop = ["time_*"]
```
### Plugin Config: tagpass and tagdrop
```toml
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
# Don't collect CPU data for cpu6 & cpu7
[plugins.cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
[[plugins.disk]]
[plugins.disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
# Globs can also be used on the tag values
path = [ "/opt", "/home*" ]
```
### Plugin Config: pass and drop
```toml
# Drop all metrics for guest & steal CPU usage
[[plugins.cpu]]
percpu = false
totalcpu = true
drop = ["usage_guest", "usage_steal"]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = ["inodes*"]
```
### Plugin config: prefix, suffix, and override
This plugin will emit measurements with the name `cpu_total`
```toml
[[plugins.cpu]]
name_suffix = "_total"
percpu = false
totalcpu = true
```
This will emit measurements with the name `foobar`
```toml
[[plugins.cpu]]
name_override = "foobar"
percpu = false
totalcpu = true
```
### Plugin config: tags
This plugin will emit measurements with two additional tags: `tag1=foo` and
`tag2=bar`
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[plugins.cpu.tags]
tag1 = "foo"
tag2 = "bar"
```
### Multiple plugins of the same type
Additional plugins (or outputs) of the same type can be specified,
just define more instances in the config file:
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
## Output Configuration
Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.
Outputs also support the same configurable options as plugins
(pass, drop, tagpass, tagdrop), added in 0.2.4
```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
precision = "s"
# Drop all measurements that start with "aerospike"
drop = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-aerospike-data"
precision = "s"
# Only accept aerospike data:
pass = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-cpu0-data"
precision = "s"
# Only store measurements where the tag "cpu" matches the value "cpu0"
[outputs.influxdb.tagpass]
cpu = ["cpu0"]
```

133
README.md
View File

@ -116,99 +116,10 @@ unit parser, e.g. "10s" for 10 seconds or "5m" for 5 minutes.
* **debug**: Set to true to gather and send metrics to STDOUT as well as * **debug**: Set to true to gather and send metrics to STDOUT as well as
InfluxDB. InfluxDB.
## Plugin Options ## Configuration
There are 5 configuration options that are configurable per plugin: See the [configuration guide](CONFIGURATION.md) for a rundown of the more advanced
configuration options.
* **pass**: An array of strings that is used to filter metrics generated by the
current plugin. Each string in the array is tested as a glob match against metric names
and if it matches, the metric is emitted.
* **drop**: The inverse of pass, if a metric name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter metrics by the current plugin. Each string in the array is tested as a glob match against
the tag name, and if it matches the metric is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted.
This is tested on metrics that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
you can configure that here.
### Plugin Configuration Examples
This is a full working config that will output CPU data to an InfluxDB instance
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect per-cpu data, dropping any
measurements which begin with `cpu_time`.
```toml
[tags]
dc = "denver-1"
[agent]
interval = "10s"
# OUTPUTS
[outputs]
[[outputs.influxdb]]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
# PLUGINS
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
Below is how to configure `tagpass` and `tagdrop` parameters
```toml
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
# Don't collect CPU data for cpu6 & cpu7
[plugins.cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
[[plugins.disk]]
[plugins.disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
# Globs can also be used on the tag values
path = [ "/opt", "/home*" ]
```
Below is how to configure `pass` and `drop` parameters
```toml
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes*" ]
```
Additional plugins (or outputs) of the same type can be specified,
just define more instances in the config file:
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
## Supported Plugins ## Supported Plugins
@ -226,7 +137,7 @@ Telegraf currently has support for collecting metrics from:
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* influxdb * influxdb
* jolokia (remote JMX with JSON over HTTP) * jolokia
* leofs * leofs
* lustre2 * lustre2
* mailchimp * mailchimp
@ -249,10 +160,10 @@ Telegraf currently has support for collecting metrics from:
* system * system
* cpu * cpu
* mem * mem
* io
* net * net
* netstat * netstat
* disk * disk
* diskio
* swap * swap
## Supported Service Plugins ## Supported Service Plugins
@ -265,40 +176,6 @@ Telegraf can collect metrics via the following services:
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.
## Output options
Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.
Outputs also support the same configurable options as plugins
(pass, drop, tagpass, tagdrop), added in 0.2.4
```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
precision = "s"
# Drop all measurements that start with "aerospike"
drop = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-aerospike-data"
precision = "s"
# Only accept aerospike data:
pass = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-cpu0-data"
precision = "s"
# Only store measurements where the tag "cpu" matches the value "cpu0"
[outputs.influxdb.tagpass]
cpu = ["cpu0"]
```
## Supported Outputs ## Supported Outputs
* influxdb * influxdb

View File

@ -69,30 +69,72 @@ func (ac *accumulator) AddFields(
tags map[string]string, tags map[string]string,
t ...time.Time, t ...time.Time,
) { ) {
// Validate uint64 and float64 fields if !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
// Override measurement name if set
if len(ac.pluginConfig.NameOverride) != 0 {
measurement = ac.pluginConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.pluginConfig.MeasurementPrefix) != 0 {
measurement = ac.pluginConfig.MeasurementPrefix + measurement
}
if len(ac.pluginConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.pluginConfig.MeasurementSuffix
}
if tags == nil {
tags = make(map[string]string)
}
// Apply plugin-wide tags if set
for k, v := range ac.pluginConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
result := make(map[string]interface{})
for k, v := range fields { for k, v := range fields {
// Filter out any filtered fields
if ac.pluginConfig != nil {
if !ac.pluginConfig.Filter.ShouldPass(k) {
continue
}
}
result[k] = v
// Validate uint64 and float64 fields
switch val := v.(type) { switch val := v.(type) {
case uint64: case uint64:
// InfluxDB does not support writing uint64 // InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) { if val < uint64(9223372036854775808) {
fields[k] = int64(val) result[k] = int64(val)
} else { } else {
fields[k] = int64(9223372036854775807) result[k] = int64(9223372036854775807)
} }
case float64: case float64:
// NaNs are invalid values in influxdb, skip measurement // NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) { if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug { if ac.debug {
log.Printf("Measurement [%s] has a NaN or Inf field, skipping", log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
measurement) "field, skipping",
measurement, k)
} }
return continue
} }
} }
} }
fields = nil
if tags == nil { if len(result) == 0 {
tags = make(map[string]string) return
} }
var timestamp time.Time var timestamp time.Time
@ -106,19 +148,7 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement measurement = ac.prefix + measurement
} }
if ac.pluginConfig != nil { pt, err := client.NewPoint(measurement, tags, result, timestamp)
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
}
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
pt, err := client.NewPoint(measurement, tags, fields, timestamp)
if err != nil { if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return return

View File

@ -104,7 +104,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Config.Tags) acc.SetDefaultTags(a.Config.Tags)
if err := plugin.Plugin.Gather(acc); err != nil { if err := plugin.Plugin.Gather(acc); err != nil {
@ -141,7 +141,7 @@ func (a *Agent) gatherSeparate(
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Config.Tags) acc.SetDefaultTags(a.Config.Tags)
if err := plugin.Plugin.Gather(acc); err != nil { if err := plugin.Plugin.Gather(acc); err != nil {
@ -187,7 +187,7 @@ func (a *Agent) Test() error {
for _, plugin := range a.Config.Plugins { for _, plugin := range a.Config.Plugins {
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(true) acc.SetDebug(true)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name) fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name)
if plugin.Config.Interval != 0 { if plugin.Config.Interval != 0 {

View File

@ -97,7 +97,7 @@
# Mountpoints=["/"] # Mountpoints=["/"]
# Read metrics about disk IO by device # Read metrics about disk IO by device
[[plugins.io]] [[plugins.diskio]]
# By default, telegraf will gather stats for all devices including # By default, telegraf will gather stats for all devices including
# disk partitions. # disk partitions.
# Setting devices will restrict the stats to the specified devices. # Setting devices will restrict the stats to the specified devices.

View File

@ -112,9 +112,13 @@ type Filter struct {
// PluginConfig containing a name, interval, and filter // PluginConfig containing a name, interval, and filter
type PluginConfig struct { type PluginConfig struct {
Name string Name string
Filter Filter NameOverride string
Interval time.Duration MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
} }
// OutputConfig containing name and filter // OutputConfig containing name and filter
@ -142,12 +146,12 @@ func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
// ShouldPass returns true if the metric should pass, false if should drop // ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters // based on the drop/pass filter parameters
func (f Filter) ShouldPass(measurement string) bool { func (f Filter) ShouldPass(fieldkey string) bool {
if f.Pass != nil { if f.Pass != nil {
for _, pat := range f.Pass { for _, pat := range f.Pass {
// TODO remove HasPrefix check, leaving it for now for legacy support. // TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07 // Cam, 2015-12-07
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return true return true
} }
} }
@ -158,7 +162,7 @@ func (f Filter) ShouldPass(measurement string) bool {
for _, pat := range f.Drop { for _, pat := range f.Drop {
// TODO remove HasPrefix check, leaving it for now for legacy support. // TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07 // Cam, 2015-12-07
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return false return false
} }
} }
@ -628,7 +632,8 @@ func buildFilter(tbl *ast.Table) Filter {
return f return f
} }
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a // buildPlugin parses plugin specific items from the ast.Table,
// builds the filter and returns a
// PluginConfig to be inserted into RunningPlugin // PluginConfig to be inserted into RunningPlugin
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
cp := &PluginConfig{Name: name} cp := &PluginConfig{Name: name}
@ -644,10 +649,47 @@ func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
} }
} }
} }
if node, ok := tbl.Fields["name_prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementPrefix = str.Value
}
}
}
if node, ok := tbl.Fields["name_suffix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementSuffix = str.Value
}
}
}
if node, ok := tbl.Fields["name_override"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.NameOverride = str.Value
}
}
}
cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := toml.UnmarshalTable(subtbl, cp.Tags); err != nil {
log.Printf("Could not parse tags for plugin %s\n", name)
}
}
}
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "interval") delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
cp.Filter = buildFilter(tbl) cp.Filter = buildFilter(tbl)
return cp, nil return cp, nil
} }
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an // buildOutput parses output specific items from the ast.Table, builds the filter and returns an
@ -659,5 +701,4 @@ func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
Filter: buildFilter(tbl), Filter: buildFilter(tbl),
} }
return oc, nil return oc, nil
} }

View File

@ -105,7 +105,7 @@ urls = ["http://localhost/server-status?auto"]
drop = ["cpu_time"] drop = ["cpu_time"]
# Read metrics about disk usage by mount point # Read metrics about disk usage by mount point
[[plugins.disk]] [[plugins.diskio]]
# no configuration # no configuration
# Read metrics from one or many disque servers # Read metrics from one or many disque servers

View File

@ -2,6 +2,7 @@ package system
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
@ -31,7 +32,7 @@ var sampleConfig = `
# Whether to report total system cpu stats or not # Whether to report total system cpu stats or not
totalcpu = true totalcpu = true
# Comment this line if you want the raw CPU time metrics # Comment this line if you want the raw CPU time metrics
drop = ["cpu_time*"] drop = ["time_*"]
` `
func (_ *CPUStats) SampleConfig() string { func (_ *CPUStats) SampleConfig() string {
@ -43,6 +44,7 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
if err != nil { if err != nil {
return fmt.Errorf("error getting CPU info: %s", err) return fmt.Errorf("error getting CPU info: %s", err)
} }
now := time.Now()
for i, cts := range times { for i, cts := range times {
tags := map[string]string{ tags := map[string]string{
@ -51,21 +53,24 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
total := totalCpuTime(cts) total := totalCpuTime(cts)
// Add total cpu numbers // Add cpu time metrics
add(acc, "time_user", cts.User, tags) fields := map[string]interface{}{
add(acc, "time_system", cts.System, tags) "time_user": cts.User,
add(acc, "time_idle", cts.Idle, tags) "time_system": cts.System,
add(acc, "time_nice", cts.Nice, tags) "time_idle": cts.Idle,
add(acc, "time_iowait", cts.Iowait, tags) "time_nice": cts.Nice,
add(acc, "time_irq", cts.Irq, tags) "time_iowait": cts.Iowait,
add(acc, "time_softirq", cts.Softirq, tags) "time_irq": cts.Irq,
add(acc, "time_steal", cts.Steal, tags) "time_softirq": cts.Softirq,
add(acc, "time_guest", cts.Guest, tags) "time_steal": cts.Steal,
add(acc, "time_guest_nice", cts.GuestNice, tags) "time_guest": cts.Guest,
"time_guest_nice": cts.GuestNice,
}
// Add in percentage // Add in percentage
if len(s.lastStats) == 0 { if len(s.lastStats) == 0 {
// If it's the 1st gather, can't get CPU stats yet acc.AddFields("cpu", fields, tags, now)
// If it's the 1st gather, can't get CPU Usage stats yet
continue continue
} }
lastCts := s.lastStats[i] lastCts := s.lastStats[i]
@ -81,17 +86,17 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
continue continue
} }
add(acc, "usage_user", 100*(cts.User-lastCts.User)/totalDelta, tags) fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta
add(acc, "usage_system", 100*(cts.System-lastCts.System)/totalDelta, tags) fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta
add(acc, "usage_idle", 100*(cts.Idle-lastCts.Idle)/totalDelta, tags) fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta
add(acc, "usage_nice", 100*(cts.Nice-lastCts.Nice)/totalDelta, tags) fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta
add(acc, "usage_iowait", 100*(cts.Iowait-lastCts.Iowait)/totalDelta, tags) fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta
add(acc, "usage_irq", 100*(cts.Irq-lastCts.Irq)/totalDelta, tags) fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta
add(acc, "usage_softirq", 100*(cts.Softirq-lastCts.Softirq)/totalDelta, tags) fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta
add(acc, "usage_steal", 100*(cts.Steal-lastCts.Steal)/totalDelta, tags) fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta
add(acc, "usage_guest", 100*(cts.Guest-lastCts.Guest)/totalDelta, tags) fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta
add(acc, "usage_guest_nice", 100*(cts.GuestNice-lastCts.GuestNice)/totalDelta, tags) fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta
acc.AddFields("cpu", fields, tags, now)
} }
s.lastStats = times s.lastStats = times

106
plugins/system/cpu_test.go Normal file
View File

@ -0,0 +1,106 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/cpu"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCPUStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
cts := cpu.CPUTimesStat{
CPU: "cpu0",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0511,
Guest: 8.1,
GuestNice: 0.324,
}
cts2 := cpu.CPUTimesStat{
CPU: "cpu0",
User: 11.4, // increased by 8.3
System: 10.9, // increased by 2.7
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
Nice: 2.5, // increased by 1.2
Iowait: 0.7, // increased by 0.5
Irq: 1.2, // increased by 1.1
Softirq: 0.31, // increased by 0.2
Steal: 0.2812, // increased by 0.0001
Guest: 12.9, // increased by 4.8
GuestNice: 2.524, // increased by 2.2
}
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
cs := NewCPUStats(&mps)
cputags := map[string]string{
"cpu": "cpu0",
}
err := cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints := len(acc.Points)
expectedCPUPoints := 10
assert.Equal(t, expectedCPUPoints, numCPUPoints)
// Computed values are checked with delta > 0 becasue of floating point arithmatic
// imprecision
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
mps2 := MockPS{}
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
cs.ps = &mps2
// Should have added cpu percentages too
err = cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints = len(acc.Points) - numCPUPoints
expectedCPUPoints = 20
assert.Equal(t, expectedCPUPoints, numCPUPoints)
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
}

View File

@ -50,12 +50,15 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {
"path": du.Path, "path": du.Path,
"fstype": du.Fstype, "fstype": du.Fstype,
} }
acc.Add("total", du.Total, tags) fields := map[string]interface{}{
acc.Add("free", du.Free, tags) "total": du.Total,
acc.Add("used", du.Total-du.Free, tags) "free": du.Free,
acc.Add("inodes_total", du.InodesTotal, tags) "used": du.Total - du.Free,
acc.Add("inodes_free", du.InodesFree, tags) "inodes_total": du.InodesTotal,
acc.Add("inodes_used", du.InodesTotal-du.InodesFree, tags) "inodes_free": du.InodesFree,
"inodes_used": du.InodesTotal - du.InodesFree,
}
acc.AddFields("disk", fields, tags)
} }
return nil return nil
@ -115,13 +118,16 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
} }
} }
acc.Add("reads", io.ReadCount, tags) fields := map[string]interface{}{
acc.Add("writes", io.WriteCount, tags) "reads": io.ReadCount,
acc.Add("read_bytes", io.ReadBytes, tags) "writes": io.WriteCount,
acc.Add("write_bytes", io.WriteBytes, tags) "read_bytes": io.ReadBytes,
acc.Add("read_time", io.ReadTime, tags) "write_bytes": io.WriteBytes,
acc.Add("write_time", io.WriteTime, tags) "read_time": io.ReadTime,
acc.Add("io_time", io.IoTime, tags) "write_time": io.WriteTime,
"io_time": io.IoTime,
}
acc.AddFields("diskio", fields, tags)
} }
return nil return nil
@ -132,7 +138,7 @@ func init() {
return &DiskStats{ps: &systemPS{}} return &DiskStats{ps: &systemPS{}}
}) })
plugins.Add("io", func() plugins.Plugin { plugins.Add("diskio", func() plugins.Plugin {
return &DiskIOStats{ps: &systemPS{}} return &DiskIOStats{ps: &systemPS{}}
}) })
} }

161
plugins/system/disk_test.go Normal file
View File

@ -0,0 +1,161 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/disk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDiskStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
var err error
du := []*disk.DiskUsageStat{
{
Path: "/",
Fstype: "ext4",
Total: 128,
Free: 23,
InodesTotal: 1234,
InodesFree: 234,
},
{
Path: "/home",
Fstype: "ext4",
Total: 256,
Free: 46,
InodesTotal: 2468,
InodesFree: 468,
},
}
mps.On("DiskUsage").Return(du, nil)
err = (&DiskStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskPoints := len(acc.Points)
expectedAllDiskPoints := 12
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
tags1 := map[string]string{
"path": "/",
"fstype": "ext4",
}
tags2 := map[string]string{
"path": "/home",
"fstype": "ext4",
}
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
// We expect 6 more DiskPoints to show up with an explicit match on "/"
// and /home not matching the /dev in Mountpoints
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, expectedAllDiskPoints+6, len(acc.Points))
// We should see all the diskpoints as Mountpoints includes both
// / and /home
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, 2*expectedAllDiskPoints+6, len(acc.Points))
}
func TestDiskIOStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
var err error
diskio1 := disk.DiskIOCountersStat{
ReadCount: 888,
WriteCount: 5341,
ReadBytes: 100000,
WriteBytes: 200000,
ReadTime: 7123,
WriteTime: 9087,
Name: "sda1",
IoTime: 123552,
SerialNumber: "ab-123-ad",
}
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(
map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2},
nil)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskIOPoints := len(acc.Points)
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1",
"serial": "ab-123-ad",
}
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
}

View File

@ -36,44 +36,47 @@ func (s *DockerStats) Gather(acc plugins.Accumulator) error {
cts := cont.CPU cts := cont.CPU
acc.Add("user", cts.User, tags) fields := map[string]interface{}{
acc.Add("system", cts.System, tags) "user": cts.User,
acc.Add("idle", cts.Idle, tags) "system": cts.System,
acc.Add("nice", cts.Nice, tags) "idle": cts.Idle,
acc.Add("iowait", cts.Iowait, tags) "nice": cts.Nice,
acc.Add("irq", cts.Irq, tags) "iowait": cts.Iowait,
acc.Add("softirq", cts.Softirq, tags) "irq": cts.Irq,
acc.Add("steal", cts.Steal, tags) "softirq": cts.Softirq,
acc.Add("guest", cts.Guest, tags) "steal": cts.Steal,
acc.Add("guest_nice", cts.GuestNice, tags) "guest": cts.Guest,
"guest_nice": cts.GuestNice,
acc.Add("cache", cont.Mem.Cache, tags) "cache": cont.Mem.Cache,
acc.Add("rss", cont.Mem.RSS, tags) "rss": cont.Mem.RSS,
acc.Add("rss_huge", cont.Mem.RSSHuge, tags) "rss_huge": cont.Mem.RSSHuge,
acc.Add("mapped_file", cont.Mem.MappedFile, tags) "mapped_file": cont.Mem.MappedFile,
acc.Add("swap_in", cont.Mem.Pgpgin, tags) "swap_in": cont.Mem.Pgpgin,
acc.Add("swap_out", cont.Mem.Pgpgout, tags) "swap_out": cont.Mem.Pgpgout,
acc.Add("page_fault", cont.Mem.Pgfault, tags) "page_fault": cont.Mem.Pgfault,
acc.Add("page_major_fault", cont.Mem.Pgmajfault, tags) "page_major_fault": cont.Mem.Pgmajfault,
acc.Add("inactive_anon", cont.Mem.InactiveAnon, tags) "inactive_anon": cont.Mem.InactiveAnon,
acc.Add("active_anon", cont.Mem.ActiveAnon, tags) "active_anon": cont.Mem.ActiveAnon,
acc.Add("inactive_file", cont.Mem.InactiveFile, tags) "inactive_file": cont.Mem.InactiveFile,
acc.Add("active_file", cont.Mem.ActiveFile, tags) "active_file": cont.Mem.ActiveFile,
acc.Add("unevictable", cont.Mem.Unevictable, tags) "unevictable": cont.Mem.Unevictable,
acc.Add("memory_limit", cont.Mem.HierarchicalMemoryLimit, tags) "memory_limit": cont.Mem.HierarchicalMemoryLimit,
acc.Add("total_cache", cont.Mem.TotalCache, tags) "total_cache": cont.Mem.TotalCache,
acc.Add("total_rss", cont.Mem.TotalRSS, tags) "total_rss": cont.Mem.TotalRSS,
acc.Add("total_rss_huge", cont.Mem.TotalRSSHuge, tags) "total_rss_huge": cont.Mem.TotalRSSHuge,
acc.Add("total_mapped_file", cont.Mem.TotalMappedFile, tags) "total_mapped_file": cont.Mem.TotalMappedFile,
acc.Add("total_swap_in", cont.Mem.TotalPgpgIn, tags) "total_swap_in": cont.Mem.TotalPgpgIn,
acc.Add("total_swap_out", cont.Mem.TotalPgpgOut, tags) "total_swap_out": cont.Mem.TotalPgpgOut,
acc.Add("total_page_fault", cont.Mem.TotalPgFault, tags) "total_page_fault": cont.Mem.TotalPgFault,
acc.Add("total_page_major_fault", cont.Mem.TotalPgMajFault, tags) "total_page_major_fault": cont.Mem.TotalPgMajFault,
acc.Add("total_inactive_anon", cont.Mem.TotalInactiveAnon, tags) "total_inactive_anon": cont.Mem.TotalInactiveAnon,
acc.Add("total_active_anon", cont.Mem.TotalActiveAnon, tags) "total_active_anon": cont.Mem.TotalActiveAnon,
acc.Add("total_inactive_file", cont.Mem.TotalInactiveFile, tags) "total_inactive_file": cont.Mem.TotalInactiveFile,
acc.Add("total_active_file", cont.Mem.TotalActiveFile, tags) "total_active_file": cont.Mem.TotalActiveFile,
acc.Add("total_unevictable", cont.Mem.TotalUnevictable, tags) "total_unevictable": cont.Mem.TotalUnevictable,
}
acc.AddFields("docker", fields, tags)
} }
return nil return nil

View File

@ -22,18 +22,17 @@ func (s *MemStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting virtual memory info: %s", err) return fmt.Errorf("error getting virtual memory info: %s", err)
} }
vmtags := map[string]string(nil) fields := map[string]interface{}{
"total": vm.Total,
acc.Add("total", vm.Total, vmtags) "available": vm.Available,
acc.Add("available", vm.Available, vmtags) "used": vm.Used,
acc.Add("used", vm.Used, vmtags) "free": vm.Free,
acc.Add("free", vm.Free, vmtags) "cached": vm.Cached,
acc.Add("cached", vm.Cached, vmtags) "buffered": vm.Buffers,
acc.Add("buffered", vm.Buffers, vmtags) "used_percent": 100 * float64(vm.Used) / float64(vm.Total),
acc.Add("used_percent", 100*float64(vm.Used)/float64(vm.Total), vmtags) "available_percent": 100 * float64(vm.Available) / float64(vm.Total),
acc.Add("available_percent", }
100*float64(vm.Available)/float64(vm.Total), acc.AddFields("mem", fields, nil)
vmtags)
return nil return nil
} }
@ -54,14 +53,15 @@ func (s *SwapStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting swap memory info: %s", err) return fmt.Errorf("error getting swap memory info: %s", err)
} }
swaptags := map[string]string(nil) fields := map[string]interface{}{
"total": swap.Total,
acc.Add("total", swap.Total, swaptags) "used": swap.Used,
acc.Add("used", swap.Used, swaptags) "free": swap.Free,
acc.Add("free", swap.Free, swaptags) "used_percent": swap.UsedPercent,
acc.Add("used_percent", swap.UsedPercent, swaptags) "in": swap.Sin,
acc.Add("in", swap.Sin, swaptags) "out": swap.Sout,
acc.Add("out", swap.Sout, swaptags) }
acc.AddFields("swap", fields, nil)
return nil return nil
} }

View File

@ -0,0 +1,73 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMemStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
vms := &mem.VirtualMemoryStat{
Total: 12400,
Available: 7600,
Used: 5000,
Free: 1235,
// Active: 8134,
// Inactive: 1124,
// Buffers: 771,
// Cached: 4312,
// Wired: 134,
// Shared: 2142,
}
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
vmtags := map[string]string(nil)
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
assert.True(t, acc.CheckTaggedValue("available_percent",
float64(7600)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("used_percent",
float64(5000)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
acc.Points = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swaptags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
}

View File

@ -70,14 +70,17 @@ func (s *NetIOStats) Gather(acc plugins.Accumulator) error {
"interface": io.Name, "interface": io.Name,
} }
acc.Add("bytes_sent", io.BytesSent, tags) fields := map[string]interface{}{
acc.Add("bytes_recv", io.BytesRecv, tags) "bytes_sent": io.BytesSent,
acc.Add("packets_sent", io.PacketsSent, tags) "bytes_recv": io.BytesRecv,
acc.Add("packets_recv", io.PacketsRecv, tags) "packets_sent": io.PacketsSent,
acc.Add("err_in", io.Errin, tags) "packets_recv": io.PacketsRecv,
acc.Add("err_out", io.Errout, tags) "err_in": io.Errin,
acc.Add("drop_in", io.Dropin, tags) "err_out": io.Errout,
acc.Add("drop_out", io.Dropout, tags) "drop_in": io.Dropin,
"drop_out": io.Dropout,
}
acc.AddFields("net", fields, tags)
} }
// Get system wide stats for different network protocols // Get system wide stats for different network protocols

View File

@ -0,0 +1,88 @@
package system
import (
"syscall"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNetStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
netio := net.NetIOCountersStat{
Name: "eth0",
BytesSent: 1123,
BytesRecv: 8734422,
PacketsSent: 781,
PacketsRecv: 23456,
Errin: 832,
Errout: 8,
Dropin: 7,
Dropout: 1,
}
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
netprotos := []net.NetProtoCountersStat{
net.NetProtoCountersStat{
Protocol: "Udp",
Stats: map[string]int64{
"InDatagrams": 4655,
"NoPorts": 892592,
},
},
}
mps.On("NetProto").Return(netprotos, nil)
netstats := []net.NetConnectionStat{
net.NetConnectionStat{
Type: syscall.SOCK_DGRAM,
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "CLOSE",
},
}
mps.On("NetConnections").Return(netstats, nil)
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
require.NoError(t, err)
ntags := map[string]string{
"interface": "eth0",
}
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
acc.Points = nil
err = (&NetStats{&mps}).Gather(&acc)
require.NoError(t, err)
netstattags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
}

View File

@ -42,19 +42,23 @@ func (s *NetStats) Gather(acc plugins.Accumulator) error {
} }
counts[netcon.Status] = c + 1 counts[netcon.Status] = c + 1
} }
acc.Add("tcp_established", counts["ESTABLISHED"], tags)
acc.Add("tcp_syn_sent", counts["SYN_SENT"], tags) fields := map[string]interface{}{
acc.Add("tcp_syn_recv", counts["SYN_RECV"], tags) "tcp_established": counts["ESTABLISHED"],
acc.Add("tcp_fin_wait1", counts["FIN_WAIT1"], tags) "tcp_syn_sent": counts["SYN_SENT"],
acc.Add("tcp_fin_wait2", counts["FIN_WAIT2"], tags) "tcp_syn_recv": counts["SYN_RECV"],
acc.Add("tcp_time_wait", counts["TIME_WAIT"], tags) "tcp_fin_wait1": counts["FIN_WAIT1"],
acc.Add("tcp_close", counts["CLOSE"], tags) "tcp_fin_wait2": counts["FIN_WAIT2"],
acc.Add("tcp_close_wait", counts["CLOSE_WAIT"], tags) "tcp_time_wait": counts["TIME_WAIT"],
acc.Add("tcp_last_ack", counts["LAST_ACK"], tags) "tcp_close": counts["CLOSE"],
acc.Add("tcp_listen", counts["LISTEN"], tags) "tcp_close_wait": counts["CLOSE_WAIT"],
acc.Add("tcp_closing", counts["CLOSING"], tags) "tcp_last_ack": counts["LAST_ACK"],
acc.Add("tcp_none", counts["NONE"], tags) "tcp_listen": counts["LISTEN"],
acc.Add("udp_socket", counts["UDP"], tags) "tcp_closing": counts["CLOSING"],
"tcp_none": counts["NONE"],
"udp_socket": counts["UDP"],
}
acc.AddFields("netstat", fields, tags)
return nil return nil
} }

View File

@ -1,12 +1,16 @@
package system package system
import ( import (
"fmt"
gonet "net" gonet "net"
"os" "os"
"reflect"
"strings" "strings"
"testing"
"github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/testutil"
dc "github.com/fsouza/go-dockerclient" dc "github.com/fsouza/go-dockerclient"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
@ -14,6 +18,8 @@ import (
"github.com/shirou/gopsutil/docker" "github.com/shirou/gopsutil/docker"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net" "github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
) )
type DockerContainerStat struct { type DockerContainerStat struct {
@ -166,3 +172,49 @@ func (s *systemPS) DockerStat() ([]*DockerContainerStat, error) {
return stats, nil return stats, nil
} }
// Asserts that a given accumulator contains a measurment of type float64 with
// specific tags within a certain distance of a given expected value. Asserts a failure
// if the measurement is of the wrong type, or if no matching measurements are found
//
// Paramaters:
// t *testing.T : Testing object to use
// acc testutil.Accumulator: Accumulator to examine
// measurement string : Name of the measurement to examine
// expectedValue float64 : Value to search for within the measurement
// delta float64 : Maximum acceptable distance of an accumulated value
// from the expectedValue parameter. Useful when
// floating-point arithmatic imprecision makes looking
// for an exact match impractical
// tags map[string]string : Tag set the found measurement must have. Set to nil to
// ignore the tag set.
func assertContainsTaggedFloat(
t *testing.T,
acc *testutil.Accumulator,
measurement string,
expectedValue float64,
delta float64,
tags map[string]string,
) {
var actualValue float64
for _, pt := range acc.Points {
if pt.Measurement == measurement {
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
if value, ok := pt.Fields["value"].(float64); ok {
actualValue = value
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
// Found the point, return without failing
return
}
} else {
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
measurement))
}
}
}
}
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}

View File

@ -37,11 +37,14 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
return err return err
} }
acc.Add("load1", loadavg.Load1, nil) fields := map[string]interface{}{
acc.Add("load5", loadavg.Load5, nil) "load1": loadavg.Load1,
acc.Add("load15", loadavg.Load15, nil) "load5": loadavg.Load5,
acc.Add("uptime", float64(hostinfo.Uptime), nil) "load15": loadavg.Load15,
acc.Add("uptime_format", format_uptime(hostinfo.Uptime), nil) "uptime": float64(hostinfo.Uptime),
"uptime_format": format_uptime(hostinfo.Uptime),
}
acc.AddFields("system", fields, nil)
return nil return nil
} }

View File

@ -1,426 +0,0 @@
package system
import (
"fmt"
"reflect"
"syscall"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSystemStats_GenerateStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
cts := cpu.CPUTimesStat{
CPU: "cpu0",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0511,
Guest: 8.1,
GuestNice: 0.324,
}
cts2 := cpu.CPUTimesStat{
CPU: "cpu0",
User: 11.4, // increased by 8.3
System: 10.9, // increased by 2.7
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
Nice: 2.5, // increased by 1.2
Iowait: 0.7, // increased by 0.5
Irq: 1.2, // increased by 1.1
Softirq: 0.31, // increased by 0.2
Steal: 0.2812, // increased by 0.0001
Guest: 12.9, // increased by 4.8
GuestNice: 2.524, // increased by 2.2
}
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
du := []*disk.DiskUsageStat{
{
Path: "/",
Fstype: "ext4",
Total: 128,
Free: 23,
InodesTotal: 1234,
InodesFree: 234,
},
{
Path: "/home",
Fstype: "ext4",
Total: 256,
Free: 46,
InodesTotal: 2468,
InodesFree: 468,
},
}
mps.On("DiskUsage").Return(du, nil)
diskio1 := disk.DiskIOCountersStat{
ReadCount: 888,
WriteCount: 5341,
ReadBytes: 100000,
WriteBytes: 200000,
ReadTime: 7123,
WriteTime: 9087,
Name: "sda1",
IoTime: 123552,
SerialNumber: "ab-123-ad",
}
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil)
netio := net.NetIOCountersStat{
Name: "eth0",
BytesSent: 1123,
BytesRecv: 8734422,
PacketsSent: 781,
PacketsRecv: 23456,
Errin: 832,
Errout: 8,
Dropin: 7,
Dropout: 1,
}
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
netprotos := []net.NetProtoCountersStat{
net.NetProtoCountersStat{
Protocol: "Udp",
Stats: map[string]int64{
"InDatagrams": 4655,
"NoPorts": 892592,
},
},
}
mps.On("NetProto").Return(netprotos, nil)
vms := &mem.VirtualMemoryStat{
Total: 12400,
Available: 7600,
Used: 5000,
Free: 1235,
// Active: 8134,
// Inactive: 1124,
// Buffers: 771,
// Cached: 4312,
// Wired: 134,
// Shared: 2142,
}
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
netstats := []net.NetConnectionStat{
net.NetConnectionStat{
Type: syscall.SOCK_DGRAM,
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "CLOSE",
},
}
mps.On("NetConnections").Return(netstats, nil)
cs := NewCPUStats(&mps)
cputags := map[string]string{
"cpu": "cpu0",
}
preCPUPoints := len(acc.Points)
err := cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints := len(acc.Points) - preCPUPoints
expectedCPUPoints := 10
assert.Equal(t, expectedCPUPoints, numCPUPoints)
// Computed values are checked with delta > 0 becasue of floating point arithmatic
// imprecision
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
mps2 := MockPS{}
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
cs.ps = &mps2
// Should have added cpu percentages too
err = cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints = len(acc.Points) - (preCPUPoints + numCPUPoints)
expectedCPUPoints = 20
assert.Equal(t, expectedCPUPoints, numCPUPoints)
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
preDiskPoints := len(acc.Points)
err = (&DiskStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskPoints := len(acc.Points) - preDiskPoints
expectedAllDiskPoints := 12
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
tags1 := map[string]string{
"path": "/",
"fstype": "ext4",
}
tags2 := map[string]string{
"path": "/home",
"fstype": "ext4",
}
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
// We expect 6 more DiskPoints to show up with an explicit match on "/"
// and /home not matching the /dev in Mountpoints
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, preDiskPoints+expectedAllDiskPoints+6, len(acc.Points))
// We should see all the diskpoints as Mountpoints includes both
// / and /home
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, preDiskPoints+2*expectedAllDiskPoints+6, len(acc.Points))
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
require.NoError(t, err)
ntags := map[string]string{
"interface": "eth0",
}
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
preDiskIOPoints := len(acc.Points)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskIOPoints := len(acc.Points) - preDiskIOPoints
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1",
"serial": "ab-123-ad",
}
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
vmtags := map[string]string(nil)
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
assert.True(t, acc.CheckTaggedValue("available_percent",
float64(7600)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("used_percent",
float64(5000)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
acc.Points = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swaptags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
acc.Points = nil
err = (&NetStats{&mps}).Gather(&acc)
require.NoError(t, err)
netstattags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
}
// Asserts that a given accumulator contains a measurment of type float64 with
// specific tags within a certain distance of a given expected value. Asserts a failure
// if the measurement is of the wrong type, or if no matching measurements are found
//
// Paramaters:
// t *testing.T : Testing object to use
// acc testutil.Accumulator: Accumulator to examine
// measurement string : Name of the measurement to examine
// expectedValue float64 : Value to search for within the measurement
// delta float64 : Maximum acceptable distance of an accumulated value
// from the expectedValue parameter. Useful when
// floating-point arithmatic imprecision makes looking
// for an exact match impractical
// tags map[string]string : Tag set the found measurement must have. Set to nil to
// ignore the tag set.
func assertContainsTaggedFloat(
t *testing.T,
acc *testutil.Accumulator,
measurement string,
expectedValue float64,
delta float64,
tags map[string]string,
) {
var actualValue float64
for _, pt := range acc.Points {
if pt.Measurement == measurement {
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
if value, ok := pt.Fields["value"].(float64); ok {
actualValue = value
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
// Found the point, return without failing
return
}
} else {
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
measurement))
}
}
}
}
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}