Compare commits
28 Commits
plugin/rea
...
v0.3.0-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
069cb9766b | ||
|
|
8b54c73ae4 | ||
|
|
c9ef073fba | ||
|
|
15f66d7d1b | ||
|
|
b0f79f43ec | ||
|
|
c584129758 | ||
|
|
d1930c90b5 | ||
|
|
1e76e36df2 | ||
|
|
a73b5257dc | ||
|
|
c16be04ca7 | ||
|
|
5513275f2c | ||
|
|
3a7b1688a3 | ||
|
|
35d5c7bae3 | ||
|
|
60b6693ae3 | ||
|
|
c1e1f2ace4 | ||
|
|
6698d195d8 | ||
|
|
23b21ca86a | ||
|
|
56e14e4731 | ||
|
|
7deb339b76 | ||
|
|
0e55c371b7 | ||
|
|
f284c8c154 | ||
|
|
e3b314cacb | ||
|
|
9fce094b36 | ||
|
|
319c363c8e | ||
|
|
40d84accee | ||
|
|
3fc43df84e | ||
|
|
59f804d77a | ||
|
|
96d5f0d0de |
22
CHANGELOG.md
22
CHANGELOG.md
@@ -1,3 +1,25 @@
|
|||||||
|
## v0.3.0 [unreleased]
|
||||||
|
|
||||||
|
### Release Notes
|
||||||
|
- **breaking change** the `io` plugin has been renamed `diskio`
|
||||||
|
- **breaking change** Plugin measurements aggregated into a single measurement.
|
||||||
|
- **breaking change** `jolokia` plugin: must use global tag/drop/pass parameters
|
||||||
|
for configuration.
|
||||||
|
- `twemproxy` plugin: `prefix` option removed.
|
||||||
|
- `procstat` cpu measurements are now prepended with `cpu_time_` instead of
|
||||||
|
only `cpu_`
|
||||||
|
- The prometheus plugin schema has not been changed (measurements have not been
|
||||||
|
aggregated).
|
||||||
|
|
||||||
|
### Features
|
||||||
|
- Plugin measurements aggregated into a single measurement.
|
||||||
|
- Added ability to specify per-plugin tags
|
||||||
|
- Added ability to specify per-plugin measurement suffix and prefix.
|
||||||
|
(`name_prefix` and `name_suffix`)
|
||||||
|
- Added ability to override base plugin name. (`name_override`)
|
||||||
|
|
||||||
|
### Bugfixes
|
||||||
|
|
||||||
## v0.2.5 [unreleased]
|
## v0.2.5 [unreleased]
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|||||||
177
CONFIGURATION.md
Normal file
177
CONFIGURATION.md
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
# Telegraf Configuration
|
||||||
|
|
||||||
|
## Plugin Configuration
|
||||||
|
|
||||||
|
There are some configuration options that are configurable per plugin:
|
||||||
|
|
||||||
|
* **name_override**: Override the base name of the measurement.
|
||||||
|
(Default is the name of the plugin).
|
||||||
|
* **name_prefix**: Specifies a prefix to attach to the measurement name.
|
||||||
|
* **name_suffix**: Specifies a suffix to attach to the measurement name.
|
||||||
|
* **tags**: A map of tags to apply to a specific plugin's measurements.
|
||||||
|
|
||||||
|
### Plugin Filters
|
||||||
|
|
||||||
|
There are also filters that can be configured per plugin:
|
||||||
|
|
||||||
|
* **pass**: An array of strings that is used to filter metrics generated by the
|
||||||
|
current plugin. Each string in the array is tested as a glob match against field names
|
||||||
|
and if it matches, the field is emitted.
|
||||||
|
* **drop**: The inverse of pass, if a field name matches, it is not emitted.
|
||||||
|
* **tagpass**: tag names and arrays of strings that are used to filter
|
||||||
|
measurements by the current plugin. Each string in the array is tested as a glob
|
||||||
|
match against the tag name, and if it matches the measurement is emitted.
|
||||||
|
* **tagdrop**: The inverse of tagpass. If a tag matches, the measurement is not emitted.
|
||||||
|
This is tested on measurements that have passed the tagpass test.
|
||||||
|
* **interval**: How often to gather this metric. Normal plugins use a single
|
||||||
|
global interval, but if one particular plugin should be run less or more often,
|
||||||
|
you can configure that here.
|
||||||
|
|
||||||
|
### Plugin Configuration Examples
|
||||||
|
|
||||||
|
This is a full working config that will output CPU data to an InfluxDB instance
|
||||||
|
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
|
||||||
|
measurements at a 10s interval and will collect per-cpu data, dropping any
|
||||||
|
fields which begin with `time_`.
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[tags]
|
||||||
|
dc = "denver-1"
|
||||||
|
|
||||||
|
[agent]
|
||||||
|
interval = "10s"
|
||||||
|
|
||||||
|
# OUTPUTS
|
||||||
|
[outputs]
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
url = "http://192.168.59.103:8086" # required.
|
||||||
|
database = "telegraf" # required.
|
||||||
|
precision = "s"
|
||||||
|
|
||||||
|
# PLUGINS
|
||||||
|
[plugins]
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = false
|
||||||
|
# filter all fields beginning with 'time_'
|
||||||
|
drop = ["time_*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Plugin Config: tagpass and tagdrop
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[plugins]
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = false
|
||||||
|
drop = ["cpu_time"]
|
||||||
|
# Don't collect CPU data for cpu6 & cpu7
|
||||||
|
[plugins.cpu.tagdrop]
|
||||||
|
cpu = [ "cpu6", "cpu7" ]
|
||||||
|
|
||||||
|
[[plugins.disk]]
|
||||||
|
[plugins.disk.tagpass]
|
||||||
|
# tagpass conditions are OR, not AND.
|
||||||
|
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
|
||||||
|
# then the metric passes
|
||||||
|
fstype = [ "ext4", "xfs" ]
|
||||||
|
# Globs can also be used on the tag values
|
||||||
|
path = [ "/opt", "/home*" ]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Plugin Config: pass and drop
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Drop all metrics for guest & steal CPU usage
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = false
|
||||||
|
totalcpu = true
|
||||||
|
drop = ["usage_guest", "usage_steal"]
|
||||||
|
|
||||||
|
# Only store inode related metrics for disks
|
||||||
|
[[plugins.disk]]
|
||||||
|
pass = ["inodes*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Plugin config: prefix, suffix, and override
|
||||||
|
|
||||||
|
This plugin will emit measurements with the name `cpu_total`
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[plugins.cpu]]
|
||||||
|
name_suffix = "_total"
|
||||||
|
percpu = false
|
||||||
|
totalcpu = true
|
||||||
|
```
|
||||||
|
|
||||||
|
This will emit measurements with the name `foobar`
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[plugins.cpu]]
|
||||||
|
name_override = "foobar"
|
||||||
|
percpu = false
|
||||||
|
totalcpu = true
|
||||||
|
```
|
||||||
|
|
||||||
|
### Plugin config: tags
|
||||||
|
|
||||||
|
This plugin will emit measurements with two additional tags: `tag1=foo` and
|
||||||
|
`tag2=bar`
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = false
|
||||||
|
totalcpu = true
|
||||||
|
[plugins.cpu.tags]
|
||||||
|
tag1 = "foo"
|
||||||
|
tag2 = "bar"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Multiple plugins of the same type
|
||||||
|
|
||||||
|
Additional plugins (or outputs) of the same type can be specified,
|
||||||
|
just define more instances in the config file:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = false
|
||||||
|
totalcpu = true
|
||||||
|
|
||||||
|
[[plugins.cpu]]
|
||||||
|
percpu = true
|
||||||
|
totalcpu = false
|
||||||
|
drop = ["cpu_time*"]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Output Configuration
|
||||||
|
|
||||||
|
Telegraf also supports specifying multiple output sinks to send data to,
|
||||||
|
configuring each output sink is different, but examples can be
|
||||||
|
found by running `telegraf -sample-config`.
|
||||||
|
|
||||||
|
Outputs also support the same configurable options as plugins
|
||||||
|
(pass, drop, tagpass, tagdrop), added in 0.2.4
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = [ "http://localhost:8086" ]
|
||||||
|
database = "telegraf"
|
||||||
|
precision = "s"
|
||||||
|
# Drop all measurements that start with "aerospike"
|
||||||
|
drop = ["aerospike*"]
|
||||||
|
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = [ "http://localhost:8086" ]
|
||||||
|
database = "telegraf-aerospike-data"
|
||||||
|
precision = "s"
|
||||||
|
# Only accept aerospike data:
|
||||||
|
pass = ["aerospike*"]
|
||||||
|
|
||||||
|
[[outputs.influxdb]]
|
||||||
|
urls = [ "http://localhost:8086" ]
|
||||||
|
database = "telegraf-cpu0-data"
|
||||||
|
precision = "s"
|
||||||
|
# Only store measurements where the tag "cpu" matches the value "cpu0"
|
||||||
|
[outputs.influxdb.tagpass]
|
||||||
|
cpu = ["cpu0"]
|
||||||
|
```
|
||||||
133
README.md
133
README.md
@@ -90,99 +90,10 @@ unit parser, e.g. "10s" for 10 seconds or "5m" for 5 minutes.
|
|||||||
* **debug**: Set to true to gather and send metrics to STDOUT as well as
|
* **debug**: Set to true to gather and send metrics to STDOUT as well as
|
||||||
InfluxDB.
|
InfluxDB.
|
||||||
|
|
||||||
## Plugin Options
|
## Configuration
|
||||||
|
|
||||||
There are 5 configuration options that are configurable per plugin:
|
See the [configuration guide](CONFIGURATION.md) for a rundown of the more advanced
|
||||||
|
configuration options.
|
||||||
* **pass**: An array of strings that is used to filter metrics generated by the
|
|
||||||
current plugin. Each string in the array is tested as a glob match against metric names
|
|
||||||
and if it matches, the metric is emitted.
|
|
||||||
* **drop**: The inverse of pass, if a metric name matches, it is not emitted.
|
|
||||||
* **tagpass**: tag names and arrays of strings that are used to filter metrics by the current plugin. Each string in the array is tested as a glob match against
|
|
||||||
the tag name, and if it matches the metric is emitted.
|
|
||||||
* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted.
|
|
||||||
This is tested on metrics that have passed the tagpass test.
|
|
||||||
* **interval**: How often to gather this metric. Normal plugins use a single
|
|
||||||
global interval, but if one particular plugin should be run less or more often,
|
|
||||||
you can configure that here.
|
|
||||||
|
|
||||||
### Plugin Configuration Examples
|
|
||||||
|
|
||||||
This is a full working config that will output CPU data to an InfluxDB instance
|
|
||||||
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
|
|
||||||
measurements at a 10s interval and will collect per-cpu data, dropping any
|
|
||||||
measurements which begin with `cpu_time`.
|
|
||||||
|
|
||||||
```toml
|
|
||||||
[tags]
|
|
||||||
dc = "denver-1"
|
|
||||||
|
|
||||||
[agent]
|
|
||||||
interval = "10s"
|
|
||||||
|
|
||||||
# OUTPUTS
|
|
||||||
[outputs]
|
|
||||||
[[outputs.influxdb]]
|
|
||||||
url = "http://192.168.59.103:8086" # required.
|
|
||||||
database = "telegraf" # required.
|
|
||||||
precision = "s"
|
|
||||||
|
|
||||||
# PLUGINS
|
|
||||||
[plugins]
|
|
||||||
[[plugins.cpu]]
|
|
||||||
percpu = true
|
|
||||||
totalcpu = false
|
|
||||||
drop = ["cpu_time*"]
|
|
||||||
```
|
|
||||||
|
|
||||||
Below is how to configure `tagpass` and `tagdrop` parameters
|
|
||||||
|
|
||||||
```toml
|
|
||||||
[plugins]
|
|
||||||
[[plugins.cpu]]
|
|
||||||
percpu = true
|
|
||||||
totalcpu = false
|
|
||||||
drop = ["cpu_time"]
|
|
||||||
# Don't collect CPU data for cpu6 & cpu7
|
|
||||||
[plugins.cpu.tagdrop]
|
|
||||||
cpu = [ "cpu6", "cpu7" ]
|
|
||||||
|
|
||||||
[[plugins.disk]]
|
|
||||||
[plugins.disk.tagpass]
|
|
||||||
# tagpass conditions are OR, not AND.
|
|
||||||
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
|
|
||||||
# then the metric passes
|
|
||||||
fstype = [ "ext4", "xfs" ]
|
|
||||||
# Globs can also be used on the tag values
|
|
||||||
path = [ "/opt", "/home*" ]
|
|
||||||
```
|
|
||||||
|
|
||||||
Below is how to configure `pass` and `drop` parameters
|
|
||||||
|
|
||||||
```toml
|
|
||||||
# Drop all metrics for guest CPU usage
|
|
||||||
[[plugins.cpu]]
|
|
||||||
drop = [ "cpu_usage_guest" ]
|
|
||||||
|
|
||||||
# Only store inode related metrics for disks
|
|
||||||
[[plugins.disk]]
|
|
||||||
pass = [ "disk_inodes*" ]
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
Additional plugins (or outputs) of the same type can be specified,
|
|
||||||
just define more instances in the config file:
|
|
||||||
|
|
||||||
```toml
|
|
||||||
[[plugins.cpu]]
|
|
||||||
percpu = false
|
|
||||||
totalcpu = true
|
|
||||||
|
|
||||||
[[plugins.cpu]]
|
|
||||||
percpu = true
|
|
||||||
totalcpu = false
|
|
||||||
drop = ["cpu_time*"]
|
|
||||||
```
|
|
||||||
|
|
||||||
## Supported Plugins
|
## Supported Plugins
|
||||||
|
|
||||||
@@ -200,7 +111,7 @@ Telegraf currently has support for collecting metrics from:
|
|||||||
* haproxy
|
* haproxy
|
||||||
* httpjson (generic JSON-emitting http service plugin)
|
* httpjson (generic JSON-emitting http service plugin)
|
||||||
* influxdb
|
* influxdb
|
||||||
* jolokia (remote JMX with JSON over HTTP)
|
* jolokia
|
||||||
* leofs
|
* leofs
|
||||||
* lustre2
|
* lustre2
|
||||||
* mailchimp
|
* mailchimp
|
||||||
@@ -223,10 +134,10 @@ Telegraf currently has support for collecting metrics from:
|
|||||||
* system
|
* system
|
||||||
* cpu
|
* cpu
|
||||||
* mem
|
* mem
|
||||||
* io
|
|
||||||
* net
|
* net
|
||||||
* netstat
|
* netstat
|
||||||
* disk
|
* disk
|
||||||
|
* diskio
|
||||||
* swap
|
* swap
|
||||||
|
|
||||||
## Supported Service Plugins
|
## Supported Service Plugins
|
||||||
@@ -239,40 +150,6 @@ Telegraf can collect metrics via the following services:
|
|||||||
We'll be adding support for many more over the coming months. Read on if you
|
We'll be adding support for many more over the coming months. Read on if you
|
||||||
want to add support for another service or third-party API.
|
want to add support for another service or third-party API.
|
||||||
|
|
||||||
## Output options
|
|
||||||
|
|
||||||
Telegraf also supports specifying multiple output sinks to send data to,
|
|
||||||
configuring each output sink is different, but examples can be
|
|
||||||
found by running `telegraf -sample-config`.
|
|
||||||
|
|
||||||
Outputs also support the same configurable options as plugins
|
|
||||||
(pass, drop, tagpass, tagdrop), added in 0.2.4
|
|
||||||
|
|
||||||
```toml
|
|
||||||
[[outputs.influxdb]]
|
|
||||||
urls = [ "http://localhost:8086" ]
|
|
||||||
database = "telegraf"
|
|
||||||
precision = "s"
|
|
||||||
# Drop all measurements that start with "aerospike"
|
|
||||||
drop = ["aerospike*"]
|
|
||||||
|
|
||||||
[[outputs.influxdb]]
|
|
||||||
urls = [ "http://localhost:8086" ]
|
|
||||||
database = "telegraf-aerospike-data"
|
|
||||||
precision = "s"
|
|
||||||
# Only accept aerospike data:
|
|
||||||
pass = ["aerospike*"]
|
|
||||||
|
|
||||||
[[outputs.influxdb]]
|
|
||||||
urls = [ "http://localhost:8086" ]
|
|
||||||
database = "telegraf-cpu0-data"
|
|
||||||
precision = "s"
|
|
||||||
# Only store measurements where the tag "cpu" matches the value "cpu0"
|
|
||||||
[outputs.influxdb.tagpass]
|
|
||||||
cpu = ["cpu0"]
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
## Supported Outputs
|
## Supported Outputs
|
||||||
|
|
||||||
* influxdb
|
* influxdb
|
||||||
|
|||||||
@@ -69,30 +69,72 @@ func (ac *accumulator) AddFields(
|
|||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
t ...time.Time,
|
t ...time.Time,
|
||||||
) {
|
) {
|
||||||
// Validate uint64 and float64 fields
|
if !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Override measurement name if set
|
||||||
|
if len(ac.pluginConfig.NameOverride) != 0 {
|
||||||
|
measurement = ac.pluginConfig.NameOverride
|
||||||
|
}
|
||||||
|
// Apply measurement prefix and suffix if set
|
||||||
|
if len(ac.pluginConfig.MeasurementPrefix) != 0 {
|
||||||
|
measurement = ac.pluginConfig.MeasurementPrefix + measurement
|
||||||
|
}
|
||||||
|
if len(ac.pluginConfig.MeasurementSuffix) != 0 {
|
||||||
|
measurement = measurement + ac.pluginConfig.MeasurementSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
if tags == nil {
|
||||||
|
tags = make(map[string]string)
|
||||||
|
}
|
||||||
|
// Apply plugin-wide tags if set
|
||||||
|
for k, v := range ac.pluginConfig.Tags {
|
||||||
|
if _, ok := tags[k]; !ok {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Apply daemon-wide tags if set
|
||||||
|
for k, v := range ac.defaultTags {
|
||||||
|
if _, ok := tags[k]; !ok {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make(map[string]interface{})
|
||||||
for k, v := range fields {
|
for k, v := range fields {
|
||||||
|
// Filter out any filtered fields
|
||||||
|
if ac.pluginConfig != nil {
|
||||||
|
if !ac.pluginConfig.Filter.ShouldPass(k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result[k] = v
|
||||||
|
|
||||||
|
// Validate uint64 and float64 fields
|
||||||
switch val := v.(type) {
|
switch val := v.(type) {
|
||||||
case uint64:
|
case uint64:
|
||||||
// InfluxDB does not support writing uint64
|
// InfluxDB does not support writing uint64
|
||||||
if val < uint64(9223372036854775808) {
|
if val < uint64(9223372036854775808) {
|
||||||
fields[k] = int64(val)
|
result[k] = int64(val)
|
||||||
} else {
|
} else {
|
||||||
fields[k] = int64(9223372036854775807)
|
result[k] = int64(9223372036854775807)
|
||||||
}
|
}
|
||||||
case float64:
|
case float64:
|
||||||
// NaNs are invalid values in influxdb, skip measurement
|
// NaNs are invalid values in influxdb, skip measurement
|
||||||
if math.IsNaN(val) || math.IsInf(val, 0) {
|
if math.IsNaN(val) || math.IsInf(val, 0) {
|
||||||
if ac.debug {
|
if ac.debug {
|
||||||
log.Printf("Measurement [%s] has a NaN or Inf field, skipping",
|
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
|
||||||
measurement)
|
"field, skipping",
|
||||||
|
measurement, k)
|
||||||
}
|
}
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fields = nil
|
||||||
if tags == nil {
|
if len(result) == 0 {
|
||||||
tags = make(map[string]string)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var timestamp time.Time
|
var timestamp time.Time
|
||||||
@@ -106,19 +148,7 @@ func (ac *accumulator) AddFields(
|
|||||||
measurement = ac.prefix + measurement
|
measurement = ac.prefix + measurement
|
||||||
}
|
}
|
||||||
|
|
||||||
if ac.pluginConfig != nil {
|
pt, err := client.NewPoint(measurement, tags, result, timestamp)
|
||||||
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range ac.defaultTags {
|
|
||||||
if _, ok := tags[k]; !ok {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pt, err := client.NewPoint(measurement, tags, fields, timestamp)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
|
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
|
||||||
return
|
return
|
||||||
|
|||||||
6
agent.go
6
agent.go
@@ -104,7 +104,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
|
|||||||
|
|
||||||
acc := NewAccumulator(plugin.Config, pointChan)
|
acc := NewAccumulator(plugin.Config, pointChan)
|
||||||
acc.SetDebug(a.Config.Agent.Debug)
|
acc.SetDebug(a.Config.Agent.Debug)
|
||||||
acc.SetPrefix(plugin.Name + "_")
|
// acc.SetPrefix(plugin.Name + "_")
|
||||||
acc.SetDefaultTags(a.Config.Tags)
|
acc.SetDefaultTags(a.Config.Tags)
|
||||||
|
|
||||||
if err := plugin.Plugin.Gather(acc); err != nil {
|
if err := plugin.Plugin.Gather(acc); err != nil {
|
||||||
@@ -141,7 +141,7 @@ func (a *Agent) gatherSeparate(
|
|||||||
|
|
||||||
acc := NewAccumulator(plugin.Config, pointChan)
|
acc := NewAccumulator(plugin.Config, pointChan)
|
||||||
acc.SetDebug(a.Config.Agent.Debug)
|
acc.SetDebug(a.Config.Agent.Debug)
|
||||||
acc.SetPrefix(plugin.Name + "_")
|
// acc.SetPrefix(plugin.Name + "_")
|
||||||
acc.SetDefaultTags(a.Config.Tags)
|
acc.SetDefaultTags(a.Config.Tags)
|
||||||
|
|
||||||
if err := plugin.Plugin.Gather(acc); err != nil {
|
if err := plugin.Plugin.Gather(acc); err != nil {
|
||||||
@@ -187,7 +187,7 @@ func (a *Agent) Test() error {
|
|||||||
for _, plugin := range a.Config.Plugins {
|
for _, plugin := range a.Config.Plugins {
|
||||||
acc := NewAccumulator(plugin.Config, pointChan)
|
acc := NewAccumulator(plugin.Config, pointChan)
|
||||||
acc.SetDebug(true)
|
acc.SetDebug(true)
|
||||||
acc.SetPrefix(plugin.Name + "_")
|
// acc.SetPrefix(plugin.Name + "_")
|
||||||
|
|
||||||
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name)
|
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name)
|
||||||
if plugin.Config.Interval != 0 {
|
if plugin.Config.Interval != 0 {
|
||||||
|
|||||||
@@ -97,8 +97,8 @@
|
|||||||
# Mountpoints=["/"]
|
# Mountpoints=["/"]
|
||||||
|
|
||||||
# Read metrics about disk IO by device
|
# Read metrics about disk IO by device
|
||||||
[[plugins.io]]
|
[[plugins.diskio]]
|
||||||
# By default, telegraf will gather stats for all devices including
|
# By default, telegraf will gather stats for all devices including
|
||||||
# disk partitions.
|
# disk partitions.
|
||||||
# Setting devices will restrict the stats to the specified devcies.
|
# Setting devices will restrict the stats to the specified devcies.
|
||||||
# Devices=["sda","sdb"]
|
# Devices=["sda","sdb"]
|
||||||
|
|||||||
@@ -112,9 +112,13 @@ type Filter struct {
|
|||||||
|
|
||||||
// PluginConfig containing a name, interval, and filter
|
// PluginConfig containing a name, interval, and filter
|
||||||
type PluginConfig struct {
|
type PluginConfig struct {
|
||||||
Name string
|
Name string
|
||||||
Filter Filter
|
NameOverride string
|
||||||
Interval time.Duration
|
MeasurementPrefix string
|
||||||
|
MeasurementSuffix string
|
||||||
|
Tags map[string]string
|
||||||
|
Filter Filter
|
||||||
|
Interval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// OutputConfig containing name and filter
|
// OutputConfig containing name and filter
|
||||||
@@ -142,12 +146,12 @@ func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
|
|||||||
|
|
||||||
// ShouldPass returns true if the metric should pass, false if should drop
|
// ShouldPass returns true if the metric should pass, false if should drop
|
||||||
// based on the drop/pass filter parameters
|
// based on the drop/pass filter parameters
|
||||||
func (f Filter) ShouldPass(measurement string) bool {
|
func (f Filter) ShouldPass(fieldkey string) bool {
|
||||||
if f.Pass != nil {
|
if f.Pass != nil {
|
||||||
for _, pat := range f.Pass {
|
for _, pat := range f.Pass {
|
||||||
// TODO remove HasPrefix check, leaving it for now for legacy support.
|
// TODO remove HasPrefix check, leaving it for now for legacy support.
|
||||||
// Cam, 2015-12-07
|
// Cam, 2015-12-07
|
||||||
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) {
|
if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -158,7 +162,7 @@ func (f Filter) ShouldPass(measurement string) bool {
|
|||||||
for _, pat := range f.Drop {
|
for _, pat := range f.Drop {
|
||||||
// TODO remove HasPrefix check, leaving it for now for legacy support.
|
// TODO remove HasPrefix check, leaving it for now for legacy support.
|
||||||
// Cam, 2015-12-07
|
// Cam, 2015-12-07
|
||||||
if strings.HasPrefix(measurement, pat) || internal.Glob(pat, measurement) {
|
if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -527,6 +531,11 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
|
|||||||
if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) {
|
if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// Legacy support renaming io plugin to diskio
|
||||||
|
if name == "io" {
|
||||||
|
name = "diskio"
|
||||||
|
}
|
||||||
|
|
||||||
creator, ok := plugins.Plugins[name]
|
creator, ok := plugins.Plugins[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Undefined but requested plugin: %s", name)
|
return fmt.Errorf("Undefined but requested plugin: %s", name)
|
||||||
@@ -628,7 +637,8 @@ func buildFilter(tbl *ast.Table) Filter {
|
|||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
|
// buildPlugin parses plugin specific items from the ast.Table,
|
||||||
|
// builds the filter and returns a
|
||||||
// PluginConfig to be inserted into RunningPlugin
|
// PluginConfig to be inserted into RunningPlugin
|
||||||
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
|
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
|
||||||
cp := &PluginConfig{Name: name}
|
cp := &PluginConfig{Name: name}
|
||||||
@@ -644,10 +654,47 @@ func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["name_prefix"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
cp.MeasurementPrefix = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["name_suffix"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
cp.MeasurementSuffix = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["name_override"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
cp.NameOverride = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cp.Tags = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["tags"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
if err := toml.UnmarshalTable(subtbl, cp.Tags); err != nil {
|
||||||
|
log.Printf("Could not parse tags for plugin %s\n", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(tbl.Fields, "name_prefix")
|
||||||
|
delete(tbl.Fields, "name_suffix")
|
||||||
|
delete(tbl.Fields, "name_override")
|
||||||
delete(tbl.Fields, "interval")
|
delete(tbl.Fields, "interval")
|
||||||
|
delete(tbl.Fields, "tags")
|
||||||
cp.Filter = buildFilter(tbl)
|
cp.Filter = buildFilter(tbl)
|
||||||
return cp, nil
|
return cp, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
|
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
|
||||||
@@ -659,5 +706,4 @@ func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
|
|||||||
Filter: buildFilter(tbl),
|
Filter: buildFilter(tbl),
|
||||||
}
|
}
|
||||||
return oc, nil
|
return oc, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
2
internal/config/testdata/telegraf-agent.toml
vendored
2
internal/config/testdata/telegraf-agent.toml
vendored
@@ -105,7 +105,7 @@ urls = ["http://localhost/server-status?auto"]
|
|||||||
drop = ["cpu_time"]
|
drop = ["cpu_time"]
|
||||||
|
|
||||||
# Read metrics about disk usage by mount point
|
# Read metrics about disk usage by mount point
|
||||||
[[plugins.disk]]
|
[[plugins.diskio]]
|
||||||
# no configuration
|
# no configuration
|
||||||
|
|
||||||
# Read metrics from one or many disque servers
|
# Read metrics from one or many disque servers
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package internal
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -27,6 +28,39 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
|
|||||||
|
|
||||||
var NotImplementedError = errors.New("not implemented yet")
|
var NotImplementedError = errors.New("not implemented yet")
|
||||||
|
|
||||||
|
type JSONFlattener struct {
|
||||||
|
Fields map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlattenJSON flattens nested maps/interfaces into a fields map
|
||||||
|
func (f *JSONFlattener) FlattenJSON(
|
||||||
|
fieldname string,
|
||||||
|
v interface{},
|
||||||
|
) error {
|
||||||
|
if f.Fields == nil {
|
||||||
|
f.Fields = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
fieldname = strings.Trim(fieldname, "_")
|
||||||
|
switch t := v.(type) {
|
||||||
|
case map[string]interface{}:
|
||||||
|
for k, v := range t {
|
||||||
|
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case float64:
|
||||||
|
f.Fields[fieldname] = t
|
||||||
|
case bool, string, []interface{}:
|
||||||
|
// ignored types
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
|
||||||
|
t, t, fieldname)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReadLines reads contents from a file and splits them by new lines.
|
// ReadLines reads contents from a file and splits them by new lines.
|
||||||
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
|
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
|
||||||
func ReadLines(filename string) ([]string, error) {
|
func ReadLines(filename string) ([]string, error) {
|
||||||
|
|||||||
@@ -58,21 +58,26 @@ func (a *Amon) Write(points []*client.Point) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ts := TimeSeries{}
|
ts := TimeSeries{}
|
||||||
var tempSeries = make([]*Metric, len(points))
|
tempSeries := []*Metric{}
|
||||||
var acceptablePoints = 0
|
metricCounter := 0
|
||||||
|
|
||||||
for _, pt := range points {
|
for _, pt := range points {
|
||||||
metric := &Metric{
|
mname := strings.Replace(pt.Name(), "_", ".", -1)
|
||||||
Metric: strings.Replace(pt.Name(), "_", ".", -1),
|
if amonPts, err := buildPoints(pt); err == nil {
|
||||||
}
|
for fieldName, amonPt := range amonPts {
|
||||||
if p, err := buildPoint(pt); err == nil {
|
metric := &Metric{
|
||||||
metric.Points[0] = p
|
Metric: mname + "_" + strings.Replace(fieldName, "_", ".", -1),
|
||||||
tempSeries[acceptablePoints] = metric
|
}
|
||||||
acceptablePoints += 1
|
metric.Points[0] = amonPt
|
||||||
|
tempSeries = append(tempSeries, metric)
|
||||||
|
metricCounter++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts.Series = make([]*Metric, acceptablePoints)
|
|
||||||
|
ts.Series = make([]*Metric, metricCounter)
|
||||||
copy(ts.Series, tempSeries[0:])
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -110,13 +115,17 @@ func (a *Amon) authenticatedUrl() string {
|
|||||||
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
|
return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildPoint(pt *client.Point) (Point, error) {
|
func buildPoints(pt *client.Point) (map[string]Point, error) {
|
||||||
var p Point
|
pts := make(map[string]Point)
|
||||||
if err := p.setValue(pt.Fields()["value"]); err != nil {
|
for k, v := range pt.Fields() {
|
||||||
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
var p Point
|
||||||
|
if err := p.setValue(v); err != nil {
|
||||||
|
return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||||
|
}
|
||||||
|
p[0] = float64(pt.Time().Unix())
|
||||||
|
pts[k] = p
|
||||||
}
|
}
|
||||||
p[0] = float64(pt.Time().Unix())
|
return pts, nil
|
||||||
return p, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Point) setValue(v interface{}) error {
|
func (p *Point) setValue(v interface{}) error {
|
||||||
|
|||||||
@@ -67,23 +67,26 @@ func (d *Datadog) Write(points []*client.Point) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ts := TimeSeries{}
|
ts := TimeSeries{}
|
||||||
var tempSeries = make([]*Metric, len(points))
|
tempSeries := []*Metric{}
|
||||||
var acceptablePoints = 0
|
metricCounter := 0
|
||||||
|
|
||||||
for _, pt := range points {
|
for _, pt := range points {
|
||||||
metric := &Metric{
|
mname := strings.Replace(pt.Name(), "_", ".", -1)
|
||||||
Metric: strings.Replace(pt.Name(), "_", ".", -1),
|
if amonPts, err := buildPoints(pt); err == nil {
|
||||||
Tags: buildTags(pt.Tags()),
|
for fieldName, amonPt := range amonPts {
|
||||||
Host: pt.Tags()["host"],
|
metric := &Metric{
|
||||||
}
|
Metric: mname + strings.Replace(fieldName, "_", ".", -1),
|
||||||
if p, err := buildPoint(pt); err == nil {
|
}
|
||||||
metric.Points[0] = p
|
metric.Points[0] = amonPt
|
||||||
tempSeries[acceptablePoints] = metric
|
tempSeries = append(tempSeries, metric)
|
||||||
acceptablePoints += 1
|
metricCounter++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts.Series = make([]*Metric, acceptablePoints)
|
|
||||||
|
ts.Series = make([]*Metric, metricCounter)
|
||||||
copy(ts.Series, tempSeries[0:])
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -123,13 +126,17 @@ func (d *Datadog) authenticatedUrl() string {
|
|||||||
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
|
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildPoint(pt *client.Point) (Point, error) {
|
func buildPoints(pt *client.Point) (map[string]Point, error) {
|
||||||
var p Point
|
pts := make(map[string]Point)
|
||||||
if err := p.setValue(pt.Fields()["value"]); err != nil {
|
for k, v := range pt.Fields() {
|
||||||
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
var p Point
|
||||||
|
if err := p.setValue(v); err != nil {
|
||||||
|
return pts, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||||
|
}
|
||||||
|
p[0] = float64(pt.Time().Unix())
|
||||||
|
pts[k] = p
|
||||||
}
|
}
|
||||||
p[0] = float64(pt.Time().Unix())
|
return pts, nil
|
||||||
return p, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildTags(ptTags map[string]string) []string {
|
func buildTags(ptTags map[string]string) []string {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/client/v2"
|
"github.com/influxdb/influxdb/client/v2"
|
||||||
"github.com/influxdb/telegraf/internal"
|
"github.com/influxdb/telegraf/internal"
|
||||||
@@ -110,6 +111,7 @@ func (i *InfluxDB) Connect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
i.conns = conns
|
i.conns = conns
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -74,17 +74,21 @@ func (l *Librato) Write(points []*client.Point) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
metrics := Metrics{}
|
metrics := Metrics{}
|
||||||
var tempGauges = make([]*Gauge, len(points))
|
tempGauges := []*Gauge{}
|
||||||
var acceptablePoints = 0
|
metricCounter := 0
|
||||||
|
|
||||||
for _, pt := range points {
|
for _, pt := range points {
|
||||||
if gauge, err := l.buildGauge(pt); err == nil {
|
if gauges, err := l.buildGauges(pt); err == nil {
|
||||||
tempGauges[acceptablePoints] = gauge
|
for _, gauge := range gauges {
|
||||||
acceptablePoints += 1
|
tempGauges = append(tempGauges, gauge)
|
||||||
|
metricCounter++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
|
log.Printf("unable to build Gauge for %s, skipping\n", pt.Name())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
metrics.Gauges = make([]*Gauge, acceptablePoints)
|
|
||||||
|
metrics.Gauges = make([]*Gauge, metricCounter)
|
||||||
copy(metrics.Gauges, tempGauges[0:])
|
copy(metrics.Gauges, tempGauges[0:])
|
||||||
metricsBytes, err := json.Marshal(metrics)
|
metricsBytes, err := json.Marshal(metrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,22 +122,28 @@ func (l *Librato) Description() string {
|
|||||||
return "Configuration for Librato API to send metrics to."
|
return "Configuration for Librato API to send metrics to."
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) {
|
func (l *Librato) buildGauges(pt *client.Point) ([]*Gauge, error) {
|
||||||
gauge := &Gauge{
|
gauges := []*Gauge{}
|
||||||
Name: pt.Name(),
|
for fieldName, value := range pt.Fields() {
|
||||||
MeasureTime: pt.Time().Unix(),
|
gauge := &Gauge{
|
||||||
}
|
Name: pt.Name() + "_" + fieldName,
|
||||||
if err := gauge.setValue(pt.Fields()["value"]); err != nil {
|
MeasureTime: pt.Time().Unix(),
|
||||||
return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error())
|
}
|
||||||
}
|
if err := gauge.setValue(value); err != nil {
|
||||||
if l.SourceTag != "" {
|
return gauges, fmt.Errorf("unable to extract value from Fields, %s\n",
|
||||||
if source, ok := pt.Tags()[l.SourceTag]; ok {
|
err.Error())
|
||||||
gauge.Source = source
|
}
|
||||||
} else {
|
if l.SourceTag != "" {
|
||||||
return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag)
|
if source, ok := pt.Tags()[l.SourceTag]; ok {
|
||||||
|
gauge.Source = source
|
||||||
|
} else {
|
||||||
|
return gauges,
|
||||||
|
fmt.Errorf("undeterminable Source type from Field, %s\n",
|
||||||
|
l.SourceTag)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return gauge, nil
|
return gauges, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Gauge) setValue(v interface{}) error {
|
func (g *Gauge) setValue(v interface{}) error {
|
||||||
|
|||||||
@@ -62,7 +62,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
|||||||
if len(points) == 0 {
|
if len(points) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var timeNow = time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// Send Data with telnet / socket communication
|
// Send Data with telnet / socket communication
|
||||||
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
|
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
|
||||||
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
|
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
|
||||||
@@ -70,32 +71,21 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
||||||
}
|
}
|
||||||
|
defer connection.Close()
|
||||||
|
|
||||||
for _, pt := range points {
|
for _, pt := range points {
|
||||||
metric := &MetricLine{
|
for _, metric := range buildMetrics(pt, now, o.Prefix) {
|
||||||
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()),
|
messageLine := fmt.Sprintf("put %s %v %s %s\n",
|
||||||
Timestamp: timeNow.Unix(),
|
metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
||||||
}
|
if o.Debug {
|
||||||
|
fmt.Print(messageLine)
|
||||||
metricValue, buildError := buildValue(pt)
|
}
|
||||||
if buildError != nil {
|
_, err := connection.Write([]byte(messageLine))
|
||||||
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
if err != nil {
|
||||||
continue
|
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
||||||
}
|
}
|
||||||
metric.Value = metricValue
|
|
||||||
|
|
||||||
tagsSlice := buildTags(pt.Tags())
|
|
||||||
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
|
||||||
|
|
||||||
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
|
||||||
if o.Debug {
|
|
||||||
fmt.Print(messageLine)
|
|
||||||
}
|
|
||||||
_, err := connection.Write([]byte(messageLine))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
defer connection.Close()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -111,9 +101,29 @@ func buildTags(ptTags map[string]string) []string {
|
|||||||
return tags
|
return tags
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildValue(pt *client.Point) (string, error) {
|
func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine {
|
||||||
|
ret := []*MetricLine{}
|
||||||
|
for fieldName, value := range pt.Fields() {
|
||||||
|
metric := &MetricLine{
|
||||||
|
Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName),
|
||||||
|
Timestamp: now.Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
metricValue, buildError := buildValue(value)
|
||||||
|
if buildError != nil {
|
||||||
|
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metric.Value = metricValue
|
||||||
|
tagsSlice := buildTags(pt.Tags())
|
||||||
|
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
||||||
|
ret = append(ret, metric)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildValue(v interface{}) (string, error) {
|
||||||
var retv string
|
var retv string
|
||||||
var v = pt.Fields()["value"]
|
|
||||||
switch p := v.(type) {
|
switch p := v.(type) {
|
||||||
case int64:
|
case int64:
|
||||||
retv = IntToString(int64(p))
|
retv = IntToString(int64(p))
|
||||||
|
|||||||
@@ -55,8 +55,10 @@ func (r *Riemann) Write(points []*client.Point) error {
|
|||||||
|
|
||||||
var events []*raidman.Event
|
var events []*raidman.Event
|
||||||
for _, p := range points {
|
for _, p := range points {
|
||||||
ev := buildEvent(p)
|
evs := buildEvents(p)
|
||||||
events = append(events, ev)
|
for _, ev := range evs {
|
||||||
|
events = append(events, ev)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var senderr = r.client.SendMulti(events)
|
var senderr = r.client.SendMulti(events)
|
||||||
@@ -68,24 +70,28 @@ func (r *Riemann) Write(points []*client.Point) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildEvent(p *client.Point) *raidman.Event {
|
func buildEvents(p *client.Point) []*raidman.Event {
|
||||||
host, ok := p.Tags()["host"]
|
events := []*raidman.Event{}
|
||||||
if !ok {
|
for fieldName, value := range p.Fields() {
|
||||||
hostname, err := os.Hostname()
|
host, ok := p.Tags()["host"]
|
||||||
if err != nil {
|
if !ok {
|
||||||
host = "unknown"
|
hostname, err := os.Hostname()
|
||||||
} else {
|
if err != nil {
|
||||||
host = hostname
|
host = "unknown"
|
||||||
|
} else {
|
||||||
|
host = hostname
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event := &raidman.Event{
|
||||||
|
Host: host,
|
||||||
|
Service: p.Name() + "_" + fieldName,
|
||||||
|
Metric: value,
|
||||||
|
}
|
||||||
|
events = append(events, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
var event = &raidman.Event{
|
return events
|
||||||
Host: host,
|
|
||||||
Service: p.Name(),
|
|
||||||
Metric: p.Fields()["value"],
|
|
||||||
}
|
|
||||||
|
|
||||||
return event
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -247,26 +247,32 @@ func get(key []byte, host string) (map[string]string, error) {
|
|||||||
return data, err
|
return data, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func readAerospikeStats(stats map[string]string, acc plugins.Accumulator, host, namespace string) {
|
func readAerospikeStats(
|
||||||
|
stats map[string]string,
|
||||||
|
acc plugins.Accumulator,
|
||||||
|
host string,
|
||||||
|
namespace string,
|
||||||
|
) {
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := map[string]string{
|
||||||
|
"aerospike_host": host,
|
||||||
|
"namespace": "_service",
|
||||||
|
}
|
||||||
|
|
||||||
|
if namespace != "" {
|
||||||
|
tags["namespace"] = namespace
|
||||||
|
}
|
||||||
for key, value := range stats {
|
for key, value := range stats {
|
||||||
tags := map[string]string{
|
|
||||||
"aerospike_host": host,
|
|
||||||
"namespace": "_service",
|
|
||||||
}
|
|
||||||
|
|
||||||
if namespace != "" {
|
|
||||||
tags["namespace"] = namespace
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are going to ignore all string based keys
|
// We are going to ignore all string based keys
|
||||||
val, err := strconv.ParseInt(value, 10, 64)
|
val, err := strconv.ParseInt(value, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if strings.Contains(key, "-") {
|
if strings.Contains(key, "-") {
|
||||||
key = strings.Replace(key, "-", "_", -1)
|
key = strings.Replace(key, "-", "_", -1)
|
||||||
}
|
}
|
||||||
acc.Add(key, val, tags)
|
fields[key] = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("aerospike", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
|
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
|
||||||
|
|||||||
@@ -72,32 +72,33 @@ func (n *Apache) gatherUrl(addr *url.URL, acc plugins.Accumulator) error {
|
|||||||
tags := getTags(addr)
|
tags := getTags(addr)
|
||||||
|
|
||||||
sc := bufio.NewScanner(resp.Body)
|
sc := bufio.NewScanner(resp.Body)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for sc.Scan() {
|
for sc.Scan() {
|
||||||
line := sc.Text()
|
line := sc.Text()
|
||||||
if strings.Contains(line, ":") {
|
if strings.Contains(line, ":") {
|
||||||
|
|
||||||
parts := strings.SplitN(line, ":", 2)
|
parts := strings.SplitN(line, ":", 2)
|
||||||
key, part := strings.Replace(parts[0], " ", "", -1), strings.TrimSpace(parts[1])
|
key, part := strings.Replace(parts[0], " ", "", -1), strings.TrimSpace(parts[1])
|
||||||
|
|
||||||
switch key {
|
switch key {
|
||||||
|
|
||||||
case "Scoreboard":
|
case "Scoreboard":
|
||||||
n.gatherScores(part, acc, tags)
|
for field, value := range n.gatherScores(part) {
|
||||||
|
fields[field] = value
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
value, err := strconv.ParseFloat(part, 64)
|
value, err := strconv.ParseFloat(part, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("apache", fields, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Apache) gatherScores(data string, acc plugins.Accumulator, tags map[string]string) {
|
func (n *Apache) gatherScores(data string) map[string]interface{} {
|
||||||
|
|
||||||
var waiting, open int = 0, 0
|
var waiting, open int = 0, 0
|
||||||
var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0
|
var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0
|
||||||
|
|
||||||
@@ -129,17 +130,20 @@ func (n *Apache) gatherScores(data string, acc plugins.Accumulator, tags map[str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("scboard_waiting", float64(waiting), tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("scboard_starting", float64(S), tags)
|
"scboard_waiting": float64(waiting),
|
||||||
acc.Add("scboard_reading", float64(R), tags)
|
"scboard_starting": float64(S),
|
||||||
acc.Add("scboard_sending", float64(W), tags)
|
"scboard_reading": float64(R),
|
||||||
acc.Add("scboard_keepalive", float64(K), tags)
|
"scboard_sending": float64(W),
|
||||||
acc.Add("scboard_dnslookup", float64(D), tags)
|
"scboard_keepalive": float64(K),
|
||||||
acc.Add("scboard_closing", float64(C), tags)
|
"scboard_dnslookup": float64(D),
|
||||||
acc.Add("scboard_logging", float64(L), tags)
|
"scboard_closing": float64(C),
|
||||||
acc.Add("scboard_finishing", float64(G), tags)
|
"scboard_logging": float64(L),
|
||||||
acc.Add("scboard_idle_cleanup", float64(I), tags)
|
"scboard_finishing": float64(G),
|
||||||
acc.Add("scboard_open", float64(open), tags)
|
"scboard_idle_cleanup": float64(I),
|
||||||
|
"scboard_open": float64(open),
|
||||||
|
}
|
||||||
|
return fields
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get tag(s) for the apache plugin
|
// Get tag(s) for the apache plugin
|
||||||
|
|||||||
@@ -81,7 +81,9 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
rawValue := strings.TrimSpace(string(file))
|
rawValue := strings.TrimSpace(string(file))
|
||||||
value := prettyToBytes(rawValue)
|
value := prettyToBytes(rawValue)
|
||||||
acc.Add("dirty_data", value, tags)
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
fields["dirty_data"] = value
|
||||||
|
|
||||||
for _, path := range metrics {
|
for _, path := range metrics {
|
||||||
key := filepath.Base(path)
|
key := filepath.Base(path)
|
||||||
@@ -92,12 +94,13 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
if key == "bypassed" {
|
if key == "bypassed" {
|
||||||
value := prettyToBytes(rawValue)
|
value := prettyToBytes(rawValue)
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
} else {
|
} else {
|
||||||
value, _ := strconv.ParseUint(rawValue, 10, 64)
|
value, _ := strconv.ParseUint(rawValue, 10, 64)
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("bcache", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +120,7 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
|
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
|
||||||
if len(bdevs) < 1 {
|
if len(bdevs) < 1 {
|
||||||
return errors.New("Can't found any bcache device")
|
return errors.New("Can't find any bcache device")
|
||||||
}
|
}
|
||||||
for _, bdev := range bdevs {
|
for _, bdev := range bdevs {
|
||||||
if restrictDevs {
|
if restrictDevs {
|
||||||
|
|||||||
@@ -155,6 +155,8 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
|||||||
|
|
||||||
var read int
|
var read int
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := map[string]string{"host": addr.String()}
|
||||||
for read < sz {
|
for read < sz {
|
||||||
line, err := r.ReadString('\n')
|
line, err := r.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -176,12 +178,11 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"host": addr.String()}
|
|
||||||
val := strings.TrimSpace(parts[1])
|
val := strings.TrimSpace(parts[1])
|
||||||
|
|
||||||
ival, err := strconv.ParseUint(val, 10, 64)
|
ival, err := strconv.ParseUint(val, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add(metric, ival, tags)
|
fields[metric] = ival
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,9 +191,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add(metric, fval, tags)
|
fields[metric] = fval
|
||||||
}
|
}
|
||||||
|
acc.AddFields("disque", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -31,8 +31,9 @@ contains `status`, `timed_out`, `number_of_nodes`, `number_of_data_nodes`,
|
|||||||
`initializing_shards`, `unassigned_shards` fields
|
`initializing_shards`, `unassigned_shards` fields
|
||||||
- elasticsearch_cluster_health
|
- elasticsearch_cluster_health
|
||||||
|
|
||||||
contains `status`, `number_of_shards`, `number_of_replicas`, `active_primary_shards`,
|
contains `status`, `number_of_shards`, `number_of_replicas`,
|
||||||
`active_shards`, `relocating_shards`, `initializing_shards`, `unassigned_shards` fields
|
`active_primary_shards`, `active_shards`, `relocating_shards`,
|
||||||
|
`initializing_shards`, `unassigned_shards` fields
|
||||||
- elasticsearch_indices
|
- elasticsearch_indices
|
||||||
|
|
||||||
#### node measurements:
|
#### node measurements:
|
||||||
@@ -316,4 +317,4 @@ Transport statistics about sent and received bytes in cluster communication meas
|
|||||||
- elasticsearch_transport_rx_count value=6
|
- elasticsearch_transport_rx_count value=6
|
||||||
- elasticsearch_transport_rx_size_in_bytes value=1380
|
- elasticsearch_transport_rx_size_in_bytes value=1380
|
||||||
- elasticsearch_transport_tx_count value=6
|
- elasticsearch_transport_tx_count value=6
|
||||||
- elasticsearch_transport_tx_size_in_bytes value=1380
|
- elasticsearch_transport_tx_size_in_bytes value=1380
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/internal"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -141,10 +142,14 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) err
|
|||||||
"breakers": n.Breakers,
|
"breakers": n.Breakers,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
for p, s := range stats {
|
for p, s := range stats {
|
||||||
if err := e.parseInterface(acc, p, tags, s); err != nil {
|
f := internal.JSONFlattener{}
|
||||||
|
err := f.FlattenJSON("", s)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -168,7 +173,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
|
|||||||
"unassigned_shards": clusterStats.UnassignedShards,
|
"unassigned_shards": clusterStats.UnassignedShards,
|
||||||
}
|
}
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"cluster_health",
|
"elasticsearch_cluster_health",
|
||||||
clusterFields,
|
clusterFields,
|
||||||
map[string]string{"name": clusterStats.ClusterName},
|
map[string]string{"name": clusterStats.ClusterName},
|
||||||
measurementTime,
|
measurementTime,
|
||||||
@@ -186,7 +191,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
|
|||||||
"unassigned_shards": health.UnassignedShards,
|
"unassigned_shards": health.UnassignedShards,
|
||||||
}
|
}
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"indices",
|
"elasticsearch_indices",
|
||||||
indexFields,
|
indexFields,
|
||||||
map[string]string{"index": name},
|
map[string]string{"index": name},
|
||||||
measurementTime,
|
measurementTime,
|
||||||
@@ -205,7 +210,8 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
|||||||
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
||||||
// to let the underlying transport close the connection and re-establish a new one for
|
// to let the underlying transport close the connection and re-establish a new one for
|
||||||
// future calls.
|
// future calls.
|
||||||
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
||||||
|
r.StatusCode, http.StatusOK)
|
||||||
}
|
}
|
||||||
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -213,25 +219,6 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case map[string]interface{}:
|
|
||||||
for k, v := range t {
|
|
||||||
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case float64:
|
|
||||||
acc.Add(prefix, t, tags)
|
|
||||||
case bool, string, []interface{}:
|
|
||||||
// ignored types
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("elasticsearch", func() plugins.Plugin {
|
plugins.Add("elasticsearch", func() plugins.Plugin {
|
||||||
return NewElasticsearch()
|
return NewElasticsearch()
|
||||||
|
|||||||
@@ -3,59 +3,38 @@ package exec
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gonuts/go-shellquote"
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
|
||||||
"math"
|
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
|
||||||
"sync"
|
"github.com/gonuts/go-shellquote"
|
||||||
"time"
|
|
||||||
|
"github.com/influxdb/telegraf/internal"
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
# specify commands via an array of tables
|
|
||||||
[[plugins.exec.commands]]
|
|
||||||
# the command to run
|
# the command to run
|
||||||
command = "/usr/bin/mycollector --foo=bar"
|
command = "/usr/bin/mycollector --foo=bar"
|
||||||
|
|
||||||
# name of the command (used as a prefix for measurements)
|
# name of the command (used as a prefix for measurements)
|
||||||
name = "mycollector"
|
name = "mycollector"
|
||||||
|
|
||||||
# Only run this command if it has been at least this many
|
|
||||||
# seconds since it last ran
|
|
||||||
interval = 10
|
|
||||||
`
|
`
|
||||||
|
|
||||||
type Exec struct {
|
type Exec struct {
|
||||||
Commands []*Command
|
Command string
|
||||||
runner Runner
|
Name string
|
||||||
clock Clock
|
|
||||||
}
|
|
||||||
|
|
||||||
type Command struct {
|
runner Runner
|
||||||
Command string
|
|
||||||
Name string
|
|
||||||
Interval int
|
|
||||||
lastRunAt time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Runner interface {
|
type Runner interface {
|
||||||
Run(*Command) ([]byte, error)
|
Run(*Exec) ([]byte, error)
|
||||||
}
|
|
||||||
|
|
||||||
type Clock interface {
|
|
||||||
Now() time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type CommandRunner struct{}
|
type CommandRunner struct{}
|
||||||
|
|
||||||
type RealClock struct{}
|
func (c CommandRunner) Run(e *Exec) ([]byte, error) {
|
||||||
|
split_cmd, err := shellquote.Split(e.Command)
|
||||||
func (c CommandRunner) Run(command *Command) ([]byte, error) {
|
|
||||||
command.lastRunAt = time.Now()
|
|
||||||
split_cmd, err := shellquote.Split(command.Command)
|
|
||||||
if err != nil || len(split_cmd) == 0 {
|
if err != nil || len(split_cmd) == 0 {
|
||||||
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
|
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
|
||||||
}
|
}
|
||||||
@@ -65,18 +44,14 @@ func (c CommandRunner) Run(command *Command) ([]byte, error) {
|
|||||||
cmd.Stdout = &out
|
cmd.Stdout = &out
|
||||||
|
|
||||||
if err := cmd.Run(); err != nil {
|
if err := cmd.Run(); err != nil {
|
||||||
return nil, fmt.Errorf("exec: %s for command '%s'", err, command.Command)
|
return nil, fmt.Errorf("exec: %s for command '%s'", err, e.Command)
|
||||||
}
|
}
|
||||||
|
|
||||||
return out.Bytes(), nil
|
return out.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c RealClock) Now() time.Time {
|
|
||||||
return time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExec() *Exec {
|
func NewExec() *Exec {
|
||||||
return &Exec{runner: CommandRunner{}, clock: RealClock{}}
|
return &Exec{runner: CommandRunner{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Exec) SampleConfig() string {
|
func (e *Exec) SampleConfig() string {
|
||||||
@@ -88,73 +63,34 @@ func (e *Exec) Description() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Exec) Gather(acc plugins.Accumulator) error {
|
func (e *Exec) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
out, err := e.runner.Run(e)
|
||||||
|
if err != nil {
|
||||||
errorChannel := make(chan error, len(e.Commands))
|
return err
|
||||||
|
|
||||||
for _, c := range e.Commands {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(c *Command, acc plugins.Accumulator) {
|
|
||||||
defer wg.Done()
|
|
||||||
err := e.gatherCommand(c, acc)
|
|
||||||
if err != nil {
|
|
||||||
errorChannel <- err
|
|
||||||
}
|
|
||||||
}(c, acc)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
var jsonOut interface{}
|
||||||
close(errorChannel)
|
err = json.Unmarshal(out, &jsonOut)
|
||||||
|
if err != nil {
|
||||||
// Get all errors and return them as one giant error
|
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
|
||||||
errorStrings := []string{}
|
e.Command, err)
|
||||||
for err := range errorChannel {
|
|
||||||
errorStrings = append(errorStrings, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errorStrings) == 0 {
|
f := internal.JSONFlattener{}
|
||||||
return nil
|
err = f.FlattenJSON("", jsonOut)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return errors.New(strings.Join(errorStrings, "\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
|
var msrmnt_name string
|
||||||
secondsSinceLastRun := 0.0
|
if e.Name == "" {
|
||||||
|
msrmnt_name = "exec"
|
||||||
if c.lastRunAt.Unix() == 0 { // means time is uninitialized
|
|
||||||
secondsSinceLastRun = math.Inf(1)
|
|
||||||
} else {
|
} else {
|
||||||
secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds()
|
msrmnt_name = "exec_" + e.Name
|
||||||
}
|
|
||||||
|
|
||||||
if secondsSinceLastRun >= float64(c.Interval) {
|
|
||||||
out, err := e.runner.Run(c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var jsonOut interface{}
|
|
||||||
err = json.Unmarshal(out, &jsonOut)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
processResponse(acc, c.Name, map[string]string{}, jsonOut)
|
|
||||||
}
|
}
|
||||||
|
acc.AddFields(msrmnt_name, f.Fields, nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case map[string]interface{}:
|
|
||||||
for k, v := range t {
|
|
||||||
processResponse(acc, prefix+"_"+k, tags, v)
|
|
||||||
}
|
|
||||||
case float64:
|
|
||||||
acc.Add(prefix, v, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("exec", func() plugins.Plugin {
|
plugins.Add("exec", func() plugins.Plugin {
|
||||||
return NewExec()
|
return NewExec()
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
|
//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
|
||||||
@@ -152,210 +153,208 @@ func (g *haproxy) gatherServer(addr string, acc plugins.Accumulator) error {
|
|||||||
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err)
|
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
importCsvResult(res.Body, acc, u.Host)
|
return importCsvResult(res.Body, acc, u.Host)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]string, error) {
|
func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) error {
|
||||||
csv := csv.NewReader(r)
|
csv := csv.NewReader(r)
|
||||||
result, err := csv.ReadAll()
|
result, err := csv.ReadAll()
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
for _, row := range result {
|
for _, row := range result {
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := map[string]string{
|
||||||
|
"server": host,
|
||||||
|
"proxy": row[HF_PXNAME],
|
||||||
|
"sv": row[HF_SVNAME],
|
||||||
|
}
|
||||||
for field, v := range row {
|
for field, v := range row {
|
||||||
tags := map[string]string{
|
|
||||||
"server": host,
|
|
||||||
"proxy": row[HF_PXNAME],
|
|
||||||
"sv": row[HF_SVNAME],
|
|
||||||
}
|
|
||||||
switch field {
|
switch field {
|
||||||
case HF_QCUR:
|
case HF_QCUR:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("qcur", ival, tags)
|
fields["qcur"] = ival
|
||||||
}
|
}
|
||||||
case HF_QMAX:
|
case HF_QMAX:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("qmax", ival, tags)
|
fields["qmax"] = ival
|
||||||
}
|
}
|
||||||
case HF_SCUR:
|
case HF_SCUR:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("scur", ival, tags)
|
fields["scur"] = ival
|
||||||
}
|
}
|
||||||
case HF_SMAX:
|
case HF_SMAX:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("smax", ival, tags)
|
fields["smax"] = ival
|
||||||
}
|
}
|
||||||
case HF_STOT:
|
case HF_STOT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("stot", ival, tags)
|
fields["stot"] = ival
|
||||||
}
|
}
|
||||||
case HF_BIN:
|
case HF_BIN:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("bin", ival, tags)
|
fields["bin"] = ival
|
||||||
}
|
}
|
||||||
case HF_BOUT:
|
case HF_BOUT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("bout", ival, tags)
|
fields["bout"] = ival
|
||||||
}
|
}
|
||||||
case HF_DREQ:
|
case HF_DREQ:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("dreq", ival, tags)
|
fields["dreq"] = ival
|
||||||
}
|
}
|
||||||
case HF_DRESP:
|
case HF_DRESP:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("dresp", ival, tags)
|
fields["dresp"] = ival
|
||||||
}
|
}
|
||||||
case HF_EREQ:
|
case HF_EREQ:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("ereq", ival, tags)
|
fields["ereq"] = ival
|
||||||
}
|
}
|
||||||
case HF_ECON:
|
case HF_ECON:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("econ", ival, tags)
|
fields["econ"] = ival
|
||||||
}
|
}
|
||||||
case HF_ERESP:
|
case HF_ERESP:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("eresp", ival, tags)
|
fields["eresp"] = ival
|
||||||
}
|
}
|
||||||
case HF_WRETR:
|
case HF_WRETR:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("wretr", ival, tags)
|
fields["wretr"] = ival
|
||||||
}
|
}
|
||||||
case HF_WREDIS:
|
case HF_WREDIS:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("wredis", ival, tags)
|
fields["wredis"] = ival
|
||||||
}
|
}
|
||||||
case HF_ACT:
|
case HF_ACT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("active_servers", ival, tags)
|
fields["active_servers"] = ival
|
||||||
}
|
}
|
||||||
case HF_BCK:
|
case HF_BCK:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("backup_servers", ival, tags)
|
fields["backup_servers"] = ival
|
||||||
}
|
}
|
||||||
case HF_DOWNTIME:
|
case HF_DOWNTIME:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("downtime", ival, tags)
|
fields["downtime"] = ival
|
||||||
}
|
}
|
||||||
case HF_THROTTLE:
|
case HF_THROTTLE:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("throttle", ival, tags)
|
fields["throttle"] = ival
|
||||||
}
|
}
|
||||||
case HF_LBTOT:
|
case HF_LBTOT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("lbtot", ival, tags)
|
fields["lbtot"] = ival
|
||||||
}
|
}
|
||||||
case HF_RATE:
|
case HF_RATE:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("rate", ival, tags)
|
fields["rate"] = ival
|
||||||
}
|
}
|
||||||
case HF_RATE_MAX:
|
case HF_RATE_MAX:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("rate_max", ival, tags)
|
fields["rate_max"] = ival
|
||||||
}
|
}
|
||||||
case HF_CHECK_DURATION:
|
case HF_CHECK_DURATION:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("check_duration", ival, tags)
|
fields["check_duration"] = ival
|
||||||
}
|
}
|
||||||
case HF_HRSP_1xx:
|
case HF_HRSP_1xx:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("http_response.1xx", ival, tags)
|
fields["http_response.1xx"] = ival
|
||||||
}
|
}
|
||||||
case HF_HRSP_2xx:
|
case HF_HRSP_2xx:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("http_response.2xx", ival, tags)
|
fields["http_response.2xx"] = ival
|
||||||
}
|
}
|
||||||
case HF_HRSP_3xx:
|
case HF_HRSP_3xx:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("http_response.3xx", ival, tags)
|
fields["http_response.3xx"] = ival
|
||||||
}
|
}
|
||||||
case HF_HRSP_4xx:
|
case HF_HRSP_4xx:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("http_response.4xx", ival, tags)
|
fields["http_response.4xx"] = ival
|
||||||
}
|
}
|
||||||
case HF_HRSP_5xx:
|
case HF_HRSP_5xx:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("http_response.5xx", ival, tags)
|
fields["http_response.5xx"] = ival
|
||||||
}
|
}
|
||||||
case HF_REQ_RATE:
|
case HF_REQ_RATE:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("req_rate", ival, tags)
|
fields["req_rate"] = ival
|
||||||
}
|
}
|
||||||
case HF_REQ_RATE_MAX:
|
case HF_REQ_RATE_MAX:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("req_rate_max", ival, tags)
|
fields["req_rate_max"] = ival
|
||||||
}
|
}
|
||||||
case HF_REQ_TOT:
|
case HF_REQ_TOT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("req_tot", ival, tags)
|
fields["req_tot"] = ival
|
||||||
}
|
}
|
||||||
case HF_CLI_ABRT:
|
case HF_CLI_ABRT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("cli_abort", ival, tags)
|
fields["cli_abort"] = ival
|
||||||
}
|
}
|
||||||
case HF_SRV_ABRT:
|
case HF_SRV_ABRT:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("srv_abort", ival, tags)
|
fields["srv_abort"] = ival
|
||||||
}
|
}
|
||||||
case HF_QTIME:
|
case HF_QTIME:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("qtime", ival, tags)
|
fields["qtime"] = ival
|
||||||
}
|
}
|
||||||
case HF_CTIME:
|
case HF_CTIME:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("ctime", ival, tags)
|
fields["ctime"] = ival
|
||||||
}
|
}
|
||||||
case HF_RTIME:
|
case HF_RTIME:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("rtime", ival, tags)
|
fields["rtime"] = ival
|
||||||
}
|
}
|
||||||
case HF_TTIME:
|
case HF_TTIME:
|
||||||
ival, err := strconv.ParseUint(v, 10, 64)
|
ival, err := strconv.ParseUint(v, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add("ttime", ival, tags)
|
fields["ttime"] = ival
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
acc.AddFields("haproxy", fields, tags, now)
|
||||||
}
|
}
|
||||||
return result, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -10,20 +10,17 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/internal"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
type HttpJson struct {
|
type HttpJson struct {
|
||||||
Services []Service
|
|
||||||
client HTTPClient
|
|
||||||
}
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
Name string
|
Name string
|
||||||
Servers []string
|
Servers []string
|
||||||
Method string
|
Method string
|
||||||
TagKeys []string
|
TagKeys []string
|
||||||
Parameters map[string]string
|
Parameters map[string]string
|
||||||
|
client HTTPClient
|
||||||
}
|
}
|
||||||
|
|
||||||
type HTTPClient interface {
|
type HTTPClient interface {
|
||||||
@@ -47,31 +44,28 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
# Specify services via an array of tables
|
# a name for the service being polled
|
||||||
[[plugins.httpjson.services]]
|
name = "webserver_stats"
|
||||||
|
|
||||||
# a name for the service being polled
|
# URL of each server in the service's cluster
|
||||||
name = "webserver_stats"
|
servers = [
|
||||||
|
"http://localhost:9999/stats/",
|
||||||
|
"http://localhost:9998/stats/",
|
||||||
|
]
|
||||||
|
|
||||||
# URL of each server in the service's cluster
|
# HTTP method to use (case-sensitive)
|
||||||
servers = [
|
method = "GET"
|
||||||
"http://localhost:9999/stats/",
|
|
||||||
"http://localhost:9998/stats/",
|
|
||||||
]
|
|
||||||
|
|
||||||
# HTTP method to use (case-sensitive)
|
# List of tag names to extract from top-level of JSON server response
|
||||||
method = "GET"
|
# tag_keys = [
|
||||||
|
# "my_tag_1",
|
||||||
|
# "my_tag_2"
|
||||||
|
# ]
|
||||||
|
|
||||||
# List of tag names to extract from top-level of JSON server response
|
# HTTP parameters (all values must be strings)
|
||||||
# tag_keys = [
|
[plugins.httpjson.parameters]
|
||||||
# "my_tag_1",
|
event_type = "cpu_spike"
|
||||||
# "my_tag_2"
|
threshold = "0.75"
|
||||||
# ]
|
|
||||||
|
|
||||||
# HTTP parameters (all values must be strings)
|
|
||||||
[plugins.httpjson.services.parameters]
|
|
||||||
event_type = "cpu_spike"
|
|
||||||
threshold = "0.75"
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func (h *HttpJson) SampleConfig() string {
|
func (h *HttpJson) SampleConfig() string {
|
||||||
@@ -86,22 +80,16 @@ func (h *HttpJson) Description() string {
|
|||||||
func (h *HttpJson) Gather(acc plugins.Accumulator) error {
|
func (h *HttpJson) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
totalServers := 0
|
errorChannel := make(chan error, len(h.Servers))
|
||||||
for _, service := range h.Services {
|
|
||||||
totalServers += len(service.Servers)
|
|
||||||
}
|
|
||||||
errorChannel := make(chan error, totalServers)
|
|
||||||
|
|
||||||
for _, service := range h.Services {
|
for _, server := range h.Servers {
|
||||||
for _, server := range service.Servers {
|
wg.Add(1)
|
||||||
wg.Add(1)
|
go func(server string) {
|
||||||
go func(service Service, server string) {
|
defer wg.Done()
|
||||||
defer wg.Done()
|
if err := h.gatherServer(acc, server); err != nil {
|
||||||
if err := h.gatherServer(acc, service, server); err != nil {
|
errorChannel <- err
|
||||||
errorChannel <- err
|
}
|
||||||
}
|
}(server)
|
||||||
}(service, server)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@@ -129,10 +117,9 @@ func (h *HttpJson) Gather(acc plugins.Accumulator) error {
|
|||||||
// error: Any error that may have occurred
|
// error: Any error that may have occurred
|
||||||
func (h *HttpJson) gatherServer(
|
func (h *HttpJson) gatherServer(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
service Service,
|
|
||||||
serverURL string,
|
serverURL string,
|
||||||
) error {
|
) error {
|
||||||
resp, err := h.sendRequest(service, serverURL)
|
resp, err := h.sendRequest(serverURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -146,7 +133,7 @@ func (h *HttpJson) gatherServer(
|
|||||||
"server": serverURL,
|
"server": serverURL,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tag := range service.TagKeys {
|
for _, tag := range h.TagKeys {
|
||||||
switch v := jsonOut[tag].(type) {
|
switch v := jsonOut[tag].(type) {
|
||||||
case string:
|
case string:
|
||||||
tags[tag] = v
|
tags[tag] = v
|
||||||
@@ -154,7 +141,19 @@ func (h *HttpJson) gatherServer(
|
|||||||
delete(jsonOut, tag)
|
delete(jsonOut, tag)
|
||||||
}
|
}
|
||||||
|
|
||||||
processResponse(acc, service.Name, tags, jsonOut)
|
f := internal.JSONFlattener{}
|
||||||
|
err = f.FlattenJSON("", jsonOut)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var msrmnt_name string
|
||||||
|
if h.Name == "" {
|
||||||
|
msrmnt_name = "httpjson"
|
||||||
|
} else {
|
||||||
|
msrmnt_name = "httpjson_" + h.Name
|
||||||
|
}
|
||||||
|
acc.AddFields(msrmnt_name, f.Fields, nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,7 +164,7 @@ func (h *HttpJson) gatherServer(
|
|||||||
// Returns:
|
// Returns:
|
||||||
// string: body of the response
|
// string: body of the response
|
||||||
// error : Any error that may have occurred
|
// error : Any error that may have occurred
|
||||||
func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error) {
|
func (h *HttpJson) sendRequest(serverURL string) (string, error) {
|
||||||
// Prepare URL
|
// Prepare URL
|
||||||
requestURL, err := url.Parse(serverURL)
|
requestURL, err := url.Parse(serverURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -173,13 +172,13 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
params := url.Values{}
|
params := url.Values{}
|
||||||
for k, v := range service.Parameters {
|
for k, v := range h.Parameters {
|
||||||
params.Add(k, v)
|
params.Add(k, v)
|
||||||
}
|
}
|
||||||
requestURL.RawQuery = params.Encode()
|
requestURL.RawQuery = params.Encode()
|
||||||
|
|
||||||
// Create + send request
|
// Create + send request
|
||||||
req, err := http.NewRequest(service.Method, requestURL.String(), nil)
|
req, err := http.NewRequest(h.Method, requestURL.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -188,6 +187,7 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
@@ -209,23 +209,6 @@ func (h *HttpJson) sendRequest(service Service, serverURL string) (string, error
|
|||||||
return string(body), err
|
return string(body), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flattens the map generated from the JSON object and stores its float values using a
|
|
||||||
// plugins.Accumulator. It ignores any non-float values.
|
|
||||||
// Parameters:
|
|
||||||
// acc: the Accumulator to use
|
|
||||||
// prefix: What the name of the measurement name should be prefixed by.
|
|
||||||
// tags: telegraf tags to
|
|
||||||
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case map[string]interface{}:
|
|
||||||
for k, v := range t {
|
|
||||||
processResponse(acc, prefix+"_"+k, tags, v)
|
|
||||||
}
|
|
||||||
case float64:
|
|
||||||
acc.Add(prefix, v, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("httpjson", func() plugins.Plugin {
|
plugins.Add("httpjson", func() plugins.Plugin {
|
||||||
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
|
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
@@ -23,8 +22,6 @@ type Server struct {
|
|||||||
type Metric struct {
|
type Metric struct {
|
||||||
Name string
|
Name string
|
||||||
Jmx string
|
Jmx string
|
||||||
Pass []string
|
|
||||||
Drop []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type JolokiaClient interface {
|
type JolokiaClient interface {
|
||||||
@@ -44,7 +41,6 @@ type Jolokia struct {
|
|||||||
Context string
|
Context string
|
||||||
Servers []Server
|
Servers []Server
|
||||||
Metrics []Metric
|
Metrics []Metric
|
||||||
Tags map[string]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *Jolokia) SampleConfig() string {
|
func (j *Jolokia) SampleConfig() string {
|
||||||
@@ -52,10 +48,6 @@ func (j *Jolokia) SampleConfig() string {
|
|||||||
# This is the context root used to compose the jolokia url
|
# This is the context root used to compose the jolokia url
|
||||||
context = "/jolokia/read"
|
context = "/jolokia/read"
|
||||||
|
|
||||||
# Tags added to each measurements
|
|
||||||
[jolokia.tags]
|
|
||||||
group = "as"
|
|
||||||
|
|
||||||
# List of servers exposing jolokia read service
|
# List of servers exposing jolokia read service
|
||||||
[[plugins.jolokia.servers]]
|
[[plugins.jolokia.servers]]
|
||||||
name = "stable"
|
name = "stable"
|
||||||
@@ -70,23 +62,6 @@ func (j *Jolokia) SampleConfig() string {
|
|||||||
[[plugins.jolokia.metrics]]
|
[[plugins.jolokia.metrics]]
|
||||||
name = "heap_memory_usage"
|
name = "heap_memory_usage"
|
||||||
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
|
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
|
||||||
|
|
||||||
|
|
||||||
# This drops the 'committed' value from Eden space measurement
|
|
||||||
[[plugins.jolokia.metrics]]
|
|
||||||
name = "memory_eden"
|
|
||||||
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
|
|
||||||
drop = [ "committed" ]
|
|
||||||
|
|
||||||
|
|
||||||
# This passes only DaemonThreadCount and ThreadCount
|
|
||||||
[[plugins.jolokia.metrics]]
|
|
||||||
name = "heap_threads"
|
|
||||||
jmx = "/java.lang:type=Threading"
|
|
||||||
pass = [
|
|
||||||
"DaemonThreadCount",
|
|
||||||
"ThreadCount"
|
|
||||||
]
|
|
||||||
`
|
`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,12 +75,9 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer req.Body.Close()
|
||||||
|
|
||||||
resp, err := j.jClient.MakeRequest(req)
|
resp, err := j.jClient.MakeRequest(req)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -137,65 +109,22 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
|||||||
return jsonOut, nil
|
return jsonOut, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Metric) shouldPass(field string) bool {
|
|
||||||
|
|
||||||
if m.Pass != nil {
|
|
||||||
|
|
||||||
for _, pass := range m.Pass {
|
|
||||||
if strings.HasPrefix(field, pass) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.Drop != nil {
|
|
||||||
|
|
||||||
for _, drop := range m.Drop {
|
|
||||||
if strings.HasPrefix(field, drop) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {
|
|
||||||
|
|
||||||
for field, _ := range fields {
|
|
||||||
if !m.shouldPass(field) {
|
|
||||||
delete(fields, field)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Jolokia) Gather(acc plugins.Accumulator) error {
|
func (j *Jolokia) Gather(acc plugins.Accumulator) error {
|
||||||
|
|
||||||
context := j.Context //"/jolokia/read"
|
context := j.Context //"/jolokia/read"
|
||||||
servers := j.Servers
|
servers := j.Servers
|
||||||
metrics := j.Metrics
|
metrics := j.Metrics
|
||||||
tags := j.Tags
|
tags := make(map[string]string)
|
||||||
|
|
||||||
if tags == nil {
|
|
||||||
tags = map[string]string{}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
|
tags["server"] = server.Name
|
||||||
|
tags["port"] = server.Port
|
||||||
|
tags["host"] = server.Host
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
|
||||||
measurement := metric.Name
|
measurement := metric.Name
|
||||||
jmxPath := metric.Jmx
|
jmxPath := metric.Jmx
|
||||||
|
|
||||||
tags["server"] = server.Name
|
|
||||||
tags["port"] = server.Port
|
|
||||||
tags["host"] = server.Host
|
|
||||||
|
|
||||||
// Prepare URL
|
// Prepare URL
|
||||||
requestUrl, err := url.Parse("http://" + server.Host + ":" +
|
requestUrl, err := url.Parse("http://" + server.Host + ":" +
|
||||||
server.Port + context + jmxPath)
|
server.Port + context + jmxPath)
|
||||||
@@ -209,16 +138,20 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error {
|
|||||||
out, _ := j.getAttr(requestUrl)
|
out, _ := j.getAttr(requestUrl)
|
||||||
|
|
||||||
if values, ok := out["value"]; ok {
|
if values, ok := out["value"]; ok {
|
||||||
switch values.(type) {
|
switch t := values.(type) {
|
||||||
case map[string]interface{}:
|
case map[string]interface{}:
|
||||||
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags)
|
for k, v := range t {
|
||||||
|
fields[measurement+"_"+k] = v
|
||||||
|
}
|
||||||
case interface{}:
|
case interface{}:
|
||||||
acc.Add(measurement, values.(interface{}), tags)
|
fields[measurement] = t
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String())
|
fmt.Printf("Missing key 'value' in '%s' output response\n",
|
||||||
|
requestUrl.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("jolokia", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -197,6 +197,8 @@ func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc plugins
|
|||||||
"node": nodeNameTrimmed,
|
"node": nodeNameTrimmed,
|
||||||
}
|
}
|
||||||
i := 0
|
i := 0
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
key := KeyMapping[serverType][i]
|
key := KeyMapping[serverType][i]
|
||||||
val, err := retrieveTokenAfterColon(scanner.Text())
|
val, err := retrieveTokenAfterColon(scanner.Text())
|
||||||
@@ -207,9 +209,10 @@ func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc plugins
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
|
return fmt.Errorf("Unable to parse the value:%s, err:%s", val, err)
|
||||||
}
|
}
|
||||||
acc.Add(key, fVal, tags)
|
fields[key] = fVal
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
acc.AddFields("leofs", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -149,19 +149,19 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
fields := strings.Fields(line)
|
parts := strings.Fields(line)
|
||||||
|
|
||||||
for _, wanted := range wanted_fields {
|
for _, wanted := range wanted_fields {
|
||||||
var data uint64
|
var data uint64
|
||||||
if fields[0] == wanted.inProc {
|
if parts[0] == wanted.inProc {
|
||||||
wanted_field := wanted.field
|
wanted_field := wanted.field
|
||||||
// if not set, assume field[1]. Shouldn't be field[0], as
|
// if not set, assume field[1]. Shouldn't be field[0], as
|
||||||
// that's a string
|
// that's a string
|
||||||
if wanted_field == 0 {
|
if wanted_field == 0 {
|
||||||
wanted_field = 1
|
wanted_field = 1
|
||||||
}
|
}
|
||||||
data, err = strconv.ParseUint((fields[wanted_field]), 10, 64)
|
data, err = strconv.ParseUint((parts[wanted_field]), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -169,11 +169,11 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping,
|
|||||||
if wanted.reportAs != "" {
|
if wanted.reportAs != "" {
|
||||||
report_name = wanted.reportAs
|
report_name = wanted.reportAs
|
||||||
}
|
}
|
||||||
acc.Add(report_name, data, tags)
|
fields[report_name] = data
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("lustre2", fields, tags)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,35 +75,38 @@ func gatherReport(acc plugins.Accumulator, report Report, now time.Time) {
|
|||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
tags["id"] = report.ID
|
tags["id"] = report.ID
|
||||||
tags["campaign_title"] = report.CampaignTitle
|
tags["campaign_title"] = report.CampaignTitle
|
||||||
acc.Add("emails_sent", report.EmailsSent, tags, now)
|
fields := map[string]interface{}{
|
||||||
acc.Add("abuse_reports", report.AbuseReports, tags, now)
|
"emails_sent": report.EmailsSent,
|
||||||
acc.Add("unsubscribed", report.Unsubscribed, tags, now)
|
"abuse_reports": report.AbuseReports,
|
||||||
acc.Add("hard_bounces", report.Bounces.HardBounces, tags, now)
|
"unsubscribed": report.Unsubscribed,
|
||||||
acc.Add("soft_bounces", report.Bounces.SoftBounces, tags, now)
|
"hard_bounces": report.Bounces.HardBounces,
|
||||||
acc.Add("syntax_errors", report.Bounces.SyntaxErrors, tags, now)
|
"soft_bounces": report.Bounces.SoftBounces,
|
||||||
acc.Add("forwards_count", report.Forwards.ForwardsCount, tags, now)
|
"syntax_errors": report.Bounces.SyntaxErrors,
|
||||||
acc.Add("forwards_opens", report.Forwards.ForwardsOpens, tags, now)
|
"forwards_count": report.Forwards.ForwardsCount,
|
||||||
acc.Add("opens_total", report.Opens.OpensTotal, tags, now)
|
"forwards_opens": report.Forwards.ForwardsOpens,
|
||||||
acc.Add("unique_opens", report.Opens.UniqueOpens, tags, now)
|
"opens_total": report.Opens.OpensTotal,
|
||||||
acc.Add("open_rate", report.Opens.OpenRate, tags, now)
|
"unique_opens": report.Opens.UniqueOpens,
|
||||||
acc.Add("clicks_total", report.Clicks.ClicksTotal, tags, now)
|
"open_rate": report.Opens.OpenRate,
|
||||||
acc.Add("unique_clicks", report.Clicks.UniqueClicks, tags, now)
|
"clicks_total": report.Clicks.ClicksTotal,
|
||||||
acc.Add("unique_subscriber_clicks", report.Clicks.UniqueSubscriberClicks, tags, now)
|
"unique_clicks": report.Clicks.UniqueClicks,
|
||||||
acc.Add("click_rate", report.Clicks.ClickRate, tags, now)
|
"unique_subscriber_clicks": report.Clicks.UniqueSubscriberClicks,
|
||||||
acc.Add("facebook_recipient_likes", report.FacebookLikes.RecipientLikes, tags, now)
|
"click_rate": report.Clicks.ClickRate,
|
||||||
acc.Add("facebook_unique_likes", report.FacebookLikes.UniqueLikes, tags, now)
|
"facebook_recipient_likes": report.FacebookLikes.RecipientLikes,
|
||||||
acc.Add("facebook_likes", report.FacebookLikes.FacebookLikes, tags, now)
|
"facebook_unique_likes": report.FacebookLikes.UniqueLikes,
|
||||||
acc.Add("industry_type", report.IndustryStats.Type, tags, now)
|
"facebook_likes": report.FacebookLikes.FacebookLikes,
|
||||||
acc.Add("industry_open_rate", report.IndustryStats.OpenRate, tags, now)
|
"industry_type": report.IndustryStats.Type,
|
||||||
acc.Add("industry_click_rate", report.IndustryStats.ClickRate, tags, now)
|
"industry_open_rate": report.IndustryStats.OpenRate,
|
||||||
acc.Add("industry_bounce_rate", report.IndustryStats.BounceRate, tags, now)
|
"industry_click_rate": report.IndustryStats.ClickRate,
|
||||||
acc.Add("industry_unopen_rate", report.IndustryStats.UnopenRate, tags, now)
|
"industry_bounce_rate": report.IndustryStats.BounceRate,
|
||||||
acc.Add("industry_unsub_rate", report.IndustryStats.UnsubRate, tags, now)
|
"industry_unopen_rate": report.IndustryStats.UnopenRate,
|
||||||
acc.Add("industry_abuse_rate", report.IndustryStats.AbuseRate, tags, now)
|
"industry_unsub_rate": report.IndustryStats.UnsubRate,
|
||||||
acc.Add("list_stats_sub_rate", report.ListStats.SubRate, tags, now)
|
"industry_abuse_rate": report.IndustryStats.AbuseRate,
|
||||||
acc.Add("list_stats_unsub_rate", report.ListStats.UnsubRate, tags, now)
|
"list_stats_sub_rate": report.ListStats.SubRate,
|
||||||
acc.Add("list_stats_open_rate", report.ListStats.OpenRate, tags, now)
|
"list_stats_unsub_rate": report.ListStats.UnsubRate,
|
||||||
acc.Add("list_stats_click_rate", report.ListStats.ClickRate, tags, now)
|
"list_stats_open_rate": report.ListStats.OpenRate,
|
||||||
|
"list_stats_click_rate": report.ListStats.ClickRate,
|
||||||
|
}
|
||||||
|
acc.AddFields("mailchimp", fields, tags, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -137,16 +137,18 @@ func (m *Memcached) gatherServer(
|
|||||||
tags := map[string]string{"server": address}
|
tags := map[string]string{"server": address}
|
||||||
|
|
||||||
// Process values
|
// Process values
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for _, key := range sendMetrics {
|
for _, key := range sendMetrics {
|
||||||
if value, ok := values[key]; ok {
|
if value, ok := values[key]; ok {
|
||||||
// Mostly it is the number
|
// Mostly it is the number
|
||||||
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
|
if iValue, errParse := strconv.ParseInt(value, 10, 64); errParse != nil {
|
||||||
acc.Add(key, value, tags)
|
fields[key] = iValue
|
||||||
} else {
|
} else {
|
||||||
acc.Add(key, iValue, tags)
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("memcached", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -98,7 +98,8 @@ func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
dialInfo, err := mgo.ParseURL(dialAddrs[0])
|
dialInfo, err := mgo.ParseURL(dialAddrs[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
|
return fmt.Errorf("Unable to parse URL (%s), %s\n",
|
||||||
|
dialAddrs[0], err.Error())
|
||||||
}
|
}
|
||||||
dialInfo.Direct = true
|
dialInfo.Direct = true
|
||||||
dialInfo.Timeout = time.Duration(10) * time.Second
|
dialInfo.Timeout = time.Duration(10) * time.Second
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
type MongodbData struct {
|
type MongodbData struct {
|
||||||
StatLine *StatLine
|
StatLine *StatLine
|
||||||
|
Fields map[string]interface{}
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -20,6 +21,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
|
|||||||
return &MongodbData{
|
return &MongodbData{
|
||||||
StatLine: statLine,
|
StatLine: statLine,
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
|
Fields: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,38 +65,44 @@ var WiredTigerStats = map[string]string{
|
|||||||
"percent_cache_used": "CacheUsedPercent",
|
"percent_cache_used": "CacheUsedPercent",
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
|
func (d *MongodbData) AddDefaultStats() {
|
||||||
statLine := reflect.ValueOf(d.StatLine).Elem()
|
statLine := reflect.ValueOf(d.StatLine).Elem()
|
||||||
d.addStat(acc, statLine, DefaultStats)
|
d.addStat(statLine, DefaultStats)
|
||||||
if d.StatLine.NodeType != "" {
|
if d.StatLine.NodeType != "" {
|
||||||
d.addStat(acc, statLine, DefaultReplStats)
|
d.addStat(statLine, DefaultReplStats)
|
||||||
}
|
}
|
||||||
if d.StatLine.StorageEngine == "mmapv1" {
|
if d.StatLine.StorageEngine == "mmapv1" {
|
||||||
d.addStat(acc, statLine, MmapStats)
|
d.addStat(statLine, MmapStats)
|
||||||
} else if d.StatLine.StorageEngine == "wiredTiger" {
|
} else if d.StatLine.StorageEngine == "wiredTiger" {
|
||||||
for key, value := range WiredTigerStats {
|
for key, value := range WiredTigerStats {
|
||||||
val := statLine.FieldByName(value).Interface()
|
val := statLine.FieldByName(value).Interface()
|
||||||
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
|
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
|
||||||
floatVal, _ := strconv.ParseFloat(percentVal, 64)
|
floatVal, _ := strconv.ParseFloat(percentVal, 64)
|
||||||
d.add(acc, key, floatVal)
|
d.add(key, floatVal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
|
func (d *MongodbData) addStat(
|
||||||
|
statLine reflect.Value,
|
||||||
|
stats map[string]string,
|
||||||
|
) {
|
||||||
for key, value := range stats {
|
for key, value := range stats {
|
||||||
val := statLine.FieldByName(value).Interface()
|
val := statLine.FieldByName(value).Interface()
|
||||||
d.add(acc, key, val)
|
d.add(key, val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
|
func (d *MongodbData) add(key string, val interface{}) {
|
||||||
|
d.Fields[key] = val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *MongodbData) flush(acc plugins.Accumulator) {
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
key,
|
"mongodb",
|
||||||
map[string]interface{}{
|
d.Fields,
|
||||||
"value": val,
|
|
||||||
},
|
|
||||||
d.Tags,
|
d.Tags,
|
||||||
d.StatLine.Time,
|
d.StatLine.Time,
|
||||||
)
|
)
|
||||||
|
d.Fields = make(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,7 +44,8 @@ func (s *Server) gatherData(acc plugins.Accumulator) error {
|
|||||||
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
|
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
|
||||||
s.getDefaultTags(),
|
s.getDefaultTags(),
|
||||||
)
|
)
|
||||||
data.AddDefaultStats(acc)
|
data.AddDefaultStats()
|
||||||
|
data.flush(acc)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -138,6 +138,8 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
servtag = "localhost"
|
servtag = "localhost"
|
||||||
}
|
}
|
||||||
|
tags := map[string]string{"server": servtag}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var name string
|
var name string
|
||||||
var val interface{}
|
var val interface{}
|
||||||
@@ -149,12 +151,10 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
|
|||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
|
|
||||||
tags := map[string]string{"server": servtag}
|
|
||||||
|
|
||||||
for _, mapped := range mappings {
|
for _, mapped := range mappings {
|
||||||
if strings.HasPrefix(name, mapped.onServer) {
|
if strings.HasPrefix(name, mapped.onServer) {
|
||||||
i, _ := strconv.Atoi(string(val.([]byte)))
|
i, _ := strconv.Atoi(string(val.([]byte)))
|
||||||
acc.Add(mapped.inExport+name[len(mapped.onServer):], i, tags)
|
fields[mapped.inExport+name[len(mapped.onServer):]] = i
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -170,16 +170,17 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("queries", i, tags)
|
fields["queries"] = i
|
||||||
case "Slow_queries":
|
case "Slow_queries":
|
||||||
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("slow_queries", i, tags)
|
fields["slow_queries"] = i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("mysql", fields, tags)
|
||||||
|
|
||||||
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
conn_rows, err := db.Query("SELECT user, sum(1) FROM INFORMATION_SCHEMA.PROCESSLIST GROUP BY user")
|
||||||
|
|
||||||
@@ -193,11 +194,13 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"server": servtag, "user": user}
|
tags := map[string]string{"server": servtag, "user": user}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
acc.Add("connections", connections, tags)
|
fields["connections"] = connections
|
||||||
|
acc.AddFields("mysql_users", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -127,14 +127,16 @@ func (n *Nginx) gatherUrl(addr *url.URL, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tags := getTags(addr)
|
tags := getTags(addr)
|
||||||
|
fields := map[string]interface{}{
|
||||||
acc.Add("active", active, tags)
|
"active": active,
|
||||||
acc.Add("accepts", accepts, tags)
|
"accepts": accepts,
|
||||||
acc.Add("handled", handled, tags)
|
"handled": handled,
|
||||||
acc.Add("requests", requests, tags)
|
"requests": requests,
|
||||||
acc.Add("reading", reading, tags)
|
"reading": reading,
|
||||||
acc.Add("writing", writing, tags)
|
"writing": writing,
|
||||||
acc.Add("waiting", waiting, tags)
|
"waiting": waiting,
|
||||||
|
}
|
||||||
|
acc.AddFields("nginx", fields, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -198,9 +198,11 @@ func importMetric(r io.Reader, acc plugins.Accumulator, host string) (poolStat,
|
|||||||
"url": host,
|
"url": host,
|
||||||
"pool": pool,
|
"pool": pool,
|
||||||
}
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for k, v := range stats[pool] {
|
for k, v := range stats[pool] {
|
||||||
acc.Add(strings.Replace(k, " ", "_", -1), v, tags)
|
fields[strings.Replace(k, " ", "_", -1)] = v
|
||||||
}
|
}
|
||||||
|
acc.AddFields("phpfpm", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return stats, nil
|
return stats, nil
|
||||||
|
|||||||
@@ -82,10 +82,13 @@ func (p *Ping) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
// Calculate packet loss percentage
|
// Calculate packet loss percentage
|
||||||
loss := float64(trans-rec) / float64(trans) * 100.0
|
loss := float64(trans-rec) / float64(trans) * 100.0
|
||||||
acc.Add("packets_transmitted", trans, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("packets_received", rec, tags)
|
"packets_transmitted": trans,
|
||||||
acc.Add("percent_packet_loss", loss, tags)
|
"packets_received": rec,
|
||||||
acc.Add("average_response_ms", avg, tags)
|
"percent_packet_loss": loss,
|
||||||
|
"average_response_ms": avg,
|
||||||
|
}
|
||||||
|
acc.AddFields("ping", fields, tags)
|
||||||
}(url, acc)
|
}(url, acc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,46 +11,32 @@ import (
|
|||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Postgresql struct {
|
||||||
Address string
|
Address string
|
||||||
Databases []string
|
Databases []string
|
||||||
OrderedColumns []string
|
OrderedColumns []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Postgresql struct {
|
|
||||||
Servers []*Server
|
|
||||||
}
|
|
||||||
|
|
||||||
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
|
var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_reset": true}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
# specify servers via an array of tables
|
|
||||||
[[plugins.postgresql.servers]]
|
|
||||||
|
|
||||||
# specify address via a url matching:
|
# specify address via a url matching:
|
||||||
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
|
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
|
||||||
# or a simple string:
|
# or a simple string:
|
||||||
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
|
# host=localhost user=pqotest password=... sslmode=... dbname=app_production
|
||||||
#
|
#
|
||||||
# All connection parameters are optional. By default, the host is localhost
|
# All connection parameters are optional.
|
||||||
# and the user is the currently running user. For localhost, we default
|
|
||||||
# to sslmode=disable as well.
|
|
||||||
#
|
#
|
||||||
# Without the dbname parameter, the driver will default to a database
|
# Without the dbname parameter, the driver will default to a database
|
||||||
# with the same name as the user. This dbname is just for instantiating a
|
# with the same name as the user. This dbname is just for instantiating a
|
||||||
# connection with the server and doesn't restrict the databases we are trying
|
# connection with the server and doesn't restrict the databases we are trying
|
||||||
# to grab metrics for.
|
# to grab metrics for.
|
||||||
#
|
#
|
||||||
|
address = "host=localhost user=postgres sslmode=disable"
|
||||||
address = "sslmode=disable"
|
|
||||||
|
|
||||||
# A list of databases to pull metrics about. If not specified, metrics for all
|
# A list of databases to pull metrics about. If not specified, metrics for all
|
||||||
# databases are gathered.
|
# databases are gathered.
|
||||||
|
# databases = ["app_production", "testing"]
|
||||||
# databases = ["app_production", "blah_testing"]
|
|
||||||
|
|
||||||
# [[plugins.postgresql.servers]]
|
|
||||||
# address = "influx@remoteserver"
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func (p *Postgresql) SampleConfig() string {
|
func (p *Postgresql) SampleConfig() string {
|
||||||
@@ -65,42 +51,27 @@ func (p *Postgresql) IgnoredColumns() map[string]bool {
|
|||||||
return ignoredColumns
|
return ignoredColumns
|
||||||
}
|
}
|
||||||
|
|
||||||
var localhost = &Server{Address: "sslmode=disable"}
|
var localhost = "host=localhost sslmode=disable"
|
||||||
|
|
||||||
func (p *Postgresql) Gather(acc plugins.Accumulator) error {
|
func (p *Postgresql) Gather(acc plugins.Accumulator) error {
|
||||||
if len(p.Servers) == 0 {
|
|
||||||
p.gatherServer(localhost, acc)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, serv := range p.Servers {
|
|
||||||
err := p.gatherServer(serv, acc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
|
||||||
var query string
|
var query string
|
||||||
|
|
||||||
if serv.Address == "" || serv.Address == "localhost" {
|
if p.Address == "" || p.Address == "localhost" {
|
||||||
serv = localhost
|
p.Address = localhost
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := sql.Open("postgres", serv.Address)
|
db, err := sql.Open("postgres", p.Address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
if len(serv.Databases) == 0 {
|
if len(p.Databases) == 0 {
|
||||||
query = `SELECT * FROM pg_stat_database`
|
query = `SELECT * FROM pg_stat_database`
|
||||||
} else {
|
} else {
|
||||||
query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`, strings.Join(serv.Databases, "','"))
|
query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`,
|
||||||
|
strings.Join(p.Databases, "','"))
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := db.Query(query)
|
rows, err := db.Query(query)
|
||||||
@@ -111,13 +82,13 @@ func (p *Postgresql) gatherServer(serv *Server, acc plugins.Accumulator) error {
|
|||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
// grab the column information from the result
|
// grab the column information from the result
|
||||||
serv.OrderedColumns, err = rows.Columns()
|
p.OrderedColumns, err = rows.Columns()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
err = p.accRow(rows, acc, serv)
|
err = p.accRow(rows, acc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -130,20 +101,20 @@ type scanner interface {
|
|||||||
Scan(dest ...interface{}) error
|
Scan(dest ...interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server) error {
|
func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator) error {
|
||||||
var columnVars []interface{}
|
var columnVars []interface{}
|
||||||
var dbname bytes.Buffer
|
var dbname bytes.Buffer
|
||||||
|
|
||||||
// this is where we'll store the column name with its *interface{}
|
// this is where we'll store the column name with its *interface{}
|
||||||
columnMap := make(map[string]*interface{})
|
columnMap := make(map[string]*interface{})
|
||||||
|
|
||||||
for _, column := range serv.OrderedColumns {
|
for _, column := range p.OrderedColumns {
|
||||||
columnMap[column] = new(interface{})
|
columnMap[column] = new(interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// populate the array of interface{} with the pointers in the right order
|
// populate the array of interface{} with the pointers in the right order
|
||||||
for i := 0; i < len(columnMap); i++ {
|
for i := 0; i < len(columnMap); i++ {
|
||||||
columnVars = append(columnVars, columnMap[serv.OrderedColumns[i]])
|
columnVars = append(columnVars, columnMap[p.OrderedColumns[i]])
|
||||||
}
|
}
|
||||||
|
|
||||||
// deconstruct array of variables and send to Scan
|
// deconstruct array of variables and send to Scan
|
||||||
@@ -159,14 +130,16 @@ func (p *Postgresql) accRow(row scanner, acc plugins.Accumulator, serv *Server)
|
|||||||
dbname.WriteString(string(dbnameChars[i]))
|
dbname.WriteString(string(dbnameChars[i]))
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"server": serv.Address, "db": dbname.String()}
|
tags := map[string]string{"server": p.Address, "db": dbname.String()}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for col, val := range columnMap {
|
for col, val := range columnMap {
|
||||||
_, ignore := ignoredColumns[col]
|
_, ignore := ignoredColumns[col]
|
||||||
if !ignore {
|
if !ignore {
|
||||||
acc.Add(col, *val, tags)
|
fields[col] = *val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("postgresql", fields, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,22 +7,17 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/process"
|
"github.com/shirou/gopsutil/process"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Specification struct {
|
type Procstat struct {
|
||||||
PidFile string `toml:"pid_file"`
|
PidFile string `toml:"pid_file"`
|
||||||
Exe string
|
Exe string
|
||||||
Prefix string
|
|
||||||
Pattern string
|
Pattern string
|
||||||
}
|
Prefix string
|
||||||
|
|
||||||
type Procstat struct {
|
|
||||||
Specifications []*Specification
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcstat() *Procstat {
|
func NewProcstat() *Procstat {
|
||||||
@@ -30,8 +25,6 @@ func NewProcstat() *Procstat {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
[[plugins.procstat.specifications]]
|
|
||||||
prefix = "" # optional string to prefix measurements
|
|
||||||
# Must specify one of: pid_file, exe, or pattern
|
# Must specify one of: pid_file, exe, or pattern
|
||||||
# PID file to monitor process
|
# PID file to monitor process
|
||||||
pid_file = "/var/run/nginx.pid"
|
pid_file = "/var/run/nginx.pid"
|
||||||
@@ -39,6 +32,9 @@ var sampleConfig = `
|
|||||||
# exe = "nginx"
|
# exe = "nginx"
|
||||||
# pattern as argument for pgrep (ie, pgrep -f <pattern>)
|
# pattern as argument for pgrep (ie, pgrep -f <pattern>)
|
||||||
# pattern = "nginx"
|
# pattern = "nginx"
|
||||||
|
|
||||||
|
# Field name prefix
|
||||||
|
prefix = ""
|
||||||
`
|
`
|
||||||
|
|
||||||
func (_ *Procstat) SampleConfig() string {
|
func (_ *Procstat) SampleConfig() string {
|
||||||
@@ -50,35 +46,26 @@ func (_ *Procstat) Description() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Procstat) Gather(acc plugins.Accumulator) error {
|
func (p *Procstat) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
procs, err := p.createProcesses()
|
||||||
|
if err != nil {
|
||||||
for _, specification := range p.Specifications {
|
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s",
|
||||||
wg.Add(1)
|
p.Exe, p.PidFile, p.Pattern, err.Error())
|
||||||
go func(spec *Specification, acc plugins.Accumulator) {
|
} else {
|
||||||
defer wg.Done()
|
for _, proc := range procs {
|
||||||
procs, err := spec.createProcesses()
|
p := NewSpecProcessor(p.Prefix, acc, proc)
|
||||||
if err != nil {
|
p.pushMetrics()
|
||||||
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] %s",
|
}
|
||||||
spec.Exe, spec.PidFile, spec.Pattern, err.Error())
|
|
||||||
} else {
|
|
||||||
for _, proc := range procs {
|
|
||||||
p := NewSpecProcessor(spec.Prefix, acc, proc)
|
|
||||||
p.pushMetrics()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(specification, acc)
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (spec *Specification) createProcesses() ([]*process.Process, error) {
|
func (p *Procstat) createProcesses() ([]*process.Process, error) {
|
||||||
var out []*process.Process
|
var out []*process.Process
|
||||||
var errstring string
|
var errstring string
|
||||||
var outerr error
|
var outerr error
|
||||||
|
|
||||||
pids, err := spec.getAllPids()
|
pids, err := p.getAllPids()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errstring += err.Error() + " "
|
errstring += err.Error() + " "
|
||||||
}
|
}
|
||||||
@@ -99,16 +86,16 @@ func (spec *Specification) createProcesses() ([]*process.Process, error) {
|
|||||||
return out, outerr
|
return out, outerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (spec *Specification) getAllPids() ([]int32, error) {
|
func (p *Procstat) getAllPids() ([]int32, error) {
|
||||||
var pids []int32
|
var pids []int32
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if spec.PidFile != "" {
|
if p.PidFile != "" {
|
||||||
pids, err = pidsFromFile(spec.PidFile)
|
pids, err = pidsFromFile(p.PidFile)
|
||||||
} else if spec.Exe != "" {
|
} else if p.Exe != "" {
|
||||||
pids, err = pidsFromExe(spec.Exe)
|
pids, err = pidsFromExe(p.Exe)
|
||||||
} else if spec.Pattern != "" {
|
} else if p.Pattern != "" {
|
||||||
pids, err = pidsFromPattern(spec.Pattern)
|
pids, err = pidsFromPattern(p.Pattern)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("Either exe, pid_file or pattern has to be specified")
|
err = fmt.Errorf("Either exe, pid_file or pattern has to be specified")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
type SpecProcessor struct {
|
type SpecProcessor struct {
|
||||||
Prefix string
|
Prefix string
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
|
fields map[string]interface{}
|
||||||
acc plugins.Accumulator
|
acc plugins.Accumulator
|
||||||
proc *process.Process
|
proc *process.Process
|
||||||
}
|
}
|
||||||
@@ -23,7 +24,12 @@ func (p *SpecProcessor) add(metric string, value interface{}) {
|
|||||||
} else {
|
} else {
|
||||||
mname = p.Prefix + "_" + metric
|
mname = p.Prefix + "_" + metric
|
||||||
}
|
}
|
||||||
p.acc.Add(mname, value, p.tags)
|
p.fields[mname] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *SpecProcessor) flush() {
|
||||||
|
p.acc.AddFields("procstat", p.fields, p.tags)
|
||||||
|
p.fields = make(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSpecProcessor(
|
func NewSpecProcessor(
|
||||||
@@ -39,6 +45,7 @@ func NewSpecProcessor(
|
|||||||
return &SpecProcessor{
|
return &SpecProcessor{
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
tags: tags,
|
tags: tags,
|
||||||
|
fields: make(map[string]interface{}),
|
||||||
acc: acc,
|
acc: acc,
|
||||||
proc: p,
|
proc: p,
|
||||||
}
|
}
|
||||||
@@ -60,6 +67,7 @@ func (p *SpecProcessor) pushMetrics() {
|
|||||||
if err := p.pushMemoryStats(); err != nil {
|
if err := p.pushMemoryStats(); err != nil {
|
||||||
log.Printf("procstat, mem stats not available: %s", err.Error())
|
log.Printf("procstat, mem stats not available: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
p.flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *SpecProcessor) pushFDStats() error {
|
func (p *SpecProcessor) pushFDStats() error {
|
||||||
@@ -94,21 +102,22 @@ func (p *SpecProcessor) pushIOStats() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *SpecProcessor) pushCPUStats() error {
|
func (p *SpecProcessor) pushCPUStats() error {
|
||||||
cpu, err := p.proc.CPUTimes()
|
cpu_time, err := p.proc.CPUTimes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.add("cpu_user", cpu.User)
|
p.add("cpu_time_user", cpu_time.User)
|
||||||
p.add("cpu_system", cpu.System)
|
p.add("cpu_time_system", cpu_time.System)
|
||||||
p.add("cpu_idle", cpu.Idle)
|
p.add("cpu_time_idle", cpu_time.Idle)
|
||||||
p.add("cpu_nice", cpu.Nice)
|
p.add("cpu_time_nice", cpu_time.Nice)
|
||||||
p.add("cpu_iowait", cpu.Iowait)
|
p.add("cpu_time_iowait", cpu_time.Iowait)
|
||||||
p.add("cpu_irq", cpu.Irq)
|
p.add("cpu_time_irq", cpu_time.Irq)
|
||||||
p.add("cpu_soft_irq", cpu.Softirq)
|
p.add("cpu_time_soft_irq", cpu_time.Softirq)
|
||||||
p.add("cpu_soft_steal", cpu.Steal)
|
p.add("cpu_time_soft_steal", cpu_time.Steal)
|
||||||
p.add("cpu_soft_stolen", cpu.Stolen)
|
p.add("cpu_time_soft_stolen", cpu_time.Stolen)
|
||||||
p.add("cpu_soft_guest", cpu.Guest)
|
p.add("cpu_time_soft_guest", cpu_time.Guest)
|
||||||
p.add("cpu_soft_guest_nice", cpu.GuestNice)
|
p.add("cpu_time_soft_guest_nice", cpu_time.GuestNice)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -80,14 +80,14 @@ func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error {
|
|||||||
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
|
return fmt.Errorf("error getting processing samples for %s: %s", url, err)
|
||||||
}
|
}
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
tags := map[string]string{}
|
tags := make(map[string]string)
|
||||||
for key, value := range sample.Metric {
|
for key, value := range sample.Metric {
|
||||||
if key == model.MetricNameLabel {
|
if key == model.MetricNameLabel {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tags[string(key)] = string(value)
|
tags[string(key)] = string(value)
|
||||||
}
|
}
|
||||||
acc.Add(string(sample.Metric[model.MetricNameLabel]),
|
acc.Add("prometheus_"+string(sample.Metric[model.MetricNameLabel]),
|
||||||
float64(sample.Value), tags)
|
float64(sample.Value), tags)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,15 +104,16 @@ func (pa *PuppetAgent) Gather(acc plugins.Accumulator) error {
|
|||||||
return fmt.Errorf("%s", err)
|
return fmt.Errorf("%s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
structPrinter(&puppetState, acc)
|
tags := map[string]string{"location": pa.Location}
|
||||||
|
structPrinter(&puppetState, acc, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func structPrinter(s *State, acc plugins.Accumulator) {
|
func structPrinter(s *State, acc plugins.Accumulator, tags map[string]string) {
|
||||||
|
|
||||||
e := reflect.ValueOf(s).Elem()
|
e := reflect.ValueOf(s).Elem()
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for tLevelFNum := 0; tLevelFNum < e.NumField(); tLevelFNum++ {
|
for tLevelFNum := 0; tLevelFNum < e.NumField(); tLevelFNum++ {
|
||||||
name := e.Type().Field(tLevelFNum).Name
|
name := e.Type().Field(tLevelFNum).Name
|
||||||
nameNumField := e.FieldByName(name).NumField()
|
nameNumField := e.FieldByName(name).NumField()
|
||||||
@@ -123,10 +124,10 @@ func structPrinter(s *State, acc plugins.Accumulator) {
|
|||||||
|
|
||||||
lname := strings.ToLower(name)
|
lname := strings.ToLower(name)
|
||||||
lsName := strings.ToLower(sName)
|
lsName := strings.ToLower(sName)
|
||||||
acc.Add(fmt.Sprintf("%s_%s", lname, lsName), sValue, nil)
|
fields[fmt.Sprintf("%s_%s", lname, lsName)] = sValue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("puppetagent", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
@@ -13,17 +14,13 @@ const DefaultUsername = "guest"
|
|||||||
const DefaultPassword = "guest"
|
const DefaultPassword = "guest"
|
||||||
const DefaultURL = "http://localhost:15672"
|
const DefaultURL = "http://localhost:15672"
|
||||||
|
|
||||||
type Server struct {
|
type RabbitMQ struct {
|
||||||
URL string
|
URL string
|
||||||
Name string
|
Name string
|
||||||
Username string
|
Username string
|
||||||
Password string
|
Password string
|
||||||
Nodes []string
|
Nodes []string
|
||||||
Queues []string
|
Queues []string
|
||||||
}
|
|
||||||
|
|
||||||
type RabbitMQ struct {
|
|
||||||
Servers []*Server
|
|
||||||
|
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
}
|
}
|
||||||
@@ -94,15 +91,13 @@ type Node struct {
|
|||||||
SocketsUsed int64 `json:"sockets_used"`
|
SocketsUsed int64 `json:"sockets_used"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error)
|
type gatherFunc func(r *RabbitMQ, acc plugins.Accumulator, errChan chan error)
|
||||||
|
|
||||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
# Specify servers via an array of tables
|
url = "http://localhost:15672" # required
|
||||||
[[plugins.rabbitmq.servers]]
|
|
||||||
# name = "rmq-server-1" # optional tag
|
# name = "rmq-server-1" # optional tag
|
||||||
# url = "http://localhost:15672"
|
|
||||||
# username = "guest"
|
# username = "guest"
|
||||||
# password = "guest"
|
# password = "guest"
|
||||||
|
|
||||||
@@ -119,27 +114,18 @@ func (r *RabbitMQ) Description() string {
|
|||||||
return "Read metrics from one or many RabbitMQ servers via the management API"
|
return "Read metrics from one or many RabbitMQ servers via the management API"
|
||||||
}
|
}
|
||||||
|
|
||||||
var localhost = &Server{URL: DefaultURL}
|
|
||||||
|
|
||||||
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
||||||
if r.Client == nil {
|
if r.Client == nil {
|
||||||
r.Client = &http.Client{}
|
r.Client = &http.Client{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errChan = make(chan error, len(r.Servers))
|
var errChan = make(chan error, len(gatherFunctions))
|
||||||
|
|
||||||
// use localhost is no servers are specified in config
|
for _, f := range gatherFunctions {
|
||||||
if len(r.Servers) == 0 {
|
go f(r, acc, errChan)
|
||||||
r.Servers = append(r.Servers, localhost)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, serv := range r.Servers {
|
for i := 1; i <= len(gatherFunctions); i++ {
|
||||||
for _, f := range gatherFunctions {
|
|
||||||
go f(r, serv, acc, errChan)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ {
|
|
||||||
err := <-errChan
|
err := <-errChan
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -149,20 +135,20 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error {
|
func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
||||||
u = fmt.Sprintf("%s%s", serv.URL, u)
|
u = fmt.Sprintf("%s%s", r.URL, u)
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", u, nil)
|
req, err := http.NewRequest("GET", u, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
username := serv.Username
|
username := r.Username
|
||||||
if username == "" {
|
if username == "" {
|
||||||
username = DefaultUsername
|
username = DefaultUsername
|
||||||
}
|
}
|
||||||
|
|
||||||
password := serv.Password
|
password := r.Password
|
||||||
if password == "" {
|
if password == "" {
|
||||||
password = DefaultPassword
|
password = DefaultPassword
|
||||||
}
|
}
|
||||||
@@ -181,10 +167,10 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherOverview(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
overview := &OverviewResponse{}
|
overview := &OverviewResponse{}
|
||||||
|
|
||||||
err := r.requestJSON(serv, "/api/overview", &overview)
|
err := r.requestJSON("/api/overview", &overview)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
@@ -195,76 +181,80 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"url": serv.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
if serv.Name != "" {
|
if r.Name != "" {
|
||||||
tags["name"] = serv.Name
|
tags["name"] = r.Name
|
||||||
}
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
acc.Add("messages", overview.QueueTotals.Messages, tags)
|
"messages": overview.QueueTotals.Messages,
|
||||||
acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags)
|
"messages_ready": overview.QueueTotals.MessagesReady,
|
||||||
acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags)
|
"messages_unacked": overview.QueueTotals.MessagesUnacknowledged,
|
||||||
|
"channels": overview.ObjectTotals.Channels,
|
||||||
acc.Add("channels", overview.ObjectTotals.Channels, tags)
|
"connections": overview.ObjectTotals.Connections,
|
||||||
acc.Add("connections", overview.ObjectTotals.Connections, tags)
|
"consumers": overview.ObjectTotals.Consumers,
|
||||||
acc.Add("consumers", overview.ObjectTotals.Consumers, tags)
|
"exchanges": overview.ObjectTotals.Exchanges,
|
||||||
acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags)
|
"queues": overview.ObjectTotals.Queues,
|
||||||
acc.Add("queues", overview.ObjectTotals.Queues, tags)
|
"messages_acked": overview.MessageStats.Ack,
|
||||||
|
"messages_delivered": overview.MessageStats.Deliver,
|
||||||
acc.Add("messages_acked", overview.MessageStats.Ack, tags)
|
"messages_published": overview.MessageStats.Publish,
|
||||||
acc.Add("messages_delivered", overview.MessageStats.Deliver, tags)
|
}
|
||||||
acc.Add("messages_published", overview.MessageStats.Publish, tags)
|
acc.AddFields("rabbitmq_overview", fields, tags)
|
||||||
|
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherNodes(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
nodes := make([]Node, 0)
|
nodes := make([]Node, 0)
|
||||||
// Gather information about nodes
|
// Gather information about nodes
|
||||||
err := r.requestJSON(serv, "/api/nodes", &nodes)
|
err := r.requestJSON("/api/nodes", &nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
if !shouldGatherNode(node, serv) {
|
if !r.shouldGatherNode(node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"url": serv.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
tags["node"] = node.Name
|
tags["node"] = node.Name
|
||||||
|
|
||||||
acc.Add("disk_free", node.DiskFree, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("disk_free_limit", node.DiskFreeLimit, tags)
|
"disk_free": node.DiskFree,
|
||||||
acc.Add("fd_total", node.FdTotal, tags)
|
"disk_free_limit": node.DiskFreeLimit,
|
||||||
acc.Add("fd_used", node.FdUsed, tags)
|
"fd_total": node.FdTotal,
|
||||||
acc.Add("mem_limit", node.MemLimit, tags)
|
"fd_used": node.FdUsed,
|
||||||
acc.Add("mem_used", node.MemUsed, tags)
|
"mem_limit": node.MemLimit,
|
||||||
acc.Add("proc_total", node.ProcTotal, tags)
|
"mem_used": node.MemUsed,
|
||||||
acc.Add("proc_used", node.ProcUsed, tags)
|
"proc_total": node.ProcTotal,
|
||||||
acc.Add("run_queue", node.RunQueue, tags)
|
"proc_used": node.ProcUsed,
|
||||||
acc.Add("sockets_total", node.SocketsTotal, tags)
|
"run_queue": node.RunQueue,
|
||||||
acc.Add("sockets_used", node.SocketsUsed, tags)
|
"sockets_total": node.SocketsTotal,
|
||||||
|
"sockets_used": node.SocketsUsed,
|
||||||
|
}
|
||||||
|
acc.AddFields("rabbitmq_node", fields, tags, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
|
func gatherQueues(r *RabbitMQ, acc plugins.Accumulator, errChan chan error) {
|
||||||
// Gather information about queues
|
// Gather information about queues
|
||||||
queues := make([]Queue, 0)
|
queues := make([]Queue, 0)
|
||||||
err := r.requestJSON(serv, "/api/queues", &queues)
|
err := r.requestJSON("/api/queues", &queues)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, queue := range queues {
|
for _, queue := range queues {
|
||||||
if !shouldGatherQueue(queue, serv) {
|
if !r.shouldGatherQueue(queue) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"url": serv.URL,
|
"url": r.URL,
|
||||||
"queue": queue.Name,
|
"queue": queue.Name,
|
||||||
"vhost": queue.Vhost,
|
"vhost": queue.Vhost,
|
||||||
"node": queue.Node,
|
"node": queue.Node,
|
||||||
@@ -273,7 +263,7 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
|
|||||||
}
|
}
|
||||||
|
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"queue",
|
"rabbitmq_queue",
|
||||||
map[string]interface{}{
|
map[string]interface{}{
|
||||||
// common information
|
// common information
|
||||||
"consumers": queue.Consumers,
|
"consumers": queue.Consumers,
|
||||||
@@ -301,12 +291,12 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
|
|||||||
errChan <- nil
|
errChan <- nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldGatherNode(node Node, serv *Server) bool {
|
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
||||||
if len(serv.Nodes) == 0 {
|
if len(r.Nodes) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, name := range serv.Nodes {
|
for _, name := range r.Nodes {
|
||||||
if name == node.Name {
|
if name == node.Name {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -315,12 +305,12 @@ func shouldGatherNode(node Node, serv *Server) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldGatherQueue(queue Queue, serv *Server) bool {
|
func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
||||||
if len(serv.Queues) == 0 {
|
if len(r.Queues) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, name := range serv.Queues {
|
for _, name := range r.Queues {
|
||||||
if name == queue.Name {
|
if name == queue.Name {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -164,6 +164,7 @@ func gatherInfoOutput(
|
|||||||
var keyspace_hits, keyspace_misses uint64 = 0, 0
|
var keyspace_hits, keyspace_misses uint64 = 0, 0
|
||||||
|
|
||||||
scanner := bufio.NewScanner(rdr)
|
scanner := bufio.NewScanner(rdr)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
if strings.Contains(line, "ERR") {
|
if strings.Contains(line, "ERR") {
|
||||||
@@ -199,7 +200,7 @@ func gatherInfoOutput(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add(metric, ival, tags)
|
fields[metric] = ival
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,13 +209,14 @@ func gatherInfoOutput(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add(metric, fval, tags)
|
fields[metric] = fval
|
||||||
}
|
}
|
||||||
var keyspace_hitrate float64 = 0.0
|
var keyspace_hitrate float64 = 0.0
|
||||||
if keyspace_hits != 0 || keyspace_misses != 0 {
|
if keyspace_hits != 0 || keyspace_misses != 0 {
|
||||||
keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
|
keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
|
||||||
}
|
}
|
||||||
acc.Add("keyspace_hitrate", keyspace_hitrate, tags)
|
fields["keyspace_hitrate"] = keyspace_hitrate
|
||||||
|
acc.AddFields("redis", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,15 +231,17 @@ func gatherKeyspaceLine(
|
|||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) {
|
) {
|
||||||
if strings.Contains(line, "keys=") {
|
if strings.Contains(line, "keys=") {
|
||||||
|
fields := make(map[string]interface{})
|
||||||
tags["database"] = name
|
tags["database"] = name
|
||||||
dbparts := strings.Split(line, ",")
|
dbparts := strings.Split(line, ",")
|
||||||
for _, dbp := range dbparts {
|
for _, dbp := range dbparts {
|
||||||
kv := strings.Split(dbp, "=")
|
kv := strings.Split(dbp, "=")
|
||||||
ival, err := strconv.ParseUint(kv[1], 10, 64)
|
ival, err := strconv.ParseUint(kv[1], 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add(kv[0], ival, tags)
|
fields[kv[0]] = ival
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("redis_keyspace", fields, tags)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -86,25 +86,30 @@ var engineStats = map[string]string{
|
|||||||
"total_writes": "TotalWrites",
|
"total_writes": "TotalWrites",
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) {
|
func (e *Engine) AddEngineStats(
|
||||||
|
keys []string,
|
||||||
|
acc plugins.Accumulator,
|
||||||
|
tags map[string]string,
|
||||||
|
) {
|
||||||
engine := reflect.ValueOf(e).Elem()
|
engine := reflect.ValueOf(e).Elem()
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
acc.Add(
|
fields[key] = engine.FieldByName(engineStats[key]).Interface()
|
||||||
key,
|
|
||||||
engine.FieldByName(engineStats[key]).Interface(),
|
|
||||||
tags,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
acc.AddFields("rethinkdb_engine", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
|
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
|
||||||
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags)
|
"cache_bytes_in_use": s.Cache.BytesInUse,
|
||||||
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags)
|
"disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec,
|
||||||
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags)
|
"disk_read_bytes_total": s.Disk.ReadBytesTotal,
|
||||||
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags)
|
"disk_written_bytes_per_sec": s.Disk.WriteBytesPerSec,
|
||||||
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags)
|
"disk_written_bytes_total": s.Disk.WriteBytesTotal,
|
||||||
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags)
|
"disk_usage_data_bytes": s.Disk.SpaceUsage.Data,
|
||||||
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags)
|
"disk_usage_garbage_bytes": s.Disk.SpaceUsage.Garbage,
|
||||||
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags)
|
"disk_usage_metadata_bytes": s.Disk.SpaceUsage.Metadata,
|
||||||
|
"disk_usage_preallocated_bytes": s.Disk.SpaceUsage.Prealloc,
|
||||||
|
}
|
||||||
|
acc.AddFields("rethinkdb", fields, tags)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package system
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
"github.com/shirou/gopsutil/cpu"
|
"github.com/shirou/gopsutil/cpu"
|
||||||
@@ -31,7 +32,7 @@ var sampleConfig = `
|
|||||||
# Whether to report total system cpu stats or not
|
# Whether to report total system cpu stats or not
|
||||||
totalcpu = true
|
totalcpu = true
|
||||||
# Comment this line if you want the raw CPU time metrics
|
# Comment this line if you want the raw CPU time metrics
|
||||||
drop = ["cpu_time*"]
|
drop = ["time_*"]
|
||||||
`
|
`
|
||||||
|
|
||||||
func (_ *CPUStats) SampleConfig() string {
|
func (_ *CPUStats) SampleConfig() string {
|
||||||
@@ -43,6 +44,7 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting CPU info: %s", err)
|
return fmt.Errorf("error getting CPU info: %s", err)
|
||||||
}
|
}
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
for i, cts := range times {
|
for i, cts := range times {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
@@ -51,21 +53,24 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
|
|||||||
|
|
||||||
total := totalCpuTime(cts)
|
total := totalCpuTime(cts)
|
||||||
|
|
||||||
// Add total cpu numbers
|
// Add cpu time metrics
|
||||||
add(acc, "time_user", cts.User, tags)
|
fields := map[string]interface{}{
|
||||||
add(acc, "time_system", cts.System, tags)
|
"time_user": cts.User,
|
||||||
add(acc, "time_idle", cts.Idle, tags)
|
"time_system": cts.System,
|
||||||
add(acc, "time_nice", cts.Nice, tags)
|
"time_idle": cts.Idle,
|
||||||
add(acc, "time_iowait", cts.Iowait, tags)
|
"time_nice": cts.Nice,
|
||||||
add(acc, "time_irq", cts.Irq, tags)
|
"time_iowait": cts.Iowait,
|
||||||
add(acc, "time_softirq", cts.Softirq, tags)
|
"time_irq": cts.Irq,
|
||||||
add(acc, "time_steal", cts.Steal, tags)
|
"time_softirq": cts.Softirq,
|
||||||
add(acc, "time_guest", cts.Guest, tags)
|
"time_steal": cts.Steal,
|
||||||
add(acc, "time_guest_nice", cts.GuestNice, tags)
|
"time_guest": cts.Guest,
|
||||||
|
"time_guest_nice": cts.GuestNice,
|
||||||
|
}
|
||||||
|
|
||||||
// Add in percentage
|
// Add in percentage
|
||||||
if len(s.lastStats) == 0 {
|
if len(s.lastStats) == 0 {
|
||||||
// If it's the 1st gather, can't get CPU stats yet
|
acc.AddFields("cpu", fields, tags, now)
|
||||||
|
// If it's the 1st gather, can't get CPU Usage stats yet
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
lastCts := s.lastStats[i]
|
lastCts := s.lastStats[i]
|
||||||
@@ -81,17 +86,17 @@ func (s *CPUStats) Gather(acc plugins.Accumulator) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
add(acc, "usage_user", 100*(cts.User-lastCts.User)/totalDelta, tags)
|
fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta
|
||||||
add(acc, "usage_system", 100*(cts.System-lastCts.System)/totalDelta, tags)
|
fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta
|
||||||
add(acc, "usage_idle", 100*(cts.Idle-lastCts.Idle)/totalDelta, tags)
|
fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta
|
||||||
add(acc, "usage_nice", 100*(cts.Nice-lastCts.Nice)/totalDelta, tags)
|
fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta
|
||||||
add(acc, "usage_iowait", 100*(cts.Iowait-lastCts.Iowait)/totalDelta, tags)
|
fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta
|
||||||
add(acc, "usage_irq", 100*(cts.Irq-lastCts.Irq)/totalDelta, tags)
|
fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta
|
||||||
add(acc, "usage_softirq", 100*(cts.Softirq-lastCts.Softirq)/totalDelta, tags)
|
fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta
|
||||||
add(acc, "usage_steal", 100*(cts.Steal-lastCts.Steal)/totalDelta, tags)
|
fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta
|
||||||
add(acc, "usage_guest", 100*(cts.Guest-lastCts.Guest)/totalDelta, tags)
|
fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta
|
||||||
add(acc, "usage_guest_nice", 100*(cts.GuestNice-lastCts.GuestNice)/totalDelta, tags)
|
fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta
|
||||||
|
acc.AddFields("cpu", fields, tags, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.lastStats = times
|
s.lastStats = times
|
||||||
|
|||||||
106
plugins/system/cpu_test.go
Normal file
106
plugins/system/cpu_test.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/shirou/gopsutil/cpu"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCPUStats(t *testing.T) {
|
||||||
|
var mps MockPS
|
||||||
|
defer mps.AssertExpectations(t)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
cts := cpu.CPUTimesStat{
|
||||||
|
CPU: "cpu0",
|
||||||
|
User: 3.1,
|
||||||
|
System: 8.2,
|
||||||
|
Idle: 80.1,
|
||||||
|
Nice: 1.3,
|
||||||
|
Iowait: 0.2,
|
||||||
|
Irq: 0.1,
|
||||||
|
Softirq: 0.11,
|
||||||
|
Steal: 0.0511,
|
||||||
|
Guest: 8.1,
|
||||||
|
GuestNice: 0.324,
|
||||||
|
}
|
||||||
|
|
||||||
|
cts2 := cpu.CPUTimesStat{
|
||||||
|
CPU: "cpu0",
|
||||||
|
User: 11.4, // increased by 8.3
|
||||||
|
System: 10.9, // increased by 2.7
|
||||||
|
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
|
||||||
|
Nice: 2.5, // increased by 1.2
|
||||||
|
Iowait: 0.7, // increased by 0.5
|
||||||
|
Irq: 1.2, // increased by 1.1
|
||||||
|
Softirq: 0.31, // increased by 0.2
|
||||||
|
Steal: 0.2812, // increased by 0.0001
|
||||||
|
Guest: 12.9, // increased by 4.8
|
||||||
|
GuestNice: 2.524, // increased by 2.2
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
|
||||||
|
|
||||||
|
cs := NewCPUStats(&mps)
|
||||||
|
|
||||||
|
cputags := map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := cs.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
numCPUPoints := len(acc.Points)
|
||||||
|
|
||||||
|
expectedCPUPoints := 10
|
||||||
|
assert.Equal(t, expectedCPUPoints, numCPUPoints)
|
||||||
|
|
||||||
|
// Computed values are checked with delta > 0 becasue of floating point arithmatic
|
||||||
|
// imprecision
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
|
||||||
|
|
||||||
|
mps2 := MockPS{}
|
||||||
|
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
|
||||||
|
cs.ps = &mps2
|
||||||
|
|
||||||
|
// Should have added cpu percentages too
|
||||||
|
err = cs.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
numCPUPoints = len(acc.Points) - numCPUPoints
|
||||||
|
expectedCPUPoints = 20
|
||||||
|
assert.Equal(t, expectedCPUPoints, numCPUPoints)
|
||||||
|
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
|
||||||
|
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
|
||||||
|
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
|
||||||
|
}
|
||||||
@@ -50,12 +50,15 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {
|
|||||||
"path": du.Path,
|
"path": du.Path,
|
||||||
"fstype": du.Fstype,
|
"fstype": du.Fstype,
|
||||||
}
|
}
|
||||||
acc.Add("total", du.Total, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("free", du.Free, tags)
|
"total": du.Total,
|
||||||
acc.Add("used", du.Total-du.Free, tags)
|
"free": du.Free,
|
||||||
acc.Add("inodes_total", du.InodesTotal, tags)
|
"used": du.Total - du.Free,
|
||||||
acc.Add("inodes_free", du.InodesFree, tags)
|
"inodes_total": du.InodesTotal,
|
||||||
acc.Add("inodes_used", du.InodesTotal-du.InodesFree, tags)
|
"inodes_free": du.InodesFree,
|
||||||
|
"inodes_used": du.InodesTotal - du.InodesFree,
|
||||||
|
}
|
||||||
|
acc.AddFields("disk", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -115,13 +118,16 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("reads", io.ReadCount, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("writes", io.WriteCount, tags)
|
"reads": io.ReadCount,
|
||||||
acc.Add("read_bytes", io.ReadBytes, tags)
|
"writes": io.WriteCount,
|
||||||
acc.Add("write_bytes", io.WriteBytes, tags)
|
"read_bytes": io.ReadBytes,
|
||||||
acc.Add("read_time", io.ReadTime, tags)
|
"write_bytes": io.WriteBytes,
|
||||||
acc.Add("write_time", io.WriteTime, tags)
|
"read_time": io.ReadTime,
|
||||||
acc.Add("io_time", io.IoTime, tags)
|
"write_time": io.WriteTime,
|
||||||
|
"io_time": io.IoTime,
|
||||||
|
}
|
||||||
|
acc.AddFields("diskio", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -132,7 +138,7 @@ func init() {
|
|||||||
return &DiskStats{ps: &systemPS{}}
|
return &DiskStats{ps: &systemPS{}}
|
||||||
})
|
})
|
||||||
|
|
||||||
plugins.Add("io", func() plugins.Plugin {
|
plugins.Add("diskio", func() plugins.Plugin {
|
||||||
return &DiskIOStats{ps: &systemPS{}}
|
return &DiskIOStats{ps: &systemPS{}}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
161
plugins/system/disk_test.go
Normal file
161
plugins/system/disk_test.go
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/shirou/gopsutil/disk"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDiskStats(t *testing.T) {
|
||||||
|
var mps MockPS
|
||||||
|
defer mps.AssertExpectations(t)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
var err error
|
||||||
|
|
||||||
|
du := []*disk.DiskUsageStat{
|
||||||
|
{
|
||||||
|
Path: "/",
|
||||||
|
Fstype: "ext4",
|
||||||
|
Total: 128,
|
||||||
|
Free: 23,
|
||||||
|
InodesTotal: 1234,
|
||||||
|
InodesFree: 234,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: "/home",
|
||||||
|
Fstype: "ext4",
|
||||||
|
Total: 256,
|
||||||
|
Free: 46,
|
||||||
|
InodesTotal: 2468,
|
||||||
|
InodesFree: 468,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("DiskUsage").Return(du, nil)
|
||||||
|
|
||||||
|
err = (&DiskStats{ps: &mps}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
numDiskPoints := len(acc.Points)
|
||||||
|
expectedAllDiskPoints := 12
|
||||||
|
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
|
||||||
|
|
||||||
|
tags1 := map[string]string{
|
||||||
|
"path": "/",
|
||||||
|
"fstype": "ext4",
|
||||||
|
}
|
||||||
|
tags2 := map[string]string{
|
||||||
|
"path": "/home",
|
||||||
|
"fstype": "ext4",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
|
||||||
|
|
||||||
|
// We expect 6 more DiskPoints to show up with an explicit match on "/"
|
||||||
|
// and /home not matching the /dev in Mountpoints
|
||||||
|
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
|
||||||
|
assert.Equal(t, expectedAllDiskPoints+6, len(acc.Points))
|
||||||
|
|
||||||
|
// We should see all the diskpoints as Mountpoints includes both
|
||||||
|
// / and /home
|
||||||
|
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
|
||||||
|
assert.Equal(t, 2*expectedAllDiskPoints+6, len(acc.Points))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDiskIOStats(t *testing.T) {
|
||||||
|
var mps MockPS
|
||||||
|
defer mps.AssertExpectations(t)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
var err error
|
||||||
|
|
||||||
|
diskio1 := disk.DiskIOCountersStat{
|
||||||
|
|
||||||
|
ReadCount: 888,
|
||||||
|
WriteCount: 5341,
|
||||||
|
ReadBytes: 100000,
|
||||||
|
WriteBytes: 200000,
|
||||||
|
ReadTime: 7123,
|
||||||
|
WriteTime: 9087,
|
||||||
|
Name: "sda1",
|
||||||
|
IoTime: 123552,
|
||||||
|
SerialNumber: "ab-123-ad",
|
||||||
|
}
|
||||||
|
diskio2 := disk.DiskIOCountersStat{
|
||||||
|
ReadCount: 444,
|
||||||
|
WriteCount: 2341,
|
||||||
|
ReadBytes: 200000,
|
||||||
|
WriteBytes: 400000,
|
||||||
|
ReadTime: 3123,
|
||||||
|
WriteTime: 6087,
|
||||||
|
Name: "sdb1",
|
||||||
|
IoTime: 246552,
|
||||||
|
SerialNumber: "bb-123-ad",
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("DiskIO").Return(
|
||||||
|
map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2},
|
||||||
|
nil)
|
||||||
|
|
||||||
|
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
numDiskIOPoints := len(acc.Points)
|
||||||
|
expectedAllDiskIOPoints := 14
|
||||||
|
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
|
||||||
|
|
||||||
|
dtags1 := map[string]string{
|
||||||
|
"name": "sda1",
|
||||||
|
"serial": "ab-123-ad",
|
||||||
|
}
|
||||||
|
dtags2 := map[string]string{
|
||||||
|
"name": "sdb1",
|
||||||
|
"serial": "bb-123-ad",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
|
||||||
|
|
||||||
|
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
|
||||||
|
// and serial should be missing from the tags with SkipSerialNumber set
|
||||||
|
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
|
||||||
|
assert.Equal(t, expectedAllDiskIOPoints+7, len(acc.Points))
|
||||||
|
|
||||||
|
dtags3 := map[string]string{
|
||||||
|
"name": "sdb1",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
|
||||||
|
}
|
||||||
@@ -36,44 +36,47 @@ func (s *DockerStats) Gather(acc plugins.Accumulator) error {
|
|||||||
|
|
||||||
cts := cont.CPU
|
cts := cont.CPU
|
||||||
|
|
||||||
acc.Add("user", cts.User, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("system", cts.System, tags)
|
"user": cts.User,
|
||||||
acc.Add("idle", cts.Idle, tags)
|
"system": cts.System,
|
||||||
acc.Add("nice", cts.Nice, tags)
|
"idle": cts.Idle,
|
||||||
acc.Add("iowait", cts.Iowait, tags)
|
"nice": cts.Nice,
|
||||||
acc.Add("irq", cts.Irq, tags)
|
"iowait": cts.Iowait,
|
||||||
acc.Add("softirq", cts.Softirq, tags)
|
"irq": cts.Irq,
|
||||||
acc.Add("steal", cts.Steal, tags)
|
"softirq": cts.Softirq,
|
||||||
acc.Add("guest", cts.Guest, tags)
|
"steal": cts.Steal,
|
||||||
acc.Add("guest_nice", cts.GuestNice, tags)
|
"guest": cts.Guest,
|
||||||
|
"guest_nice": cts.GuestNice,
|
||||||
|
|
||||||
acc.Add("cache", cont.Mem.Cache, tags)
|
"cache": cont.Mem.Cache,
|
||||||
acc.Add("rss", cont.Mem.RSS, tags)
|
"rss": cont.Mem.RSS,
|
||||||
acc.Add("rss_huge", cont.Mem.RSSHuge, tags)
|
"rss_huge": cont.Mem.RSSHuge,
|
||||||
acc.Add("mapped_file", cont.Mem.MappedFile, tags)
|
"mapped_file": cont.Mem.MappedFile,
|
||||||
acc.Add("swap_in", cont.Mem.Pgpgin, tags)
|
"swap_in": cont.Mem.Pgpgin,
|
||||||
acc.Add("swap_out", cont.Mem.Pgpgout, tags)
|
"swap_out": cont.Mem.Pgpgout,
|
||||||
acc.Add("page_fault", cont.Mem.Pgfault, tags)
|
"page_fault": cont.Mem.Pgfault,
|
||||||
acc.Add("page_major_fault", cont.Mem.Pgmajfault, tags)
|
"page_major_fault": cont.Mem.Pgmajfault,
|
||||||
acc.Add("inactive_anon", cont.Mem.InactiveAnon, tags)
|
"inactive_anon": cont.Mem.InactiveAnon,
|
||||||
acc.Add("active_anon", cont.Mem.ActiveAnon, tags)
|
"active_anon": cont.Mem.ActiveAnon,
|
||||||
acc.Add("inactive_file", cont.Mem.InactiveFile, tags)
|
"inactive_file": cont.Mem.InactiveFile,
|
||||||
acc.Add("active_file", cont.Mem.ActiveFile, tags)
|
"active_file": cont.Mem.ActiveFile,
|
||||||
acc.Add("unevictable", cont.Mem.Unevictable, tags)
|
"unevictable": cont.Mem.Unevictable,
|
||||||
acc.Add("memory_limit", cont.Mem.HierarchicalMemoryLimit, tags)
|
"memory_limit": cont.Mem.HierarchicalMemoryLimit,
|
||||||
acc.Add("total_cache", cont.Mem.TotalCache, tags)
|
"total_cache": cont.Mem.TotalCache,
|
||||||
acc.Add("total_rss", cont.Mem.TotalRSS, tags)
|
"total_rss": cont.Mem.TotalRSS,
|
||||||
acc.Add("total_rss_huge", cont.Mem.TotalRSSHuge, tags)
|
"total_rss_huge": cont.Mem.TotalRSSHuge,
|
||||||
acc.Add("total_mapped_file", cont.Mem.TotalMappedFile, tags)
|
"total_mapped_file": cont.Mem.TotalMappedFile,
|
||||||
acc.Add("total_swap_in", cont.Mem.TotalPgpgIn, tags)
|
"total_swap_in": cont.Mem.TotalPgpgIn,
|
||||||
acc.Add("total_swap_out", cont.Mem.TotalPgpgOut, tags)
|
"total_swap_out": cont.Mem.TotalPgpgOut,
|
||||||
acc.Add("total_page_fault", cont.Mem.TotalPgFault, tags)
|
"total_page_fault": cont.Mem.TotalPgFault,
|
||||||
acc.Add("total_page_major_fault", cont.Mem.TotalPgMajFault, tags)
|
"total_page_major_fault": cont.Mem.TotalPgMajFault,
|
||||||
acc.Add("total_inactive_anon", cont.Mem.TotalInactiveAnon, tags)
|
"total_inactive_anon": cont.Mem.TotalInactiveAnon,
|
||||||
acc.Add("total_active_anon", cont.Mem.TotalActiveAnon, tags)
|
"total_active_anon": cont.Mem.TotalActiveAnon,
|
||||||
acc.Add("total_inactive_file", cont.Mem.TotalInactiveFile, tags)
|
"total_inactive_file": cont.Mem.TotalInactiveFile,
|
||||||
acc.Add("total_active_file", cont.Mem.TotalActiveFile, tags)
|
"total_active_file": cont.Mem.TotalActiveFile,
|
||||||
acc.Add("total_unevictable", cont.Mem.TotalUnevictable, tags)
|
"total_unevictable": cont.Mem.TotalUnevictable,
|
||||||
|
}
|
||||||
|
acc.AddFields("docker", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -22,18 +22,17 @@ func (s *MemStats) Gather(acc plugins.Accumulator) error {
|
|||||||
return fmt.Errorf("error getting virtual memory info: %s", err)
|
return fmt.Errorf("error getting virtual memory info: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
vmtags := map[string]string(nil)
|
fields := map[string]interface{}{
|
||||||
|
"total": vm.Total,
|
||||||
acc.Add("total", vm.Total, vmtags)
|
"available": vm.Available,
|
||||||
acc.Add("available", vm.Available, vmtags)
|
"used": vm.Used,
|
||||||
acc.Add("used", vm.Used, vmtags)
|
"free": vm.Free,
|
||||||
acc.Add("free", vm.Free, vmtags)
|
"cached": vm.Cached,
|
||||||
acc.Add("cached", vm.Cached, vmtags)
|
"buffered": vm.Buffers,
|
||||||
acc.Add("buffered", vm.Buffers, vmtags)
|
"used_percent": 100 * float64(vm.Used) / float64(vm.Total),
|
||||||
acc.Add("used_percent", 100*float64(vm.Used)/float64(vm.Total), vmtags)
|
"available_percent": 100 * float64(vm.Available) / float64(vm.Total),
|
||||||
acc.Add("available_percent",
|
}
|
||||||
100*float64(vm.Available)/float64(vm.Total),
|
acc.AddFields("mem", fields, nil)
|
||||||
vmtags)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -54,14 +53,15 @@ func (s *SwapStats) Gather(acc plugins.Accumulator) error {
|
|||||||
return fmt.Errorf("error getting swap memory info: %s", err)
|
return fmt.Errorf("error getting swap memory info: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
swaptags := map[string]string(nil)
|
fields := map[string]interface{}{
|
||||||
|
"total": swap.Total,
|
||||||
acc.Add("total", swap.Total, swaptags)
|
"used": swap.Used,
|
||||||
acc.Add("used", swap.Used, swaptags)
|
"free": swap.Free,
|
||||||
acc.Add("free", swap.Free, swaptags)
|
"used_percent": swap.UsedPercent,
|
||||||
acc.Add("used_percent", swap.UsedPercent, swaptags)
|
"in": swap.Sin,
|
||||||
acc.Add("in", swap.Sin, swaptags)
|
"out": swap.Sout,
|
||||||
acc.Add("out", swap.Sout, swaptags)
|
}
|
||||||
|
acc.AddFields("swap", fields, nil)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
73
plugins/system/memory_test.go
Normal file
73
plugins/system/memory_test.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/shirou/gopsutil/mem"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMemStats(t *testing.T) {
|
||||||
|
var mps MockPS
|
||||||
|
var err error
|
||||||
|
defer mps.AssertExpectations(t)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
vms := &mem.VirtualMemoryStat{
|
||||||
|
Total: 12400,
|
||||||
|
Available: 7600,
|
||||||
|
Used: 5000,
|
||||||
|
Free: 1235,
|
||||||
|
// Active: 8134,
|
||||||
|
// Inactive: 1124,
|
||||||
|
// Buffers: 771,
|
||||||
|
// Cached: 4312,
|
||||||
|
// Wired: 134,
|
||||||
|
// Shared: 2142,
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("VMStat").Return(vms, nil)
|
||||||
|
|
||||||
|
sms := &mem.SwapMemoryStat{
|
||||||
|
Total: 8123,
|
||||||
|
Used: 1232,
|
||||||
|
Free: 6412,
|
||||||
|
UsedPercent: 12.2,
|
||||||
|
Sin: 7,
|
||||||
|
Sout: 830,
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("SwapStat").Return(sms, nil)
|
||||||
|
|
||||||
|
err = (&MemStats{&mps}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vmtags := map[string]string(nil)
|
||||||
|
|
||||||
|
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("available_percent",
|
||||||
|
float64(7600)/float64(12400)*100,
|
||||||
|
vmtags))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("used_percent",
|
||||||
|
float64(5000)/float64(12400)*100,
|
||||||
|
vmtags))
|
||||||
|
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
|
||||||
|
|
||||||
|
acc.Points = nil
|
||||||
|
|
||||||
|
err = (&SwapStats{&mps}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
swaptags := map[string]string(nil)
|
||||||
|
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
|
||||||
|
}
|
||||||
@@ -70,14 +70,17 @@ func (s *NetIOStats) Gather(acc plugins.Accumulator) error {
|
|||||||
"interface": io.Name,
|
"interface": io.Name,
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("bytes_sent", io.BytesSent, tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("bytes_recv", io.BytesRecv, tags)
|
"bytes_sent": io.BytesSent,
|
||||||
acc.Add("packets_sent", io.PacketsSent, tags)
|
"bytes_recv": io.BytesRecv,
|
||||||
acc.Add("packets_recv", io.PacketsRecv, tags)
|
"packets_sent": io.PacketsSent,
|
||||||
acc.Add("err_in", io.Errin, tags)
|
"packets_recv": io.PacketsRecv,
|
||||||
acc.Add("err_out", io.Errout, tags)
|
"err_in": io.Errin,
|
||||||
acc.Add("drop_in", io.Dropin, tags)
|
"err_out": io.Errout,
|
||||||
acc.Add("drop_out", io.Dropout, tags)
|
"drop_in": io.Dropin,
|
||||||
|
"drop_out": io.Dropout,
|
||||||
|
}
|
||||||
|
acc.AddFields("net", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get system wide stats for different network protocols
|
// Get system wide stats for different network protocols
|
||||||
|
|||||||
88
plugins/system/net_test.go
Normal file
88
plugins/system/net_test.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/shirou/gopsutil/net"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNetStats(t *testing.T) {
|
||||||
|
var mps MockPS
|
||||||
|
var err error
|
||||||
|
defer mps.AssertExpectations(t)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
netio := net.NetIOCountersStat{
|
||||||
|
Name: "eth0",
|
||||||
|
BytesSent: 1123,
|
||||||
|
BytesRecv: 8734422,
|
||||||
|
PacketsSent: 781,
|
||||||
|
PacketsRecv: 23456,
|
||||||
|
Errin: 832,
|
||||||
|
Errout: 8,
|
||||||
|
Dropin: 7,
|
||||||
|
Dropout: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
|
||||||
|
|
||||||
|
netprotos := []net.NetProtoCountersStat{
|
||||||
|
net.NetProtoCountersStat{
|
||||||
|
Protocol: "Udp",
|
||||||
|
Stats: map[string]int64{
|
||||||
|
"InDatagrams": 4655,
|
||||||
|
"NoPorts": 892592,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
mps.On("NetProto").Return(netprotos, nil)
|
||||||
|
|
||||||
|
netstats := []net.NetConnectionStat{
|
||||||
|
net.NetConnectionStat{
|
||||||
|
Type: syscall.SOCK_DGRAM,
|
||||||
|
},
|
||||||
|
net.NetConnectionStat{
|
||||||
|
Status: "ESTABLISHED",
|
||||||
|
},
|
||||||
|
net.NetConnectionStat{
|
||||||
|
Status: "ESTABLISHED",
|
||||||
|
},
|
||||||
|
net.NetConnectionStat{
|
||||||
|
Status: "CLOSE",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mps.On("NetConnections").Return(netstats, nil)
|
||||||
|
|
||||||
|
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ntags := map[string]string{
|
||||||
|
"interface": "eth0",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
|
||||||
|
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
|
||||||
|
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
|
||||||
|
|
||||||
|
acc.Points = nil
|
||||||
|
|
||||||
|
err = (&NetStats{&mps}).Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
netstattags := map[string]string(nil)
|
||||||
|
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
|
||||||
|
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
|
||||||
|
}
|
||||||
@@ -42,19 +42,23 @@ func (s *NetStats) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
counts[netcon.Status] = c + 1
|
counts[netcon.Status] = c + 1
|
||||||
}
|
}
|
||||||
acc.Add("tcp_established", counts["ESTABLISHED"], tags)
|
|
||||||
acc.Add("tcp_syn_sent", counts["SYN_SENT"], tags)
|
fields := map[string]interface{}{
|
||||||
acc.Add("tcp_syn_recv", counts["SYN_RECV"], tags)
|
"tcp_established": counts["ESTABLISHED"],
|
||||||
acc.Add("tcp_fin_wait1", counts["FIN_WAIT1"], tags)
|
"tcp_syn_sent": counts["SYN_SENT"],
|
||||||
acc.Add("tcp_fin_wait2", counts["FIN_WAIT2"], tags)
|
"tcp_syn_recv": counts["SYN_RECV"],
|
||||||
acc.Add("tcp_time_wait", counts["TIME_WAIT"], tags)
|
"tcp_fin_wait1": counts["FIN_WAIT1"],
|
||||||
acc.Add("tcp_close", counts["CLOSE"], tags)
|
"tcp_fin_wait2": counts["FIN_WAIT2"],
|
||||||
acc.Add("tcp_close_wait", counts["CLOSE_WAIT"], tags)
|
"tcp_time_wait": counts["TIME_WAIT"],
|
||||||
acc.Add("tcp_last_ack", counts["LAST_ACK"], tags)
|
"tcp_close": counts["CLOSE"],
|
||||||
acc.Add("tcp_listen", counts["LISTEN"], tags)
|
"tcp_close_wait": counts["CLOSE_WAIT"],
|
||||||
acc.Add("tcp_closing", counts["CLOSING"], tags)
|
"tcp_last_ack": counts["LAST_ACK"],
|
||||||
acc.Add("tcp_none", counts["NONE"], tags)
|
"tcp_listen": counts["LISTEN"],
|
||||||
acc.Add("udp_socket", counts["UDP"], tags)
|
"tcp_closing": counts["CLOSING"],
|
||||||
|
"tcp_none": counts["NONE"],
|
||||||
|
"udp_socket": counts["UDP"],
|
||||||
|
}
|
||||||
|
acc.AddFields("netstat", fields, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,12 +1,16 @@
|
|||||||
package system
|
package system
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
gonet "net"
|
gonet "net"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/internal"
|
"github.com/influxdb/telegraf/internal"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
|
||||||
dc "github.com/fsouza/go-dockerclient"
|
dc "github.com/fsouza/go-dockerclient"
|
||||||
"github.com/shirou/gopsutil/cpu"
|
"github.com/shirou/gopsutil/cpu"
|
||||||
@@ -14,6 +18,8 @@ import (
|
|||||||
"github.com/shirou/gopsutil/docker"
|
"github.com/shirou/gopsutil/docker"
|
||||||
"github.com/shirou/gopsutil/mem"
|
"github.com/shirou/gopsutil/mem"
|
||||||
"github.com/shirou/gopsutil/net"
|
"github.com/shirou/gopsutil/net"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DockerContainerStat struct {
|
type DockerContainerStat struct {
|
||||||
@@ -166,3 +172,49 @@ func (s *systemPS) DockerStat() ([]*DockerContainerStat, error) {
|
|||||||
|
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Asserts that a given accumulator contains a measurment of type float64 with
|
||||||
|
// specific tags within a certain distance of a given expected value. Asserts a failure
|
||||||
|
// if the measurement is of the wrong type, or if no matching measurements are found
|
||||||
|
//
|
||||||
|
// Paramaters:
|
||||||
|
// t *testing.T : Testing object to use
|
||||||
|
// acc testutil.Accumulator: Accumulator to examine
|
||||||
|
// measurement string : Name of the measurement to examine
|
||||||
|
// expectedValue float64 : Value to search for within the measurement
|
||||||
|
// delta float64 : Maximum acceptable distance of an accumulated value
|
||||||
|
// from the expectedValue parameter. Useful when
|
||||||
|
// floating-point arithmatic imprecision makes looking
|
||||||
|
// for an exact match impractical
|
||||||
|
// tags map[string]string : Tag set the found measurement must have. Set to nil to
|
||||||
|
// ignore the tag set.
|
||||||
|
func assertContainsTaggedFloat(
|
||||||
|
t *testing.T,
|
||||||
|
acc *testutil.Accumulator,
|
||||||
|
measurement string,
|
||||||
|
expectedValue float64,
|
||||||
|
delta float64,
|
||||||
|
tags map[string]string,
|
||||||
|
) {
|
||||||
|
var actualValue float64
|
||||||
|
for _, pt := range acc.Points {
|
||||||
|
if pt.Measurement == measurement {
|
||||||
|
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
|
||||||
|
if value, ok := pt.Fields["value"].(float64); ok {
|
||||||
|
actualValue = value
|
||||||
|
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
|
||||||
|
// Found the point, return without failing
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
|
||||||
|
measurement))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
|
||||||
|
measurement, delta, expectedValue, actualValue)
|
||||||
|
assert.Fail(t, msg)
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,13 +19,6 @@ func (_ *SystemStats) Description() string {
|
|||||||
|
|
||||||
func (_ *SystemStats) SampleConfig() string { return "" }
|
func (_ *SystemStats) SampleConfig() string { return "" }
|
||||||
|
|
||||||
func (_ *SystemStats) add(acc plugins.Accumulator,
|
|
||||||
name string, val float64, tags map[string]string) {
|
|
||||||
if val >= 0 {
|
|
||||||
acc.Add(name, val, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
||||||
loadavg, err := load.LoadAvg()
|
loadavg, err := load.LoadAvg()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -37,11 +30,14 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add("load1", loadavg.Load1, nil)
|
fields := map[string]interface{}{
|
||||||
acc.Add("load5", loadavg.Load5, nil)
|
"load1": loadavg.Load1,
|
||||||
acc.Add("load15", loadavg.Load15, nil)
|
"load5": loadavg.Load5,
|
||||||
acc.Add("uptime", float64(hostinfo.Uptime), nil)
|
"load15": loadavg.Load15,
|
||||||
acc.Add("uptime_format", format_uptime(hostinfo.Uptime), nil)
|
"uptime": hostinfo.Uptime,
|
||||||
|
"uptime_format": format_uptime(hostinfo.Uptime),
|
||||||
|
}
|
||||||
|
acc.AddFields("system", fields, nil)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,426 +0,0 @@
|
|||||||
package system
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"syscall"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/testutil"
|
|
||||||
"github.com/shirou/gopsutil/cpu"
|
|
||||||
"github.com/shirou/gopsutil/disk"
|
|
||||||
"github.com/shirou/gopsutil/mem"
|
|
||||||
"github.com/shirou/gopsutil/net"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestSystemStats_GenerateStats(t *testing.T) {
|
|
||||||
var mps MockPS
|
|
||||||
|
|
||||||
defer mps.AssertExpectations(t)
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
cts := cpu.CPUTimesStat{
|
|
||||||
CPU: "cpu0",
|
|
||||||
User: 3.1,
|
|
||||||
System: 8.2,
|
|
||||||
Idle: 80.1,
|
|
||||||
Nice: 1.3,
|
|
||||||
Iowait: 0.2,
|
|
||||||
Irq: 0.1,
|
|
||||||
Softirq: 0.11,
|
|
||||||
Steal: 0.0511,
|
|
||||||
Guest: 8.1,
|
|
||||||
GuestNice: 0.324,
|
|
||||||
}
|
|
||||||
|
|
||||||
cts2 := cpu.CPUTimesStat{
|
|
||||||
CPU: "cpu0",
|
|
||||||
User: 11.4, // increased by 8.3
|
|
||||||
System: 10.9, // increased by 2.7
|
|
||||||
Idle: 158.8699, // increased by 78.7699 (for total increase of 100)
|
|
||||||
Nice: 2.5, // increased by 1.2
|
|
||||||
Iowait: 0.7, // increased by 0.5
|
|
||||||
Irq: 1.2, // increased by 1.1
|
|
||||||
Softirq: 0.31, // increased by 0.2
|
|
||||||
Steal: 0.2812, // increased by 0.0001
|
|
||||||
Guest: 12.9, // increased by 4.8
|
|
||||||
GuestNice: 2.524, // increased by 2.2
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("CPUTimes").Return([]cpu.CPUTimesStat{cts}, nil)
|
|
||||||
|
|
||||||
du := []*disk.DiskUsageStat{
|
|
||||||
{
|
|
||||||
Path: "/",
|
|
||||||
Fstype: "ext4",
|
|
||||||
Total: 128,
|
|
||||||
Free: 23,
|
|
||||||
InodesTotal: 1234,
|
|
||||||
InodesFree: 234,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Path: "/home",
|
|
||||||
Fstype: "ext4",
|
|
||||||
Total: 256,
|
|
||||||
Free: 46,
|
|
||||||
InodesTotal: 2468,
|
|
||||||
InodesFree: 468,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("DiskUsage").Return(du, nil)
|
|
||||||
|
|
||||||
diskio1 := disk.DiskIOCountersStat{
|
|
||||||
|
|
||||||
ReadCount: 888,
|
|
||||||
WriteCount: 5341,
|
|
||||||
ReadBytes: 100000,
|
|
||||||
WriteBytes: 200000,
|
|
||||||
ReadTime: 7123,
|
|
||||||
WriteTime: 9087,
|
|
||||||
Name: "sda1",
|
|
||||||
IoTime: 123552,
|
|
||||||
SerialNumber: "ab-123-ad",
|
|
||||||
}
|
|
||||||
diskio2 := disk.DiskIOCountersStat{
|
|
||||||
ReadCount: 444,
|
|
||||||
WriteCount: 2341,
|
|
||||||
ReadBytes: 200000,
|
|
||||||
WriteBytes: 400000,
|
|
||||||
ReadTime: 3123,
|
|
||||||
WriteTime: 6087,
|
|
||||||
Name: "sdb1",
|
|
||||||
IoTime: 246552,
|
|
||||||
SerialNumber: "bb-123-ad",
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("DiskIO").Return(map[string]disk.DiskIOCountersStat{"sda1": diskio1, "sdb1": diskio2}, nil)
|
|
||||||
|
|
||||||
netio := net.NetIOCountersStat{
|
|
||||||
Name: "eth0",
|
|
||||||
BytesSent: 1123,
|
|
||||||
BytesRecv: 8734422,
|
|
||||||
PacketsSent: 781,
|
|
||||||
PacketsRecv: 23456,
|
|
||||||
Errin: 832,
|
|
||||||
Errout: 8,
|
|
||||||
Dropin: 7,
|
|
||||||
Dropout: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("NetIO").Return([]net.NetIOCountersStat{netio}, nil)
|
|
||||||
|
|
||||||
netprotos := []net.NetProtoCountersStat{
|
|
||||||
net.NetProtoCountersStat{
|
|
||||||
Protocol: "Udp",
|
|
||||||
Stats: map[string]int64{
|
|
||||||
"InDatagrams": 4655,
|
|
||||||
"NoPorts": 892592,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
mps.On("NetProto").Return(netprotos, nil)
|
|
||||||
|
|
||||||
vms := &mem.VirtualMemoryStat{
|
|
||||||
Total: 12400,
|
|
||||||
Available: 7600,
|
|
||||||
Used: 5000,
|
|
||||||
Free: 1235,
|
|
||||||
// Active: 8134,
|
|
||||||
// Inactive: 1124,
|
|
||||||
// Buffers: 771,
|
|
||||||
// Cached: 4312,
|
|
||||||
// Wired: 134,
|
|
||||||
// Shared: 2142,
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("VMStat").Return(vms, nil)
|
|
||||||
|
|
||||||
sms := &mem.SwapMemoryStat{
|
|
||||||
Total: 8123,
|
|
||||||
Used: 1232,
|
|
||||||
Free: 6412,
|
|
||||||
UsedPercent: 12.2,
|
|
||||||
Sin: 7,
|
|
||||||
Sout: 830,
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("SwapStat").Return(sms, nil)
|
|
||||||
|
|
||||||
netstats := []net.NetConnectionStat{
|
|
||||||
net.NetConnectionStat{
|
|
||||||
Type: syscall.SOCK_DGRAM,
|
|
||||||
},
|
|
||||||
net.NetConnectionStat{
|
|
||||||
Status: "ESTABLISHED",
|
|
||||||
},
|
|
||||||
net.NetConnectionStat{
|
|
||||||
Status: "ESTABLISHED",
|
|
||||||
},
|
|
||||||
net.NetConnectionStat{
|
|
||||||
Status: "CLOSE",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
mps.On("NetConnections").Return(netstats, nil)
|
|
||||||
|
|
||||||
cs := NewCPUStats(&mps)
|
|
||||||
|
|
||||||
cputags := map[string]string{
|
|
||||||
"cpu": "cpu0",
|
|
||||||
}
|
|
||||||
|
|
||||||
preCPUPoints := len(acc.Points)
|
|
||||||
err := cs.Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
numCPUPoints := len(acc.Points) - preCPUPoints
|
|
||||||
|
|
||||||
expectedCPUPoints := 10
|
|
||||||
assert.Equal(t, expectedCPUPoints, numCPUPoints)
|
|
||||||
|
|
||||||
// Computed values are checked with delta > 0 becasue of floating point arithmatic
|
|
||||||
// imprecision
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_user", 3.1, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_system", 8.2, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_idle", 80.1, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_nice", 1.3, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.2, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_irq", 0.1, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.11, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_steal", 0.0511, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_guest", 8.1, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 0.324, 0, cputags)
|
|
||||||
|
|
||||||
mps2 := MockPS{}
|
|
||||||
mps2.On("CPUTimes").Return([]cpu.CPUTimesStat{cts2}, nil)
|
|
||||||
cs.ps = &mps2
|
|
||||||
|
|
||||||
// Should have added cpu percentages too
|
|
||||||
err = cs.Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numCPUPoints = len(acc.Points) - (preCPUPoints + numCPUPoints)
|
|
||||||
expectedCPUPoints = 20
|
|
||||||
assert.Equal(t, expectedCPUPoints, numCPUPoints)
|
|
||||||
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_user", 11.4, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_system", 10.9, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_idle", 158.8699, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_nice", 2.5, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_iowait", 0.7, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_irq", 1.2, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_softirq", 0.31, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_steal", 0.2812, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_guest", 12.9, 0, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "time_guest_nice", 2.524, 0, cputags)
|
|
||||||
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_user", 8.3, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_system", 2.7, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_idle", 78.7699, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_nice", 1.2, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_iowait", 0.5, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_irq", 1.1, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_softirq", 0.2, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_steal", 0.2301, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_guest", 4.8, 0.0005, cputags)
|
|
||||||
assertContainsTaggedFloat(t, &acc, "usage_guest_nice", 2.2, 0.0005, cputags)
|
|
||||||
|
|
||||||
preDiskPoints := len(acc.Points)
|
|
||||||
|
|
||||||
err = (&DiskStats{ps: &mps}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numDiskPoints := len(acc.Points) - preDiskPoints
|
|
||||||
expectedAllDiskPoints := 12
|
|
||||||
assert.Equal(t, expectedAllDiskPoints, numDiskPoints)
|
|
||||||
|
|
||||||
tags1 := map[string]string{
|
|
||||||
"path": "/",
|
|
||||||
"fstype": "ext4",
|
|
||||||
}
|
|
||||||
tags2 := map[string]string{
|
|
||||||
"path": "/home",
|
|
||||||
"fstype": "ext4",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("used", uint64(105), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("free", uint64(23), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(1234), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(234), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(1000), tags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("total", uint64(256), tags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("used", uint64(210), tags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("free", uint64(46), tags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_total", uint64(2468), tags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_free", uint64(468), tags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("inodes_used", uint64(2000), tags2))
|
|
||||||
|
|
||||||
// We expect 6 more DiskPoints to show up with an explicit match on "/"
|
|
||||||
// and /home not matching the /dev in Mountpoints
|
|
||||||
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/dev"}}).Gather(&acc)
|
|
||||||
assert.Equal(t, preDiskPoints+expectedAllDiskPoints+6, len(acc.Points))
|
|
||||||
|
|
||||||
// We should see all the diskpoints as Mountpoints includes both
|
|
||||||
// / and /home
|
|
||||||
err = (&DiskStats{ps: &mps, Mountpoints: []string{"/", "/home"}}).Gather(&acc)
|
|
||||||
assert.Equal(t, preDiskPoints+2*expectedAllDiskPoints+6, len(acc.Points))
|
|
||||||
|
|
||||||
err = (&NetIOStats{ps: &mps, skipChecks: true}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
ntags := map[string]string{
|
|
||||||
"interface": "eth0",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("bytes_sent", uint64(1123), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("bytes_recv", uint64(8734422), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("packets_sent", uint64(781), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("packets_recv", uint64(23456), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("err_in", uint64(832), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("err_out", uint64(8), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("drop_in", uint64(7), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("drop_out", uint64(1), ntags))
|
|
||||||
assert.NoError(t, acc.ValidateValue("udp_noports", int64(892592)))
|
|
||||||
assert.NoError(t, acc.ValidateValue("udp_indatagrams", int64(4655)))
|
|
||||||
|
|
||||||
preDiskIOPoints := len(acc.Points)
|
|
||||||
|
|
||||||
err = (&DiskIOStats{ps: &mps}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
numDiskIOPoints := len(acc.Points) - preDiskIOPoints
|
|
||||||
expectedAllDiskIOPoints := 14
|
|
||||||
assert.Equal(t, expectedAllDiskIOPoints, numDiskIOPoints)
|
|
||||||
|
|
||||||
dtags1 := map[string]string{
|
|
||||||
"name": "sda1",
|
|
||||||
"serial": "ab-123-ad",
|
|
||||||
}
|
|
||||||
dtags2 := map[string]string{
|
|
||||||
"name": "sdb1",
|
|
||||||
"serial": "bb-123-ad",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.True(t, acc.CheckTaggedValue("reads", uint64(888), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("writes", uint64(5341), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(100000), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(200000), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_time", uint64(7123), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_time", uint64(9087), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("io_time", uint64(123552), dtags1))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags2))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags2))
|
|
||||||
|
|
||||||
// We expect 7 more DiskIOPoints to show up with an explicit match on "sdb1"
|
|
||||||
// and serial should be missing from the tags with SkipSerialNumber set
|
|
||||||
err = (&DiskIOStats{ps: &mps, Devices: []string{"sdb1"}, SkipSerialNumber: true}).Gather(&acc)
|
|
||||||
assert.Equal(t, preDiskIOPoints+expectedAllDiskIOPoints+7, len(acc.Points))
|
|
||||||
|
|
||||||
dtags3 := map[string]string{
|
|
||||||
"name": "sdb1",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.True(t, acc.CheckTaggedValue("reads", uint64(444), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("writes", uint64(2341), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_bytes", uint64(200000), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_bytes", uint64(400000), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("read_time", uint64(3123), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("write_time", uint64(6087), dtags3))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("io_time", uint64(246552), dtags3))
|
|
||||||
|
|
||||||
err = (&MemStats{&mps}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
vmtags := map[string]string(nil)
|
|
||||||
|
|
||||||
assert.True(t, acc.CheckTaggedValue("total", uint64(12400), vmtags))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("available", uint64(7600), vmtags))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("used", uint64(5000), vmtags))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("available_percent",
|
|
||||||
float64(7600)/float64(12400)*100,
|
|
||||||
vmtags))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("used_percent",
|
|
||||||
float64(5000)/float64(12400)*100,
|
|
||||||
vmtags))
|
|
||||||
assert.True(t, acc.CheckTaggedValue("free", uint64(1235), vmtags))
|
|
||||||
|
|
||||||
acc.Points = nil
|
|
||||||
|
|
||||||
err = (&SwapStats{&mps}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
swaptags := map[string]string(nil)
|
|
||||||
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("total", uint64(8123), swaptags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("used", uint64(1232), swaptags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("used_percent", float64(12.2), swaptags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("free", uint64(6412), swaptags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("in", uint64(7), swaptags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("out", uint64(830), swaptags))
|
|
||||||
|
|
||||||
acc.Points = nil
|
|
||||||
|
|
||||||
err = (&NetStats{&mps}).Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
netstattags := map[string]string(nil)
|
|
||||||
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("tcp_established", 2, netstattags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("tcp_close", 1, netstattags))
|
|
||||||
assert.NoError(t, acc.ValidateTaggedValue("udp_socket", 1, netstattags))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Asserts that a given accumulator contains a measurment of type float64 with
|
|
||||||
// specific tags within a certain distance of a given expected value. Asserts a failure
|
|
||||||
// if the measurement is of the wrong type, or if no matching measurements are found
|
|
||||||
//
|
|
||||||
// Paramaters:
|
|
||||||
// t *testing.T : Testing object to use
|
|
||||||
// acc testutil.Accumulator: Accumulator to examine
|
|
||||||
// measurement string : Name of the measurement to examine
|
|
||||||
// expectedValue float64 : Value to search for within the measurement
|
|
||||||
// delta float64 : Maximum acceptable distance of an accumulated value
|
|
||||||
// from the expectedValue parameter. Useful when
|
|
||||||
// floating-point arithmatic imprecision makes looking
|
|
||||||
// for an exact match impractical
|
|
||||||
// tags map[string]string : Tag set the found measurement must have. Set to nil to
|
|
||||||
// ignore the tag set.
|
|
||||||
func assertContainsTaggedFloat(
|
|
||||||
t *testing.T,
|
|
||||||
acc *testutil.Accumulator,
|
|
||||||
measurement string,
|
|
||||||
expectedValue float64,
|
|
||||||
delta float64,
|
|
||||||
tags map[string]string,
|
|
||||||
) {
|
|
||||||
var actualValue float64
|
|
||||||
for _, pt := range acc.Points {
|
|
||||||
if pt.Measurement == measurement {
|
|
||||||
if (tags == nil) || reflect.DeepEqual(pt.Tags, tags) {
|
|
||||||
if value, ok := pt.Fields["value"].(float64); ok {
|
|
||||||
actualValue = value
|
|
||||||
if (value >= expectedValue-delta) && (value <= expectedValue+delta) {
|
|
||||||
// Found the point, return without failing
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert.Fail(t, fmt.Sprintf("Measurement \"%s\" does not have type float64",
|
|
||||||
measurement))
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
msg := fmt.Sprintf("Could not find measurement \"%s\" with requested tags within %f of %f, Actual: %f",
|
|
||||||
measurement, delta, expectedValue, actualValue)
|
|
||||||
assert.Fail(t, msg)
|
|
||||||
}
|
|
||||||
@@ -41,6 +41,5 @@ func (s *Trig) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
||||||
plugins.Add("Trig", func() plugins.Plugin { return &Trig{x: 0.0} })
|
plugins.Add("Trig", func() plugins.Plugin { return &Trig{x: 0.0} })
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,28 +5,21 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Twemproxy struct {
|
type Twemproxy struct {
|
||||||
Instances []TwemproxyInstance
|
|
||||||
}
|
|
||||||
|
|
||||||
type TwemproxyInstance struct {
|
|
||||||
Addr string
|
Addr string
|
||||||
Pools []string
|
Pools []string
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
[[plugins.twemproxy.instances]]
|
# Twemproxy stats address and port (no scheme)
|
||||||
# Twemproxy stats address and port (no scheme)
|
addr = "localhost:22222"
|
||||||
addr = "localhost:22222"
|
# Monitor pool name
|
||||||
# Monitor pool name
|
pools = ["redis_pool", "mc_pool"]
|
||||||
pools = ["redis_pool", "mc_pool"]
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func (t *Twemproxy) SampleConfig() string {
|
func (t *Twemproxy) SampleConfig() string {
|
||||||
@@ -39,35 +32,7 @@ func (t *Twemproxy) Description() string {
|
|||||||
|
|
||||||
// Gather data from all Twemproxy instances
|
// Gather data from all Twemproxy instances
|
||||||
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
|
func (t *Twemproxy) Gather(acc plugins.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
conn, err := net.DialTimeout("tcp", t.Addr, 1*time.Second)
|
||||||
errorChan := make(chan error, len(t.Instances))
|
|
||||||
for _, inst := range t.Instances {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(inst TwemproxyInstance) {
|
|
||||||
defer wg.Done()
|
|
||||||
if err := inst.Gather(acc); err != nil {
|
|
||||||
errorChan <- err
|
|
||||||
}
|
|
||||||
}(inst)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
close(errorChan)
|
|
||||||
errs := []string{}
|
|
||||||
for err := range errorChan {
|
|
||||||
errs = append(errs, err.Error())
|
|
||||||
}
|
|
||||||
if len(errs) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New(strings.Join(errs, "\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gather data from one Twemproxy
|
|
||||||
func (ti *TwemproxyInstance) Gather(
|
|
||||||
acc plugins.Accumulator,
|
|
||||||
) error {
|
|
||||||
conn, err := net.DialTimeout("tcp", ti.Addr, 1*time.Second)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -82,14 +47,14 @@ func (ti *TwemproxyInstance) Gather(
|
|||||||
}
|
}
|
||||||
|
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
tags["twemproxy"] = ti.Addr
|
tags["twemproxy"] = t.Addr
|
||||||
ti.processStat(acc, tags, stats)
|
t.processStat(acc, tags, stats)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process Twemproxy server stats
|
// Process Twemproxy server stats
|
||||||
func (ti *TwemproxyInstance) processStat(
|
func (t *Twemproxy) processStat(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
@@ -100,40 +65,42 @@ func (ti *TwemproxyInstance) processStat(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
metrics := []string{"total_connections", "curr_connections", "timestamp"}
|
metrics := []string{"total_connections", "curr_connections", "timestamp"}
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if value, ok := data[m]; ok {
|
if value, ok := data[m]; ok {
|
||||||
if val, ok := value.(float64); ok {
|
if val, ok := value.(float64); ok {
|
||||||
acc.Add(m, val, tags)
|
fields[m] = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("twemproxy", fields, tags)
|
||||||
|
|
||||||
for _, pool := range ti.Pools {
|
for _, pool := range t.Pools {
|
||||||
if poolStat, ok := data[pool]; ok {
|
if poolStat, ok := data[pool]; ok {
|
||||||
if data, ok := poolStat.(map[string]interface{}); ok {
|
if data, ok := poolStat.(map[string]interface{}); ok {
|
||||||
poolTags := copyTags(tags)
|
poolTags := copyTags(tags)
|
||||||
poolTags["pool"] = pool
|
poolTags["pool"] = pool
|
||||||
ti.processPool(acc, poolTags, pool+"_", data)
|
t.processPool(acc, poolTags, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process pool data in Twemproxy stats
|
// Process pool data in Twemproxy stats
|
||||||
func (ti *TwemproxyInstance) processPool(
|
func (t *Twemproxy) processPool(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
prefix string,
|
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
) {
|
) {
|
||||||
serverTags := make(map[string]map[string]string)
|
serverTags := make(map[string]map[string]string)
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for key, value := range data {
|
for key, value := range data {
|
||||||
switch key {
|
switch key {
|
||||||
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
|
case "client_connections", "forward_error", "client_err", "server_ejects", "fragments", "client_eof":
|
||||||
if val, ok := value.(float64); ok {
|
if val, ok := value.(float64); ok {
|
||||||
acc.Add(prefix+key, val, tags)
|
fields[key] = val
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if data, ok := value.(map[string]interface{}); ok {
|
if data, ok := value.(map[string]interface{}); ok {
|
||||||
@@ -141,27 +108,29 @@ func (ti *TwemproxyInstance) processPool(
|
|||||||
serverTags[key] = copyTags(tags)
|
serverTags[key] = copyTags(tags)
|
||||||
serverTags[key]["server"] = key
|
serverTags[key]["server"] = key
|
||||||
}
|
}
|
||||||
ti.processServer(acc, serverTags[key], prefix, data)
|
t.processServer(acc, serverTags[key], data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("twemproxy_pool", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process backend server(redis/memcached) stats
|
// Process backend server(redis/memcached) stats
|
||||||
func (ti *TwemproxyInstance) processServer(
|
func (t *Twemproxy) processServer(
|
||||||
acc plugins.Accumulator,
|
acc plugins.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
prefix string,
|
|
||||||
data map[string]interface{},
|
data map[string]interface{},
|
||||||
) {
|
) {
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for key, value := range data {
|
for key, value := range data {
|
||||||
switch key {
|
switch key {
|
||||||
default:
|
default:
|
||||||
if val, ok := value.(float64); ok {
|
if val, ok := value.(float64); ok {
|
||||||
acc.Add(prefix+key, val, tags)
|
fields[key] = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("twemproxy_pool", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tags is not expected to be mutated after passing to Add.
|
// Tags is not expected to be mutated after passing to Add.
|
||||||
|
|||||||
@@ -88,15 +88,15 @@ func gatherPoolStats(pool poolInfo, acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tag := map[string]string{"pool": pool.name}
|
tag := map[string]string{"pool": pool.name}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for i := 0; i < keyCount; i++ {
|
for i := 0; i < keyCount; i++ {
|
||||||
value, err := strconv.ParseInt(values[i], 10, 64)
|
value, err := strconv.ParseInt(values[i], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
fields[keys[i]] = value
|
||||||
acc.Add(keys[i], value, tag)
|
|
||||||
}
|
}
|
||||||
|
acc.AddFields("zfs_pool", fields, tag)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -124,6 +124,7 @@ func (z *Zfs) Gather(acc plugins.Accumulator) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for _, metric := range kstatMetrics {
|
for _, metric := range kstatMetrics {
|
||||||
lines, err := internal.ReadLines(kstatPath + "/" + metric)
|
lines, err := internal.ReadLines(kstatPath + "/" + metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -140,9 +141,10 @@ func (z *Zfs) Gather(acc plugins.Accumulator) error {
|
|||||||
key := metric + "_" + rawData[0]
|
key := metric + "_" + rawData[0]
|
||||||
rawValue := rawData[len(rawData)-1]
|
rawValue := rawData[len(rawData)-1]
|
||||||
value, _ := strconv.ParseInt(rawValue, 10, 64)
|
value, _ := strconv.ParseInt(rawValue, 10, 64)
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("zfs", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -67,35 +67,37 @@ func (z *Zookeeper) gatherServer(address string, acc plugins.Accumulator) error
|
|||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
fmt.Fprintf(c, "%s\n", "mntr")
|
fmt.Fprintf(c, "%s\n", "mntr")
|
||||||
|
|
||||||
rdr := bufio.NewReader(c)
|
rdr := bufio.NewReader(c)
|
||||||
|
|
||||||
scanner := bufio.NewScanner(rdr)
|
scanner := bufio.NewScanner(rdr)
|
||||||
|
|
||||||
|
service := strings.Split(address, ":")
|
||||||
|
if len(service) != 2 {
|
||||||
|
return fmt.Errorf("Invalid service address: %s", address)
|
||||||
|
}
|
||||||
|
tags := map[string]string{"server": service[0], "port": service[1]}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
|
|
||||||
re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`)
|
re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`)
|
||||||
parts := re.FindStringSubmatch(string(line))
|
parts := re.FindStringSubmatch(string(line))
|
||||||
|
|
||||||
service := strings.Split(address, ":")
|
if len(parts) != 3 {
|
||||||
|
|
||||||
if len(parts) != 3 || len(service) != 2 {
|
|
||||||
return fmt.Errorf("unexpected line in mntr response: %q", line)
|
return fmt.Errorf("unexpected line in mntr response: %q", line)
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"server": service[0], "port": service[1]}
|
|
||||||
|
|
||||||
measurement := strings.TrimPrefix(parts[1], "zk_")
|
measurement := strings.TrimPrefix(parts[1], "zk_")
|
||||||
sValue := string(parts[2])
|
sValue := string(parts[2])
|
||||||
|
|
||||||
iVal, err := strconv.ParseInt(sValue, 10, 64)
|
iVal, err := strconv.ParseInt(sValue, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add(measurement, iVal, tags)
|
fields[measurement] = iVal
|
||||||
} else {
|
} else {
|
||||||
acc.Add(measurement, sValue, tags)
|
fields[measurement] = sValue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("zookeeper", fields, tags)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user