Compare commits

...

28 Commits

Author SHA1 Message Date
Cameron Sparr
069cb9766b Fix httpjson panic for nil request body 2015-12-21 13:42:32 -08:00
Cameron Sparr
8b54c73ae4 0.3.0 Removing internal parallelism: twemproxy and rabbitmq 2015-12-19 20:26:18 -07:00
Cameron Sparr
c9ef073fba 0.3.0 Removing internal parallelism: procstat 2015-12-19 16:06:21 -07:00
Cameron Sparr
15f66d7d1b 0.3.0 Removing internal parallelism: postgresql 2015-12-19 15:58:57 -07:00
Cameron Sparr
b0f79f43ec 0.3.0 Removing internal parallelism: httpjson and exec 2015-12-19 15:37:16 -07:00
Cameron Sparr
c584129758 0.3.0 outputs: riemann 2015-12-19 14:55:44 -07:00
Cameron Sparr
d1930c90b5 CHANGELOG update 2015-12-19 14:46:33 -07:00
Cameron Sparr
1e76e36df2 0.3.0 outputs: opentsdb 2015-12-19 14:30:37 -07:00
Cameron Sparr
a73b5257dc 0.3.0 output: librato 2015-12-19 14:19:43 -07:00
Cameron Sparr
c16be04ca7 0.3.0 output: datadog and amon 2015-12-19 14:08:31 -07:00
Cameron Sparr
5513275f2c 0.3.0: mongodb and jolokia 2015-12-19 13:31:22 -07:00
Cameron Sparr
3a7b1688a3 0.3.0: postgresql and phpfpm 2015-12-18 17:09:01 -07:00
Cameron Sparr
35d5c7bae3 0.3.0 HAProxy rebase 2015-12-18 16:39:45 -07:00
Cameron Sparr
60b6693ae3 0.3.0: rethinkdb 2015-12-18 16:39:45 -07:00
Cameron Sparr
c1e1f2ace4 0.3.0: zookeeper and zfs 2015-12-18 16:39:45 -07:00
Cameron Sparr
6698d195d8 backwards compatability for io->diskio change 2015-12-18 16:39:45 -07:00
Cameron Sparr
23b21ca86a 0.3.0: trig and twemproxy 2015-12-18 16:39:45 -07:00
Cameron Sparr
56e14e4731 0.3.0 redis & rabbitmq 2015-12-18 16:39:45 -07:00
Cameron Sparr
7deb339b76 0.3.0: prometheus & puppetagent 2015-12-18 16:39:45 -07:00
Cameron Sparr
0e55c371b7 0.3.0: procstat 2015-12-18 16:39:45 -07:00
Cameron Sparr
f284c8c154 0.3.0: ping, mysql, nginx 2015-12-18 16:39:45 -07:00
Cameron Sparr
e3b314cacb 0.3.0: mailchimp & memcached 2015-12-18 16:39:45 -07:00
Cameron Sparr
9fce094b36 0.3.0: leofs & lustre2 2015-12-18 16:39:45 -07:00
Cameron Sparr
319c363c8e 0.3.0 httpjson 2015-12-18 16:39:45 -07:00
Cameron Sparr
40d84accee 0.3.0: HAProxy 2015-12-18 16:39:45 -07:00
Cameron Sparr
3fc43df84e Breakout JSON flattening into internal package, exec & elasticsearch aggregation 2015-12-18 16:39:45 -07:00
Cameron Sparr
59f804d77a Updating aerospike & apache plugins for 0.3.0 2015-12-18 16:39:45 -07:00
Cameron Sparr
96d5f0d0de Updating system plugins for 0.3.0 2015-12-18 16:39:45 -07:00
61 changed files with 1621 additions and 1498 deletions

View File

@@ -1,3 +1,25 @@
## v0.3.0 [unreleased]
### Release Notes
- **breaking change** the `io` plugin has been renamed `diskio`
- **breaking change** Plugin measurements aggregated into a single measurement.
- **breaking change** `jolokia` plugin: must use global tag/drop/pass parameters
for configuration.
- `twemproxy` plugin: `prefix` option removed.
- `procstat` cpu measurements are now prepended with `cpu_time_` instead of
only `cpu_`
- The prometheus plugin schema has not been changed (measurements have not been
aggregated).
### Features
- Plugin measurements aggregated into a single measurement.
- Added ability to specify per-plugin tags
- Added ability to specify per-plugin measurement suffix and prefix.
(`name_prefix` and `name_suffix`)
- Added ability to override base plugin name. (`name_override`)
### Bugfixes
## v0.2.5 [unreleased] ## v0.2.5 [unreleased]
### Features ### Features

177
CONFIGURATION.md Normal file
View File

@@ -0,0 +1,177 @@
# Telegraf Configuration
## Plugin Configuration
There are some configuration options that are configurable per plugin:
* **name_override**: Override the base name of the measurement.
(Default is the name of the plugin).
* **name_prefix**: Specifies a prefix to attach to the measurement name.
* **name_suffix**: Specifies a suffix to attach to the measurement name.
* **tags**: A map of tags to apply to a specific plugin's measurements.
### Plugin Filters
There are also filters that can be configured per plugin:
* **pass**: An array of strings that is used to filter metrics generated by the
current plugin. Each string in the array is tested as a glob match against field names
and if it matches, the field is emitted.
* **drop**: The inverse of pass, if a field name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter
measurements by the current plugin. Each string in the array is tested as a glob
match against the tag name, and if it matches the measurement is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not emitted.
This is tested on measurements that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
you can configure that here.
### Plugin Configuration Examples
This is a full working config that will output CPU data to an InfluxDB instance
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect per-cpu data, dropping any
fields which begin with `time_`.
```toml
[tags]
dc = "denver-1"
[agent]
interval = "10s"
# OUTPUTS
[outputs]
[[outputs.influxdb]]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
# PLUGINS
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
# filter all fields beginning with 'time_'
drop = ["time_*"]
```
### Plugin Config: tagpass and tagdrop
```toml
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
# Don't collect CPU data for cpu6 & cpu7
[plugins.cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
[[plugins.disk]]
[plugins.disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
# Globs can also be used on the tag values
path = [ "/opt", "/home*" ]
```
### Plugin Config: pass and drop
```toml
# Drop all metrics for guest & steal CPU usage
[[plugins.cpu]]
percpu = false
totalcpu = true
drop = ["usage_guest", "usage_steal"]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = ["inodes*"]
```
### Plugin config: prefix, suffix, and override
This plugin will emit measurements with the name `cpu_total`
```toml
[[plugins.cpu]]
name_suffix = "_total"
percpu = false
totalcpu = true
```
This will emit measurements with the name `foobar`
```toml
[[plugins.cpu]]
name_override = "foobar"
percpu = false
totalcpu = true
```
### Plugin config: tags
This plugin will emit measurements with two additional tags: `tag1=foo` and
`tag2=bar`
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[plugins.cpu.tags]
tag1 = "foo"
tag2 = "bar"
```
### Multiple plugins of the same type
Additional plugins (or outputs) of the same type can be specified,
just define more instances in the config file:
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
## Output Configuration
Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.
Outputs also support the same configurable options as plugins
(pass, drop, tagpass, tagdrop), added in 0.2.4
```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
precision = "s"
# Drop all measurements that start with "aerospike"
drop = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-aerospike-data"
precision = "s"
# Only accept aerospike data:
pass = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-cpu0-data"
precision = "s"
# Only store measurements where the tag "cpu" matches the value "cpu0"
[outputs.influxdb.tagpass]
cpu = ["cpu0"]
```

133
README.md
View File

@@ -90,99 +90,10 @@ unit parser, e.g. "10s" for 10 seconds or "5m" for 5 minutes.
* **debug**: Set to true to gather and send metrics to STDOUT as well as * **debug**: Set to true to gather and send metrics to STDOUT as well as
InfluxDB. InfluxDB.
## Plugin Options ## Configuration
There are 5 configuration options that are configurable per plugin: See the [configuration guide](CONFIGURATION.md) for a rundown of the more advanced
configuration options.
* **pass**: An array of strings that is used to filter metrics generated by the
current plugin. Each string in the array is tested as a glob match against metric names
and if it matches, the metric is emitted.
* **drop**: The inverse of pass, if a metric name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter metrics by the current plugin. Each string in the array is tested as a glob match against
the tag name, and if it matches the metric is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted.
This is tested on metrics that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
you can configure that here.
### Plugin Configuration Examples
This is a full working config that will output CPU data to an InfluxDB instance
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect per-cpu data, dropping any
measurements which begin with `cpu_time`.
```toml
[tags]
dc = "denver-1"
[agent]
interval = "10s"
# OUTPUTS
[outputs]
[[outputs.influxdb]]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
# PLUGINS
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
Below is how to configure `tagpass` and `tagdrop` parameters
```toml
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
# Don't collect CPU data for cpu6 & cpu7
[plugins.cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
[[plugins.disk]]
[plugins.disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
fstype = [ "ext4", "xfs" ]
# Globs can also be used on the tag values
path = [ "/opt", "/home*" ]
```
Below is how to configure `pass` and `drop` parameters
```toml
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes*" ]
```
Additional plugins (or outputs) of the same type can be specified,
just define more instances in the config file:
```toml
[[plugins.cpu]]
percpu = false
totalcpu = true
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time*"]
```
## Supported Plugins ## Supported Plugins
@@ -200,7 +111,7 @@ Telegraf currently has support for collecting metrics from:
* haproxy * haproxy
* httpjson (generic JSON-emitting http service plugin) * httpjson (generic JSON-emitting http service plugin)
* influxdb * influxdb
* jolokia (remote JMX with JSON over HTTP) * jolokia
* leofs * leofs
* lustre2 * lustre2
* mailchimp * mailchimp
@@ -223,10 +134,10 @@ Telegraf currently has support for collecting metrics from:
* system * system
* cpu * cpu
* mem * mem
* io
* net * net
* netstat * netstat
* disk * disk
* diskio
* swap * swap
## Supported Service Plugins ## Supported Service Plugins
@@ -239,40 +150,6 @@ Telegraf can collect metrics via the following services:
We'll be adding support for many more over the coming months. Read on if you We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API. want to add support for another service or third-party API.
## Output options
Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.
Outputs also support the same configurable options as plugins
(pass, drop, tagpass, tagdrop), added in 0.2.4
```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
precision = "s"
# Drop all measurements that start with "aerospike"
drop = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-aerospike-data"
precision = "s"
# Only accept aerospike data:
pass = ["aerospike*"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-cpu0-data"
precision = "s"
# Only store measurements where the tag "cpu" matches the value "cpu0"
[outputs.influxdb.tagpass]
cpu = ["cpu0"]
```
## Supported Outputs ## Supported Outputs
* influxdb * influxdb

View File

@@ -69,30 +69,72 @@ func (ac *accumulator) AddFields(
tags map[string]string, tags map[string]string,
t ...time.Time, t ...time.Time,
) { ) {
// Validate uint64 and float64 fields if !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
// Override measurement name if set
if len(ac.pluginConfig.NameOverride) != 0 {
measurement = ac.pluginConfig.NameOverride
}
// Apply measurement prefix and suffix if set
if len(ac.pluginConfig.MeasurementPrefix) != 0 {
measurement = ac.pluginConfig.MeasurementPrefix + measurement
}
if len(ac.pluginConfig.MeasurementSuffix) != 0 {
measurement = measurement + ac.pluginConfig.MeasurementSuffix
}
if tags == nil {
tags = make(map[string]string)
}
// Apply plugin-wide tags if set
for k, v := range ac.pluginConfig.Tags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
// Apply daemon-wide tags if set
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
result := make(map[string]interface{})
for k, v := range fields { for k, v := range fields {
// Filter out any filtered fields
if ac.pluginConfig != nil {
if !ac.pluginConfig.Filter.ShouldPass(k) {
continue
}
}
result[k] = v
// Validate uint64 and float64 fields
switch val := v.(type) { switch val := v.(type) {
case uint64: case uint64:
// InfluxDB does not support writing uint64 // InfluxDB does not support writing uint64
if val < uint64(9223372036854775808) { if val < uint64(9223372036854775808) {
fields[k] = int64(val) result[k] = int64(val)
} else { } else {
fields[k] = int64(9223372036854775807) result[k] = int64(9223372036854775807)
} }
case float64: case float64:
// NaNs are invalid values in influxdb, skip measurement // NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) { if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug { if ac.debug {
log.Printf("Measurement [%s] has a NaN or Inf field, skipping", log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
measurement) "field, skipping",
measurement, k)
} }
return continue
} }
} }
} }
fields = nil
if tags == nil { if len(result) == 0 {
tags = make(map[string]string) return
} }
var timestamp time.Time var timestamp time.Time
@@ -106,19 +148,7 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement measurement = ac.prefix + measurement
} }
if ac.pluginConfig != nil { pt, err := client.NewPoint(measurement, tags, result, timestamp)
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
}
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
pt, err := client.NewPoint(measurement, tags, fields, timestamp)
if err != nil { if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return return

View File

@@ -104,7 +104,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Config.Tags) acc.SetDefaultTags(a.Config.Tags)
if err := plugin.Plugin.Gather(acc); err != nil { if err := plugin.Plugin.Gather(acc); err != nil {
@@ -141,7 +141,7 @@ func (a *Agent) gatherSeparate(
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug) acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Config.Tags) acc.SetDefaultTags(a.Config.Tags)
if err := plugin.Plugin.Gather(acc); err != nil { if err := plugin.Plugin.Gather(acc); err != nil {
@@ -187,7 +187,7 @@ func (a *Agent) Test() error {
for _, plugin := range a.Config.Plugins { for _, plugin := range a.Config.Plugins {
acc := NewAccumulator(plugin.Config, pointChan) acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(true) acc.SetDebug(true)
acc.SetPrefix(plugin.Name + "_") // acc.SetPrefix(plugin.Name + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name) fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name)
if plugin.Config.Interval != 0 { if plugin.Config.Interval != 0 {

View File

@@ -97,8 +97,8 @@
# Mountpoints=["/"] # Mountpoints=["/"]
# Read metrics about disk IO by device # Read metrics about disk IO by device
[[plugins.io]] [[plugins.diskio]]
# By default, telegraf will gather stats for all devices including # By default, telegraf will gather stats for all devices including
# disk partitions. # disk partitions.
# Setting devices will restrict the stats to the specified devcies. # Setting devices will restrict the stats to the specified devcies.
# Devices=["sda","sdb"] # Devices=["sda","sdb"]

View File

@@ -112,9 +112,13 @@ type Filter struct {
// PluginConfig containing a name, interval, and filter // PluginConfig containing a name, interval, and filter
type PluginConfig struct { type PluginConfig struct {
Name string Name string
Filter Filter NameOverride string
Interval time.Duration MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
} }
// OutputConfig containing name and filter // OutputConfig containing name and filter
@@ -142,12 +146,12 @@ func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
// ShouldPass returns true if the metric should pass, false if should drop // ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters // based on the drop/pass filter parameters
func (f Filter) ShouldPass(measurement string) bool { func (f Filter) ShouldPass(fieldkey string) bool {
if f.Pass != nil { if f.Pass != nil {
for _, pat := range f.Pass { for _, pat := range f.Pass {
// TODO remove HasPrefix check, leaving it for now for legacy support. // TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07 // Cam, 2015-12-07
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return true return true
} }
} }
@@ -158,7 +162,7 @@ func (f Filter) ShouldPass(measurement string) bool {
for _, pat := range f.Drop { for _, pat := range f.Drop {
// TODO remove HasPrefix check, leaving it for now for legacy support. // TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07 // Cam, 2015-12-07
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) { if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return false return false
} }
} }
@@ -527,6 +531,11 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) { if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) {
return nil return nil
} }
// Legacy support renaming io plugin to diskio
if name == "io" {
name = "diskio"
}
creator, ok := plugins.Plugins[name] creator, ok := plugins.Plugins[name]
if !ok { if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name) return fmt.Errorf("Undefined but requested plugin: %s", name)
@@ -628,7 +637,8 @@ func buildFilter(tbl *ast.Table) Filter {
return f return f
} }
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a // buildPlugin parses plugin specific items from the ast.Table,
// builds the filter and returns a
// PluginConfig to be inserted into RunningPlugin // PluginConfig to be inserted into RunningPlugin
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
cp := &PluginConfig{Name: name} cp := &PluginConfig{Name: name}
@@ -644,10 +654,47 @@ func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
} }
} }
} }
if node, ok := tbl.Fields["name_prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementPrefix = str.Value
}
}
}
if node, ok := tbl.Fields["name_suffix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.MeasurementSuffix = str.Value
}
}
}
if node, ok := tbl.Fields["name_override"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
cp.NameOverride = str.Value
}
}
}
cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := toml.UnmarshalTable(subtbl, cp.Tags); err != nil {
log.Printf("Could not parse tags for plugin %s\n", name)
}
}
}
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "interval") delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
cp.Filter = buildFilter(tbl) cp.Filter = buildFilter(tbl)
return cp, nil return cp, nil
} }
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an // buildOutput parses output specific items from the ast.Table, builds the filter and returns an
@@ -659,5 +706,4 @@ func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
Filter: buildFilter(tbl), Filter: buildFilter(tbl),
} }
return oc, nil return oc, nil
} }

View File

@@ -105,7 +105,7 @@ urls = ["http://localhost/server-status?auto"]
drop = ["cpu_time"] drop = ["cpu_time"]
# Read metrics about disk usage by mount point # Read metrics about disk usage by mount point
[[plugins.disk]] [[plugins.diskio]]
# no configuration # no configuration
# Read metrics from one or many disque servers # Read metrics from one or many disque servers

View File

@@ -3,6 +3,7 @@ package internal
import ( import (
"bufio" "bufio"
"errors" "errors"
"fmt"
"os" "os"
"strings" "strings"
"time" "time"
@@ -27,6 +28,39 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
var NotImplementedError = errors.New("not implemented yet") var NotImplementedError = errors.New("not implemented yet")
type JSONFlattener struct {
Fields map[string]interface{}
}
// FlattenJSON flattens nested maps/interfaces into a fields map
func (f *JSONFlattener) FlattenJSON(
fieldname string,
v interface{},
) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
fieldname = strings.Trim(fieldname, "_")
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
if err != nil {
return err
}
}
case float64:
f.Fields[fieldname] = t
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
t, t, fieldname)
}
return nil
}
// ReadLines reads contents from a file and splits them by new lines. // ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) { func ReadLines(filename string) ([]string, error) {

View File

@@ -58,21 +58,26 @@ func (a *Amon) Write(points []*client.Point) error {
return nil return nil
} }
ts := TimeSeries{} ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points)) tempSeries := []*Metric{}
var acceptablePoints = 0 metricCounter := 0
for _, pt := range points { for _, pt := range points {
metric := &Metric{ mname := strings.Replace(pt.Name(), "_", ".", -1)
Metric: strings.Replace(pt.Name(), "_", ".", -1), if amonPts, err := buildPoints(pt); err == nil {
} for fieldName, amonPt := range amonPts {
if p, err := buildPoint(pt); err == nil { metric := &Metric{
metric.Points[0] = p Metric: mname + "_" + strings.Replace(fieldName, "_", ".", -1),
tempSeries[acceptablePoints] = metric }
acceptablePoints += 1 metric.Points[0] = amonPt
tempSeries = append(tempSeries, metric)
metricCounter++
}
} else { } else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
} }
} }
ts.Series = make([]*Metric, acceptablePoints)
ts.Series = make([]*Metric, metricCounter)
copy(ts.Series, tempSeries[0:]) copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
@@ -110,13 +115,17 @@ func (a *Amon) authenticatedUrl() string {
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
} }
func buildPoint(pt *client.Point) (Point, error) { func buildPoints(pt *client.Point) (map[string]Point, error) {
var p Point pts := make(map[string]Point)
if err := p.setValue(pt.Fields()["value"]); err != nil { for k, v := range pt.Fields() {
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) var p Point
if err := p.setValue(v); err != nil {
return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
p[0] = float64(pt.Time().Unix())
pts[k] = p
} }
p[0] = float64(pt.Time().Unix()) return pts, nil
return p, nil
} }
func (p *Point) setValue(v interface{}) error { func (p *Point) setValue(v interface{}) error {

View File

@@ -67,23 +67,26 @@ func (d *Datadog) Write(points []*client.Point) error {
return nil return nil
} }
ts := TimeSeries{} ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points)) tempSeries := []*Metric{}
var acceptablePoints = 0 metricCounter := 0
for _, pt := range points { for _, pt := range points {
metric := &Metric{ mname := strings.Replace(pt.Name(), "_", ".", -1)
Metric: strings.Replace(pt.Name(), "_", ".", -1), if amonPts, err := buildPoints(pt); err == nil {
Tags: buildTags(pt.Tags()), for fieldName, amonPt := range amonPts {
Host: pt.Tags()["host"], metric := &Metric{
} Metric: mname + strings.Replace(fieldName, "_", ".", -1),
if p, err := buildPoint(pt); err == nil { }
metric.Points[0] = p metric.Points[0] = amonPt
tempSeries[acceptablePoints] = metric tempSeries = append(tempSeries, metric)
acceptablePoints += 1 metricCounter++
}
} else { } else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
} }
} }
ts.Series = make([]*Metric, acceptablePoints)
ts.Series = make([]*Metric, metricCounter)
copy(ts.Series, tempSeries[0:]) copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
@@ -123,13 +126,17 @@ func (d *Datadog) authenticatedUrl() string {
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode()) return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
} }
func buildPoint(pt *client.Point) (Point, error) { func buildPoints(pt *client.Point) (map[string]Point, error) {
var p Point pts := make(map[string]Point)
if err := p.setValue(pt.Fields()["value"]); err != nil { for k, v := range pt.Fields() {
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) var p Point
if err := p.setValue(v); err != nil {
return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
p[0] = float64(pt.Time().Unix())
pts[k] = p
} }
p[0] = float64(pt.Time().Unix()) return pts, nil
return p, nil
} }
func buildTags(ptTags map[string]string) []string { func buildTags(ptTags map[string]string) []string {

View File

@@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/url" "net/url"
"strings" "strings"
"time"
"github.com/influxdb/influxdb/client/v2" "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal"
@@ -110,6 +111,7 @@ func (i *InfluxDB) Connect() error {
} }
i.conns = conns i.conns = conns
rand.Seed(time.Now().UnixNano())
return nil return nil
} }

View File

@@ -74,17 +74,21 @@ func (l *Librato) Write(points []*client.Point) error {
return nil return nil
} }
metrics := Metrics{} metrics := Metrics{}
var tempGauges = make([]*Gauge, len(points)) tempGauges := []*Gauge{}
var acceptablePoints = 0 metricCounter := 0
for _, pt := range points { for _, pt := range points {
if gauge, err := l.buildGauge(pt); err == nil { if gauges, err := l.buildGauges(pt); err == nil {
tempGauges[acceptablePoints] = gauge for _, gauge := range gauges {
acceptablePoints += 1 tempGauges = append(tempGauges, gauge)
metricCounter++
}
} else { } else {
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name()) log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
} }
} }
metrics.Gauges = make([]*Gauge, acceptablePoints)
metrics.Gauges = make([]*Gauge, metricCounter)
copy(metrics.Gauges, tempGauges[0:]) copy(metrics.Gauges, tempGauges[0:])
metricsBytes, err := json.Marshal(metrics) metricsBytes, err := json.Marshal(metrics)
if err != nil { if err != nil {
@@ -118,22 +122,28 @@ func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to." return "Configuration for Librato API to send metrics to."
} }
func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) { func (l *Librato) buildGauges(pt *client.Point) ([]*Gauge, error) {
gauge := &Gauge{ gauges := []*Gauge{}
Name: pt.Name(), for fieldName, value := range pt.Fields() {
MeasureTime: pt.Time().Unix(), gauge := &Gauge{
} Name: pt.Name() + "_" + fieldName,
if err := gauge.setValue(pt.Fields()["value"]); err != nil { MeasureTime: pt.Time().Unix(),
return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error()) }
} if err := gauge.setValue(value); err != nil {
if l.SourceTag != "" { return gauges, fmt.Errorf("unable to extract value from Fields, %s\n",
if source, ok := pt.Tags()[l.SourceTag]; ok { err.Error())
gauge.Source = source }
} else { if l.SourceTag != "" {
return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag) if source, ok := pt.Tags()[l.SourceTag]; ok {
gauge.Source = source
} else {
return gauges,
fmt.Errorf("undeterminable Source type from Field, %s\n",
l.SourceTag)
}
} }
} }
return gauge, nil return gauges, nil
} }
func (g *Gauge) setValue(v interface{}) error { func (g *Gauge) setValue(v interface{}) error {

View File

@@ -62,7 +62,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
var timeNow = time.Now() now := time.Now()
// Send Data with telnet / socket communication // Send Data with telnet / socket communication
uri := fmt.Sprintf("%s:%d", o.Host, o.Port) uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
@@ -70,32 +71,21 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
if err != nil { if err != nil {
return fmt.Errorf("OpenTSDB: Telnet connect fail") return fmt.Errorf("OpenTSDB: Telnet connect fail")
} }
defer connection.Close()
for _, pt := range points { for _, pt := range points {
metric := &MetricLine{ for _, metric := range buildMetrics(pt, now, o.Prefix) {
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()), messageLine := fmt.Sprintf("put %s %v %s %s\n",
Timestamp: timeNow.Unix(), metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
} if o.Debug {
fmt.Print(messageLine)
metricValue, buildError := buildValue(pt) }
if buildError != nil { _, err := connection.Write([]byte(messageLine))
fmt.Printf("OpenTSDB: %s\n", buildError.Error()) if err != nil {
continue return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
} }
metric.Value = metricValue
tagsSlice := buildTags(pt.Tags())
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
if o.Debug {
fmt.Print(messageLine)
}
_, err := connection.Write([]byte(messageLine))
if err != nil {
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
} }
} }
defer connection.Close()
return nil return nil
} }
@@ -111,9 +101,29 @@ func buildTags(ptTags map[string]string) []string {
return tags return tags
} }
func buildValue(pt *client.Point) (string, error) { func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine {
ret := []*MetricLine{}
for fieldName, value := range pt.Fields() {
metric := &MetricLine{
Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName),
Timestamp: now.Unix(),
}
metricValue, buildError := buildValue(value)
if buildError != nil {
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(pt.Tags())
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
ret = append(ret, metric)
}
return ret
}
func buildValue(v interface{}) (string, error) {
var retv string var retv string
var v = pt.Fields()["value"]
switch p := v.(type) { switch p := v.(type) {
case int64: case int64:
retv = IntToString(int64(p)) retv = IntToString(int64(p))

View File

@@ -55,8 +55,10 @@ func (r *Riemann) Write(points []*client.Point) error {
var events []*raidman.Event var events []*raidman.Event
for _, p := range points { for _, p := range points {
ev := buildEvent(p) evs := buildEvents(p)
events = append(events, ev) for _, ev := range evs {
events = append(events, ev)
}
} }
var senderr = r.client.SendMulti(events) var senderr = r.client.SendMulti(events)
@@ -68,24 +70,28 @@ func (r *Riemann) Write(points []*client.Point) error {
return nil return nil
} }
func buildEvent(p *client.Point) *raidman.Event { func buildEvents(p *client.Point) []*raidman.Event {
host, ok := p.Tags()["host"] events := []*raidman.Event{}
if !ok { for fieldName, value := range p.Fields() {
hostname, err := os.Hostname() host, ok := p.Tags()["host"]
if err != nil { if !ok {
host = "unknown" hostname, err := os.Hostname()
} else { if err != nil {
host = hostname host = "unknown"
} else {
host = hostname
}
} }
event := &raidman.Event{
Host: host,
Service: p.Name() + "_" + fieldName,
Metric: value,
}
events = append(events, event)
} }
var event = &raidman.Event{ return events
Host: host,
Service: p.Name(),
Metric: p.Fields()["value"],
}
return event
} }
func init() { func init() {

View File

@@ -247,26 +247,32 @@ func get(key []byte, host string) (map[string]string, error) {
return data, err return data, err
} }
func readAerospikeStats(stats map[string]string, acc plugins.Accumulator, host, namespace string) { func readAerospikeStats(
stats map[string]string,
acc plugins.Accumulator,
host string,
namespace string,
) {
fields := make(map[string]interface{})
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
if namespace != "" {
tags["namespace"] = namespace
}
for key, value := range stats { for key, value := range stats {
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
if namespace != "" {
tags["namespace"] = namespace
}
// We are going to ignore all string based keys // We are going to ignore all string based keys
val, err := strconv.ParseInt(value, 10, 64) val, err := strconv.ParseInt(value, 10, 64)
if err == nil { if err == nil {
if strings.Contains(key, "-") { if strings.Contains(key, "-") {
key = strings.Replace(key, "-", "_", -1) key = strings.Replace(key, "-", "_", -1)
} }
acc.Add(key, val, tags) fields[key] = val
} }
} }
acc.AddFields("aerospike", fields, tags)
} }
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {

View File

@@ -72,32 +72,33 @@ func (n *Apache) gatherUrl(addr *url.URL, acc plugins.Accumulator) error {
tags := getTags(addr) tags := getTags(addr)
sc := bufio.NewScanner(resp.Body) sc := bufio.NewScanner(resp.Body)
fields := make(map[string]interface{})
for sc.Scan() { for sc.Scan() {
line := sc.Text() line := sc.Text()
if strings.Contains(line, ":") { if strings.Contains(line, ":") {
parts := strings.SplitN(line, ":", 2) parts := strings.SplitN(line, ":", 2)
key, part := strings.Replace(parts[0], " ", "", -1), strings.TrimSpace(parts[1]) key, part := strings.Replace(parts[0], " ", "", -1), strings.TrimSpace(parts[1])
switch key { switch key {
case "Scoreboard": case "Scoreboard":
n.gatherScores(part, acc, tags) for field, value := range n.gatherScores(part) {
fields[field] = value
}
default: default:
value, err := strconv.ParseFloat(part, 64) value, err := strconv.ParseFloat(part, 64)
if err != nil { if err != nil {
continue continue
} }
acc.Add(key, value, tags) fields[key] = value
} }
} }
} }
acc.AddFields("apache", fields, tags)
return nil return nil
} }
func (n *Apache) gatherScores(data string, acc plugins.Accumulator, tags map[string]string) { func (n *Apache) gatherScores(data string) map[string]interface{} {
var waiting, open int = 0, 0 var waiting, open int = 0, 0
var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0 var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0
@@ -129,17 +130,20 @@ func (n *Apache) gatherScores(data string, acc plugins.Accumulator, tags map[str
} }
} }
acc.Add("scboard_waiting", float64(waiting), tags) fields := map[string]interface{}{
acc.Add("scboard_starting", float64(S), tags) "scboard_waiting": float64(waiting),
acc.Add("scboard_reading", float64(R), tags) "scboard_starting": float64(S),
acc.Add("scboard_sending", float64(W), tags) "scboard_reading": float64(R),
acc.Add("scboard_keepalive", float64(K), tags) "scboard_sending": float64(W),
acc.Add("scboard_dnslookup", float64(D), tags) "scboard_keepalive": float64(K),
acc.Add("scboard_closing", float64(C), tags) "scboard_dnslookup": float64(D),
acc.Add("scboard_logging", float64(L), tags) "scboard_closing": float64(C),
acc.Add("scboard_finishing", float64(G), tags) "scboard_logging": float64(L),
acc.Add("scboard_idle_cleanup", float64(I), tags) "scboard_finishing": float64(G),
acc.Add("scboard_open", float64(open), tags) "scboard_idle_cleanup": float64(I),
"scboard_open": float64(open),
}
return fields
} }
// Get tag(s) for the apache plugin // Get tag(s) for the apache plugin

View File

@@ -81,7 +81,9 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
} }
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
acc.Add("dirty_data", value, tags)
fields := make(map[string]interface{})
fields["dirty_data"] = value
for _, path := range metrics { for _, path := range metrics {
key := filepath.Base(path) key := filepath.Base(path)
@@ -92,12 +94,13 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
} }
if key == "bypassed" { if key == "bypassed" {
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
acc.Add(key, value, tags) fields[key] = value
} else { } else {
value, _ := strconv.ParseUint(rawValue, 10, 64) value, _ := strconv.ParseUint(rawValue, 10, 64)
acc.Add(key, value, tags) fields[key] = value
} }
} }
acc.AddFields("bcache", fields, tags)
return nil return nil
} }
@@ -117,7 +120,7 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
} }
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 { if len(bdevs) < 1 {
return errors.New("Can't found any bcache device") return errors.New("Can't find any bcache device")
} }
for _, bdev := range bdevs { for _, bdev := range bdevs {
if restrictDevs { if restrictDevs {

View File

@@ -155,6 +155,8 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
var read int var read int
fields := make(map[string]interface{})
tags := map[string]string{"host": addr.String()}
for read < sz { for read < sz {
line, err := r.ReadString('\n') line, err := r.ReadString('\n')
if err != nil { if err != nil {
@@ -176,12 +178,11 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
continue continue
} }
tags := map[string]string{"host": addr.String()}
val := strings.TrimSpace(parts[1]) val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64) ival, err := strconv.ParseUint(val, 10, 64)
if err == nil { if err == nil {
acc.Add(metric, ival, tags) fields[metric] = ival
continue continue
} }
@@ -190,9 +191,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
return err return err
} }
acc.Add(metric, fval, tags) fields[metric] = fval
} }
acc.AddFields("disque", fields, tags)
return nil return nil
} }

View File

@@ -31,8 +31,9 @@ contains `status`, `timed_out`, `number_of_nodes`, `number_of_data_nodes`,
`initializing_shards`, `unassigned_shards` fields `initializing_shards`, `unassigned_shards` fields
- elasticsearch_cluster_health - elasticsearch_cluster_health
contains `status`, `number_of_shards`, `number_of_replicas`, `active_primary_shards`, contains `status`, `number_of_shards`, `number_of_replicas`,
`active_shards`, `relocating_shards`, `initializing_shards`, `unassigned_shards` fields `active_primary_shards`, `active_shards`, `relocating_shards`,
`initializing_shards`, `unassigned_shards` fields
- elasticsearch_indices - elasticsearch_indices
#### node measurements: #### node measurements:
@@ -316,4 +317,4 @@ Transport statistics about sent and received bytes in cluster communication meas
- elasticsearch_transport_rx_count value=6 - elasticsearch_transport_rx_count value=6
- elasticsearch_transport_rx_size_in_bytes value=1380 - elasticsearch_transport_rx_size_in_bytes value=1380
- elasticsearch_transport_tx_count value=6 - elasticsearch_transport_tx_count value=6
- elasticsearch_transport_tx_size_in_bytes value=1380 - elasticsearch_transport_tx_size_in_bytes value=1380

View File

@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@@ -141,10 +142,14 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) err
"breakers": n.Breakers, "breakers": n.Breakers,
} }
now := time.Now()
for p, s := range stats { for p, s := range stats {
if err := e.parseInterface(acc, p, tags, s); err != nil { f := internal.JSONFlattener{}
err := f.FlattenJSON("", s)
if err != nil {
return err return err
} }
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
} }
} }
return nil return nil
@@ -168,7 +173,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
"unassigned_shards": clusterStats.UnassignedShards, "unassigned_shards": clusterStats.UnassignedShards,
} }
acc.AddFields( acc.AddFields(
"cluster_health", "elasticsearch_cluster_health",
clusterFields, clusterFields,
map[string]string{"name": clusterStats.ClusterName}, map[string]string{"name": clusterStats.ClusterName},
measurementTime, measurementTime,
@@ -186,7 +191,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
"unassigned_shards": health.UnassignedShards, "unassigned_shards": health.UnassignedShards,
} }
acc.AddFields( acc.AddFields(
"indices", "elasticsearch_indices",
indexFields, indexFields,
map[string]string{"index": name}, map[string]string{"index": name},
measurementTime, measurementTime,
@@ -205,7 +210,8 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for // to let the underlying transport close the connection and re-establish a new one for
// future calls. // future calls.
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
r.StatusCode, http.StatusOK)
} }
if err = json.NewDecoder(r.Body).Decode(v); err != nil { if err = json.NewDecoder(r.Body).Decode(v); err != nil {
return err return err
@@ -213,25 +219,6 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
return nil return nil
} }
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, t, tags)
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
}
return nil
}
func init() { func init() {
plugins.Add("elasticsearch", func() plugins.Plugin { plugins.Add("elasticsearch", func() plugins.Plugin {
return NewElasticsearch() return NewElasticsearch()

View File

@@ -3,59 +3,38 @@ package exec
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/plugins"
"math"
"os/exec" "os/exec"
"strings"
"sync" "github.com/gonuts/go-shellquote"
"time"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins"
) )
const sampleConfig = ` const sampleConfig = `
# specify commands via an array of tables
[[plugins.exec.commands]]
# the command to run # the command to run
command = "/usr/bin/mycollector --foo=bar" command = "/usr/bin/mycollector --foo=bar"
# name of the command (used as a prefix for measurements) # name of the command (used as a prefix for measurements)
name = "mycollector" name = "mycollector"
# Only run this command if it has been at least this many
# seconds since it last ran
interval = 10
` `
type Exec struct { type Exec struct {
Commands []*Command Command string
runner Runner Name string
clock Clock
}
type Command struct { runner Runner
Command string
Name string
Interval int
lastRunAt time.Time
} }
type Runner interface { type Runner interface {
Run(*Command) ([]byte, error) Run(*Exec) ([]byte, error)
}
type Clock interface {
Now() time.Time
} }
type CommandRunner struct{} type CommandRunner struct{}
type RealClock struct{} func (c CommandRunner) Run(e *Exec) ([]byte, error) {
split_cmd, err := shellquote.Split(e.Command)
func (c CommandRunner) Run(command *Command) ([]byte, error) {
command.lastRunAt = time.Now()
split_cmd, err := shellquote.Split(command.Command)
if err != nil || len(split_cmd) == 0 { if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err) return nil, fmt.Errorf("exec: unable to parse command, %s", err)
} }
@@ -65,18 +44,14 @@ func (c CommandRunner) Run(command *Command) ([]byte, error) {
cmd.Stdout = &out cmd.Stdout = &out
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, command.Command) return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command)
} }
return out.Bytes(), nil return out.Bytes(), nil
} }
func (c RealClock) Now() time.Time {
return time.Now()
}
func NewExec() *Exec { func NewExec() *Exec {
return &Exec{runner: CommandRunner{}, clock: RealClock{}} return &Exec{runner: CommandRunner{}}
} }
func (e *Exec) SampleConfig() string { func (e *Exec) SampleConfig() string {
@@ -88,73 +63,34 @@ func (e *Exec) Description() string {
} }
func (e *Exec) Gather(acc plugins.Accumulator) error { func (e *Exec) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup out, err := e.runner.Run(e)
if err != nil {
errorChannel := make(chan error, len(e.Commands)) return err
for _, c := range e.Commands {
wg.Add(1)
go func(c *Command, acc plugins.Accumulator) {
defer wg.Done()
err := e.gatherCommand(c, acc)
if err != nil {
errorChannel <- err
}
}(c, acc)
} }
wg.Wait() var jsonOut interface{}
close(errorChannel) err = json.Unmarshal(out, &jsonOut)
if err != nil {
// Get all errors and return them as one giant error return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
errorStrings := []string{} e.Command, err)
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
} }
if len(errorStrings) == 0 { f := internal.JSONFlattener{}
return nil err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
} }
return errors.New(strings.Join(errorStrings, "\n"))
}
func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { var msrmnt_name string
secondsSinceLastRun := 0.0 if e.Name == "" {
msrmnt_name = "exec"
if c.lastRunAt.Unix() == 0 { // means time is uninitialized
secondsSinceLastRun = math.Inf(1)
} else { } else {
secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds() msrmnt_name = "exec_" + e.Name
}
if secondsSinceLastRun >= float64(c.Interval) {
out, err := e.runner.Run(c)
if err != nil {
return err
}
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
}
processResponse(acc, c.Name, map[string]string{}, jsonOut)
} }
acc.AddFields(msrmnt_name, f.Fields, nil)
return nil return nil
} }
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
processResponse(acc, prefix+"_"+k, tags, v)
}
case float64:
acc.Add(prefix, v, tags)
}
}
func init() { func init() {
plugins.Add("exec", func() plugins.Plugin { plugins.Add("exec", func() plugins.Plugin {
return NewExec() return NewExec()

View File

@@ -9,6 +9,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"sync" "sync"
"time"
) )
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1 //CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
@@ -152,210 +153,208 @@ func (g *haproxy) gatherServer(addr string, acc plugins.Accumulator) error {
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err)
} }
importCsvResult(res.Body, acc, u.Host) return importCsvResult(res.Body, acc, u.Host)
return nil
} }
func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]string, error) { func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) error {
csv := csv.NewReader(r) csv := csv.NewReader(r)
result, err := csv.ReadAll() result, err := csv.ReadAll()
now := time.Now()
for _, row := range result { for _, row := range result {
fields := make(map[string]interface{})
tags := map[string]string{
"server": host,
"proxy": row[HF_PXNAME],
"sv": row[HF_SVNAME],
}
for field, v := range row { for field, v := range row {
tags := map[string]string{
"server": host,
"proxy": row[HF_PXNAME],
"sv": row[HF_SVNAME],
}
switch field { switch field {
case HF_QCUR: case HF_QCUR:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("qcur", ival, tags) fields["qcur"] = ival
} }
case HF_QMAX: case HF_QMAX:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("qmax", ival, tags) fields["qmax"] = ival
} }
case HF_SCUR: case HF_SCUR:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("scur", ival, tags) fields["scur"] = ival
} }
case HF_SMAX: case HF_SMAX:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("smax", ival, tags) fields["smax"] = ival
} }
case HF_STOT: case HF_STOT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("stot", ival, tags) fields["stot"] = ival
} }
case HF_BIN: case HF_BIN:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("bin", ival, tags) fields["bin"] = ival
} }
case HF_BOUT: case HF_BOUT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("bout", ival, tags) fields["bout"] = ival
} }
case HF_DREQ: case HF_DREQ:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("dreq", ival, tags) fields["dreq"] = ival
} }
case HF_DRESP: case HF_DRESP:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("dresp", ival, tags) fields["dresp"] = ival
} }
case HF_EREQ: case HF_EREQ:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("ereq", ival, tags) fields["ereq"] = ival
} }
case HF_ECON: case HF_ECON:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("econ", ival, tags) fields["econ"] = ival
} }
case HF_ERESP: case HF_ERESP:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("eresp", ival, tags) fields["eresp"] = ival
} }
case HF_WRETR: case HF_WRETR:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("wretr", ival, tags) fields["wretr"] = ival
} }
case HF_WREDIS: case HF_WREDIS:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("wredis", ival, tags) fields["wredis"] = ival
} }
case HF_ACT: case HF_ACT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("active_servers", ival, tags) fields["active_servers"] = ival
} }
case HF_BCK: case HF_BCK:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("backup_servers", ival, tags) fields["backup_servers"] = ival
} }
case HF_DOWNTIME: case HF_DOWNTIME:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("downtime", ival, tags) fields["downtime"] = ival
} }
case HF_THROTTLE: case HF_THROTTLE:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("throttle", ival, tags) fields["throttle"] = ival
} }
case HF_LBTOT: case HF_LBTOT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("lbtot", ival, tags) fields["lbtot"] = ival
} }
case HF_RATE: case HF_RATE:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("rate", ival, tags) fields["rate"] = ival
} }
case HF_RATE_MAX: case HF_RATE_MAX:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("rate_max", ival, tags) fields["rate_max"] = ival
} }
case HF_CHECK_DURATION: case HF_CHECK_DURATION:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("check_duration", ival, tags) fields["check_duration"] = ival
} }
case HF_HRSP_1xx: case HF_HRSP_1xx:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("http_response.1xx", ival, tags) fields["http_response.1xx"] = ival
} }
case HF_HRSP_2xx: case HF_HRSP_2xx:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("http_response.2xx", ival, tags) fields["http_response.2xx"] = ival
} }
case HF_HRSP_3xx: case HF_HRSP_3xx:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("http_response.3xx", ival, tags) fields["http_response.3xx"] = ival
} }
case HF_HRSP_4xx: case HF_HRSP_4xx:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("http_response.4xx", ival, tags) fields["http_response.4xx"] = ival
} }
case HF_HRSP_5xx: case HF_HRSP_5xx:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("http_response.5xx", ival, tags) fields["http_response.5xx"] = ival
} }
case HF_REQ_RATE: case HF_REQ_RATE:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("req_rate", ival, tags) fields["req_rate"] = ival
} }
case HF_REQ_RATE_MAX: case HF_REQ_RATE_MAX:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("req_rate_max", ival, tags) fields["req_rate_max"] = ival
} }
case HF_REQ_TOT: case HF_REQ_TOT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("req_tot", ival, tags) fields["req_tot"] = ival
} }
case HF_CLI_ABRT: case HF_CLI_ABRT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("cli_abort", ival, tags) fields["cli_abort"] = ival
} }
case HF_SRV_ABRT: case HF_SRV_ABRT:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("srv_abort", ival, tags) fields["srv_abort"] = ival
} }
case HF_QTIME: case HF_QTIME:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("qtime", ival, tags) fields["qtime"] = ival
} }
case HF_CTIME: case HF_CTIME:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("ctime", ival, tags) fields["ctime"] = ival
} }
case HF_RTIME: case HF_RTIME:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("rtime", ival, tags) fields["rtime"] = ival
} }
case HF_TTIME: case HF_TTIME:
ival, err := strconv.ParseUint(v, 10, 64) ival, err := strconv.ParseUint(v, 10, 64)
if err == nil { if err == nil {
acc.Add("ttime", ival, tags) fields["ttime"] = ival
} }
} }
} }
acc.AddFields("haproxy", fields, tags, now)
} }
return result, err return err
} }
func init() { func init() {

View File

@@ -10,20 +10,17 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type HttpJson struct { type HttpJson struct {
Services []Service
client HTTPClient
}
type Service struct {
Name string Name string
Servers []string Servers []string
Method string Method string
TagKeys []string TagKeys []string
Parameters map[string]string Parameters map[string]string
client HTTPClient
} }
type HTTPClient interface { type HTTPClient interface {
@@ -47,31 +44,28 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
} }
var sampleConfig = ` var sampleConfig = `
# Specify services via an array of tables # a name for the service being polled
[[plugins.httpjson.services]] name = "webserver_stats"
# a name for the service being polled # URL of each server in the service's cluster
name = "webserver_stats" servers = [
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
# URL of each server in the service's cluster # HTTP method to use (case-sensitive)
servers = [ method = "GET"
"http://localhost:9999/stats/",
"http://localhost:9998/stats/",
]
# HTTP method to use (case-sensitive) # List of tag names to extract from top-level of JSON server response
method = "GET" # tag_keys = [
# "my_tag_1",
# "my_tag_2"
# ]
# List of tag names to extract from top-level of JSON server response # HTTP parameters (all values must be strings)
# tag_keys = [ [plugins.httpjson.parameters]
# "my_tag_1", event_type = "cpu_spike"
# "my_tag_2" threshold = "0.75"
# ]
# HTTP parameters (all values must be strings)
[plugins.httpjson.services.parameters]
event_type = "cpu_spike"
threshold = "0.75"
` `
func (h *HttpJson) SampleConfig() string { func (h *HttpJson) SampleConfig() string {
@@ -86,22 +80,16 @@ func (h *HttpJson) Description() string {
func (h *HttpJson) Gather(acc plugins.Accumulator) error { func (h *HttpJson) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
totalServers := 0 errorChannel := make(chan error, len(h.Servers))
for _, service := range h.Services {
totalServers += len(service.Servers)
}
errorChannel := make(chan error, totalServers)
for _, service := range h.Services { for _, server := range h.Servers {
for _, server := range service.Servers { wg.Add(1)
wg.Add(1) go func(server string) {
go func(service Service, server string) { defer wg.Done()
defer wg.Done() if err := h.gatherServer(acc, server); err != nil {
if err := h.gatherServer(acc, service, server); err != nil { errorChannel <- err
errorChannel <- err }
} }(server)
}(service, server)
}
} }
wg.Wait() wg.Wait()
@@ -129,10 +117,9 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error {
// error: Any error that may have occurred // error: Any error that may have occurred
func (h *HttpJson) gatherServer( func (h *HttpJson) gatherServer(
acc plugins.Accumulator, acc plugins.Accumulator,
service Service,
serverURL string, serverURL string,
) error { ) error {
resp, err := h.sendRequest(service, serverURL) resp, err := h.sendRequest(serverURL)
if err != nil { if err != nil {
return err return err
} }
@@ -146,7 +133,7 @@ func (h *HttpJson) gatherServer(
"server": serverURL, "server": serverURL,
} }
for _, tag := range service.TagKeys { for _, tag := range h.TagKeys {
switch v := jsonOut[tag].(type) { switch v := jsonOut[tag].(type) {
case string: case string:
tags[tag] = v tags[tag] = v
@@ -154,7 +141,19 @@ func (h *HttpJson) gatherServer(
delete(jsonOut, tag) delete(jsonOut, tag)
} }
processResponse(acc, service.Name, tags, jsonOut) f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
var msrmnt_name string
if h.Name == "" {
msrmnt_name = "httpjson"
} else {
msrmnt_name = "httpjson_" + h.Name
}
acc.AddFields(msrmnt_name, f.Fields, nil)
return nil return nil
} }
@@ -165,7 +164,7 @@ func (h *HttpJson) gatherServer(
// Returns: // Returns:
// string: body of the response // string: body of the response
// error : Any error that may have occurred // error : Any error that may have occurred
func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error) { func (h *HttpJson) sendRequest(serverURL string) (string, error) {
// Prepare URL // Prepare URL
requestURL, err := url.Parse(serverURL) requestURL, err := url.Parse(serverURL)
if err != nil { if err != nil {
@@ -173,13 +172,13 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
} }
params := url.Values{} params := url.Values{}
for k, v := range service.Parameters { for k, v := range h.Parameters {
params.Add(k, v) params.Add(k, v)
} }
requestURL.RawQuery = params.Encode() requestURL.RawQuery = params.Encode()
// Create + send request // Create + send request
req, err := http.NewRequest(service.Method, requestURL.String(), nil) req, err := http.NewRequest(h.Method, requestURL.String(), nil)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -188,6 +187,7 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
if err != nil { if err != nil {
return "", err return "", err
} }
defer resp.Body.Close()
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
@@ -209,23 +209,6 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
return string(body), err return string(body), err
} }
// Flattens the map generated from the JSON object and stores its float values using a
// plugins.Accumulator. It ignores any non-float values.
// Parameters:
// acc: the Accumulator to use
// prefix: What the name of the measurement name should be prefixed by.
// tags: telegraf tags to
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
processResponse(acc, prefix+"_"+k, tags, v)
}
case float64:
acc.Add(prefix, v, tags)
}
}
func init() { func init() {
plugins.Add("httpjson", func() plugins.Plugin { plugins.Add("httpjson", func() plugins.Plugin {
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}} return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}

View File

@@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@@ -23,8 +22,6 @@ type Server struct {
type Metric struct { type Metric struct {
Name string Name string
Jmx string Jmx string
Pass []string
Drop []string
} }
type JolokiaClient interface { type JolokiaClient interface {
@@ -44,7 +41,6 @@ type Jolokia struct {
Context string Context string
Servers []Server Servers []Server
Metrics []Metric Metrics []Metric
Tags map[string]string
} }
func (j *Jolokia) SampleConfig() string { func (j *Jolokia) SampleConfig() string {
@@ -52,10 +48,6 @@ func (j *Jolokia) SampleConfig() string {
# This is the context root used to compose the jolokia url # This is the context root used to compose the jolokia url
context = "/jolokia/read" context = "/jolokia/read"
# Tags added to each measurements
[jolokia.tags]
group = "as"
# List of servers exposing jolokia read service # List of servers exposing jolokia read service
[[plugins.jolokia.servers]] [[plugins.jolokia.servers]]
name = "stable" name = "stable"
@@ -70,23 +62,6 @@ func (j *Jolokia) SampleConfig() string {
[[plugins.jolokia.metrics]] [[plugins.jolokia.metrics]]
name = "heap_memory_usage" name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage" jmx = "/java.lang:type=Memory/HeapMemoryUsage"
# This drops the 'committed' value from Eden space measurement
[[plugins.jolokia.metrics]]
name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
drop = [ "committed" ]
# This passes only DaemonThreadCount and ThreadCount
[[plugins.jolokia.metrics]]
name = "heap_threads"
jmx = "/java.lang:type=Threading"
pass = [
"DaemonThreadCount",
"ThreadCount"
]
` `
} }
@@ -100,12 +75,9 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer req.Body.Close()
resp, err := j.jClient.MakeRequest(req) resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -137,65 +109,22 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
return jsonOut, nil return jsonOut, nil
} }
func (m *Metric) shouldPass(field string) bool {
if m.Pass != nil {
for _, pass := range m.Pass {
if strings.HasPrefix(field, pass) {
return true
}
}
return false
}
if m.Drop != nil {
for _, drop := range m.Drop {
if strings.HasPrefix(field, drop) {
return false
}
}
return true
}
return true
}
func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {
for field, _ := range fields {
if !m.shouldPass(field) {
delete(fields, field)
}
}
return fields
}
func (j *Jolokia) Gather(acc plugins.Accumulator) error { func (j *Jolokia) Gather(acc plugins.Accumulator) error {
context := j.Context //"/jolokia/read" context := j.Context //"/jolokia/read"
servers := j.Servers servers := j.Servers
metrics := j.Metrics metrics := j.Metrics
tags := j.Tags tags := make(map[string]string)
if tags == nil {
tags = map[string]string{}
}
for _, server := range servers { for _, server := range servers {
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
fields := make(map[string]interface{})
for _, metric := range metrics { for _, metric := range metrics {
measurement := metric.Name measurement := metric.Name
jmxPath := metric.Jmx jmxPath := metric.Jmx
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
// Prepare URL // Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" + requestUrl, err := url.Parse("http://" + server.Host + ":" +
server.Port + context + jmxPath) server.Port + context + jmxPath)
@@ -209,16 +138,20 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error {
out, _ := j.getAttr(requestUrl) out, _ := j.getAttr(requestUrl)
if values, ok := out["value"]; ok { if values, ok := out["value"]; ok {
switch values.(type) { switch t := values.(type) {
case map[string]interface{}: case map[string]interface{}:
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) for k, v := range t {
fields[measurement+"_"+k] = v
}
case interface{}: case interface{}:
acc.Add(measurement, values.(interface{}), tags) fields[measurement] = t
} }
} else { } else {
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) fmt.Printf("Missing key 'value' in '%s' output response\n",
requestUrl.String())
} }
} }
acc.AddFields("jolokia", fields, tags)
} }
return nil return nil

View File

@@ -197,6 +197,8 @@ func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc plugins
"node": nodeNameTrimmed, "node": nodeNameTrimmed,
} }
i := 0 i := 0
fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
key := KeyMapping[serverType][i] key := KeyMapping[serverType][i]
val, err := retrieveTokenAfterColon(scanner.Text()) val, err := retrieveTokenAfterColon(scanner.Text())
@@ -207,9 +209,10 @@ func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc plugins
if err != nil { if err != nil {
return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err) return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
} }
acc.Add(key, fVal, tags) fields[key] = fVal
i++ i++
} }
acc.AddFields("leofs", fields, tags)
return nil return nil
} }

View File

@@ -149,19 +149,19 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
return err return err
} }
fields := make(map[string]interface{})
for _, line := range lines { for _, line := range lines {
fields := strings.Fields(line) parts := strings.Fields(line)
for _, wanted := range wanted_fields { for _, wanted := range wanted_fields {
var data uint64 var data uint64
if fields[0] == wanted.inProc { if parts[0] == wanted.inProc {
wanted_field := wanted.field wanted_field := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as // if not set, assume field[1]. Shouldn't be field[0], as
// that's a string // that's a string
if wanted_field == 0 { if wanted_field == 0 {
wanted_field = 1 wanted_field = 1
} }
data, err = strconv.ParseUint((fields[wanted_field]), 10, 64) data, err = strconv.ParseUint((parts[wanted_field]), 10, 64)
if err != nil { if err != nil {
return err return err
} }
@@ -169,11 +169,11 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
if wanted.reportAs != "" { if wanted.reportAs != "" {
report_name = wanted.reportAs report_name = wanted.reportAs
} }
acc.Add(report_name, data, tags) fields[report_name] = data
} }
} }
} }
acc.AddFields("lustre2", fields, tags)
} }
return nil return nil
} }

View File

@@ -75,35 +75,38 @@ func gatherReport(acc plugins.Accumulator, report Report, now time.Time) {
tags := make(map[string]string) tags := make(map[string]string)
tags["id"] = report.ID tags["id"] = report.ID
tags["campaign_title"] = report.CampaignTitle tags["campaign_title"] = report.CampaignTitle
acc.Add("emails_sent", report.EmailsSent, tags, now) fields := map[string]interface{}{
acc.Add("abuse_reports", report.AbuseReports, tags, now) "emails_sent": report.EmailsSent,
acc.Add("unsubscribed", report.Unsubscribed, tags, now) "abuse_reports": report.AbuseReports,
acc.Add("hard_bounces", report.Bounces.HardBounces, tags, now) "unsubscribed": report.Unsubscribed,
acc.Add("soft_bounces", report.Bounces.SoftBounces, tags, now) "hard_bounces": report.Bounces.HardBounces,
acc.Add("syntax_errors", report.Bounces.SyntaxErrors, tags, now) "soft_bounces": report.Bounces.SoftBounces,
acc.Add("forwards_count", report.Forwards.ForwardsCount, tags, now) "syntax_errors": report.Bounces.SyntaxErrors,
acc.Add("forwards_opens", report.Forwards.ForwardsOpens, tags, now) "forwards_count": report.Forwards.ForwardsCount,
acc.Add("opens_total", report.Opens.OpensTotal, tags, now) "forwards_opens": report.Forwards.ForwardsOpens,
acc.Add("unique_opens", report.Opens.UniqueOpens, tags, now) "opens_total": report.Opens.OpensTotal,
acc.Add("open_rate", report.Opens.OpenRate, tags, now) "unique_opens": report.Opens.UniqueOpens,
acc.Add("clicks_total", report.Clicks.ClicksTotal, tags, now) "open_rate": report.Opens.OpenRate,
acc.Add("unique_clicks", report.Clicks.UniqueClicks, tags, now) "clicks_total": report.Clicks.ClicksTotal,
acc.Add("unique_subscriber_clicks", report.Clicks.UniqueSubscriberClicks, tags, now) "unique_clicks": report.Clicks.UniqueClicks,
acc.Add("click_rate", report.Clicks.ClickRate, tags, now) "unique_subscriber_clicks": report.Clicks.UniqueSubscriberClicks,
acc.Add("facebook_recipient_likes", report.FacebookLikes.RecipientLikes, tags, now) "click_rate": report.Clicks.ClickRate,
acc.Add("facebook_unique_likes", report.FacebookLikes.UniqueLikes, tags, now) "facebook_recipient_likes": report.FacebookLikes.RecipientLikes,
acc.Add("facebook_likes", report.FacebookLikes.FacebookLikes, tags, now) "facebook_unique_likes": report.FacebookLikes.UniqueLikes,
acc.Add("industry_type", report.IndustryStats.Type, tags, now) "facebook_likes": report.FacebookLikes.FacebookLikes,
acc.Add("industry_open_rate", report.IndustryStats.OpenRate, tags, now) "industry_type": report.IndustryStats.Type,
acc.Add("industry_click_rate", report.IndustryStats.ClickRate, tags, now) "industry_open_rate": report.IndustryStats.OpenRate,
acc.Add("industry_bounce_rate", report.IndustryStats.BounceRate, tags, now) "industry_click_rate": report.IndustryStats.ClickRate,
acc.Add("industry_unopen_rate", report.IndustryStats.UnopenRate, tags, now) "industry_bounce_rate": report.IndustryStats.BounceRate,
acc.Add("industry_unsub_rate", report.IndustryStats.UnsubRate, tags, now) "industry_unopen_rate": report.IndustryStats.UnopenRate,
acc.Add("industry_abuse_rate", report.IndustryStats.AbuseRate, tags, now) "industry_unsub_rate": report.IndustryStats.UnsubRate,
acc.Add("list_stats_sub_rate", report.ListStats.SubRate, tags, now) "industry_abuse_rate": report.IndustryStats.AbuseRate,
acc.Add("list_stats_unsub_rate", report.ListStats.UnsubRate, tags, now) "list_stats_sub_rate": report.ListStats.SubRate,
acc.Add("list_stats_open_rate", report.ListStats.OpenRate, tags, now) "list_stats_unsub_rate": report.ListStats.UnsubRate,
acc.Add("list_stats_click_rate", report.ListStats.ClickRate, tags, now) "list_stats_open_rate": report.ListStats.OpenRate,
"list_stats_click_rate": report.ListStats.ClickRate,
}
acc.AddFields("mailchimp", fields, tags, now)
} }
func init() { func init() {

View File

@@ -137,16 +137,18 @@ func (m *Memcached) gatherServer(
tags := map[string]string{"server": address} tags := map[string]string{"server": address}
// Process values // Process values
fields := make(map[string]interface{})
for _, key := range sendMetrics { for _, key := range sendMetrics {
if value, ok := values[key]; ok { if value, ok := values[key]; ok {
// Mostly it is the number // Mostly it is the number
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil { if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
acc.Add(key, value, tags) fields[key] = iValue
} else { } else {
acc.Add(key, iValue, tags) fields[key] = value
} }
} }
} }
acc.AddFields("memcached", fields, tags)
return nil return nil
} }

View File

@@ -98,7 +98,8 @@ func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
} }
dialInfo, err := mgo.ParseURL(dialAddrs[0]) dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil { if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error()) return fmt.Errorf("Unable to parse URL (%s), %s\n",
dialAddrs[0], err.Error())
} }
dialInfo.Direct = true dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second dialInfo.Timeout = time.Duration(10) * time.Second

View File

@@ -10,6 +10,7 @@ import (
type MongodbData struct { type MongodbData struct {
StatLine *StatLine StatLine *StatLine
Fields map[string]interface{}
Tags map[string]string Tags map[string]string
} }
@@ -20,6 +21,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
return &MongodbData{ return &MongodbData{
StatLine: statLine, StatLine: statLine,
Tags: tags, Tags: tags,
Fields: make(map[string]interface{}),
} }
} }
@@ -63,38 +65,44 @@ var WiredTigerStats = map[string]string{
"percent_cache_used": "CacheUsedPercent", "percent_cache_used": "CacheUsedPercent",
} }
func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) { func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem() statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats) d.addStat(statLine, DefaultStats)
if d.StatLine.NodeType != "" { if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats) d.addStat(statLine, DefaultReplStats)
} }
if d.StatLine.StorageEngine == "mmapv1" { if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats) d.addStat(statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" { } else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats { for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface() val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100) percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64) floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal) d.add(key, floatVal)
} }
} }
} }
func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) { func (d *MongodbData) addStat(
statLine reflect.Value,
stats map[string]string,
) {
for key, value := range stats { for key, value := range stats {
val := statLine.FieldByName(value).Interface() val := statLine.FieldByName(value).Interface()
d.add(acc, key, val) d.add(key, val)
} }
} }
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { func (d *MongodbData) add(key string, val interface{}) {
d.Fields[key] = val
}
func (d *MongodbData) flush(acc plugins.Accumulator) {
acc.AddFields( acc.AddFields(
key, "mongodb",
map[string]interface{}{ d.Fields,
"value": val,
},
d.Tags, d.Tags,
d.StatLine.Time, d.StatLine.Time,
) )
d.Fields = make(map[string]interface{})
} }

View File

@@ -44,7 +44,8 @@ func (s *Server) gatherData(acc plugins.Accumulator) error {
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds), NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(), s.getDefaultTags(),
) )
data.AddDefaultStats(acc) data.AddDefaultStats()
data.flush(acc)
} }
return nil return nil
} }

View File

@@ -138,6 +138,8 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
if err != nil { if err != nil {
servtag = "localhost" servtag = "localhost"
} }
tags := map[string]string{"server": servtag}
fields := make(map[string]interface{})
for rows.Next() { for rows.Next() {
var name string var name string
var val interface{} var val interface{}
@@ -149,12 +151,10 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
var found bool var found bool
tags := map[string]string{"server": servtag}
for _, mapped := range mappings { for _, mapped := range mappings {
if strings.HasPrefix(name, mapped.onServer) { if strings.HasPrefix(name, mapped.onServer) {
i, _ := strconv.Atoi(string(val.([]byte))) i, _ := strconv.Atoi(string(val.([]byte)))
acc.Add(mapped.inExport+name[len(mapped.onServer):], i, tags) fields[mapped.inExport+name[len(mapped.onServer):]] = i
found = true found = true
} }
} }
@@ -170,16 +170,17 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
return err return err
} }
acc.Add("queries", i, tags) fields["queries"] = i
case "Slow_queries": case "Slow_queries":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64) i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil { if err != nil {
return err return err
} }
acc.Add("slow_queries", i, tags) fields["slow_queries"] = i
} }
} }
acc.AddFields("mysql", fields, tags)
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user") conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
@@ -193,11 +194,13 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
} }
tags := map[string]string{"server": servtag, "user": user} tags := map[string]string{"server": servtag, "user": user}
fields := make(map[string]interface{})
if err != nil { if err != nil {
return err return err
} }
acc.Add("connections", connections, tags) fields["connections"] = connections
acc.AddFields("mysql_users", fields, tags)
} }
return nil return nil

View File

@@ -127,14 +127,16 @@ func (n *Nginx) gatherUrl(addr *url.URL, acc plugins.Accumulator) error {
} }
tags := getTags(addr) tags := getTags(addr)
fields := map[string]interface{}{
acc.Add("active", active, tags) "active": active,
acc.Add("accepts", accepts, tags) "accepts": accepts,
acc.Add("handled", handled, tags) "handled": handled,
acc.Add("requests", requests, tags) "requests": requests,
acc.Add("reading", reading, tags) "reading": reading,
acc.Add("writing", writing, tags) "writing": writing,
acc.Add("waiting", waiting, tags) "waiting": waiting,
}
acc.AddFields("nginx", fields, tags)
return nil return nil
} }

View File

@@ -198,9 +198,11 @@ func importMetric(r io.Reader, acc plugins.Accumulator, host string) (poolStat,
"url": host, "url": host,
"pool": pool, "pool": pool,
} }
fields := make(map[string]interface{})
for k, v := range stats[pool] { for k, v := range stats[pool] {
acc.Add(strings.Replace(k, " ", "_", -1), v, tags) fields[strings.Replace(k, " ", "_", -1)] = v
} }
acc.AddFields("phpfpm", fields, tags)
} }
return stats, nil return stats, nil

View File

@@ -82,10 +82,13 @@ func (p *Ping) Gather(acc plugins.Accumulator) error {
} }
// Calculate packet loss percentage // Calculate packet loss percentage
loss := float64(trans-rec) / float64(trans) * 100.0 loss := float64(trans-rec) / float64(trans) * 100.0
acc.Add("packets_transmitted", trans, tags) fields := map[string]interface{}{
acc.Add("packets_received", rec, tags) "packets_transmitted": trans,
acc.Add("percent_packet_loss", loss, tags) "packets_received": rec,
acc.Add("average_response_ms", avg, tags) "percent_packet_loss": loss,
"average_response_ms": avg,
}
acc.AddFields("ping", fields, tags)
}(url, acc) }(url, acc)
} }

View File

@@ -11,46 +11,32 @@ import (
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
type Server struct { type Postgresql struct {
Address string Address string
Databases []string Databases []string
OrderedColumns []string OrderedColumns []string
} }
type Postgresql struct {
Servers []*Server
}
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true} var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
var sampleConfig = ` var sampleConfig = `
# specify servers via an array of tables
[[plugins.postgresql.servers]]
# specify address via a url matching: # specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
# or a simple string: # or a simple string:
# host=localhost user=pqotest password=... sslmode=... dbname=app_production # host=localhost user=pqotest password=... sslmode=... dbname=app_production
# #
# All connection parameters are optional. By default, the host is localhost # All connection parameters are optional.
# and the user is the currently running user. For localhost, we default
# to sslmode=disable as well.
# #
# Without the dbname parameter, the driver will default to a database # Without the dbname parameter, the driver will default to a database
# with the same name as the user. This dbname is just for instantiating a # with the same name as the user. This dbname is just for instantiating a
# connection with the server and doesn't restrict the databases we are trying # connection with the server and doesn't restrict the databases we are trying
# to grab metrics for. # to grab metrics for.
# #
address = "host=localhost user=postgres sslmode=disable"
address = "sslmode=disable"
# A list of databases to pull metrics about. If not specified, metrics for all # A list of databases to pull metrics about. If not specified, metrics for all
# databases are gathered. # databases are gathered.
# databases = ["app_production", "testing"]
# databases = ["app_production", "blah_testing"]
# [[plugins.postgresql.servers]]
# address = "influx@remoteserver"
` `
func (p *Postgresql) SampleConfig() string { func (p *Postgresql) SampleConfig() string {
@@ -65,42 +51,27 @@ func (p *Postgresql) IgnoredColumns() map[string]bool {
return ignoredColumns return ignoredColumns
} }
var localhost = &Server{Address: "sslmode=disable"} var localhost = "host=localhost sslmode=disable"
func (p *Postgresql) Gather(acc plugins.Accumulator) error { func (p *Postgresql) Gather(acc plugins.Accumulator) error {
if len(p.Servers) == 0 {
p.gatherServer(localhost, acc)
return nil
}
for _, serv := range p.Servers {
err := p.gatherServer(serv, acc)
if err != nil {
return err
}
}
return nil
}
func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
var query string var query string
if serv.Address == "" || serv.Address == "localhost" { if p.Address == "" || p.Address == "localhost" {
serv = localhost p.Address = localhost
} }
db, err := sql.Open("postgres", serv.Address) db, err := sql.Open("postgres", p.Address)
if err != nil { if err != nil {
return err return err
} }
defer db.Close() defer db.Close()
if len(serv.Databases) == 0 { if len(p.Databases) == 0 {
query = `SELECT * FROM pg_stat_database` query = `SELECT * FROM pg_stat_database`
} else { } else {
query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`, strings.Join(serv.Databases, "','")) query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`,
strings.Join(p.Databases, "','"))
} }
rows, err := db.Query(query) rows, err := db.Query(query)
@@ -111,13 +82,13 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
defer rows.Close() defer rows.Close()
// grab the column information from the result // grab the column information from the result
serv.OrderedColumns, err = rows.Columns() p.OrderedColumns, err = rows.Columns()
if err != nil { if err != nil {
return err return err
} }
for rows.Next() { for rows.Next() {
err = p.accRow(rows, acc, serv) err = p.accRow(rows, acc)
if err != nil { if err != nil {
return err return err
} }
@@ -130,20 +101,20 @@ type scanner interface {
Scan(dest ...interface{}) error Scan(dest ...interface{}) error
} }
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server) error { func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
var columnVars []interface{} var columnVars []interface{}
var dbname bytes.Buffer var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{} // this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{}) columnMap := make(map[string]*interface{})
for _, column := range serv.OrderedColumns { for _, column := range p.OrderedColumns {
columnMap[column] = new(interface{}) columnMap[column] = new(interface{})
} }
// populate the array of interface{} with the pointers in the right order // populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ { for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[serv.OrderedColumns[i]]) columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
} }
// deconstruct array of variables and send to Scan // deconstruct array of variables and send to Scan
@@ -159,14 +130,16 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server)
dbname.WriteString(string(dbnameChars[i])) dbname.WriteString(string(dbnameChars[i]))
} }
tags := map[string]string{"server": serv.Address, "db": dbname.String()} tags := map[string]string{"server": p.Address, "db": dbname.String()}
fields := make(map[string]interface{})
for col, val := range columnMap { for col, val := range columnMap {
_, ignore := ignoredColumns[col] _, ignore := ignoredColumns[col]
if !ignore { if !ignore {
acc.Add(col, *val, tags) fields[col] = *val
} }
} }
acc.AddFields("postgresql", fields, tags)
return nil return nil
} }

View File

@@ -7,22 +7,17 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/shirou/gopsutil/process" "github.com/shirou/gopsutil/process"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type Specification struct { type Procstat struct {
PidFile string `toml:"pid_file"` PidFile string `toml:"pid_file"`
Exe string Exe string
Prefix string
Pattern string Pattern string
} Prefix string
type Procstat struct {
Specifications []*Specification
} }
func NewProcstat() *Procstat { func NewProcstat() *Procstat {
@@ -30,8 +25,6 @@ func NewProcstat() *Procstat {
} }
var sampleConfig = ` var sampleConfig = `
[[plugins.procstat.specifications]]
prefix = "" # optional string to prefix measurements
# Must specify one of: pid_file, exe, or pattern # Must specify one of: pid_file, exe, or pattern
# PID file to monitor process # PID file to monitor process
pid_file = "/var/run/nginx.pid" pid_file = "/var/run/nginx.pid"
@@ -39,6 +32,9 @@ var sampleConfig = `
# exe = "nginx" # exe = "nginx"
# pattern as argument for pgrep (ie, pgrep -f <pattern>) # pattern as argument for pgrep (ie, pgrep -f <pattern>)
# pattern = "nginx" # pattern = "nginx"
# Field name prefix
prefix = ""
` `
func (_ *Procstat) SampleConfig() string { func (_ *Procstat) SampleConfig() string {
@@ -50,35 +46,26 @@ func (_ *Procstat) Description() string {
} }
func (p *Procstat) Gather(acc plugins.Accumulator) error { func (p *Procstat) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup procs, err := p.createProcesses()
if err != nil {
for _, specification := range p.Specifications { log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s",
wg.Add(1) p.Exe, p.PidFile, p.Pattern, err.Error())
go func(spec *Specification, acc plugins.Accumulator) { } else {
defer wg.Done() for _, proc := range procs {
procs, err := spec.createProcesses() p := NewSpecProcessor(p.Prefix, acc, proc)
if err != nil { p.pushMetrics()
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s", }
spec.Exe, spec.PidFile, spec.Pattern, err.Error())
} else {
for _, proc := range procs {
p := NewSpecProcessor(spec.Prefix, acc, proc)
p.pushMetrics()
}
}
}(specification, acc)
} }
wg.Wait()
return nil return nil
} }
func (spec *Specification) createProcesses() ([]*process.Process, error) { func (p *Procstat) createProcesses() ([]*process.Process, error) {
var out []*process.Process var out []*process.Process
var errstring string var errstring string
var outerr error var outerr error
pids, err := spec.getAllPids() pids, err := p.getAllPids()
if err != nil { if err != nil {
errstring += err.Error() + " " errstring += err.Error() + " "
} }
@@ -99,16 +86,16 @@ func (spec *Specification) createProcesses() ([]*process.Process, error) {
return out, outerr return out, outerr
} }
func (spec *Specification) getAllPids() ([]int32, error) { func (p *Procstat) getAllPids() ([]int32, error) {
var pids []int32 var pids []int32
var err error var err error
if spec.PidFile != "" { if p.PidFile != "" {
pids, err = pidsFromFile(spec.PidFile) pids, err = pidsFromFile(p.PidFile)
} else if spec.Exe != "" { } else if p.Exe != "" {
pids, err = pidsFromExe(spec.Exe) pids, err = pidsFromExe(p.Exe)
} else if spec.Pattern != "" { } else if p.Pattern != "" {
pids, err = pidsFromPattern(spec.Pattern) pids, err = pidsFromPattern(p.Pattern)
} else { } else {
err = fmt.Errorf("Either exe, pid_file or pattern has to be specified") err = fmt.Errorf("Either exe, pid_file or pattern has to be specified")
} }

View File

@@ -12,6 +12,7 @@ import (
type SpecProcessor struct { type SpecProcessor struct {
Prefix string Prefix string
tags map[string]string tags map[string]string
fields map[string]interface{}
acc plugins.Accumulator acc plugins.Accumulator
proc *process.Process proc *process.Process
} }
@@ -23,7 +24,12 @@ func (p *SpecProcessor) add(metric string, value interface{}) {
} else { } else {
mname = p.Prefix + "_" + metric mname = p.Prefix + "_" + metric
} }
p.acc.Add(mname, value, p.tags) p.fields[mname] = value
}
func (p *SpecProcessor) flush() {
p.acc.AddFields("procstat", p.fields, p.tags)
p.fields = make(map[string]interface{})
} }
func NewSpecProcessor( func NewSpecProcessor(
@@ -39,6 +45,7 @@ func NewSpecProcessor(
return &SpecProcessor{ return &SpecProcessor{
Prefix: prefix, Prefix: prefix,
tags: tags, tags: tags,
fields: make(map[string]interface{}),
acc: acc, acc: acc,
proc: p, proc: p,
} }
@@ -60,6 +67,7 @@ func (p *SpecProcessor) pushMetrics() {
if err := p.pushMemoryStats(); err != nil { if err := p.pushMemoryStats(); err != nil {
log.Printf("procstat, mem stats not available: %s", err.Error()) log.Printf("procstat, mem stats not available: %s", err.Error())
} }
p.flush()
} }
func (p *SpecProcessor) pushFDStats() error { func (p *SpecProcessor) pushFDStats() error {
@@ -94,21 +102,22 @@ func (p *SpecProcessor) pushIOStats() error {
} }
func (p *SpecProcessor) pushCPUStats() error { func (p *SpecProcessor) pushCPUStats() error {
cpu, err := p.proc.CPUTimes() cpu_time, err := p.proc.CPUTimes()
if err != nil { if err != nil {
return err return err
} }
p.add("cpu_user", cpu.User) p.add("cpu_time_user", cpu_time.User)
p.add("cpu_system", cpu.System) p.add("cpu_time_system", cpu_time.System)
p.add("cpu_idle", cpu.Idle) p.add("cpu_time_idle", cpu_time.Idle)
p.add("cpu_nice", cpu.Nice) p.add("cpu_time_nice", cpu_time.Nice)
p.add("cpu_iowait", cpu.Iowait) p.add("cpu_time_iowait", cpu_time.Iowait)
p.add("cpu_irq", cpu.Irq) p.add("cpu_time_irq", cpu_time.Irq)
p.add("cpu_soft_irq", cpu.Softirq) p.add("cpu_time_soft_irq", cpu_time.Softirq)
p.add("cpu_soft_steal", cpu.Steal) p.add("cpu_time_soft_steal", cpu_time.Steal)
p.add("cpu_soft_stolen", cpu.Stolen) p.add("cpu_time_soft_stolen", cpu_time.Stolen)
p.add("cpu_soft_guest", cpu.Guest) p.add("cpu_time_soft_guest", cpu_time.Guest)
p.add("cpu_soft_guest_nice", cpu.GuestNice) p.add("cpu_time_soft_guest_nice", cpu_time.GuestNice)
return nil return nil
} }

View File

@@ -80,14 +80,14 @@ func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error {
return fmt.Errorf("error getting processing samples for %s: %s", url, err) return fmt.Errorf("error getting processing samples for %s: %s", url, err)
} }
for _, sample := range samples { for _, sample := range samples {
tags := map[string]string{} tags := make(map[string]string)
for key, value := range sample.Metric { for key, value := range sample.Metric {
if key == model.MetricNameLabel { if key == model.MetricNameLabel {
continue continue
} }
tags[string(key)] = string(value) tags[string(key)] = string(value)
} }
acc.Add(string(sample.Metric[model.MetricNameLabel]), acc.Add("prometheus_"+string(sample.Metric[model.MetricNameLabel]),
float64(sample.Value), tags) float64(sample.Value), tags)
} }
} }

View File

@@ -104,15 +104,16 @@ func (pa *PuppetAgent) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("%s", err) return fmt.Errorf("%s", err)
} }
structPrinter(&puppetState, acc) tags := map[string]string{"location": pa.Location}
structPrinter(&puppetState, acc, tags)
return nil return nil
} }
func structPrinter(s *State, acc plugins.Accumulator) { func structPrinter(s *State, acc plugins.Accumulator, tags map[string]string) {
e := reflect.ValueOf(s).Elem() e := reflect.ValueOf(s).Elem()
fields := make(map[string]interface{})
for tLevelFNum := 0; tLevelFNum < e.NumField(); tLevelFNum++ { for tLevelFNum := 0; tLevelFNum < e.NumField(); tLevelFNum++ {
name := e.Type().Field(tLevelFNum).Name name := e.Type().Field(tLevelFNum).Name
nameNumField := e.FieldByName(name).NumField() nameNumField := e.FieldByName(name).NumField()
@@ -123,10 +124,10 @@ func structPrinter(s *State, acc plugins.Accumulator) {
lname := strings.ToLower(name) lname := strings.ToLower(name)
lsName := strings.ToLower(sName) lsName := strings.ToLower(sName)
acc.Add(fmt.Sprintf("%s_%s", lname, lsName), sValue, nil) fields[fmt.Sprintf("%s_%s", lname, lsName)] = sValue
} }
} }
acc.AddFields("puppetagent", fields, tags)
} }
func init() { func init() {

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@@ -13,17 +14,13 @@ const DefaultUsername = "guest"
const DefaultPassword = "guest" const DefaultPassword = "guest"
const DefaultURL = "http://localhost:15672" const DefaultURL = "http://localhost:15672"
type Server struct { type RabbitMQ struct {
URL string URL string
Name string Name string
Username string Username string
Password string Password string
Nodes []string Nodes []string
Queues []string Queues []string
}
type RabbitMQ struct {
Servers []*Server
Client *http.Client Client *http.Client
} }
@@ -94,15 +91,13 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
} }
type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) type gatherFunc func(r *RabbitMQ, acc plugins.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = ` var sampleConfig = `
# Specify servers via an array of tables url = "http://localhost:15672" # required
[[plugins.rabbitmq.servers]]
# name = "rmq-server-1" # optional tag # name = "rmq-server-1" # optional tag
# url = "http://localhost:15672"
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
@@ -119,27 +114,18 @@ func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Read metrics from one or many RabbitMQ servers via the management API"
} }
var localhost = &Server{URL: DefaultURL}
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
if r.Client == nil { if r.Client == nil {
r.Client = &http.Client{} r.Client = &http.Client{}
} }
var errChan = make(chan error, len(r.Servers)) var errChan = make(chan error, len(gatherFunctions))
// use localhost is no servers are specified in config for _, f := range gatherFunctions {
if len(r.Servers) == 0 { go f(r, acc, errChan)
r.Servers = append(r.Servers, localhost)
} }
for _, serv := range r.Servers { for i := 1; i <= len(gatherFunctions); i++ {
for _, f := range gatherFunctions {
go f(r, serv, acc, errChan)
}
}
for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ {
err := <-errChan err := <-errChan
if err != nil { if err != nil {
return err return err
@@ -149,20 +135,20 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
return nil return nil
} }
func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error { func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
u = fmt.Sprintf("%s%s", serv.URL, u) u = fmt.Sprintf("%s%s", r.URL, u)
req, err := http.NewRequest("GET", u, nil) req, err := http.NewRequest("GET", u, nil)
if err != nil { if err != nil {
return err return err
} }
username := serv.Username username := r.Username
if username == "" { if username == "" {
username = DefaultUsername username = DefaultUsername
} }
password := serv.Password password := r.Password
if password == "" { if password == "" {
password = DefaultPassword password = DefaultPassword
} }
@@ -181,10 +167,10 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error
return nil return nil
} }
func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherOverview(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
overview := &OverviewResponse{} overview := &OverviewResponse{}
err := r.requestJSON(serv, "/api/overview", &overview) err := r.requestJSON("/api/overview", &overview)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
@@ -195,76 +181,80 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
return return
} }
tags := map[string]string{"url": serv.URL} tags := map[string]string{"url": r.URL}
if serv.Name != "" { if r.Name != "" {
tags["name"] = serv.Name tags["name"] = r.Name
} }
fields := map[string]interface{}{
acc.Add("messages", overview.QueueTotals.Messages, tags) "messages": overview.QueueTotals.Messages,
acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) "messages_ready": overview.QueueTotals.MessagesReady,
acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) "messages_unacked": overview.QueueTotals.MessagesUnacknowledged,
"channels": overview.ObjectTotals.Channels,
acc.Add("channels", overview.ObjectTotals.Channels, tags) "connections": overview.ObjectTotals.Connections,
acc.Add("connections", overview.ObjectTotals.Connections, tags) "consumers": overview.ObjectTotals.Consumers,
acc.Add("consumers", overview.ObjectTotals.Consumers, tags) "exchanges": overview.ObjectTotals.Exchanges,
acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) "queues": overview.ObjectTotals.Queues,
acc.Add("queues", overview.ObjectTotals.Queues, tags) "messages_acked": overview.MessageStats.Ack,
"messages_delivered": overview.MessageStats.Deliver,
acc.Add("messages_acked", overview.MessageStats.Ack, tags) "messages_published": overview.MessageStats.Publish,
acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) }
acc.Add("messages_published", overview.MessageStats.Publish, tags) acc.AddFields("rabbitmq_overview", fields, tags)
errChan <- nil errChan <- nil
} }
func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherNodes(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
nodes := make([]Node, 0) nodes := make([]Node, 0)
// Gather information about nodes // Gather information about nodes
err := r.requestJSON(serv, "/api/nodes", &nodes) err := r.requestJSON("/api/nodes", &nodes)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
now := time.Now()
for _, node := range nodes { for _, node := range nodes {
if !shouldGatherNode(node, serv) { if !r.shouldGatherNode(node) {
continue continue
} }
tags := map[string]string{"url": serv.URL} tags := map[string]string{"url": r.URL}
tags["node"] = node.Name tags["node"] = node.Name
acc.Add("disk_free", node.DiskFree, tags) fields := map[string]interface{}{
acc.Add("disk_free_limit", node.DiskFreeLimit, tags) "disk_free": node.DiskFree,
acc.Add("fd_total", node.FdTotal, tags) "disk_free_limit": node.DiskFreeLimit,
acc.Add("fd_used", node.FdUsed, tags) "fd_total": node.FdTotal,
acc.Add("mem_limit", node.MemLimit, tags) "fd_used": node.FdUsed,
acc.Add("mem_used", node.MemUsed, tags) "mem_limit": node.MemLimit,
acc.Add("proc_total", node.ProcTotal, tags) "mem_used": node.MemUsed,
acc.Add("proc_used", node.ProcUsed, tags) "proc_total": node.ProcTotal,
acc.Add("run_queue", node.RunQueue, tags) "proc_used": node.ProcUsed,
acc.Add("sockets_total", node.SocketsTotal, tags) "run_queue": node.RunQueue,
acc.Add("sockets_used", node.SocketsUsed, tags) "sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
} }
errChan <- nil errChan <- nil
} }
func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { func gatherQueues(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
// Gather information about queues // Gather information about queues
queues := make([]Queue, 0) queues := make([]Queue, 0)
err := r.requestJSON(serv, "/api/queues", &queues) err := r.requestJSON("/api/queues", &queues)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
for _, queue := range queues { for _, queue := range queues {
if !shouldGatherQueue(queue, serv) { if !r.shouldGatherQueue(queue) {
continue continue
} }
tags := map[string]string{ tags := map[string]string{
"url": serv.URL, "url": r.URL,
"queue": queue.Name, "queue": queue.Name,
"vhost": queue.Vhost, "vhost": queue.Vhost,
"node": queue.Node, "node": queue.Node,
@@ -273,7 +263,7 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
} }
acc.AddFields( acc.AddFields(
"queue", "rabbitmq_queue",
map[string]interface{}{ map[string]interface{}{
// common information // common information
"consumers": queue.Consumers, "consumers": queue.Consumers,
@@ -301,12 +291,12 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
errChan <- nil errChan <- nil
} }
func shouldGatherNode(node Node, serv *Server) bool { func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(serv.Nodes) == 0 { if len(r.Nodes) == 0 {
return true return true
} }
for _, name := range serv.Nodes { for _, name := range r.Nodes {
if name == node.Name { if name == node.Name {
return true return true
} }
@@ -315,12 +305,12 @@ func shouldGatherNode(node Node, serv *Server) bool {
return false return false
} }
func shouldGatherQueue(queue Queue, serv *Server) bool { func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
if len(serv.Queues) == 0 { if len(r.Queues) == 0 {
return true return true
} }
for _, name := range serv.Queues { for _, name := range r.Queues {
if name == queue.Name { if name == queue.Name {
return true return true
} }

View File

@@ -164,6 +164,7 @@ func gatherInfoOutput(
var keyspace_hits, keyspace_misses uint64 = 0, 0 var keyspace_hits, keyspace_misses uint64 = 0, 0
scanner := bufio.NewScanner(rdr) scanner := bufio.NewScanner(rdr)
fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
if strings.Contains(line, "ERR") { if strings.Contains(line, "ERR") {
@@ -199,7 +200,7 @@ func gatherInfoOutput(
} }
if err == nil { if err == nil {
acc.Add(metric, ival, tags) fields[metric] = ival
continue continue
} }
@@ -208,13 +209,14 @@ func gatherInfoOutput(
return err return err
} }
acc.Add(metric, fval, tags) fields[metric] = fval
} }
var keyspace_hitrate float64 = 0.0 var keyspace_hitrate float64 = 0.0
if keyspace_hits != 0 || keyspace_misses != 0 { if keyspace_hits != 0 || keyspace_misses != 0 {
keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses) keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
} }
acc.Add("keyspace_hitrate", keyspace_hitrate, tags) fields["keyspace_hitrate"] = keyspace_hitrate
acc.AddFields("redis", fields, tags)
return nil return nil
} }
@@ -229,15 +231,17 @@ func gatherKeyspaceLine(
tags map[string]string, tags map[string]string,
) { ) {
if strings.Contains(line, "keys=") { if strings.Contains(line, "keys=") {
fields := make(map[string]interface{})
tags["database"] = name tags["database"] = name
dbparts := strings.Split(line, ",") dbparts := strings.Split(line, ",")
for _, dbp := range dbparts { for _, dbp := range dbparts {
kv := strings.Split(dbp, "=") kv := strings.Split(dbp, "=")
ival, err := strconv.ParseUint(kv[1], 10, 64) ival, err := strconv.ParseUint(kv[1], 10, 64)
if err == nil { if err == nil {
acc.Add(kv[0], ival, tags) fields[kv[0]] = ival
} }
} }
acc.AddFields("redis_keyspace", fields, tags)
} }
} }

View File

@@ -86,25 +86,30 @@ var engineStats = map[string]string{
"total_writes": "TotalWrites", "total_writes": "TotalWrites",
} }
func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) { func (e *Engine) AddEngineStats(
keys []string,
acc plugins.Accumulator,
tags map[string]string,
) {
engine := reflect.ValueOf(e).Elem() engine := reflect.ValueOf(e).Elem()
fields := make(map[string]interface{})
for _, key := range keys { for _, key := range keys {
acc.Add( fields[key] = engine.FieldByName(engineStats[key]).Interface()
key,
engine.FieldByName(engineStats[key]).Interface(),
tags,
)
} }
acc.AddFields("rethinkdb_engine", fields, tags)
} }
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) { func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags) fields := map[string]interface{}{
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags) "cache_bytes_in_use": s.Cache.BytesInUse,
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags) "disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec,
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags) "disk_read_bytes_total": s.Disk.ReadBytesTotal,
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags) "disk_written_bytes_per_sec": s.Disk.WriteBytesPerSec,
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags) "disk_written_bytes_total": s.Disk.WriteBytesTotal,
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags) "disk_usage_data_bytes": s.Disk.SpaceUsage.Data,
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags) "disk_usage_garbage_bytes": s.Disk.SpaceUsage.Garbage,
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags) "disk_usage_metadata_bytes": s.Disk.SpaceUsage.Metadata,
"disk_usage_preallocated_bytes": s.Disk.SpaceUsage.Prealloc,
}
acc.AddFields("rethinkdb", fields, tags)
} }

View File

@@ -2,6 +2,7 @@ package system
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
@@ -31,7 +32,7 @@ var sampleConfig = `
# Whether to report total system cpu stats or not # Whether to report total system cpu stats or not
totalcpu = true totalcpu = true
# Comment this line if you want the raw CPU time metrics # Comment this line if you want the raw CPU time metrics
drop = ["cpu_time*"] drop = ["time_*"]
` `
func (_ *CPUStats) SampleConfig() string { func (_ *CPUStats) SampleConfig() string {
@@ -43,6 +44,7 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
if err != nil { if err != nil {
return fmt.Errorf("error getting CPU info: %s", err) return fmt.Errorf("error getting CPU info: %s", err)
} }
now := time.Now()
for i, cts := range times { for i, cts := range times {
tags := map[string]string{ tags := map[string]string{
@@ -51,21 +53,24 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
total := totalCpuTime(cts) total := totalCpuTime(cts)
// Add total cpu numbers // Add cpu time metrics
add(acc, "time_user", cts.User, tags) fields := map[string]interface{}{
add(acc, "time_system", cts.System, tags) "time_user": cts.User,
add(acc, "time_idle", cts.Idle, tags) "time_system": cts.System,
add(acc, "time_nice", cts.Nice, tags) "time_idle": cts.Idle,
add(acc, "time_iowait", cts.Iowait, tags) "time_nice": cts.Nice,
add(acc, "time_irq", cts.Irq, tags) "time_iowait": cts.Iowait,
add(acc, "time_softirq", cts.Softirq, tags) "time_irq": cts.Irq,
add(acc, "time_steal", cts.Steal, tags) "time_softirq": cts.Softirq,
add(acc, "time_guest", cts.Guest, tags) "time_steal": cts.Steal,
add(acc, "time_guest_nice", cts.GuestNice, tags) "time_guest": cts.Guest,
"time_guest_nice": cts.GuestNice,
}
// Add in percentage // Add in percentage
if len(s.lastStats) == 0 { if len(s.lastStats) == 0 {
// If it's the 1st gather, can't get CPU stats yet acc.AddFields("cpu", fields, tags, now)
// If it's the 1st gather, can't get CPU Usage stats yet
continue continue
} }
lastCts := s.lastStats[i] lastCts := s.lastStats[i]
@@ -81,17 +86,17 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
continue continue
} }
add(acc, "usage_user", 100*(cts.User-lastCts.User)/totalDelta, tags) fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta
add(acc, "usage_system", 100*(cts.System-lastCts.System)/totalDelta, tags) fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta
add(acc, "usage_idle", 100*(cts.Idle-lastCts.Idle)/totalDelta, tags) fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta
add(acc, "usage_nice", 100*(cts.Nice-lastCts.Nice)/totalDelta, tags) fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta
add(acc, "usage_iowait", 100*(cts.Iowait-lastCts.Iowait)/totalDelta, tags) fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta
add(acc, "usage_irq", 100*(cts.Irq-lastCts.Irq)/totalDelta, tags) fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta
add(acc, "usage_softirq", 100*(cts.Softirq-lastCts.Softirq)/totalDelta, tags) fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta
add(acc, "usage_steal", 100*(cts.Steal-lastCts.Steal)/totalDelta, tags) fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta
add(acc, "usage_guest", 100*(cts.Guest-lastCts.Guest)/totalDelta, tags) fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta
add(acc, "usage_guest_nice", 100*(cts.GuestNice-lastCts.GuestNice)/totalDelta, tags) fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta
acc.AddFields("cpu", fields, tags, now)
} }
s.lastStats = times s.lastStats = times

106
plugins/system/cpu_test.go Normal file
View File

@@ -0,0 +1,106 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/cpu"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCPUStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
cts := cpu.CPUTimesStat{
CPU: "cpu0",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0511,
Guest: 8.1,
GuestNice: 0.324,
}
cts2 := cpu.CPUTimesStat{
CPU: "cpu0",
User: 11.4, // increased by 8.3
System: 10.9, // increased by 2.7
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
Nice: 2.5, // increased by 1.2
Iowait: 0.7, // increased by 0.5
Irq: 1.2, // increased by 1.1
Softirq: 0.31, // increased by 0.2
Steal: 0.2812, // increased by 0.0001
Guest: 12.9, // increased by 4.8
GuestNice: 2.524, // increased by 2.2
}
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
cs := NewCPUStats(&mps)
cputags := map[string]string{
"cpu": "cpu0",
}
err := cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints := len(acc.Points)
expectedCPUPoints := 10
assert.Equal(t, expectedCPUPoints, numCPUPoints)
// Computed values are checked with delta > 0 becasue of floating point arithmatic
// imprecision
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
mps2 := MockPS{}
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
cs.ps = &mps2
// Should have added cpu percentages too
err = cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints = len(acc.Points) - numCPUPoints
expectedCPUPoints = 20
assert.Equal(t, expectedCPUPoints, numCPUPoints)
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
}

View File

@@ -50,12 +50,15 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {
"path": du.Path, "path": du.Path,
"fstype": du.Fstype, "fstype": du.Fstype,
} }
acc.Add("total", du.Total, tags) fields := map[string]interface{}{
acc.Add("free", du.Free, tags) "total": du.Total,
acc.Add("used", du.Total-du.Free, tags) "free": du.Free,
acc.Add("inodes_total", du.InodesTotal, tags) "used": du.Total - du.Free,
acc.Add("inodes_free", du.InodesFree, tags) "inodes_total": du.InodesTotal,
acc.Add("inodes_used", du.InodesTotal-du.InodesFree, tags) "inodes_free": du.InodesFree,
"inodes_used": du.InodesTotal - du.InodesFree,
}
acc.AddFields("disk", fields, tags)
} }
return nil return nil
@@ -115,13 +118,16 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
} }
} }
acc.Add("reads", io.ReadCount, tags) fields := map[string]interface{}{
acc.Add("writes", io.WriteCount, tags) "reads": io.ReadCount,
acc.Add("read_bytes", io.ReadBytes, tags) "writes": io.WriteCount,
acc.Add("write_bytes", io.WriteBytes, tags) "read_bytes": io.ReadBytes,
acc.Add("read_time", io.ReadTime, tags) "write_bytes": io.WriteBytes,
acc.Add("write_time", io.WriteTime, tags) "read_time": io.ReadTime,
acc.Add("io_time", io.IoTime, tags) "write_time": io.WriteTime,
"io_time": io.IoTime,
}
acc.AddFields("diskio", fields, tags)
} }
return nil return nil
@@ -132,7 +138,7 @@ func init() {
return &DiskStats{ps: &systemPS{}} return &DiskStats{ps: &systemPS{}}
}) })
plugins.Add("io", func() plugins.Plugin { plugins.Add("diskio", func() plugins.Plugin {
return &DiskIOStats{ps: &systemPS{}} return &DiskIOStats{ps: &systemPS{}}
}) })
} }

161
plugins/system/disk_test.go Normal file
View File

@@ -0,0 +1,161 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/disk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDiskStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
var err error
du := []*disk.DiskUsageStat{
{
Path: "/",
Fstype: "ext4",
Total: 128,
Free: 23,
InodesTotal: 1234,
InodesFree: 234,
},
{
Path: "/home",
Fstype: "ext4",
Total: 256,
Free: 46,
InodesTotal: 2468,
InodesFree: 468,
},
}
mps.On("DiskUsage").Return(du, nil)
err = (&DiskStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskPoints := len(acc.Points)
expectedAllDiskPoints := 12
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
tags1 := map[string]string{
"path": "/",
"fstype": "ext4",
}
tags2 := map[string]string{
"path": "/home",
"fstype": "ext4",
}
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
// We expect 6 more DiskPoints to show up with an explicit match on "/"
// and /home not matching the /dev in Mountpoints
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, expectedAllDiskPoints+6, len(acc.Points))
// We should see all the diskpoints as Mountpoints includes both
// / and /home
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, 2*expectedAllDiskPoints+6, len(acc.Points))
}
func TestDiskIOStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
var err error
diskio1 := disk.DiskIOCountersStat{
ReadCount: 888,
WriteCount: 5341,
ReadBytes: 100000,
WriteBytes: 200000,
ReadTime: 7123,
WriteTime: 9087,
Name: "sda1",
IoTime: 123552,
SerialNumber: "ab-123-ad",
}
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(
map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2},
nil)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskIOPoints := len(acc.Points)
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1",
"serial": "ab-123-ad",
}
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
}

View File

@@ -36,44 +36,47 @@ func (s *DockerStats) Gather(acc plugins.Accumulator) error {
cts := cont.CPU cts := cont.CPU
acc.Add("user", cts.User, tags) fields := map[string]interface{}{
acc.Add("system", cts.System, tags) "user": cts.User,
acc.Add("idle", cts.Idle, tags) "system": cts.System,
acc.Add("nice", cts.Nice, tags) "idle": cts.Idle,
acc.Add("iowait", cts.Iowait, tags) "nice": cts.Nice,
acc.Add("irq", cts.Irq, tags) "iowait": cts.Iowait,
acc.Add("softirq", cts.Softirq, tags) "irq": cts.Irq,
acc.Add("steal", cts.Steal, tags) "softirq": cts.Softirq,
acc.Add("guest", cts.Guest, tags) "steal": cts.Steal,
acc.Add("guest_nice", cts.GuestNice, tags) "guest": cts.Guest,
"guest_nice": cts.GuestNice,
acc.Add("cache", cont.Mem.Cache, tags) "cache": cont.Mem.Cache,
acc.Add("rss", cont.Mem.RSS, tags) "rss": cont.Mem.RSS,
acc.Add("rss_huge", cont.Mem.RSSHuge, tags) "rss_huge": cont.Mem.RSSHuge,
acc.Add("mapped_file", cont.Mem.MappedFile, tags) "mapped_file": cont.Mem.MappedFile,
acc.Add("swap_in", cont.Mem.Pgpgin, tags) "swap_in": cont.Mem.Pgpgin,
acc.Add("swap_out", cont.Mem.Pgpgout, tags) "swap_out": cont.Mem.Pgpgout,
acc.Add("page_fault", cont.Mem.Pgfault, tags) "page_fault": cont.Mem.Pgfault,
acc.Add("page_major_fault", cont.Mem.Pgmajfault, tags) "page_major_fault": cont.Mem.Pgmajfault,
acc.Add("inactive_anon", cont.Mem.InactiveAnon, tags) "inactive_anon": cont.Mem.InactiveAnon,
acc.Add("active_anon", cont.Mem.ActiveAnon, tags) "active_anon": cont.Mem.ActiveAnon,
acc.Add("inactive_file", cont.Mem.InactiveFile, tags) "inactive_file": cont.Mem.InactiveFile,
acc.Add("active_file", cont.Mem.ActiveFile, tags) "active_file": cont.Mem.ActiveFile,
acc.Add("unevictable", cont.Mem.Unevictable, tags) "unevictable": cont.Mem.Unevictable,
acc.Add("memory_limit", cont.Mem.HierarchicalMemoryLimit, tags) "memory_limit": cont.Mem.HierarchicalMemoryLimit,
acc.Add("total_cache", cont.Mem.TotalCache, tags) "total_cache": cont.Mem.TotalCache,
acc.Add("total_rss", cont.Mem.TotalRSS, tags) "total_rss": cont.Mem.TotalRSS,
acc.Add("total_rss_huge", cont.Mem.TotalRSSHuge, tags) "total_rss_huge": cont.Mem.TotalRSSHuge,
acc.Add("total_mapped_file", cont.Mem.TotalMappedFile, tags) "total_mapped_file": cont.Mem.TotalMappedFile,
acc.Add("total_swap_in", cont.Mem.TotalPgpgIn, tags) "total_swap_in": cont.Mem.TotalPgpgIn,
acc.Add("total_swap_out", cont.Mem.TotalPgpgOut, tags) "total_swap_out": cont.Mem.TotalPgpgOut,
acc.Add("total_page_fault", cont.Mem.TotalPgFault, tags) "total_page_fault": cont.Mem.TotalPgFault,
acc.Add("total_page_major_fault", cont.Mem.TotalPgMajFault, tags) "total_page_major_fault": cont.Mem.TotalPgMajFault,
acc.Add("total_inactive_anon", cont.Mem.TotalInactiveAnon, tags) "total_inactive_anon": cont.Mem.TotalInactiveAnon,
acc.Add("total_active_anon", cont.Mem.TotalActiveAnon, tags) "total_active_anon": cont.Mem.TotalActiveAnon,
acc.Add("total_inactive_file", cont.Mem.TotalInactiveFile, tags) "total_inactive_file": cont.Mem.TotalInactiveFile,
acc.Add("total_active_file", cont.Mem.TotalActiveFile, tags) "total_active_file": cont.Mem.TotalActiveFile,
acc.Add("total_unevictable", cont.Mem.TotalUnevictable, tags) "total_unevictable": cont.Mem.TotalUnevictable,
}
acc.AddFields("docker", fields, tags)
} }
return nil return nil

View File

@@ -22,18 +22,17 @@ func (s *MemStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting virtual memory info: %s", err) return fmt.Errorf("error getting virtual memory info: %s", err)
} }
vmtags := map[string]string(nil) fields := map[string]interface{}{
"total": vm.Total,
acc.Add("total", vm.Total, vmtags) "available": vm.Available,
acc.Add("available", vm.Available, vmtags) "used": vm.Used,
acc.Add("used", vm.Used, vmtags) "free": vm.Free,
acc.Add("free", vm.Free, vmtags) "cached": vm.Cached,
acc.Add("cached", vm.Cached, vmtags) "buffered": vm.Buffers,
acc.Add("buffered", vm.Buffers, vmtags) "used_percent": 100 * float64(vm.Used) / float64(vm.Total),
acc.Add("used_percent", 100*float64(vm.Used)/float64(vm.Total), vmtags) "available_percent": 100 * float64(vm.Available) / float64(vm.Total),
acc.Add("available_percent", }
100*float64(vm.Available)/float64(vm.Total), acc.AddFields("mem", fields, nil)
vmtags)
return nil return nil
} }
@@ -54,14 +53,15 @@ func (s *SwapStats) Gather(acc plugins.Accumulator) error {
return fmt.Errorf("error getting swap memory info: %s", err) return fmt.Errorf("error getting swap memory info: %s", err)
} }
swaptags := map[string]string(nil) fields := map[string]interface{}{
"total": swap.Total,
acc.Add("total", swap.Total, swaptags) "used": swap.Used,
acc.Add("used", swap.Used, swaptags) "free": swap.Free,
acc.Add("free", swap.Free, swaptags) "used_percent": swap.UsedPercent,
acc.Add("used_percent", swap.UsedPercent, swaptags) "in": swap.Sin,
acc.Add("in", swap.Sin, swaptags) "out": swap.Sout,
acc.Add("out", swap.Sout, swaptags) }
acc.AddFields("swap", fields, nil)
return nil return nil
} }

View File

@@ -0,0 +1,73 @@
package system
import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/mem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMemStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
vms := &mem.VirtualMemoryStat{
Total: 12400,
Available: 7600,
Used: 5000,
Free: 1235,
// Active: 8134,
// Inactive: 1124,
// Buffers: 771,
// Cached: 4312,
// Wired: 134,
// Shared: 2142,
}
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
vmtags := map[string]string(nil)
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
assert.True(t, acc.CheckTaggedValue("available_percent",
float64(7600)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("used_percent",
float64(5000)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
acc.Points = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swaptags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
}

View File

@@ -70,14 +70,17 @@ func (s *NetIOStats) Gather(acc plugins.Accumulator) error {
"interface": io.Name, "interface": io.Name,
} }
acc.Add("bytes_sent", io.BytesSent, tags) fields := map[string]interface{}{
acc.Add("bytes_recv", io.BytesRecv, tags) "bytes_sent": io.BytesSent,
acc.Add("packets_sent", io.PacketsSent, tags) "bytes_recv": io.BytesRecv,
acc.Add("packets_recv", io.PacketsRecv, tags) "packets_sent": io.PacketsSent,
acc.Add("err_in", io.Errin, tags) "packets_recv": io.PacketsRecv,
acc.Add("err_out", io.Errout, tags) "err_in": io.Errin,
acc.Add("drop_in", io.Dropin, tags) "err_out": io.Errout,
acc.Add("drop_out", io.Dropout, tags) "drop_in": io.Dropin,
"drop_out": io.Dropout,
}
acc.AddFields("net", fields, tags)
} }
// Get system wide stats for different network protocols // Get system wide stats for different network protocols

View File

@@ -0,0 +1,88 @@
package system
import (
"syscall"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNetStats(t *testing.T) {
var mps MockPS
var err error
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
netio := net.NetIOCountersStat{
Name: "eth0",
BytesSent: 1123,
BytesRecv: 8734422,
PacketsSent: 781,
PacketsRecv: 23456,
Errin: 832,
Errout: 8,
Dropin: 7,
Dropout: 1,
}
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
netprotos := []net.NetProtoCountersStat{
net.NetProtoCountersStat{
Protocol: "Udp",
Stats: map[string]int64{
"InDatagrams": 4655,
"NoPorts": 892592,
},
},
}
mps.On("NetProto").Return(netprotos, nil)
netstats := []net.NetConnectionStat{
net.NetConnectionStat{
Type: syscall.SOCK_DGRAM,
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "CLOSE",
},
}
mps.On("NetConnections").Return(netstats, nil)
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
require.NoError(t, err)
ntags := map[string]string{
"interface": "eth0",
}
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
acc.Points = nil
err = (&NetStats{&mps}).Gather(&acc)
require.NoError(t, err)
netstattags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
}

View File

@@ -42,19 +42,23 @@ func (s *NetStats) Gather(acc plugins.Accumulator) error {
} }
counts[netcon.Status] = c + 1 counts[netcon.Status] = c + 1
} }
acc.Add("tcp_established", counts["ESTABLISHED"], tags)
acc.Add("tcp_syn_sent", counts["SYN_SENT"], tags) fields := map[string]interface{}{
acc.Add("tcp_syn_recv", counts["SYN_RECV"], tags) "tcp_established": counts["ESTABLISHED"],
acc.Add("tcp_fin_wait1", counts["FIN_WAIT1"], tags) "tcp_syn_sent": counts["SYN_SENT"],
acc.Add("tcp_fin_wait2", counts["FIN_WAIT2"], tags) "tcp_syn_recv": counts["SYN_RECV"],
acc.Add("tcp_time_wait", counts["TIME_WAIT"], tags) "tcp_fin_wait1": counts["FIN_WAIT1"],
acc.Add("tcp_close", counts["CLOSE"], tags) "tcp_fin_wait2": counts["FIN_WAIT2"],
acc.Add("tcp_close_wait", counts["CLOSE_WAIT"], tags) "tcp_time_wait": counts["TIME_WAIT"],
acc.Add("tcp_last_ack", counts["LAST_ACK"], tags) "tcp_close": counts["CLOSE"],
acc.Add("tcp_listen", counts["LISTEN"], tags) "tcp_close_wait": counts["CLOSE_WAIT"],
acc.Add("tcp_closing", counts["CLOSING"], tags) "tcp_last_ack": counts["LAST_ACK"],
acc.Add("tcp_none", counts["NONE"], tags) "tcp_listen": counts["LISTEN"],
acc.Add("udp_socket", counts["UDP"], tags) "tcp_closing": counts["CLOSING"],
"tcp_none": counts["NONE"],
"udp_socket": counts["UDP"],
}
acc.AddFields("netstat", fields, tags)
return nil return nil
} }

View File

@@ -1,12 +1,16 @@
package system package system
import ( import (
"fmt"
gonet "net" gonet "net"
"os" "os"
"reflect"
"strings" "strings"
"testing"
"github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/testutil"
dc "github.com/fsouza/go-dockerclient" dc "github.com/fsouza/go-dockerclient"
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
@@ -14,6 +18,8 @@ import (
"github.com/shirou/gopsutil/docker" "github.com/shirou/gopsutil/docker"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net" "github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
) )
type DockerContainerStat struct { type DockerContainerStat struct {
@@ -166,3 +172,49 @@ func (s *systemPS) DockerStat() ([]*DockerContainerStat, error) {
return stats, nil return stats, nil
} }
// Asserts that a given accumulator contains a measurment of type float64 with
// specific tags within a certain distance of a given expected value. Asserts a failure
// if the measurement is of the wrong type, or if no matching measurements are found
//
// Paramaters:
// t *testing.T : Testing object to use
// acc testutil.Accumulator: Accumulator to examine
// measurement string : Name of the measurement to examine
// expectedValue float64 : Value to search for within the measurement
// delta float64 : Maximum acceptable distance of an accumulated value
// from the expectedValue parameter. Useful when
// floating-point arithmatic imprecision makes looking
// for an exact match impractical
// tags map[string]string : Tag set the found measurement must have. Set to nil to
// ignore the tag set.
func assertContainsTaggedFloat(
t *testing.T,
acc *testutil.Accumulator,
measurement string,
expectedValue float64,
delta float64,
tags map[string]string,
) {
var actualValue float64
for _, pt := range acc.Points {
if pt.Measurement == measurement {
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
if value, ok := pt.Fields["value"].(float64); ok {
actualValue = value
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
// Found the point, return without failing
return
}
} else {
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
measurement))
}
}
}
}
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}

View File

@@ -19,13 +19,6 @@ func (_ *SystemStats) Description() string {
func (_ *SystemStats) SampleConfig() string { return "" } func (_ *SystemStats) SampleConfig() string { return "" }
func (_ *SystemStats) add(acc plugins.Accumulator,
name string, val float64, tags map[string]string) {
if val >= 0 {
acc.Add(name, val, tags)
}
}
func (_ *SystemStats) Gather(acc plugins.Accumulator) error { func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
loadavg, err := load.LoadAvg() loadavg, err := load.LoadAvg()
if err != nil { if err != nil {
@@ -37,11 +30,14 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
return err return err
} }
acc.Add("load1", loadavg.Load1, nil) fields := map[string]interface{}{
acc.Add("load5", loadavg.Load5, nil) "load1": loadavg.Load1,
acc.Add("load15", loadavg.Load15, nil) "load5": loadavg.Load5,
acc.Add("uptime", float64(hostinfo.Uptime), nil) "load15": loadavg.Load15,
acc.Add("uptime_format", format_uptime(hostinfo.Uptime), nil) "uptime": hostinfo.Uptime,
"uptime_format": format_uptime(hostinfo.Uptime),
}
acc.AddFields("system", fields, nil)
return nil return nil
} }

View File

@@ -1,426 +0,0 @@
package system
import (
"fmt"
"reflect"
"syscall"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSystemStats_GenerateStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
cts := cpu.CPUTimesStat{
CPU: "cpu0",
User: 3.1,
System: 8.2,
Idle: 80.1,
Nice: 1.3,
Iowait: 0.2,
Irq: 0.1,
Softirq: 0.11,
Steal: 0.0511,
Guest: 8.1,
GuestNice: 0.324,
}
cts2 := cpu.CPUTimesStat{
CPU: "cpu0",
User: 11.4, // increased by 8.3
System: 10.9, // increased by 2.7
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
Nice: 2.5, // increased by 1.2
Iowait: 0.7, // increased by 0.5
Irq: 1.2, // increased by 1.1
Softirq: 0.31, // increased by 0.2
Steal: 0.2812, // increased by 0.0001
Guest: 12.9, // increased by 4.8
GuestNice: 2.524, // increased by 2.2
}
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
du := []*disk.DiskUsageStat{
{
Path: "/",
Fstype: "ext4",
Total: 128,
Free: 23,
InodesTotal: 1234,
InodesFree: 234,
},
{
Path: "/home",
Fstype: "ext4",
Total: 256,
Free: 46,
InodesTotal: 2468,
InodesFree: 468,
},
}
mps.On("DiskUsage").Return(du, nil)
diskio1 := disk.DiskIOCountersStat{
ReadCount: 888,
WriteCount: 5341,
ReadBytes: 100000,
WriteBytes: 200000,
ReadTime: 7123,
WriteTime: 9087,
Name: "sda1",
IoTime: 123552,
SerialNumber: "ab-123-ad",
}
diskio2 := disk.DiskIOCountersStat{
ReadCount: 444,
WriteCount: 2341,
ReadBytes: 200000,
WriteBytes: 400000,
ReadTime: 3123,
WriteTime: 6087,
Name: "sdb1",
IoTime: 246552,
SerialNumber: "bb-123-ad",
}
mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil)
netio := net.NetIOCountersStat{
Name: "eth0",
BytesSent: 1123,
BytesRecv: 8734422,
PacketsSent: 781,
PacketsRecv: 23456,
Errin: 832,
Errout: 8,
Dropin: 7,
Dropout: 1,
}
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
netprotos := []net.NetProtoCountersStat{
net.NetProtoCountersStat{
Protocol: "Udp",
Stats: map[string]int64{
"InDatagrams": 4655,
"NoPorts": 892592,
},
},
}
mps.On("NetProto").Return(netprotos, nil)
vms := &mem.VirtualMemoryStat{
Total: 12400,
Available: 7600,
Used: 5000,
Free: 1235,
// Active: 8134,
// Inactive: 1124,
// Buffers: 771,
// Cached: 4312,
// Wired: 134,
// Shared: 2142,
}
mps.On("VMStat").Return(vms, nil)
sms := &mem.SwapMemoryStat{
Total: 8123,
Used: 1232,
Free: 6412,
UsedPercent: 12.2,
Sin: 7,
Sout: 830,
}
mps.On("SwapStat").Return(sms, nil)
netstats := []net.NetConnectionStat{
net.NetConnectionStat{
Type: syscall.SOCK_DGRAM,
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "ESTABLISHED",
},
net.NetConnectionStat{
Status: "CLOSE",
},
}
mps.On("NetConnections").Return(netstats, nil)
cs := NewCPUStats(&mps)
cputags := map[string]string{
"cpu": "cpu0",
}
preCPUPoints := len(acc.Points)
err := cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints := len(acc.Points) - preCPUPoints
expectedCPUPoints := 10
assert.Equal(t, expectedCPUPoints, numCPUPoints)
// Computed values are checked with delta > 0 becasue of floating point arithmatic
// imprecision
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
mps2 := MockPS{}
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
cs.ps = &mps2
// Should have added cpu percentages too
err = cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints = len(acc.Points) - (preCPUPoints + numCPUPoints)
expectedCPUPoints = 20
assert.Equal(t, expectedCPUPoints, numCPUPoints)
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
preDiskPoints := len(acc.Points)
err = (&DiskStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskPoints := len(acc.Points) - preDiskPoints
expectedAllDiskPoints := 12
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
tags1 := map[string]string{
"path": "/",
"fstype": "ext4",
}
tags2 := map[string]string{
"path": "/home",
"fstype": "ext4",
}
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
// We expect 6 more DiskPoints to show up with an explicit match on "/"
// and /home not matching the /dev in Mountpoints
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, preDiskPoints+expectedAllDiskPoints+6, len(acc.Points))
// We should see all the diskpoints as Mountpoints includes both
// / and /home
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, preDiskPoints+2*expectedAllDiskPoints+6, len(acc.Points))
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
require.NoError(t, err)
ntags := map[string]string{
"interface": "eth0",
}
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
preDiskIOPoints := len(acc.Points)
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskIOPoints := len(acc.Points) - preDiskIOPoints
expectedAllDiskIOPoints := 14
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
dtags1 := map[string]string{
"name": "sda1",
"serial": "ab-123-ad",
}
dtags2 := map[string]string{
"name": "sdb1",
"serial": "bb-123-ad",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
// and serial should be missing from the tags with SkipSerialNumber set
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points))
dtags3 := map[string]string{
"name": "sdb1",
}
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
err = (&MemStats{&mps}).Gather(&acc)
require.NoError(t, err)
vmtags := map[string]string(nil)
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
assert.True(t, acc.CheckTaggedValue("available_percent",
float64(7600)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("used_percent",
float64(5000)/float64(12400)*100,
vmtags))
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
acc.Points = nil
err = (&SwapStats{&mps}).Gather(&acc)
require.NoError(t, err)
swaptags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
acc.Points = nil
err = (&NetStats{&mps}).Gather(&acc)
require.NoError(t, err)
netstattags := map[string]string(nil)
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
}
// Asserts that a given accumulator contains a measurment of type float64 with
// specific tags within a certain distance of a given expected value. Asserts a failure
// if the measurement is of the wrong type, or if no matching measurements are found
//
// Paramaters:
// t *testing.T : Testing object to use
// acc testutil.Accumulator: Accumulator to examine
// measurement string : Name of the measurement to examine
// expectedValue float64 : Value to search for within the measurement
// delta float64 : Maximum acceptable distance of an accumulated value
// from the expectedValue parameter. Useful when
// floating-point arithmatic imprecision makes looking
// for an exact match impractical
// tags map[string]string : Tag set the found measurement must have. Set to nil to
// ignore the tag set.
func assertContainsTaggedFloat(
t *testing.T,
acc *testutil.Accumulator,
measurement string,
expectedValue float64,
delta float64,
tags map[string]string,
) {
var actualValue float64
for _, pt := range acc.Points {
if pt.Measurement == measurement {
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
if value, ok := pt.Fields["value"].(float64); ok {
actualValue = value
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
// Found the point, return without failing
return
}
} else {
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
measurement))
}
}
}
}
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
measurement, delta, expectedValue, actualValue)
assert.Fail(t, msg)
}

View File

@@ -41,6 +41,5 @@ func (s *Trig) Gather(acc plugins.Accumulator) error {
} }
func init() { func init() {
plugins.Add("Trig", func() plugins.Plugin { return &Trig{x: 0.0} }) plugins.Add("Trig", func() plugins.Plugin { return &Trig{x: 0.0} })
} }

View File

@@ -5,28 +5,21 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"net" "net"
"strings"
"sync"
"time" "time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type Twemproxy struct { type Twemproxy struct {
Instances []TwemproxyInstance
}
type TwemproxyInstance struct {
Addr string Addr string
Pools []string Pools []string
} }
var sampleConfig = ` var sampleConfig = `
[[plugins.twemproxy.instances]] # Twemproxy stats address and port (no scheme)
# Twemproxy stats address and port (no scheme) addr = "localhost:22222"
addr = "localhost:22222" # Monitor pool name
# Monitor pool name pools = ["redis_pool", "mc_pool"]
pools = ["redis_pool", "mc_pool"]
` `
func (t *Twemproxy) SampleConfig() string { func (t *Twemproxy) SampleConfig() string {
@@ -39,35 +32,7 @@ func (t *Twemproxy) Description() string {
// Gather data from all Twemproxy instances // Gather data from all Twemproxy instances
func (t *Twemproxy) Gather(acc plugins.Accumulator) error { func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
errorChan := make(chan error, len(t.Instances))
for _, inst := range t.Instances {
wg.Add(1)
go func(inst TwemproxyInstance) {
defer wg.Done()
if err := inst.Gather(acc); err != nil {
errorChan <- err
}
}(inst)
}
wg.Wait()
close(errorChan)
errs := []string{}
for err := range errorChan {
errs = append(errs, err.Error())
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, "\n"))
}
// Gather data from one Twemproxy
func (ti *TwemproxyInstance) Gather(
acc plugins.Accumulator,
) error {
conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second)
if err != nil { if err != nil {
return err return err
} }
@@ -82,14 +47,14 @@ func (ti *TwemproxyInstance) Gather(
} }
tags := make(map[string]string) tags := make(map[string]string)
tags["twemproxy"] = ti.Addr tags["twemproxy"] = t.Addr
ti.processStat(acc, tags, stats) t.processStat(acc, tags, stats)
return nil return nil
} }
// Process Twemproxy server stats // Process Twemproxy server stats
func (ti *TwemproxyInstance) processStat( func (t *Twemproxy) processStat(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
data map[string]interface{}, data map[string]interface{},
@@ -100,40 +65,42 @@ func (ti *TwemproxyInstance) processStat(
} }
} }
fields := make(map[string]interface{})
metrics := []string{"total_connections", "curr_connections", "timestamp"} metrics := []string{"total_connections", "curr_connections", "timestamp"}
for _, m := range metrics { for _, m := range metrics {
if value, ok := data[m]; ok { if value, ok := data[m]; ok {
if val, ok := value.(float64); ok { if val, ok := value.(float64); ok {
acc.Add(m, val, tags) fields[m] = val
} }
} }
} }
acc.AddFields("twemproxy", fields, tags)
for _, pool := range ti.Pools { for _, pool := range t.Pools {
if poolStat, ok := data[pool]; ok { if poolStat, ok := data[pool]; ok {
if data, ok := poolStat.(map[string]interface{}); ok { if data, ok := poolStat.(map[string]interface{}); ok {
poolTags := copyTags(tags) poolTags := copyTags(tags)
poolTags["pool"] = pool poolTags["pool"] = pool
ti.processPool(acc, poolTags, pool+"_", data) t.processPool(acc, poolTags, data)
} }
} }
} }
} }
// Process pool data in Twemproxy stats // Process pool data in Twemproxy stats
func (ti *TwemproxyInstance) processPool( func (t *Twemproxy) processPool(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
prefix string,
data map[string]interface{}, data map[string]interface{},
) { ) {
serverTags := make(map[string]map[string]string) serverTags := make(map[string]map[string]string)
fields := make(map[string]interface{})
for key, value := range data { for key, value := range data {
switch key { switch key {
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof": case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
if val, ok := value.(float64); ok { if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags) fields[key] = val
} }
default: default:
if data, ok := value.(map[string]interface{}); ok { if data, ok := value.(map[string]interface{}); ok {
@@ -141,27 +108,29 @@ func (ti *TwemproxyInstance) processPool(
serverTags[key] = copyTags(tags) serverTags[key] = copyTags(tags)
serverTags[key]["server"] = key serverTags[key]["server"] = key
} }
ti.processServer(acc, serverTags[key], prefix, data) t.processServer(acc, serverTags[key], data)
} }
} }
} }
acc.AddFields("twemproxy_pool", fields, tags)
} }
// Process backend server(redis/memcached) stats // Process backend server(redis/memcached) stats
func (ti *TwemproxyInstance) processServer( func (t *Twemproxy) processServer(
acc plugins.Accumulator, acc plugins.Accumulator,
tags map[string]string, tags map[string]string,
prefix string,
data map[string]interface{}, data map[string]interface{},
) { ) {
fields := make(map[string]interface{})
for key, value := range data { for key, value := range data {
switch key { switch key {
default: default:
if val, ok := value.(float64); ok { if val, ok := value.(float64); ok {
acc.Add(prefix+key, val, tags) fields[key] = val
} }
} }
} }
acc.AddFields("twemproxy_pool", fields, tags)
} }
// Tags is not expected to be mutated after passing to Add. // Tags is not expected to be mutated after passing to Add.

View File

@@ -88,15 +88,15 @@ func gatherPoolStats(pool poolInfo, acc plugins.Accumulator) error {
} }
tag := map[string]string{"pool": pool.name} tag := map[string]string{"pool": pool.name}
fields := make(map[string]interface{})
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
value, err := strconv.ParseInt(values[i], 10, 64) value, err := strconv.ParseInt(values[i], 10, 64)
if err != nil { if err != nil {
return err return err
} }
fields[keys[i]] = value
acc.Add(keys[i], value, tag)
} }
acc.AddFields("zfs_pool", fields, tag)
return nil return nil
} }
@@ -124,6 +124,7 @@ func (z *Zfs) Gather(acc plugins.Accumulator) error {
} }
} }
fields := make(map[string]interface{})
for _, metric := range kstatMetrics { for _, metric := range kstatMetrics {
lines, err := internal.ReadLines(kstatPath + "/" + metric) lines, err := internal.ReadLines(kstatPath + "/" + metric)
if err != nil { if err != nil {
@@ -140,9 +141,10 @@ func (z *Zfs) Gather(acc plugins.Accumulator) error {
key := metric + "_" + rawData[0] key := metric + "_" + rawData[0]
rawValue := rawData[len(rawData)-1] rawValue := rawData[len(rawData)-1]
value, _ := strconv.ParseInt(rawValue, 10, 64) value, _ := strconv.ParseInt(rawValue, 10, 64)
acc.Add(key, value, tags) fields[key] = value
} }
} }
acc.AddFields("zfs", fields, tags)
return nil return nil
} }

View File

@@ -67,35 +67,37 @@ func (z *Zookeeper) gatherServer(address string, acc plugins.Accumulator) error
defer c.Close() defer c.Close()
fmt.Fprintf(c, "%s\n", "mntr") fmt.Fprintf(c, "%s\n", "mntr")
rdr := bufio.NewReader(c) rdr := bufio.NewReader(c)
scanner := bufio.NewScanner(rdr) scanner := bufio.NewScanner(rdr)
service := strings.Split(address, ":")
if len(service) != 2 {
return fmt.Errorf("Invalid service address: %s", address)
}
tags := map[string]string{"server": service[0], "port": service[1]}
fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`) re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`)
parts := re.FindStringSubmatch(string(line)) parts := re.FindStringSubmatch(string(line))
service := strings.Split(address, ":") if len(parts) != 3 {
if len(parts) != 3 || len(service) != 2 {
return fmt.Errorf("unexpected line in mntr response: %q", line) return fmt.Errorf("unexpected line in mntr response: %q", line)
} }
tags := map[string]string{"server": service[0], "port": service[1]}
measurement := strings.TrimPrefix(parts[1], "zk_") measurement := strings.TrimPrefix(parts[1], "zk_")
sValue := string(parts[2]) sValue := string(parts[2])
iVal, err := strconv.ParseInt(sValue, 10, 64) iVal, err := strconv.ParseInt(sValue, 10, 64)
if err == nil { if err == nil {
acc.Add(measurement, iVal, tags) fields[measurement] = iVal
} else { } else {
acc.Add(measurement, sValue, tags) fields[measurement] = sValue
} }
} }
acc.AddFields("zookeeper", fields, tags)
return nil return nil
} }