Updated with new code from master

This commit is contained in:
Dennis Bellinger 2016-07-25 10:22:45 -04:00
commit dd549552eb
26 changed files with 1775 additions and 276 deletions

View File

@ -3,12 +3,15 @@
### Features ### Features
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag. - [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
### Bugfixes ### Bugfixes
- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures. - [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures.
- [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic. - [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic.
- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes. - [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes.
- [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters.
- [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does).
## v1.0 beta 3 [2016-07-18] ## v1.0 beta 3 [2016-07-18]
@ -60,6 +63,7 @@ should now look like:
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats() - [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
### Bugfixes ### Bugfixes

2
Godeps
View File

@ -44,7 +44,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/shirou/gopsutil ee66bc560c366dd33b9a4046ba0b644caba46bed
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744

View File

@ -156,6 +156,7 @@ Currently implemented sources:
* [exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) * [exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
* [filestat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/filestat) * [filestat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/filestat)
* [haproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/haproxy) * [haproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/haproxy)
* [hddtemp](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/hddtemp)
* [http_response](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/http_response) * [http_response](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/http_response)
* [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin)
* [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb) * [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb)

View File

@ -16,6 +16,8 @@ type Accumulator interface {
tags map[string]string, tags map[string]string,
t ...time.Time) t ...time.Time)
AddError(err error)
Debug() bool Debug() bool
SetDebug(enabled bool) SetDebug(enabled bool)

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"math" "math"
"sync/atomic"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -32,9 +33,9 @@ type accumulator struct {
inputConfig *internal_models.InputConfig inputConfig *internal_models.InputConfig
prefix string
precision time.Duration precision time.Duration
errCount uint64
} }
func (ac *accumulator) Add( func (ac *accumulator) Add(
@ -146,10 +147,6 @@ func (ac *accumulator) AddFields(
} }
timestamp = timestamp.Round(ac.precision) timestamp = timestamp.Round(ac.precision)
if ac.prefix != "" {
measurement = ac.prefix + measurement
}
m, err := telegraf.NewMetric(measurement, tags, result, timestamp) m, err := telegraf.NewMetric(measurement, tags, result, timestamp)
if err != nil { if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
@ -161,6 +158,17 @@ func (ac *accumulator) AddFields(
ac.metrics <- m ac.metrics <- m
} }
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
}
func (ac *accumulator) Debug() bool { func (ac *accumulator) Debug() bool {
return ac.debug return ac.debug
} }

View File

@ -1,8 +1,11 @@
package agent package agent
import ( import (
"bytes"
"fmt" "fmt"
"log"
"math" "math"
"os"
"testing" "testing"
"time" "time"
@ -10,6 +13,7 @@ import (
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAdd(t *testing.T) { func TestAdd(t *testing.T) {
@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) {
fmt.Sprintf("acctest value=101 %d", now.UnixNano()), fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
actual) actual)
} }
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
a := accumulator{}
a.inputConfig = &internal_models.InputConfig{}
a.inputConfig.Name = "mock_plugin"
a.AddError(fmt.Errorf("foo"))
a.AddError(fmt.Errorf("bar"))
a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "mock_plugin")
assert.Contains(t, string(errs[0]), "foo")
assert.Contains(t, string(errs[1]), "mock_plugin")
assert.Contains(t, string(errs[1]), "bar")
assert.Contains(t, string(errs[2]), "mock_plugin")
assert.Contains(t, string(errs[2]), "baz")
}

View File

@ -215,6 +215,9 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
return err return err
} }
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}
// Special instructions for some inputs. cpu, for example, needs to be // Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages. // run twice in order to return cpu usage percentages.

View File

@ -197,7 +197,7 @@
# # Configuration for Graphite server to send metrics to # # Configuration for Graphite server to send metrics to
# [[outputs.graphite]] # [[outputs.graphite]]
# ## TCP endpoint for your graphite instance. # ## TCP endpoint for your graphite instance.
# ## If multiple endpoints are configured, the output will be load balanced. # ## If multiple endpoints are configured, output will be load balanced.
# ## Only one of the endpoints will be written to with each iteration. # ## Only one of the endpoints will be written to with each iteration.
# servers = ["localhost:2003"] # servers = ["localhost:2003"]
# ## Prefix metrics name # ## Prefix metrics name
@ -436,8 +436,8 @@
## disk partitions. ## disk partitions.
## Setting devices will restrict the stats to the specified devices. ## Setting devices will restrict the stats to the specified devices.
# devices = ["sda", "sdb"] # devices = ["sda", "sdb"]
## Uncomment the following line if you do not need disk serial numbers. ## Uncomment the following line if you need disk serial numbers.
# skip_serial_number = true # skip_serial_number = false
# Get kernel statistics from /proc/stat # Get kernel statistics from /proc/stat
@ -465,7 +465,7 @@
# no configuration # no configuration
# # Read stats from an aerospike server # # Read stats from aerospike server(s)
# [[inputs.aerospike]] # [[inputs.aerospike]]
# ## Aerospike servers to connect to (with port) # ## Aerospike servers to connect to (with port)
# ## This plugin will query all namespaces the aerospike # ## This plugin will query all namespaces the aerospike
@ -666,6 +666,13 @@
# container_names = [] # container_names = []
# ## Timeout for docker list, info, and stats commands # ## Timeout for docker list, info, and stats commands
# timeout = "5s" # timeout = "5s"
#
# ## Whether to report for each container per-device blkio (8:0, 8:1...) and
# ## network (eth0, eth1, ...) stats or not
# perdevice = true
# ## Whether to report for each container total blkio and network stats or not
# total = false
#
# # Read statistics from one or many dovecot servers # # Read statistics from one or many dovecot servers
@ -782,9 +789,11 @@
# [[inputs.haproxy]] # [[inputs.haproxy]]
# ## An array of address to gather stats about. Specify an ip on hostname # ## An array of address to gather stats about. Specify an ip on hostname
# ## with optional port. ie localhost, 10.10.3.33:1936, etc. # ## with optional port. ie localhost, 10.10.3.33:1936, etc.
# # ## Make sure you specify the complete path to the stats endpoint
# ## If no servers are specified, then default to 127.0.0.1:1936 # ## ie 10.10.3.33:1936/haproxy?stats
# servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] # #
# ## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats
# servers = ["http://myhaproxy.com:1936/haproxy?stats"]
# ## Or you can also use local socket # ## Or you can also use local socket
# ## servers = ["socket:/run/haproxy/admin.sock"] # ## servers = ["socket:/run/haproxy/admin.sock"]
@ -970,21 +979,35 @@
# # Telegraf plugin for gathering metrics from N Mesos masters # # Telegraf plugin for gathering metrics from N Mesos masters
# [[inputs.mesos]] # [[inputs.mesos]]
# # Timeout, in ms. # ## Timeout, in ms.
# timeout = 100 # timeout = 100
# # A list of Mesos masters, default value is localhost:5050. # ## A list of Mesos masters.
# masters = ["localhost:5050"] # masters = ["localhost:5050"]
# # Metrics groups to be collected, by default, all enabled. # ## Master metrics groups to be collected, by default, all enabled.
# master_collections = [ # master_collections = [
# "resources", # "resources",
# "master", # "master",
# "system", # "system",
# "slaves", # "agents",
# "frameworks", # "frameworks",
# "tasks",
# "messages", # "messages",
# "evqueue", # "evqueue",
# "registrar", # "registrar",
# ] # ]
# ## A list of Mesos slaves, default is []
# # slaves = []
# ## Slave metrics groups to be collected, by default, all enabled.
# # slave_collections = [
# # "resources",
# # "agent",
# # "system",
# # "executors",
# # "tasks",
# # "messages",
# # ]
# ## Include mesos tasks statistics, default is false
# # slave_tasks = true
# # Read metrics from one or many MongoDB servers # # Read metrics from one or many MongoDB servers
@ -995,6 +1018,7 @@
# ## mongodb://10.10.3.33:18832, # ## mongodb://10.10.3.33:18832,
# ## 10.0.0.1:10000, etc. # ## 10.0.0.1:10000, etc.
# servers = ["127.0.0.1:27017"] # servers = ["127.0.0.1:27017"]
# gather_perdb_stats = false
# # Read metrics from one or many mysql servers # # Read metrics from one or many mysql servers
@ -1101,9 +1125,9 @@
# ## file paths for proc files. If empty default paths will be used: # ## file paths for proc files. If empty default paths will be used:
# ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 # ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6
# ## These can also be overridden with env variables, see README. # ## These can also be overridden with env variables, see README.
# proc_net_netstat = "" # proc_net_netstat = "/proc/net/netstat"
# proc_net_snmp = "" # proc_net_snmp = "/proc/net/snmp"
# proc_net_snmp6 = "" # proc_net_snmp6 = "/proc/net/snmp6"
# ## dump metrics with 0 values too # ## dump metrics with 0 values too
# dump_zeros = true # dump_zeros = true
@ -1305,6 +1329,13 @@
# # username = "guest" # # username = "guest"
# # password = "guest" # # password = "guest"
# #
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## A list of nodes to pull metrics about. If not specified, metrics for # ## A list of nodes to pull metrics about. If not specified, metrics for
# ## all nodes are gathered. # ## all nodes are gathered.
# # nodes = ["rabbit@node1", "rabbit@node2"] # # nodes = ["rabbit@node1", "rabbit@node2"]
@ -1323,6 +1354,7 @@
# ## e.g. # ## e.g.
# ## tcp://localhost:6379 # ## tcp://localhost:6379
# ## tcp://:password@192.168.99.100 # ## tcp://:password@192.168.99.100
# ## unix:///var/run/redis.sock
# ## # ##
# ## If no servers are specified, then localhost is used as the host. # ## If no servers are specified, then localhost is used as the host.
# ## If no port is specified, 6379 is used # ## If no port is specified, 6379 is used
@ -1559,6 +1591,8 @@
# ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
# ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
# patterns = ["%{INFLUXDB_HTTPD_LOG}"] # patterns = ["%{INFLUXDB_HTTPD_LOG}"]
# ## Name of the outputted measurement name.
# measurement = "influxdb_log"
# ## Full path(s) to custom pattern files. # ## Full path(s) to custom pattern files.
# custom_pattern_files = [] # custom_pattern_files = []
# ## Custom patterns can also be defined here. Put one pattern per line. # ## Custom patterns can also be defined here. Put one pattern per line.
@ -1622,6 +1656,21 @@
# data_format = "influx" # data_format = "influx"
# # Read NSQ topic for metrics.
# [[inputs.nsq_consumer]]
# ## An string representing the NSQD TCP Endpoint
# server = "localhost:4150"
# topic = "telegraf"
# channel = "consumer"
# max_in_flight = 100
#
# ## Data format to consume.
# ## Each data format has it's own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
# # Statsd Server # # Statsd Server
# [[inputs.statsd]] # [[inputs.statsd]]
# ## Address and port to host UDP listener on # ## Address and port to host UDP listener on
@ -1725,6 +1774,9 @@
# [inputs.webhooks.github] # [inputs.webhooks.github]
# path = "/github" # path = "/github"
# #
# [inputs.webhooks.mandrill]
# path = "/mandrill"
#
# [inputs.webhooks.rollbar] # [inputs.webhooks.rollbar]
# path = "/rollbar" # path = "/rollbar"

View File

@ -139,7 +139,7 @@ func (c *Config) InputNames() []string {
return name return name
} }
// Outputs returns a list of strings of the configured inputs. // Outputs returns a list of strings of the configured outputs.
func (c *Config) OutputNames() []string { func (c *Config) OutputNames() []string {
var name []string var name []string
for _, output := range c.Outputs { for _, output := range c.Outputs {

View File

@ -22,6 +22,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/filestat" _ "github.com/influxdata/telegraf/plugins/inputs/filestat"
_ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/http_response"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb"

View File

@ -25,6 +25,8 @@ type Docker struct {
Endpoint string Endpoint string
ContainerNames []string ContainerNames []string
Timeout internal.Duration Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
client DockerClient client DockerClient
} }
@ -58,6 +60,13 @@ var sampleConfig = `
container_names = [] container_names = []
## Timeout for docker list, info, and stats commands ## Timeout for docker list, info, and stats commands
timeout = "5s" timeout = "5s"
## Whether to report for each container per-device blkio (8:0, 8:1...) and
## network (eth0, eth1, ...) stats or not
perdevice = true
## Whether to report for each container total blkio and network stats or not
total = false
` `
// Description returns input description // Description returns input description
@ -246,7 +255,7 @@ func (d *Docker) gatherContainer(
tags[k] = label tags[k] = label
} }
gatherContainerStats(v, acc, tags, container.ID) gatherContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total)
return nil return nil
} }
@ -256,6 +265,8 @@ func gatherContainerStats(
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
id string, id string,
perDevice bool,
total bool,
) { ) {
now := stat.Read now := stat.Read
@ -323,6 +334,7 @@ func gatherContainerStats(
acc.AddFields("docker_container_cpu", fields, percputags, now) acc.AddFields("docker_container_cpu", fields, percputags, now)
} }
totalNetworkStatMap := make(map[string]interface{})
for network, netstats := range stat.Networks { for network, netstats := range stat.Networks {
netfields := map[string]interface{}{ netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped, "rx_dropped": netstats.RxDropped,
@ -336,12 +348,35 @@ func gatherContainerStats(
"container_id": id, "container_id": id,
} }
// Create a new network tag dictionary for the "network" tag // Create a new network tag dictionary for the "network" tag
if perDevice {
nettags := copyTags(tags) nettags := copyTags(tags)
nettags["network"] = network nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, now) acc.AddFields("docker_container_net", netfields, nettags, now)
} }
if total {
for field, value := range netfields {
if field == "container_id" {
continue
}
_, ok := totalNetworkStatMap[field]
if ok {
totalNetworkStatMap[field] = totalNetworkStatMap[field].(uint64) + value.(uint64)
} else {
totalNetworkStatMap[field] = value
}
}
}
}
gatherBlockIOMetrics(stat, acc, tags, now, id) // totalNetworkStatMap could be empty if container is running with --net=host.
if total && len(totalNetworkStatMap) != 0 {
nettags := copyTags(tags)
nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, now)
}
gatherBlockIOMetrics(stat, acc, tags, now, id, perDevice, total)
} }
func calculateMemPercent(stat *types.StatsJSON) float64 { func calculateMemPercent(stat *types.StatsJSON) float64 {
@ -370,6 +405,8 @@ func gatherBlockIOMetrics(
tags map[string]string, tags map[string]string,
now time.Time, now time.Time,
id string, id string,
perDevice bool,
total bool,
) { ) {
blkioStats := stat.BlkioStats blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats // Make a map of devices to their block io stats
@ -431,12 +468,34 @@ func gatherBlockIOMetrics(
deviceStatMap[device]["sectors_recursive"] = metric.Value deviceStatMap[device]["sectors_recursive"] = metric.Value
} }
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap { for device, fields := range deviceStatMap {
fields["container_id"] = id
if perDevice {
iotags := copyTags(tags) iotags := copyTags(tags)
iotags["device"] = device iotags["device"] = device
fields["container_id"] = id
acc.AddFields("docker_container_blkio", fields, iotags, now) acc.AddFields("docker_container_blkio", fields, iotags, now)
} }
if total {
for field, value := range fields {
if field == "container_id" {
continue
}
_, ok := totalStatMap[field]
if ok {
totalStatMap[field] = totalStatMap[field].(uint64) + value.(uint64)
} else {
totalStatMap[field] = value
}
}
}
}
if total {
totalStatMap["container_id"] = id
iotags := copyTags(tags)
iotags["device"] = "total"
acc.AddFields("docker_container_blkio", totalStatMap, iotags, now)
}
} }
func copyTags(in map[string]string) map[string]string { func copyTags(in map[string]string) map[string]string {
@ -480,6 +539,7 @@ func parseSize(sizeStr string) (int64, error) {
func init() { func init() {
inputs.Add("docker", func() telegraf.Input { inputs.Add("docker", func() telegraf.Input {
return &Docker{ return &Docker{
PerDevice: true,
Timeout: internal.Duration{Duration: time.Second * 5}, Timeout: internal.Duration{Duration: time.Second * 5},
} }
}) })

View File

@ -24,7 +24,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
"container_name": "redis", "container_name": "redis",
"container_image": "redis/image", "container_image": "redis/image",
} }
gatherContainerStats(stats, &acc, tags, "123456789") gatherContainerStats(stats, &acc, tags, "123456789", true, true)
// test docker_container_net measurement // test docker_container_net measurement
netfields := map[string]interface{}{ netfields := map[string]interface{}{
@ -42,6 +42,21 @@ func TestDockerGatherContainerStats(t *testing.T) {
nettags["network"] = "eth0" nettags["network"] = "eth0"
acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags) acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags)
netfields = map[string]interface{}{
"rx_dropped": uint64(6),
"rx_bytes": uint64(8),
"rx_errors": uint64(10),
"tx_packets": uint64(12),
"tx_dropped": uint64(6),
"rx_packets": uint64(8),
"tx_errors": uint64(10),
"tx_bytes": uint64(12),
"container_id": "123456789",
}
nettags = copyTags(tags)
nettags["network"] = "total"
acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags)
// test docker_blkio measurement // test docker_blkio measurement
blkiotags := copyTags(tags) blkiotags := copyTags(tags)
blkiotags["device"] = "6:0" blkiotags["device"] = "6:0"
@ -52,6 +67,15 @@ func TestDockerGatherContainerStats(t *testing.T) {
} }
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags) acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
blkiotags = copyTags(tags)
blkiotags["device"] = "total"
blkiofields = map[string]interface{}{
"io_service_bytes_recursive_read": uint64(100),
"io_serviced_recursive_write": uint64(302),
"container_id": "123456789",
}
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
// test docker_container_mem measurement // test docker_container_mem measurement
memfields := map[string]interface{}{ memfields := map[string]interface{}{
"max_usage": uint64(1001), "max_usage": uint64(1001),
@ -186,6 +210,17 @@ func testStats() *types.StatsJSON {
TxBytes: 4, TxBytes: 4,
} }
stats.Networks["eth1"] = types.NetworkStats{
RxDropped: 5,
RxBytes: 6,
RxErrors: 7,
TxPackets: 8,
TxDropped: 5,
RxPackets: 6,
TxErrors: 7,
TxBytes: 8,
}
sbr := types.BlkioStatEntry{ sbr := types.BlkioStatEntry{
Major: 6, Major: 6,
Minor: 0, Minor: 0,
@ -198,11 +233,19 @@ func testStats() *types.StatsJSON {
Op: "write", Op: "write",
Value: 101, Value: 101,
} }
sr2 := types.BlkioStatEntry{
Major: 6,
Minor: 1,
Op: "write",
Value: 201,
}
stats.BlkioStats.IoServiceBytesRecursive = append( stats.BlkioStats.IoServiceBytesRecursive = append(
stats.BlkioStats.IoServiceBytesRecursive, sbr) stats.BlkioStats.IoServiceBytesRecursive, sbr)
stats.BlkioStats.IoServicedRecursive = append( stats.BlkioStats.IoServicedRecursive = append(
stats.BlkioStats.IoServicedRecursive, sr) stats.BlkioStats.IoServicedRecursive, sr)
stats.BlkioStats.IoServicedRecursive = append(
stats.BlkioStats.IoServicedRecursive, sr2)
return stats return stats
} }

View File

@ -0,0 +1,22 @@
# Hddtemp Input Plugin
This plugin reads data from hddtemp daemon
## Requirements
Hddtemp should be installed and its daemon running
## Configuration
```
[[inputs.hddtemp]]
## By default, telegraf gathers temps data from all disks detected by the
## hddtemp.
##
## Only collect temps from the selected disks.
##
## A * as the device name will return the temperature values of all disks.
##
# address = "127.0.0.1:7634"
# devices = ["sda", "*"]
```

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2016 Mendelson Gusmão
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,61 @@
package hddtemp
import (
"bytes"
"io"
"net"
"strconv"
"strings"
)
type disk struct {
DeviceName string
Model string
Temperature int32
Unit string
Status string
}
func Fetch(address string) ([]disk, error) {
var (
err error
conn net.Conn
buffer bytes.Buffer
disks []disk
)
if conn, err = net.Dial("tcp", address); err != nil {
return nil, err
}
if _, err = io.Copy(&buffer, conn); err != nil {
return nil, err
}
fields := strings.Split(buffer.String(), "|")
for index := 0; index < len(fields)/5; index++ {
status := ""
offset := index * 5
device := fields[offset+1]
device = device[strings.LastIndex(device, "/")+1:]
temperatureField := fields[offset+3]
temperature, err := strconv.ParseInt(temperatureField, 10, 32)
if err != nil {
temperature = 0
status = temperatureField
}
disks = append(disks, disk{
DeviceName: device,
Model: fields[offset+2],
Temperature: int32(temperature),
Unit: fields[offset+4],
Status: status,
})
}
return disks, nil
}

View File

@ -0,0 +1,116 @@
package hddtemp
import (
"net"
"reflect"
"testing"
)
func TestFetch(t *testing.T) {
l := serve(t, []byte("|/dev/sda|foobar|36|C|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "sda",
Model: "foobar",
Temperature: 36,
Unit: "C",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func TestFetchWrongAddress(t *testing.T) {
_, err := Fetch("127.0.0.1:1")
if err == nil {
t.Error("expecting err to be non-nil")
}
}
func TestFetchStatus(t *testing.T) {
l := serve(t, []byte("|/dev/sda|foobar|SLP|C|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "sda",
Model: "foobar",
Temperature: 0,
Unit: "C",
Status: "SLP",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func TestFetchTwoDisks(t *testing.T) {
l := serve(t, []byte("|/dev/hda|ST380011A|46|C||/dev/hdd|ST340016A|SLP|*|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "hda",
Model: "ST380011A",
Temperature: 46,
Unit: "C",
},
{
DeviceName: "hdd",
Model: "ST340016A",
Temperature: 0,
Unit: "*",
Status: "SLP",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func serve(t *testing.T, data []byte) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
go func(t *testing.T) {
conn, err := l.Accept()
if err != nil {
t.Fatal(err)
}
conn.Write(data)
conn.Close()
}(t)
return l
}

View File

@ -0,0 +1,74 @@
// +build linux
package hddtemp
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
gohddtemp "github.com/influxdata/telegraf/plugins/inputs/hddtemp/go-hddtemp"
)
const defaultAddress = "127.0.0.1:7634"
type HDDTemp struct {
Address string
Devices []string
}
func (_ *HDDTemp) Description() string {
return "Monitor disks' temperatures using hddtemp"
}
var hddtempSampleConfig = `
## By default, telegraf gathers temps data from all disks detected by the
## hddtemp.
##
## Only collect temps from the selected disks.
##
## A * as the device name will return the temperature values of all disks.
##
# address = "127.0.0.1:7634"
# devices = ["sda", "*"]
`
func (_ *HDDTemp) SampleConfig() string {
return hddtempSampleConfig
}
func (h *HDDTemp) Gather(acc telegraf.Accumulator) error {
disks, err := gohddtemp.Fetch(h.Address)
if err != nil {
return err
}
for _, disk := range disks {
for _, chosenDevice := range h.Devices {
if chosenDevice == "*" || chosenDevice == disk.DeviceName {
tags := map[string]string{
"device": disk.DeviceName,
"model": disk.Model,
"unit": disk.Unit,
"status": disk.Status,
}
fields := map[string]interface{}{
disk.DeviceName: disk.Temperature,
}
acc.AddFields("hddtemp", fields, tags)
}
}
}
return nil
}
func init() {
inputs.Add("hddtemp", func() telegraf.Input {
return &HDDTemp{
Address: defaultAddress,
Devices: []string{"*"},
}
})
}

View File

@ -0,0 +1,3 @@
// +build !linux
package hddtemp

View File

@ -1,6 +1,6 @@
# Mesos Input Plugin # Mesos Input Plugin
This input plugin gathers metrics from Mesos (*currently only Mesos masters*). This input plugin gathers metrics from Mesos.
For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page. For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page.
### Configuration: ### Configuration:
@ -8,14 +8,41 @@ For more information, please check the [Mesos Observability Metrics](http://meso
```toml ```toml
# Telegraf plugin for gathering metrics from N Mesos masters # Telegraf plugin for gathering metrics from N Mesos masters
[[inputs.mesos]] [[inputs.mesos]]
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"] master_collections = [
"resources",
"master",
"system",
"agents",
"frameworks",
"tasks",
"messages",
"evqueue",
"registrar",
]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
``` ```
By dafault this plugin is not configured to gather metrics from mesos. Since mesos cluster can be deployed in numerous ways it does not provide ane default
values in that matter. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally by enabling `slave_tasks` will allow
agthering metrics from takss runing on specified slaves (this options is disabled by default).
### Measurements & Fields: ### Measurements & Fields:
Mesos master metric groups Mesos master metric groups
@ -33,6 +60,12 @@ Mesos master metric groups
- master/disk_revocable_percent - master/disk_revocable_percent
- master/disk_revocable_total - master/disk_revocable_total
- master/disk_revocable_used - master/disk_revocable_used
- master/gpus_percent
- master/gpus_used
- master/gpus_total
- master/gpus_revocable_percent
- master/gpus_revocable_total
- master/gpus_revocable_used
- master/mem_percent - master/mem_percent
- master/mem_used - master/mem_used
- master/mem_total - master/mem_total
@ -136,17 +169,111 @@ Mesos master metric groups
- registrar/state_store_ms/p999 - registrar/state_store_ms/p999
- registrar/state_store_ms/p9999 - registrar/state_store_ms/p9999
Mesos slave metric groups
- resources
- slave/cpus_percent
- slave/cpus_used
- slave/cpus_total
- slave/cpus_revocable_percent
- slave/cpus_revocable_total
- slave/cpus_revocable_used
- slave/disk_percent
- slave/disk_used
- slave/disk_total
- slave/disk_revocable_percent
- slave/disk_revocable_total
- slave/disk_revocable_used
- slave/gpus_percent
- slave/gpus_used
- slave/gpus_total,
- slave/gpus_revocable_percent
- slave/gpus_revocable_total
- slave/gpus_revocable_used
- slave/mem_percent
- slave/mem_used
- slave/mem_total
- slave/mem_revocable_percent
- slave/mem_revocable_total
- slave/mem_revocable_used
- agent
- slave/registered
- slave/uptime_secs
- system
- system/cpus_total
- system/load_15min
- system/load_5min
- system/load_1min
- system/mem_free_bytes
- system/mem_total_bytes
- executors
- containerizer/mesos/container_destroy_errors
- slave/container_launch_errors
- slave/executors_preempted
- slave/frameworks_active
- slave/executor_directory_max_allowed_age_secs
- slave/executors_registering
- slave/executors_running
- slave/executors_terminated
- slave/executors_terminating
- slave/recovery_errors
- tasks
- slave/tasks_failed
- slave/tasks_finished
- slave/tasks_killed
- slave/tasks_lost
- slave/tasks_running
- slave/tasks_staging
- slave/tasks_starting
- messages
- slave/invalid_framework_messages
- slave/invalid_status_updates
- slave/valid_framework_messages
- slave/valid_status_updates
Mesos tasks metric groups
- executor_id
- executor_name
- framework_id
- source
- statistics (all metrics below will have `statistics_` prefix included in their names
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp
### Tags: ### Tags:
- All measurements have the following tags: - All master/slave measurements have the following tags:
- server
- role (master/slave)
- Tasks measurements have the following tags:
- server - server
### Example Output: ### Example Output:
``` ```
$ telegraf -config ~/mesos.conf -input-filter mesos -test $ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1 * Plugin: mesos, Collection 1
mesos,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0, master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2, master/cpus_revocable_used=0,master/cpus_total=2,
master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0, master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0,
@ -163,3 +290,16 @@ master/mem_revocable_used=0,master/mem_total=1002,
master/mem_used=0,master/messages_authenticate=0, master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ... master/messages_deactivate_framework=0 ...
``` ```
Meoso tasks metrics (if enabled):
```
mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14,
statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928,
statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928,
statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0,
statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0,
statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016,
statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0,
statistics_timestamp=1465486052.70525 1465486053052811792...
```

View File

@ -17,33 +17,57 @@ import (
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
type Role string
const (
MASTER Role = "master"
SLAVE = "slave"
)
type Mesos struct { type Mesos struct {
Timeout int Timeout int
Masters []string Masters []string
MasterCols []string `toml:"master_collections"` MasterCols []string `toml:"master_collections"`
Slaves []string
SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool
} }
var defaultMetrics = []string{ var allMetrics = map[Role][]string{
"resources", "master", "system", "slaves", "frameworks", MASTER: []string{"resources", "master", "system", "agents", "frameworks", "tasks", "messages", "evqueue", "registrar"},
"tasks", "messages", "evqueue", "messages", "registrar", SLAVE: []string{"resources", "agent", "system", "executors", "tasks", "messages"},
} }
var sampleConfig = ` var sampleConfig = `
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
"master", "master",
"system", "system",
"slaves", "agents",
"frameworks", "frameworks",
"tasks",
"messages", "messages",
"evqueue", "evqueue",
"registrar", "registrar",
] ]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@ -56,21 +80,54 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters" return "Telegraf plugin for gathering metrics from N Mesos masters"
} }
func (m *Mesos) SetDefaults() {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
if len(m.SlaveCols) == 0 {
m.SlaveCols = allMetrics[SLAVE]
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
}
// Gather() metrics from given list of Mesos Masters // Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error { func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var errorChannel chan error var errorChannel chan error
if len(m.Masters) == 0 { m.SetDefaults()
m.Masters = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Masters)*2) errorChannel = make(chan error, len(m.Masters)+2*len(m.Slaves))
for _, v := range m.Masters { for _, v := range m.Masters {
wg.Add(1) wg.Add(1)
go func(c string) { go func(c string) {
errorChannel <- m.gatherMetrics(c, acc) errorChannel <- m.gatherMainMetrics(c, ":5050", MASTER, acc)
wg.Done()
return
}(v)
}
for _, v := range m.Slaves {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc)
wg.Done()
return
}(v)
if !m.SlaveTasks {
continue
}
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done() wg.Done()
return return
}(v) }(v)
@ -94,7 +151,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
} }
// metricsDiff() returns set names for removal // metricsDiff() returns set names for removal
func metricsDiff(w []string) []string { func metricsDiff(role Role, w []string) []string {
b := []string{} b := []string{}
s := make(map[string]bool) s := make(map[string]bool)
@ -106,7 +163,7 @@ func metricsDiff(w []string) []string {
s[v] = true s[v] = true
} }
for _, d := range defaultMetrics { for _, d := range allMetrics[role] {
if _, ok := s[d]; !ok { if _, ok := s[d]; !ok {
b = append(b, d) b = append(b, d)
} }
@ -116,11 +173,12 @@ func metricsDiff(w []string) []string {
} }
// masterBlocks serves as kind of metrics registry groupping them in sets // masterBlocks serves as kind of metrics registry groupping them in sets
func masterBlocks(g string) []string { func getMetrics(role Role, group string) []string {
var m map[string][]string var m map[string][]string
m = make(map[string][]string) m = make(map[string][]string)
if role == MASTER {
m["resources"] = []string{ m["resources"] = []string{
"master/cpus_percent", "master/cpus_percent",
"master/cpus_used", "master/cpus_used",
@ -134,6 +192,12 @@ func masterBlocks(g string) []string {
"master/disk_revocable_percent", "master/disk_revocable_percent",
"master/disk_revocable_total", "master/disk_revocable_total",
"master/disk_revocable_used", "master/disk_revocable_used",
"master/gpus_percent",
"master/gpus_used",
"master/gpus_total",
"master/gpus_revocable_percent",
"master/gpus_revocable_total",
"master/gpus_revocable_used",
"master/mem_percent", "master/mem_percent",
"master/mem_used", "master/mem_used",
"master/mem_total", "master/mem_total",
@ -156,7 +220,7 @@ func masterBlocks(g string) []string {
"system/mem_total_bytes", "system/mem_total_bytes",
} }
m["slaves"] = []string{ m["agents"] = []string{
"master/slave_registrations", "master/slave_registrations",
"master/slave_removals", "master/slave_removals",
"master/slave_reregistrations", "master/slave_reregistrations",
@ -245,27 +309,103 @@ func masterBlocks(g string) []string {
"registrar/state_store_ms/p999", "registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999", "registrar/state_store_ms/p9999",
} }
} else if role == SLAVE {
m["resources"] = []string{
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
}
ret, ok := m[g] m["agent"] = []string{
"slave/registered",
"slave/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["executors"] = []string{
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
}
m["tasks"] = []string{
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
}
m["messages"] = []string{
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
}
ret, ok := m[group]
if !ok { if !ok {
log.Println("[mesos] Unkown metrics group: ", g) log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group)
return []string{} return []string{}
} }
return ret return ret
} }
// removeGroup(), remove unwanted sets func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
func (m *Mesos) removeGroup(j *map[string]interface{}) {
var ok bool var ok bool
var selectedMetrics []string
b := metricsDiff(m.MasterCols) if role == MASTER {
selectedMetrics = m.MasterCols
} else if role == SLAVE {
selectedMetrics = m.SlaveCols
}
for _, k := range b { for _, k := range metricsDiff(role, selectedMetrics) {
for _, v := range masterBlocks(k) { for _, v := range getMetrics(role, k) {
if _, ok = (*j)[v]; ok { if _, ok = (*metrics)[v]; ok {
delete((*j), v) delete((*metrics), v)
} }
} }
} }
@ -280,23 +420,66 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
// This should not belong to the object func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var metrics []map[string]interface{}
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a) host, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
host = a host = address
a = a + ":5050" address = address + defaultPort
} }
tags := map[string]string{ tags := map[string]string{
"server": host, "server": host,
} }
if m.Timeout == 0 { ts := strconv.Itoa(m.Timeout) + "ms"
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100 resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil {
return err
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if err = json.Unmarshal([]byte(data), &metrics); err != nil {
return errors.New("Error decoding JSON response")
}
for _, task := range metrics {
tags["task_id"] = task["executor_id"].(string)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", task)
if err != nil {
return err
}
acc.AddFields("mesos-tasks", jf.Fields, tags)
}
return nil
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{
"server": host,
"role": string(role),
} }
ts := strconv.Itoa(m.Timeout) + "ms" ts := strconv.Itoa(m.Timeout) + "ms"
@ -317,7 +500,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
return errors.New("Error decoding JSON response") return errors.New("Error decoding JSON response")
} }
m.removeGroup(&jsonOut) m.filterMetrics(role, &jsonOut)
jf := jsonparser.JSONFlattener{} jf := jsonparser.JSONFlattener{}

View File

@ -2,70 +2,275 @@ package mesos
import ( import (
"encoding/json" "encoding/json"
"fmt"
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"testing" "testing"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
var mesosMetrics map[string]interface{} var masterMetrics map[string]interface{}
var ts *httptest.Server var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server
func randUUID() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
func generateMetrics() { func generateMetrics() {
mesosMetrics = make(map[string]interface{}) masterMetrics = make(map[string]interface{})
metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total", metricNames := []string{
"master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", // resources
"master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", "master/cpus_percent",
"master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", "master/cpus_used",
"master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", "master/cpus_total",
"master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total", "master/cpus_revocable_percent",
"system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", "master/cpus_revocable_total",
"system/mem_total_bytes", "master/slave_registrations", "master/slave_removals", "master/cpus_revocable_used",
"master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", "master/disk_percent",
"master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", "master/disk_used",
"master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active", "master/disk_total",
"master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", "master/disk_revocable_percent",
"master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished", "master/disk_revocable_total",
"master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", "master/disk_revocable_used",
"master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", "master/gpus_percent",
"master/invalid_status_update_acknowledgements", "master/invalid_status_updates", "master/gpus_used",
"master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", "master/gpus_total",
"master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", "master/gpus_revocable_percent",
"master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", "master/gpus_revocable_total",
"master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", "master/gpus_revocable_used",
"master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", "master/mem_percent",
"master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", "master/mem_used",
"master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", "master/mem_total",
"master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", "master/mem_revocable_percent",
"master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", "master/mem_revocable_total",
"master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", "master/mem_revocable_used",
"master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", // master
"master/valid_executor_to_framework_messages", "master/event_queue_dispatches", "master/elected",
"master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms", "master/uptime_secs",
"registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", // system
"registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", "system/cpus_total",
"registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"} "system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// agents
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
// frameworks
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
// tasks
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
// messages
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
// evgqueue
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
// registrar
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
for _, k := range metricNames { for _, k := range metricNames {
mesosMetrics[k] = rand.Float64() masterMetrics[k] = rand.Float64()
}
slaveMetrics = make(map[string]interface{})
metricNames = []string{
// resources
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
// agent
"slave/registered",
"slave/uptime_secs",
// system
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// executors
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
// tasks
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
// messages
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
for _, k := range metricNames {
slaveMetrics[k] = rand.Float64()
}
slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_%s", randUUID()),
"executor_name": "Some task description",
"framework_id": randUUID(),
"source": fmt.Sprintf("task_source_%s", randUUID()),
"statistics": map[string]interface{}{
"cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(),
},
} }
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
generateMetrics() generateMetrics()
r := http.NewServeMux()
r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(mesosMetrics) json.NewEncoder(w).Encode(masterMetrics)
}) })
ts = httptest.NewServer(r) masterTestServer = httptest.NewServer(masterRouter)
slaveRouter := http.NewServeMux()
slaveRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics)
})
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
})
slaveTestServer = httptest.NewServer(slaveRouter)
rc := m.Run() rc := m.Run()
ts.Close()
masterTestServer.Close()
slaveTestServer.Close()
os.Exit(rc) os.Exit(rc)
} }
@ -73,7 +278,7 @@ func TestMesosMaster(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
m := Mesos{ m := Mesos{
Masters: []string{ts.Listener.Addr().String()}, Masters: []string{masterTestServer.Listener.Addr().String()},
Timeout: 10, Timeout: 10,
} }
@ -83,34 +288,88 @@ func TestMesosMaster(t *testing.T) {
t.Errorf(err.Error()) t.Errorf(err.Error())
} }
acc.AssertContainsFields(t, "mesos", mesosMetrics) acc.AssertContainsFields(t, "mesos", masterMetrics)
} }
func TestRemoveGroup(t *testing.T) { func TestMasterFilter(t *testing.T) {
generateMetrics()
m := Mesos{ m := Mesos{
MasterCols: []string{ MasterCols: []string{
"resources", "master", "registrar", "resources", "master", "registrar",
}, },
} }
b := []string{ b := []string{
"system", "slaves", "frameworks", "system", "agents", "frameworks",
"messages", "evqueue", "messages", "evqueue", "tasks",
} }
m.removeGroup(&mesosMetrics) m.filterMetrics(MASTER, &masterMetrics)
for _, v := range b { for _, v := range b {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; ok { if _, ok := masterMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x) t.Errorf("Found key %s, it should be gone.", x)
} }
} }
} }
for _, v := range m.MasterCols { for _, v := range m.MasterCols {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; !ok { if _, ok := masterMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x)
}
}
}
}
func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator
m := Mesos{
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true,
Timeout: 10,
}
err := m.Gather(&acc)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos", slaveMetrics)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", slaveTaskMetrics)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos-tasks", jf.Fields)
}
func TestSlaveFilter(t *testing.T) {
m := Mesos{
SlaveCols: []string{
"resources", "agent", "tasks",
},
}
b := []string{
"system", "executors", "messages",
}
m.filterMetrics(SLAVE, &slaveMetrics)
for _, v := range b {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x)
}
}
}
for _, v := range m.MasterCols {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x) t.Errorf("Didn't find key %s, it should present.", x)
} }
} }

View File

@ -1,3 +1,210 @@
// +build windows // +build windows
package ping package ping
import (
"errors"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(timeout float64, args ...string) (string, error)
type Ping struct {
// Number of pings to send (ping -c <COUNT>)
Count int
// Ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
Timeout float64
// URLs to ping
Urls []string
// host ping function
pingHost HostPinger
}
func (s *Ping) Description() string {
return "Ping given url(s) and return statistics"
}
const sampleConfig = `
## urls to ping
urls = ["www.google.com"] # required
## number of pings to send per collection (ping -n <COUNT>)
count = 4 # required
## Ping timeout, in seconds. 0 means default timeout (ping -w <TIMEOUT>)
Timeout = 0
`
func (s *Ping) SampleConfig() string {
return sampleConfig
}
func hostPinger(timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath("ping")
if err != nil {
return "", err
}
c := exec.Command(bin, args...)
out, err := internal.CombinedOutputTimeout(c,
time.Second*time.Duration(timeout+1))
return string(out), err
}
// processPingOutput takes in a string output from the ping command
// based on linux implementation but using regex ( multilanguage support ) ( shouldn't affect the performance of the program )
// It returns (<transmitted packets>, <received packets>, <average response>, <min response>, <max response>)
func processPingOutput(out string) (int, int, int, int, int, error) {
// So find a line contain 3 numbers except reply lines
var stats, aproxs []string = nil, nil
err := errors.New("Fatal error processing ping output")
stat := regexp.MustCompile(`=\W*(\d+)\D*=\W*(\d+)\D*=\W*(\d+)`)
aprox := regexp.MustCompile(`=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms`)
lines := strings.Split(out, "\n")
for _, line := range lines {
if !strings.Contains(line, "TTL") {
if stats == nil {
stats = stat.FindStringSubmatch(line)
}
if stats != nil && aproxs == nil {
aproxs = aprox.FindStringSubmatch(line)
}
}
}
// stats data should contain 4 members: entireExpression + ( Send, Receive, Lost )
if len(stats) != 4 {
return 0, 0, 0, 0, 0, err
}
trans, err := strconv.Atoi(stats[1])
if err != nil {
return 0, 0, 0, 0, 0, err
}
rec, err := strconv.Atoi(stats[2])
if err != nil {
return 0, 0, 0, 0, 0, err
}
// aproxs data should contain 4 members: entireExpression + ( min, max, avg )
if len(aproxs) != 4 {
return trans, rec, 0, 0, 0, err
}
min, err := strconv.Atoi(aproxs[1])
if err != nil {
return trans, rec, 0, 0, 0, err
}
max, err := strconv.Atoi(aproxs[2])
if err != nil {
return trans, rec, 0, 0, 0, err
}
avg, err := strconv.Atoi(aproxs[3])
if err != nil {
return 0, 0, 0, 0, 0, err
}
return trans, rec, avg, min, max, err
}
func (p *Ping) timeout() float64 {
// According to MSDN, default ping timeout for windows is 4 second
// Add also one second interval
if p.Timeout > 0 {
return p.Timeout + 1
}
return 4 + 1
}
// args returns the arguments for the 'ping' executable
func (p *Ping) args(url string) []string {
args := []string{"-n", strconv.Itoa(p.Count)}
if p.Timeout > 0 {
args = append(args, "-w", strconv.FormatFloat(p.Timeout*1000, 'f', 0, 64))
}
args = append(args, url)
return args
}
func (p *Ping) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
errorChannel := make(chan error, len(p.Urls)*2)
var pendingError error = nil
// Spin off a go routine for each url to ping
for _, url := range p.Urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
args := p.args(u)
totalTimeout := p.timeout() * float64(p.Count)
out, err := p.pingHost(totalTimeout, args...)
// ping host return exitcode != 0 also when there was no response from host
// but command was execute succesfully
if err != nil {
// Combine go err + stderr output
pendingError = errors.New(strings.TrimSpace(out) + ", " + err.Error())
}
tags := map[string]string{"url": u}
trans, rec, avg, min, max, err := processPingOutput(out)
if err != nil {
// fatal error
if pendingError != nil {
errorChannel <- pendingError
}
errorChannel <- err
return
}
// Calculate packet loss percentage
loss := float64(trans-rec) / float64(trans) * 100.0
fields := map[string]interface{}{
"packets_transmitted": trans,
"packets_received": rec,
"percent_packet_loss": loss,
}
if avg > 0 {
fields["average_response_ms"] = avg
}
if min > 0 {
fields["minimum_response_ms"] = min
}
if max > 0 {
fields["maximum_response_ms"] = max
}
acc.AddFields("ping", fields, tags)
}(url)
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
func init() {
inputs.Add("ping", func() telegraf.Input {
return &Ping{pingHost: hostPinger}
})
}

View File

@ -0,0 +1,218 @@
// +build windows
package ping
import (
"errors"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)
// Windows ping format ( should support multilanguage ?)
var winPLPingOutput = `
Badanie 8.8.8.8 z 32 bajtami danych:
Odpowiedz z 8.8.8.8: bajtow=32 czas=49ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=46ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=48ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=57ms TTL=43
Statystyka badania ping dla 8.8.8.8:
Pakiety: Wyslane = 4, Odebrane = 4, Utracone = 0
(0% straty),
Szacunkowy czas bladzenia pakietww w millisekundach:
Minimum = 46 ms, Maksimum = 57 ms, Czas sredni = 50 ms
`
// Windows ping format ( should support multilanguage ?)
var winENPingOutput = `
Pinging 8.8.8.8 with 32 bytes of data:
Reply from 8.8.8.8: bytes=32 time=52ms TTL=43
Reply from 8.8.8.8: bytes=32 time=50ms TTL=43
Reply from 8.8.8.8: bytes=32 time=50ms TTL=43
Reply from 8.8.8.8: bytes=32 time=51ms TTL=43
Ping statistics for 8.8.8.8:
Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
Minimum = 50ms, Maximum = 52ms, Average = 50ms
`
func TestHost(t *testing.T) {
trans, rec, avg, min, max, err := processPingOutput(winPLPingOutput)
assert.NoError(t, err)
assert.Equal(t, 4, trans, "4 packets were transmitted")
assert.Equal(t, 4, rec, "4 packets were received")
assert.Equal(t, 50, avg, "Average 50")
assert.Equal(t, 46, min, "Min 46")
assert.Equal(t, 57, max, "max 57")
trans, rec, avg, min, max, err = processPingOutput(winENPingOutput)
assert.NoError(t, err)
assert.Equal(t, 4, trans, "4 packets were transmitted")
assert.Equal(t, 4, rec, "4 packets were received")
assert.Equal(t, 50, avg, "Average 50")
assert.Equal(t, 50, min, "Min 50")
assert.Equal(t, 52, max, "Max 52")
}
func mockHostPinger(timeout float64, args ...string) (string, error) {
return winENPingOutput, nil
}
// Test that Gather function works on a normal ping
func TestPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com", "www.reddit.com"},
pingHost: mockHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
fields := map[string]interface{}{
"packets_transmitted": 4,
"packets_received": 4,
"percent_packet_loss": 0.0,
"average_response_ms": 50,
"minimum_response_ms": 50,
"maximum_response_ms": 52,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
tags = map[string]string{"url": "www.reddit.com"}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
var errorPingOutput = `
Badanie nask.pl [195.187.242.157] z 32 bajtami danych:
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Statystyka badania ping dla 195.187.242.157:
Pakiety: Wysłane = 4, Odebrane = 0, Utracone = 4
(100% straty),
`
func mockErrorHostPinger(timeout float64, args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received")
}
// Test that Gather works on a ping with no transmitted packets, even though the
// command returns an error
func TestBadPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockErrorHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.amazon.com"}
fields := map[string]interface{}{
"packets_transmitted": 4,
"packets_received": 0,
"percent_packet_loss": 100.0,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
var lossyPingOutput = `
Badanie thecodinglove.com [66.6.44.4] z 9800 bajtami danych:
Upłynął limit czasu żądania.
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=118ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Upłynął limit czasu żądania.
Odpowiedź z 66.6.44.4: bajtów=9800 czas=119ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=116ms TTL=48
Statystyka badania ping dla 66.6.44.4:
Pakiety: Wysłane = 9, Odebrane = 7, Utracone = 2
(22% straty),
Szacunkowy czas błądzenia pakietów w millisekundach:
Minimum = 114 ms, Maksimum = 119 ms, Czas średni = 115 ms
`
func mockLossyHostPinger(timeout float64, args ...string) (string, error) {
return lossyPingOutput, nil
}
// Test that Gather works on a ping with lossy packets
func TestLossyPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com"},
pingHost: mockLossyHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
fields := map[string]interface{}{
"packets_transmitted": 9,
"packets_received": 7,
"percent_packet_loss": 22.22222222222222,
"average_response_ms": 115,
"minimum_response_ms": 114,
"maximum_response_ms": 119,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
// Fatal ping output (invalid argument)
var fatalPingOutput = `
Bad option -d.
Usage: ping [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS]
[-r count] [-s count] [[-j host-list] | [-k host-list]]
[-w timeout] [-R] [-S srcaddr] [-4] [-6] target_name
Options:
-t Ping the specified host until stopped.
To see statistics and continue - type Control-Break;
To stop - type Control-C.
-a Resolve addresses to hostnames.
-n count Number of echo requests to send.
-l size Send buffer size.
-f Set Don't Fragment flag in packet (IPv4-only).
-i TTL Time To Live.
-v TOS Type Of Service (IPv4-only. This setting has been deprecated
and has no effect on the type of service field in the IP Header).
-r count Record route for count hops (IPv4-only).
-s count Timestamp for count hops (IPv4-only).
-j host-list Loose source route along host-list (IPv4-only).
-k host-list Strict source route along host-list (IPv4-only).
-w timeout Timeout in milliseconds to wait for each reply.
-R Use routing header to test reverse route also (IPv6-only).
-S srcaddr Source address to use.
-4 Force using IPv4.
-6 Force using IPv6.
`
func mockFatalHostPinger(timeout float64, args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad")
}
// Test that a fatal ping command does not gather any statistics.
func TestFatalPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockFatalHostPinger,
}
p.Gather(&acc)
assert.False(t, acc.HasMeasurement("packets_transmitted"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("packets_received"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("percent_packet_loss"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("average_response_ms"),
"Fatal ping should not have packet measurements")
}

View File

@ -107,7 +107,8 @@ type item struct {
counterHandle win.PDH_HCOUNTER counterHandle win.PDH_HCOUNTER
} }
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", " ", "_") var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
" ", "_", "%", "Percent", `\`, "")
func (m *Win_PerfCounters) AddItem(metrics *itemList, query string, objectName string, counter string, instance string, func (m *Win_PerfCounters) AddItem(metrics *itemList, query string, objectName string, counter string, instance string,
measurement string, include_total bool) { measurement string, include_total bool) {
@ -299,13 +300,12 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
tags["instance"] = s tags["instance"] = s
} }
tags["objectname"] = metric.objectName tags["objectname"] = metric.objectName
fields[sanitizedChars.Replace(string(metric.counter))] = float32(c.FmtValue.DoubleValue) fields[sanitizedChars.Replace(metric.counter)] =
float32(c.FmtValue.DoubleValue)
var measurement string measurement := sanitizedChars.Replace(metric.measurement)
if metric.measurement == "" { if measurement == "" {
measurement = "win_perf_counters" measurement = "win_perf_counters"
} else {
measurement = metric.measurement
} }
acc.AddFields(measurement, fields, tags) acc.AddFields(measurement, fields, tags)
} }

View File

@ -12,17 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var ( var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$")
// Prometheus labels must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
)
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
@ -119,9 +109,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
if len(k) == 0 { if len(k) == 0 {
continue continue
} }
if !labelName.MatchString(k) {
continue
}
labels = append(labels, k) labels = append(labels, k)
l[k] = v l[k] = v
} }
@ -144,11 +131,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
mname = fmt.Sprintf("%s_%s", key, n) mname = fmt.Sprintf("%s_%s", key, n)
} }
// verify that it is a valid measurement name
if !metricName.MatchString(mname) {
continue
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l) desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric var metric prometheus.Metric
var err error var err error

View File

@ -28,6 +28,7 @@ type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
Errors []error
debug bool debug bool
} }
@ -84,6 +85,16 @@ func (a *Accumulator) AddFields(
a.Metrics = append(a.Metrics, p) a.Metrics = append(a.Metrics, p)
} }
// AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) {
if err == nil {
return
}
a.Lock()
a.Errors = append(a.Errors, err)
a.Unlock()
}
func (a *Accumulator) SetPrecision(precision, interval time.Duration) { func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
return return
} }