This commit is contained in:
ncohensm 2016-07-25 11:41:19 -07:00
commit f715c17bea
48 changed files with 2153 additions and 622 deletions

View File

@ -1,5 +1,18 @@
## v1.0 [unreleased] ## v1.0 [unreleased]
### Features
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
- [#1525](https://github.com/influxdata/telegraf/pull/1525): Support setting per-device and total metrics for Docker network and blockio.
### Bugfixes
- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures.
- [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic.
- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes.
- [#1430](https://github.com/influxdata/telegraf/issues/1430): Fix prometheus character sanitizing. Sanitize more win_perf_counters characters.
- [#1534](https://github.com/influxdata/telegraf/pull/1534): Add diskio io_time to FreeBSD & report timing metrics as ms (as linux does).
## v1.0 beta 3 [2016-07-18] ## v1.0 beta 3 [2016-07-18]
### Release Notes ### Release Notes
@ -36,6 +49,7 @@ should now look like:
### Features ### Features
- [#1503](https://github.com/influxdata/telegraf/pull/1503): Add tls support for certs to RabbitMQ input plugin
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
- [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin. - [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin.
@ -49,6 +63,8 @@ should now look like:
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
### Bugfixes ### Bugfixes
@ -68,6 +84,8 @@ should now look like:
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin - [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. - [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. - [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
- [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats
- [#1521](https://github.com/influxdata/telegraf/pull/1521): Fix Redis url, an extra "tcp://" was added.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

2
Godeps
View File

@ -44,7 +44,7 @@ github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37
github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8 github.com/prometheus/procfs 406e5b7bfd8201a36e2bb5f7bdae0b03380c2ce8
github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f github.com/samuel/go-zookeeper 218e9c81c0dd8b3b18172b2bbfad92cc7d6db55f
github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/shirou/gopsutil ee66bc560c366dd33b9a4046ba0b644caba46bed
github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d
github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5 github.com/sparrc/aerospike-client-go d4bb42d2c2d39dae68e054116f4538af189e05d5
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744

View File

@ -156,6 +156,7 @@ Currently implemented sources:
* [exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) * [exec](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
* [filestat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/filestat) * [filestat](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/filestat)
* [haproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/haproxy) * [haproxy](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/haproxy)
* [hddtemp](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/hddtemp)
* [http_response](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/http_response) * [http_response](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/http_response)
* [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin) * [httpjson](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/httpjson) (generic JSON-emitting http service plugin)
* [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb) * [influxdb](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/influxdb)

View File

@ -16,6 +16,8 @@ type Accumulator interface {
tags map[string]string, tags map[string]string,
t ...time.Time) t ...time.Time)
AddError(err error)
Debug() bool Debug() bool
SetDebug(enabled bool) SetDebug(enabled bool)

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"math" "math"
"sync/atomic"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -32,9 +33,9 @@ type accumulator struct {
inputConfig *internal_models.InputConfig inputConfig *internal_models.InputConfig
prefix string
precision time.Duration precision time.Duration
errCount uint64
} }
func (ac *accumulator) Add( func (ac *accumulator) Add(
@ -146,10 +147,6 @@ func (ac *accumulator) AddFields(
} }
timestamp = timestamp.Round(ac.precision) timestamp = timestamp.Round(ac.precision)
if ac.prefix != "" {
measurement = ac.prefix + measurement
}
m, err := telegraf.NewMetric(measurement, tags, result, timestamp) m, err := telegraf.NewMetric(measurement, tags, result, timestamp)
if err != nil { if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error()) log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
@ -161,6 +158,17 @@ func (ac *accumulator) AddFields(
ac.metrics <- m ac.metrics <- m
} }
// AddError passes a runtime error to the accumulator.
// The error will be tagged with the plugin name and written to the log.
func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
}
func (ac *accumulator) Debug() bool { func (ac *accumulator) Debug() bool {
return ac.debug return ac.debug
} }

View File

@ -1,8 +1,11 @@
package agent package agent
import ( import (
"bytes"
"fmt" "fmt"
"log"
"math" "math"
"os"
"testing" "testing"
"time" "time"
@ -10,6 +13,7 @@ import (
"github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/internal/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAdd(t *testing.T) { func TestAdd(t *testing.T) {
@ -454,3 +458,27 @@ func TestAccFilterTags(t *testing.T) {
fmt.Sprintf("acctest value=101 %d", now.UnixNano()), fmt.Sprintf("acctest value=101 %d", now.UnixNano()),
actual) actual)
} }
func TestAccAddError(t *testing.T) {
errBuf := bytes.NewBuffer(nil)
log.SetOutput(errBuf)
defer log.SetOutput(os.Stderr)
a := accumulator{}
a.inputConfig = &internal_models.InputConfig{}
a.inputConfig.Name = "mock_plugin"
a.AddError(fmt.Errorf("foo"))
a.AddError(fmt.Errorf("bar"))
a.AddError(fmt.Errorf("baz"))
errs := bytes.Split(errBuf.Bytes(), []byte{'\n'})
assert.EqualValues(t, 3, a.errCount)
require.Len(t, errs, 4) // 4 because of trailing newline
assert.Contains(t, string(errs[0]), "mock_plugin")
assert.Contains(t, string(errs[0]), "foo")
assert.Contains(t, string(errs[1]), "mock_plugin")
assert.Contains(t, string(errs[1]), "bar")
assert.Contains(t, string(errs[2]), "mock_plugin")
assert.Contains(t, string(errs[2]), "baz")
}

View File

@ -215,6 +215,9 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil { if err := input.Input.Gather(acc); err != nil {
return err return err
} }
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}
// Special instructions for some inputs. cpu, for example, needs to be // Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages. // run twice in order to return cpu usage percentages.

View File

@ -197,7 +197,7 @@
# # Configuration for Graphite server to send metrics to # # Configuration for Graphite server to send metrics to
# [[outputs.graphite]] # [[outputs.graphite]]
# ## TCP endpoint for your graphite instance. # ## TCP endpoint for your graphite instance.
# ## If multiple endpoints are configured, the output will be load balanced. # ## If multiple endpoints are configured, output will be load balanced.
# ## Only one of the endpoints will be written to with each iteration. # ## Only one of the endpoints will be written to with each iteration.
# servers = ["localhost:2003"] # servers = ["localhost:2003"]
# ## Prefix metrics name # ## Prefix metrics name
@ -436,8 +436,8 @@
## disk partitions. ## disk partitions.
## Setting devices will restrict the stats to the specified devices. ## Setting devices will restrict the stats to the specified devices.
# devices = ["sda", "sdb"] # devices = ["sda", "sdb"]
## Uncomment the following line if you do not need disk serial numbers. ## Uncomment the following line if you need disk serial numbers.
# skip_serial_number = true # skip_serial_number = false
# Get kernel statistics from /proc/stat # Get kernel statistics from /proc/stat
@ -465,7 +465,7 @@
# no configuration # no configuration
# # Read stats from an aerospike server # # Read stats from aerospike server(s)
# [[inputs.aerospike]] # [[inputs.aerospike]]
# ## Aerospike servers to connect to (with port) # ## Aerospike servers to connect to (with port)
# ## This plugin will query all namespaces the aerospike # ## This plugin will query all namespaces the aerospike
@ -666,6 +666,13 @@
# container_names = [] # container_names = []
# ## Timeout for docker list, info, and stats commands # ## Timeout for docker list, info, and stats commands
# timeout = "5s" # timeout = "5s"
#
# ## Whether to report for each container per-device blkio (8:0, 8:1...) and
# ## network (eth0, eth1, ...) stats or not
# perdevice = true
# ## Whether to report for each container total blkio and network stats or not
# total = false
#
# # Read statistics from one or many dovecot servers # # Read statistics from one or many dovecot servers
@ -782,9 +789,11 @@
# [[inputs.haproxy]] # [[inputs.haproxy]]
# ## An array of address to gather stats about. Specify an ip on hostname # ## An array of address to gather stats about. Specify an ip on hostname
# ## with optional port. ie localhost, 10.10.3.33:1936, etc. # ## with optional port. ie localhost, 10.10.3.33:1936, etc.
# # ## Make sure you specify the complete path to the stats endpoint
# ## If no servers are specified, then default to 127.0.0.1:1936 # ## ie 10.10.3.33:1936/haproxy?stats
# servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] # #
# ## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats
# servers = ["http://myhaproxy.com:1936/haproxy?stats"]
# ## Or you can also use local socket # ## Or you can also use local socket
# ## servers = ["socket:/run/haproxy/admin.sock"] # ## servers = ["socket:/run/haproxy/admin.sock"]
@ -970,21 +979,35 @@
# # Telegraf plugin for gathering metrics from N Mesos masters # # Telegraf plugin for gathering metrics from N Mesos masters
# [[inputs.mesos]] # [[inputs.mesos]]
# # Timeout, in ms. # ## Timeout, in ms.
# timeout = 100 # timeout = 100
# # A list of Mesos masters, default value is localhost:5050. # ## A list of Mesos masters.
# masters = ["localhost:5050"] # masters = ["localhost:5050"]
# # Metrics groups to be collected, by default, all enabled. # ## Master metrics groups to be collected, by default, all enabled.
# master_collections = [ # master_collections = [
# "resources", # "resources",
# "master", # "master",
# "system", # "system",
# "slaves", # "agents",
# "frameworks", # "frameworks",
# "tasks",
# "messages", # "messages",
# "evqueue", # "evqueue",
# "registrar", # "registrar",
# ] # ]
# ## A list of Mesos slaves, default is []
# # slaves = []
# ## Slave metrics groups to be collected, by default, all enabled.
# # slave_collections = [
# # "resources",
# # "agent",
# # "system",
# # "executors",
# # "tasks",
# # "messages",
# # ]
# ## Include mesos tasks statistics, default is false
# # slave_tasks = true
# # Read metrics from one or many MongoDB servers # # Read metrics from one or many MongoDB servers
@ -995,6 +1018,7 @@
# ## mongodb://10.10.3.33:18832, # ## mongodb://10.10.3.33:18832,
# ## 10.0.0.1:10000, etc. # ## 10.0.0.1:10000, etc.
# servers = ["127.0.0.1:27017"] # servers = ["127.0.0.1:27017"]
# gather_perdb_stats = false
# # Read metrics from one or many mysql servers # # Read metrics from one or many mysql servers
@ -1101,9 +1125,9 @@
# ## file paths for proc files. If empty default paths will be used: # ## file paths for proc files. If empty default paths will be used:
# ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 # ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6
# ## These can also be overridden with env variables, see README. # ## These can also be overridden with env variables, see README.
# proc_net_netstat = "" # proc_net_netstat = "/proc/net/netstat"
# proc_net_snmp = "" # proc_net_snmp = "/proc/net/snmp"
# proc_net_snmp6 = "" # proc_net_snmp6 = "/proc/net/snmp6"
# ## dump metrics with 0 values too # ## dump metrics with 0 values too
# dump_zeros = true # dump_zeros = true
@ -1305,6 +1329,13 @@
# # username = "guest" # # username = "guest"
# # password = "guest" # # password = "guest"
# #
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
#
# ## A list of nodes to pull metrics about. If not specified, metrics for # ## A list of nodes to pull metrics about. If not specified, metrics for
# ## all nodes are gathered. # ## all nodes are gathered.
# # nodes = ["rabbit@node1", "rabbit@node2"] # # nodes = ["rabbit@node1", "rabbit@node2"]
@ -1323,6 +1354,7 @@
# ## e.g. # ## e.g.
# ## tcp://localhost:6379 # ## tcp://localhost:6379
# ## tcp://:password@192.168.99.100 # ## tcp://:password@192.168.99.100
# ## unix:///var/run/redis.sock
# ## # ##
# ## If no servers are specified, then localhost is used as the host. # ## If no servers are specified, then localhost is used as the host.
# ## If no port is specified, 6379 is used # ## If no port is specified, 6379 is used
@ -1559,6 +1591,8 @@
# ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) # ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
# ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) # ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
# patterns = ["%{INFLUXDB_HTTPD_LOG}"] # patterns = ["%{INFLUXDB_HTTPD_LOG}"]
# ## Name of the outputted measurement name.
# measurement = "influxdb_log"
# ## Full path(s) to custom pattern files. # ## Full path(s) to custom pattern files.
# custom_pattern_files = [] # custom_pattern_files = []
# ## Custom patterns can also be defined here. Put one pattern per line. # ## Custom patterns can also be defined here. Put one pattern per line.
@ -1622,6 +1656,21 @@
# data_format = "influx" # data_format = "influx"
# # Read NSQ topic for metrics.
# [[inputs.nsq_consumer]]
# ## An string representing the NSQD TCP Endpoint
# server = "localhost:4150"
# topic = "telegraf"
# channel = "consumer"
# max_in_flight = 100
#
# ## Data format to consume.
# ## Each data format has it's own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
# # Statsd Server # # Statsd Server
# [[inputs.statsd]] # [[inputs.statsd]]
# ## Address and port to host UDP listener on # ## Address and port to host UDP listener on
@ -1725,6 +1774,9 @@
# [inputs.webhooks.github] # [inputs.webhooks.github]
# path = "/github" # path = "/github"
# #
# [inputs.webhooks.mandrill]
# path = "/mandrill"
#
# [inputs.webhooks.rollbar] # [inputs.webhooks.rollbar]
# path = "/rollbar" # path = "/rollbar"

View File

@ -139,7 +139,7 @@ func (c *Config) InputNames() []string {
return name return name
} }
// Outputs returns a list of strings of the configured inputs. // Outputs returns a list of strings of the configured outputs.
func (c *Config) OutputNames() []string { func (c *Config) OutputNames() []string {
var name []string var name []string
for _, output := range c.Outputs { for _, output := range c.Outputs {

File diff suppressed because one or more lines are too long

View File

@ -72,18 +72,17 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
nodes := c.GetNodes() nodes := c.GetNodes()
for _, n := range nodes { for _, n := range nodes {
tags := map[string]string{ tags := map[string]string{
"node_name": n.GetName(),
"aerospike_host": hostport, "aerospike_host": hostport,
} }
fields := make(map[string]interface{}) fields := map[string]interface{}{
"node_name": n.GetName(),
}
stats, err := as.RequestNodeStats(n) stats, err := as.RequestNodeStats(n)
if err != nil { if err != nil {
return err return err
} }
for k, v := range stats { for k, v := range stats {
if iv, err := strconv.ParseInt(v, 10, 64); err == nil { fields[strings.Replace(k, "-", "_", -1)] = parseValue(v)
fields[strings.Replace(k, "-", "_", -1)] = iv
}
} }
acc.AddFields("aerospike_node", fields, tags, time.Now()) acc.AddFields("aerospike_node", fields, tags, time.Now())
@ -94,9 +93,13 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
namespaces := strings.Split(info["namespaces"], ";") namespaces := strings.Split(info["namespaces"], ";")
for _, namespace := range namespaces { for _, namespace := range namespaces {
nTags := copyTags(tags) nTags := map[string]string{
"aerospike_host": hostport,
}
nTags["namespace"] = namespace nTags["namespace"] = namespace
nFields := make(map[string]interface{}) nFields := map[string]interface{}{
"node_name": n.GetName(),
}
info, err := as.RequestNodeInfo(n, "namespace/"+namespace) info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
if err != nil { if err != nil {
continue continue
@ -107,9 +110,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
if len(parts) < 2 { if len(parts) < 2 {
continue continue
} }
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil { nFields[strings.Replace(parts[0], "-", "_", -1)] = parseValue(parts[1])
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
}
} }
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
} }
@ -117,6 +118,16 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
return nil return nil
} }
func parseValue(v string) interface{} {
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
return parsed
} else if parsed, err := strconv.ParseBool(v); err == nil {
return parsed
} else {
return v
}
}
func copyTags(m map[string]string) map[string]string { func copyTags(m map[string]string) map[string]string {
out := make(map[string]string) out := make(map[string]string)
for k, v := range m { for k, v := range m {

View File

@ -23,6 +23,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/http_listener" _ "github.com/influxdata/telegraf/plugins/inputs/http_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/http_response"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb"

View File

@ -3,12 +3,14 @@ package dns_query
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/miekg/dns" "github.com/miekg/dns"
"net" "net"
"strconv" "strconv"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
) )
type DnsQuery struct { type DnsQuery struct {
@ -55,12 +57,12 @@ func (d *DnsQuery) Description() string {
} }
func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
d.setDefaultValues() d.setDefaultValues()
errChan := errchan.New(len(d.Domains) * len(d.Servers))
for _, domain := range d.Domains { for _, domain := range d.Domains {
for _, server := range d.Servers { for _, server := range d.Servers {
dnsQueryTime, err := d.getDnsQueryTime(domain, server) dnsQueryTime, err := d.getDnsQueryTime(domain, server)
if err != nil { errChan.C <- err
return err
}
tags := map[string]string{ tags := map[string]string{
"server": server, "server": server,
"domain": domain, "domain": domain,
@ -72,7 +74,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
} }
} }
return nil return errChan.Error()
} }
func (d *DnsQuery) setDefaultValues() { func (d *DnsQuery) setDefaultValues() {

View File

@ -25,6 +25,8 @@ type Docker struct {
Endpoint string Endpoint string
ContainerNames []string ContainerNames []string
Timeout internal.Duration Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
client DockerClient client DockerClient
} }
@ -58,6 +60,13 @@ var sampleConfig = `
container_names = [] container_names = []
## Timeout for docker list, info, and stats commands ## Timeout for docker list, info, and stats commands
timeout = "5s" timeout = "5s"
## Whether to report for each container per-device blkio (8:0, 8:1...) and
## network (eth0, eth1, ...) stats or not
perdevice = true
## Whether to report for each container total blkio and network stats or not
total = false
` `
// Description returns input description // Description returns input description
@ -207,9 +216,18 @@ func (d *Docker) gatherContainer(
cname = strings.TrimPrefix(container.Names[0], "/") cname = strings.TrimPrefix(container.Names[0], "/")
} }
// the image name sometimes has a version part.
// ie, rabbitmq:3-management
imageParts := strings.Split(container.Image, ":")
imageName := imageParts[0]
imageVersion := "unknown"
if len(imageParts) > 1 {
imageVersion = imageParts[1]
}
tags := map[string]string{ tags := map[string]string{
"container_name": cname, "container_name": cname,
"container_image": container.Image, "container_image": imageName,
"container_version": imageVersion,
} }
if len(d.ContainerNames) > 0 { if len(d.ContainerNames) > 0 {
if !sliceContains(cname, d.ContainerNames) { if !sliceContains(cname, d.ContainerNames) {
@ -237,7 +255,7 @@ func (d *Docker) gatherContainer(
tags[k] = label tags[k] = label
} }
gatherContainerStats(v, acc, tags, container.ID) gatherContainerStats(v, acc, tags, container.ID, d.PerDevice, d.Total)
return nil return nil
} }
@ -247,6 +265,8 @@ func gatherContainerStats(
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
id string, id string,
perDevice bool,
total bool,
) { ) {
now := stat.Read now := stat.Read
@ -314,6 +334,7 @@ func gatherContainerStats(
acc.AddFields("docker_container_cpu", fields, percputags, now) acc.AddFields("docker_container_cpu", fields, percputags, now)
} }
totalNetworkStatMap := make(map[string]interface{})
for network, netstats := range stat.Networks { for network, netstats := range stat.Networks {
netfields := map[string]interface{}{ netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped, "rx_dropped": netstats.RxDropped,
@ -327,12 +348,35 @@ func gatherContainerStats(
"container_id": id, "container_id": id,
} }
// Create a new network tag dictionary for the "network" tag // Create a new network tag dictionary for the "network" tag
nettags := copyTags(tags) if perDevice {
nettags["network"] = network nettags := copyTags(tags)
acc.AddFields("docker_container_net", netfields, nettags, now) nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, now)
}
if total {
for field, value := range netfields {
if field == "container_id" {
continue
}
_, ok := totalNetworkStatMap[field]
if ok {
totalNetworkStatMap[field] = totalNetworkStatMap[field].(uint64) + value.(uint64)
} else {
totalNetworkStatMap[field] = value
}
}
}
} }
gatherBlockIOMetrics(stat, acc, tags, now, id) // totalNetworkStatMap could be empty if container is running with --net=host.
if total && len(totalNetworkStatMap) != 0 {
nettags := copyTags(tags)
nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, now)
}
gatherBlockIOMetrics(stat, acc, tags, now, id, perDevice, total)
} }
func calculateMemPercent(stat *types.StatsJSON) float64 { func calculateMemPercent(stat *types.StatsJSON) float64 {
@ -361,6 +405,8 @@ func gatherBlockIOMetrics(
tags map[string]string, tags map[string]string,
now time.Time, now time.Time,
id string, id string,
perDevice bool,
total bool,
) { ) {
blkioStats := stat.BlkioStats blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats // Make a map of devices to their block io stats
@ -422,11 +468,33 @@ func gatherBlockIOMetrics(
deviceStatMap[device]["sectors_recursive"] = metric.Value deviceStatMap[device]["sectors_recursive"] = metric.Value
} }
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap { for device, fields := range deviceStatMap {
iotags := copyTags(tags)
iotags["device"] = device
fields["container_id"] = id fields["container_id"] = id
acc.AddFields("docker_container_blkio", fields, iotags, now) if perDevice {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_container_blkio", fields, iotags, now)
}
if total {
for field, value := range fields {
if field == "container_id" {
continue
}
_, ok := totalStatMap[field]
if ok {
totalStatMap[field] = totalStatMap[field].(uint64) + value.(uint64)
} else {
totalStatMap[field] = value
}
}
}
}
if total {
totalStatMap["container_id"] = id
iotags := copyTags(tags)
iotags["device"] = "total"
acc.AddFields("docker_container_blkio", totalStatMap, iotags, now)
} }
} }
@ -471,7 +539,8 @@ func parseSize(sizeStr string) (int64, error) {
func init() { func init() {
inputs.Add("docker", func() telegraf.Input { inputs.Add("docker", func() telegraf.Input {
return &Docker{ return &Docker{
Timeout: internal.Duration{Duration: time.Second * 5}, PerDevice: true,
Timeout: internal.Duration{Duration: time.Second * 5},
} }
}) })
} }

View File

@ -24,7 +24,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
"container_name": "redis", "container_name": "redis",
"container_image": "redis/image", "container_image": "redis/image",
} }
gatherContainerStats(stats, &acc, tags, "123456789") gatherContainerStats(stats, &acc, tags, "123456789", true, true)
// test docker_container_net measurement // test docker_container_net measurement
netfields := map[string]interface{}{ netfields := map[string]interface{}{
@ -42,6 +42,21 @@ func TestDockerGatherContainerStats(t *testing.T) {
nettags["network"] = "eth0" nettags["network"] = "eth0"
acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags) acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags)
netfields = map[string]interface{}{
"rx_dropped": uint64(6),
"rx_bytes": uint64(8),
"rx_errors": uint64(10),
"tx_packets": uint64(12),
"tx_dropped": uint64(6),
"rx_packets": uint64(8),
"tx_errors": uint64(10),
"tx_bytes": uint64(12),
"container_id": "123456789",
}
nettags = copyTags(tags)
nettags["network"] = "total"
acc.AssertContainsTaggedFields(t, "docker_container_net", netfields, nettags)
// test docker_blkio measurement // test docker_blkio measurement
blkiotags := copyTags(tags) blkiotags := copyTags(tags)
blkiotags["device"] = "6:0" blkiotags["device"] = "6:0"
@ -52,6 +67,15 @@ func TestDockerGatherContainerStats(t *testing.T) {
} }
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags) acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
blkiotags = copyTags(tags)
blkiotags["device"] = "total"
blkiofields = map[string]interface{}{
"io_service_bytes_recursive_read": uint64(100),
"io_serviced_recursive_write": uint64(302),
"container_id": "123456789",
}
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
// test docker_container_mem measurement // test docker_container_mem measurement
memfields := map[string]interface{}{ memfields := map[string]interface{}{
"max_usage": uint64(1001), "max_usage": uint64(1001),
@ -186,6 +210,17 @@ func testStats() *types.StatsJSON {
TxBytes: 4, TxBytes: 4,
} }
stats.Networks["eth1"] = types.NetworkStats{
RxDropped: 5,
RxBytes: 6,
RxErrors: 7,
TxPackets: 8,
TxDropped: 5,
RxPackets: 6,
TxErrors: 7,
TxBytes: 8,
}
sbr := types.BlkioStatEntry{ sbr := types.BlkioStatEntry{
Major: 6, Major: 6,
Minor: 0, Minor: 0,
@ -198,11 +233,19 @@ func testStats() *types.StatsJSON {
Op: "write", Op: "write",
Value: 101, Value: 101,
} }
sr2 := types.BlkioStatEntry{
Major: 6,
Minor: 1,
Op: "write",
Value: 201,
}
stats.BlkioStats.IoServiceBytesRecursive = append( stats.BlkioStats.IoServiceBytesRecursive = append(
stats.BlkioStats.IoServiceBytesRecursive, sbr) stats.BlkioStats.IoServiceBytesRecursive, sbr)
stats.BlkioStats.IoServicedRecursive = append( stats.BlkioStats.IoServicedRecursive = append(
stats.BlkioStats.IoServicedRecursive, sr) stats.BlkioStats.IoServicedRecursive, sr)
stats.BlkioStats.IoServicedRecursive = append(
stats.BlkioStats.IoServicedRecursive, sr2)
return stats return stats
} }
@ -378,9 +421,10 @@ func TestDockerGatherInfo(t *testing.T) {
"container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
}, },
map[string]string{ map[string]string{
"container_name": "etcd2", "container_name": "etcd2",
"container_image": "quay.io/coreos/etcd:v2.2.2", "container_image": "quay.io/coreos/etcd",
"cpu": "cpu3", "cpu": "cpu3",
"container_version": "v2.2.2",
}, },
) )
acc.AssertContainsTaggedFields(t, acc.AssertContainsTaggedFields(t,
@ -423,8 +467,9 @@ func TestDockerGatherInfo(t *testing.T) {
"container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
}, },
map[string]string{ map[string]string{
"container_name": "etcd2", "container_name": "etcd2",
"container_image": "quay.io/coreos/etcd:v2.2.2", "container_image": "quay.io/coreos/etcd",
"container_version": "v2.2.2",
}, },
) )

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -51,7 +52,6 @@ const defaultPort = "24242"
// Reads stats from all configured servers. // Reads stats from all configured servers.
func (d *Dovecot) Gather(acc telegraf.Accumulator) error { func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
if !validQuery[d.Type] { if !validQuery[d.Type] {
return fmt.Errorf("Error: %s is not a valid query type\n", return fmt.Errorf("Error: %s is not a valid query type\n",
d.Type) d.Type)
@ -61,31 +61,27 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
d.Servers = append(d.Servers, "127.0.0.1:24242") d.Servers = append(d.Servers, "127.0.0.1:24242")
} }
var wg sync.WaitGroup
var outerr error
if len(d.Filters) <= 0 { if len(d.Filters) <= 0 {
d.Filters = append(d.Filters, "") d.Filters = append(d.Filters, "")
} }
for _, serv := range d.Servers { var wg sync.WaitGroup
errChan := errchan.New(len(d.Servers) * len(d.Filters))
for _, server := range d.Servers {
for _, filter := range d.Filters { for _, filter := range d.Filters {
wg.Add(1) wg.Add(1)
go func(serv string, filter string) { go func(s string, f string) {
defer wg.Done() defer wg.Done()
outerr = d.gatherServer(serv, acc, d.Type, filter) errChan.C <- d.gatherServer(s, acc, d.Type, f)
}(serv, filter) }(server, filter)
} }
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error {
_, _, err := net.SplitHostPort(addr) _, _, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return fmt.Errorf("Error: %s on url %s\n", err, addr) return fmt.Errorf("Error: %s on url %s\n", err, addr)

View File

@ -92,9 +92,11 @@ type haproxy struct {
var sampleConfig = ` var sampleConfig = `
## An array of address to gather stats about. Specify an ip on hostname ## An array of address to gather stats about. Specify an ip on hostname
## with optional port. ie localhost, 10.10.3.33:1936, etc. ## with optional port. ie localhost, 10.10.3.33:1936, etc.
## Make sure you specify the complete path to the stats endpoint
## If no servers are specified, then default to 127.0.0.1:1936 ## ie 10.10.3.33:1936/haproxy?stats
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] #
## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats
servers = ["http://myhaproxy.com:1936/haproxy?stats"]
## Or you can also use local socket ## Or you can also use local socket
## servers = ["socket:/run/haproxy/admin.sock"] ## servers = ["socket:/run/haproxy/admin.sock"]
` `
@ -111,7 +113,7 @@ func (r *haproxy) Description() string {
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (g *haproxy) Gather(acc telegraf.Accumulator) error { func (g *haproxy) Gather(acc telegraf.Accumulator) error {
if len(g.Servers) == 0 { if len(g.Servers) == 0 {
return g.gatherServer("http://127.0.0.1:1936", acc) return g.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -167,12 +169,16 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
g.client = client g.client = client
} }
if !strings.HasSuffix(addr, ";csv") {
addr += "/;csv"
}
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err) return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
} }
req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil) req, err := http.NewRequest("GET", addr, nil)
if u.User != nil { if u.User != nil {
p, _ := u.User.Password() p, _ := u.User.Password()
req.SetBasicAuth(u.User.Username(), p) req.SetBasicAuth(u.User.Username(), p)
@ -184,7 +190,7 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
} }
if res.StatusCode != 200 { if res.StatusCode != 200 {
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) return fmt.Errorf("Unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode)
} }
return importCsvResult(res.Body, acc, u.Host) return importCsvResult(res.Body, acc, u.Host)

View File

@ -243,7 +243,7 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
err := r.Gather(&acc) err := r.Gather(&acc)
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "127.0.0.1:1936/;csv") assert.Contains(t, err.Error(), "127.0.0.1:1936/haproxy?stats/;csv")
} }
const csvOutputSample = ` const csvOutputSample = `

View File

@ -0,0 +1,22 @@
# Hddtemp Input Plugin
This plugin reads data from hddtemp daemon
## Requirements
Hddtemp should be installed and its daemon running
## Configuration
```
[[inputs.hddtemp]]
## By default, telegraf gathers temps data from all disks detected by the
## hddtemp.
##
## Only collect temps from the selected disks.
##
## A * as the device name will return the temperature values of all disks.
##
# address = "127.0.0.1:7634"
# devices = ["sda", "*"]
```

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2016 Mendelson Gusmão
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,61 @@
package hddtemp
import (
"bytes"
"io"
"net"
"strconv"
"strings"
)
type disk struct {
DeviceName string
Model string
Temperature int32
Unit string
Status string
}
func Fetch(address string) ([]disk, error) {
var (
err error
conn net.Conn
buffer bytes.Buffer
disks []disk
)
if conn, err = net.Dial("tcp", address); err != nil {
return nil, err
}
if _, err = io.Copy(&buffer, conn); err != nil {
return nil, err
}
fields := strings.Split(buffer.String(), "|")
for index := 0; index < len(fields)/5; index++ {
status := ""
offset := index * 5
device := fields[offset+1]
device = device[strings.LastIndex(device, "/")+1:]
temperatureField := fields[offset+3]
temperature, err := strconv.ParseInt(temperatureField, 10, 32)
if err != nil {
temperature = 0
status = temperatureField
}
disks = append(disks, disk{
DeviceName: device,
Model: fields[offset+2],
Temperature: int32(temperature),
Unit: fields[offset+4],
Status: status,
})
}
return disks, nil
}

View File

@ -0,0 +1,116 @@
package hddtemp
import (
"net"
"reflect"
"testing"
)
func TestFetch(t *testing.T) {
l := serve(t, []byte("|/dev/sda|foobar|36|C|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "sda",
Model: "foobar",
Temperature: 36,
Unit: "C",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func TestFetchWrongAddress(t *testing.T) {
_, err := Fetch("127.0.0.1:1")
if err == nil {
t.Error("expecting err to be non-nil")
}
}
func TestFetchStatus(t *testing.T) {
l := serve(t, []byte("|/dev/sda|foobar|SLP|C|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "sda",
Model: "foobar",
Temperature: 0,
Unit: "C",
Status: "SLP",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func TestFetchTwoDisks(t *testing.T) {
l := serve(t, []byte("|/dev/hda|ST380011A|46|C||/dev/hdd|ST340016A|SLP|*|"))
defer l.Close()
disks, err := Fetch(l.Addr().String())
if err != nil {
t.Error("expecting err to be nil")
}
expected := []disk{
{
DeviceName: "hda",
Model: "ST380011A",
Temperature: 46,
Unit: "C",
},
{
DeviceName: "hdd",
Model: "ST340016A",
Temperature: 0,
Unit: "*",
Status: "SLP",
},
}
if !reflect.DeepEqual(expected, disks) {
t.Error("disks' slice is different from expected")
}
}
func serve(t *testing.T, data []byte) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
go func(t *testing.T) {
conn, err := l.Accept()
if err != nil {
t.Fatal(err)
}
conn.Write(data)
conn.Close()
}(t)
return l
}

View File

@ -0,0 +1,74 @@
// +build linux
package hddtemp
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
gohddtemp "github.com/influxdata/telegraf/plugins/inputs/hddtemp/go-hddtemp"
)
const defaultAddress = "127.0.0.1:7634"
type HDDTemp struct {
Address string
Devices []string
}
func (_ *HDDTemp) Description() string {
return "Monitor disks' temperatures using hddtemp"
}
var hddtempSampleConfig = `
## By default, telegraf gathers temps data from all disks detected by the
## hddtemp.
##
## Only collect temps from the selected disks.
##
## A * as the device name will return the temperature values of all disks.
##
# address = "127.0.0.1:7634"
# devices = ["sda", "*"]
`
func (_ *HDDTemp) SampleConfig() string {
return hddtempSampleConfig
}
func (h *HDDTemp) Gather(acc telegraf.Accumulator) error {
disks, err := gohddtemp.Fetch(h.Address)
if err != nil {
return err
}
for _, disk := range disks {
for _, chosenDevice := range h.Devices {
if chosenDevice == "*" || chosenDevice == disk.DeviceName {
tags := map[string]string{
"device": disk.DeviceName,
"model": disk.Model,
"unit": disk.Unit,
"status": disk.Status,
}
fields := map[string]interface{}{
disk.DeviceName: disk.Temperature,
}
acc.AddFields("hddtemp", fields, tags)
}
}
}
return nil
}
func init() {
inputs.Add("hddtemp", func() telegraf.Input {
return &HDDTemp{
Address: defaultAddress,
Devices: []string{"*"},
}
})
}

View File

@ -0,0 +1,3 @@
// +build !linux
package hddtemp

View File

@ -249,7 +249,14 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
switch t := values.(type) { switch t := values.(type) {
case map[string]interface{}: case map[string]interface{}:
for k, v := range t { for k, v := range t {
fields[measurement+"_"+k] = v switch t2 := v.(type) {
case map[string]interface{}:
for k2, v2 := range t2 {
fields[measurement+"_"+k+"_"+k2] = v2
}
case interface{}:
fields[measurement+"_"+k] = t2
}
} }
case interface{}: case interface{}:
fields[measurement] = t fields[measurement] = t

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -73,19 +74,16 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error {
return m.gatherServer(":11211", false, acc) return m.gatherServer(":11211", false, acc)
} }
errChan := errchan.New(len(m.Servers) + len(m.UnixSockets))
for _, serverAddress := range m.Servers { for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, false, acc); err != nil { errChan.C <- m.gatherServer(serverAddress, false, acc)
return err
}
} }
for _, unixAddress := range m.UnixSockets { for _, unixAddress := range m.UnixSockets {
if err := m.gatherServer(unixAddress, true, acc); err != nil { errChan.C <- m.gatherServer(unixAddress, true, acc)
return err
}
} }
return nil return errChan.Error()
} }
func (m *Memcached) gatherServer( func (m *Memcached) gatherServer(

View File

@ -1,6 +1,6 @@
# Mesos Input Plugin # Mesos Input Plugin
This input plugin gathers metrics from Mesos (*currently only Mesos masters*). This input plugin gathers metrics from Mesos.
For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page. For more information, please check the [Mesos Observability Metrics](http://mesos.apache.org/documentation/latest/monitoring/) page.
### Configuration: ### Configuration:
@ -8,14 +8,41 @@ For more information, please check the [Mesos Observability Metrics](http://meso
```toml ```toml
# Telegraf plugin for gathering metrics from N Mesos masters # Telegraf plugin for gathering metrics from N Mesos masters
[[inputs.mesos]] [[inputs.mesos]]
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = ["resources","master","system","slaves","frameworks","messages","evqueue","registrar"] master_collections = [
"resources",
"master",
"system",
"agents",
"frameworks",
"tasks",
"messages",
"evqueue",
"registrar",
]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
``` ```
By dafault this plugin is not configured to gather metrics from mesos. Since mesos cluster can be deployed in numerous ways it does not provide ane default
values in that matter. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally by enabling `slave_tasks` will allow
agthering metrics from takss runing on specified slaves (this options is disabled by default).
### Measurements & Fields: ### Measurements & Fields:
Mesos master metric groups Mesos master metric groups
@ -33,6 +60,12 @@ Mesos master metric groups
- master/disk_revocable_percent - master/disk_revocable_percent
- master/disk_revocable_total - master/disk_revocable_total
- master/disk_revocable_used - master/disk_revocable_used
- master/gpus_percent
- master/gpus_used
- master/gpus_total
- master/gpus_revocable_percent
- master/gpus_revocable_total
- master/gpus_revocable_used
- master/mem_percent - master/mem_percent
- master/mem_used - master/mem_used
- master/mem_total - master/mem_total
@ -136,17 +169,111 @@ Mesos master metric groups
- registrar/state_store_ms/p999 - registrar/state_store_ms/p999
- registrar/state_store_ms/p9999 - registrar/state_store_ms/p9999
Mesos slave metric groups
- resources
- slave/cpus_percent
- slave/cpus_used
- slave/cpus_total
- slave/cpus_revocable_percent
- slave/cpus_revocable_total
- slave/cpus_revocable_used
- slave/disk_percent
- slave/disk_used
- slave/disk_total
- slave/disk_revocable_percent
- slave/disk_revocable_total
- slave/disk_revocable_used
- slave/gpus_percent
- slave/gpus_used
- slave/gpus_total,
- slave/gpus_revocable_percent
- slave/gpus_revocable_total
- slave/gpus_revocable_used
- slave/mem_percent
- slave/mem_used
- slave/mem_total
- slave/mem_revocable_percent
- slave/mem_revocable_total
- slave/mem_revocable_used
- agent
- slave/registered
- slave/uptime_secs
- system
- system/cpus_total
- system/load_15min
- system/load_5min
- system/load_1min
- system/mem_free_bytes
- system/mem_total_bytes
- executors
- containerizer/mesos/container_destroy_errors
- slave/container_launch_errors
- slave/executors_preempted
- slave/frameworks_active
- slave/executor_directory_max_allowed_age_secs
- slave/executors_registering
- slave/executors_running
- slave/executors_terminated
- slave/executors_terminating
- slave/recovery_errors
- tasks
- slave/tasks_failed
- slave/tasks_finished
- slave/tasks_killed
- slave/tasks_lost
- slave/tasks_running
- slave/tasks_staging
- slave/tasks_starting
- messages
- slave/invalid_framework_messages
- slave/invalid_status_updates
- slave/valid_framework_messages
- slave/valid_status_updates
Mesos tasks metric groups
- executor_id
- executor_name
- framework_id
- source
- statistics (all metrics below will have `statistics_` prefix included in their names
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp
### Tags: ### Tags:
- All measurements have the following tags: - All master/slave measurements have the following tags:
- server
- role (master/slave)
- Tasks measurements have the following tags:
- server - server
### Example Output: ### Example Output:
``` ```
$ telegraf -config ~/mesos.conf -input-filter mesos -test $ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1 * Plugin: mesos, Collection 1
mesos,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0, mesos,host=172.17.8.102,server=172.17.8.101 allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0, master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2, master/cpus_revocable_used=0,master/cpus_total=2,
master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0, master/cpus_used=0,master/disk_percent=0,master/disk_revocable_percent=0,
@ -163,3 +290,16 @@ master/mem_revocable_used=0,master/mem_total=1002,
master/mem_used=0,master/messages_authenticate=0, master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ... master/messages_deactivate_framework=0 ...
``` ```
Meoso tasks metrics (if enabled):
```
mesos-tasks,host=172.17.8.102,server=172.17.8.101,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
statistics_cpus_limit=0.2,statistics_cpus_system_time_secs=142.49,statistics_cpus_user_time_secs=388.14,
statistics_mem_anon_bytes=359129088,statistics_mem_cache_bytes=3964928,
statistics_mem_critical_pressure_counter=0,statistics_mem_file_bytes=3964928,
statistics_mem_limit_bytes=767557632,statistics_mem_low_pressure_counter=0,
statistics_mem_mapped_file_bytes=114688,statistics_mem_medium_pressure_counter=0,
statistics_mem_rss_bytes=359129088,statistics_mem_swap_bytes=0,statistics_mem_total_bytes=363094016,
statistics_mem_total_memsw_bytes=363094016,statistics_mem_unevictable_bytes=0,
statistics_timestamp=1465486052.70525 1465486053052811792...
```

View File

@ -17,33 +17,57 @@ import (
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
type Role string
const (
MASTER Role = "master"
SLAVE = "slave"
)
type Mesos struct { type Mesos struct {
Timeout int Timeout int
Masters []string Masters []string
MasterCols []string `toml:"master_collections"` MasterCols []string `toml:"master_collections"`
Slaves []string
SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool
} }
var defaultMetrics = []string{ var allMetrics = map[Role][]string{
"resources", "master", "system", "slaves", "frameworks", MASTER: []string{"resources", "master", "system", "agents", "frameworks", "tasks", "messages", "evqueue", "registrar"},
"tasks", "messages", "evqueue", "messages", "registrar", SLAVE: []string{"resources", "agent", "system", "executors", "tasks", "messages"},
} }
var sampleConfig = ` var sampleConfig = `
# Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
# A list of Mesos masters, default value is localhost:5050. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["localhost:5050"]
# Metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
"master", "master",
"system", "system",
"slaves", "agents",
"frameworks", "frameworks",
"tasks",
"messages", "messages",
"evqueue", "evqueue",
"registrar", "registrar",
] ]
## A list of Mesos slaves, default is []
# slaves = []
## Slave metrics groups to be collected, by default, all enabled.
# slave_collections = [
# "resources",
# "agent",
# "system",
# "executors",
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@ -56,21 +80,54 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters" return "Telegraf plugin for gathering metrics from N Mesos masters"
} }
func (m *Mesos) SetDefaults() {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
if len(m.SlaveCols) == 0 {
m.SlaveCols = allMetrics[SLAVE]
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
}
// Gather() metrics from given list of Mesos Masters // Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error { func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var errorChannel chan error var errorChannel chan error
if len(m.Masters) == 0 { m.SetDefaults()
m.Masters = []string{"localhost:5050"}
}
errorChannel = make(chan error, len(m.Masters)*2) errorChannel = make(chan error, len(m.Masters)+2*len(m.Slaves))
for _, v := range m.Masters { for _, v := range m.Masters {
wg.Add(1) wg.Add(1)
go func(c string) { go func(c string) {
errorChannel <- m.gatherMetrics(c, acc) errorChannel <- m.gatherMainMetrics(c, ":5050", MASTER, acc)
wg.Done()
return
}(v)
}
for _, v := range m.Slaves {
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherMainMetrics(c, ":5051", MASTER, acc)
wg.Done()
return
}(v)
if !m.SlaveTasks {
continue
}
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done() wg.Done()
return return
}(v) }(v)
@ -94,7 +151,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
} }
// metricsDiff() returns set names for removal // metricsDiff() returns set names for removal
func metricsDiff(w []string) []string { func metricsDiff(role Role, w []string) []string {
b := []string{} b := []string{}
s := make(map[string]bool) s := make(map[string]bool)
@ -106,7 +163,7 @@ func metricsDiff(w []string) []string {
s[v] = true s[v] = true
} }
for _, d := range defaultMetrics { for _, d := range allMetrics[role] {
if _, ok := s[d]; !ok { if _, ok := s[d]; !ok {
b = append(b, d) b = append(b, d)
} }
@ -116,156 +173,239 @@ func metricsDiff(w []string) []string {
} }
// masterBlocks serves as kind of metrics registry groupping them in sets // masterBlocks serves as kind of metrics registry groupping them in sets
func masterBlocks(g string) []string { func getMetrics(role Role, group string) []string {
var m map[string][]string var m map[string][]string
m = make(map[string][]string) m = make(map[string][]string)
m["resources"] = []string{ if role == MASTER {
"master/cpus_percent", m["resources"] = []string{
"master/cpus_used", "master/cpus_percent",
"master/cpus_total", "master/cpus_used",
"master/cpus_revocable_percent", "master/cpus_total",
"master/cpus_revocable_total", "master/cpus_revocable_percent",
"master/cpus_revocable_used", "master/cpus_revocable_total",
"master/disk_percent", "master/cpus_revocable_used",
"master/disk_used", "master/disk_percent",
"master/disk_total", "master/disk_used",
"master/disk_revocable_percent", "master/disk_total",
"master/disk_revocable_total", "master/disk_revocable_percent",
"master/disk_revocable_used", "master/disk_revocable_total",
"master/mem_percent", "master/disk_revocable_used",
"master/mem_used", "master/gpus_percent",
"master/mem_total", "master/gpus_used",
"master/mem_revocable_percent", "master/gpus_total",
"master/mem_revocable_total", "master/gpus_revocable_percent",
"master/mem_revocable_used", "master/gpus_revocable_total",
"master/gpus_revocable_used",
"master/mem_percent",
"master/mem_used",
"master/mem_total",
"master/mem_revocable_percent",
"master/mem_revocable_total",
"master/mem_revocable_used",
}
m["master"] = []string{
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["agents"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
} else if role == SLAVE {
m["resources"] = []string{
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
}
m["agent"] = []string{
"slave/registered",
"slave/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["executors"] = []string{
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
}
m["tasks"] = []string{
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
}
m["messages"] = []string{
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
} }
m["master"] = []string{ ret, ok := m[group]
"master/elected",
"master/uptime_secs",
}
m["system"] = []string{
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
}
m["slaves"] = []string{
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
}
m["frameworks"] = []string{
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
}
m["tasks"] = []string{
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
}
m["messages"] = []string{
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
}
m["evqueue"] = []string{
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
}
m["registrar"] = []string{
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
ret, ok := m[g]
if !ok { if !ok {
log.Println("[mesos] Unkown metrics group: ", g) log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group)
return []string{} return []string{}
} }
return ret return ret
} }
// removeGroup(), remove unwanted sets func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
func (m *Mesos) removeGroup(j *map[string]interface{}) {
var ok bool var ok bool
var selectedMetrics []string
b := metricsDiff(m.MasterCols) if role == MASTER {
selectedMetrics = m.MasterCols
} else if role == SLAVE {
selectedMetrics = m.SlaveCols
}
for _, k := range b { for _, k := range metricsDiff(role, selectedMetrics) {
for _, v := range masterBlocks(k) { for _, v := range getMetrics(role, k) {
if _, ok = (*j)[v]; ok { if _, ok = (*metrics)[v]; ok {
delete((*j), v) delete((*metrics), v)
} }
} }
} }
@ -280,23 +420,66 @@ var client = &http.Client{
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
} }
// This should not belong to the object func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error { var metrics []map[string]interface{}
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a) host, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
host = a host = address
a = a + ":5050" address = address + defaultPort
} }
tags := map[string]string{ tags := map[string]string{
"server": host, "server": host,
} }
if m.Timeout == 0 { ts := strconv.Itoa(m.Timeout) + "ms"
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100 resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil {
return err
}
data, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return err
}
if err = json.Unmarshal([]byte(data), &metrics); err != nil {
return errors.New("Error decoding JSON response")
}
for _, task := range metrics {
tags["task_id"] = task["executor_id"].(string)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", task)
if err != nil {
return err
}
acc.AddFields("mesos-tasks", jf.Fields, tags)
}
return nil
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{
"server": host,
"role": string(role),
} }
ts := strconv.Itoa(m.Timeout) + "ms" ts := strconv.Itoa(m.Timeout) + "ms"
@ -317,7 +500,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
return errors.New("Error decoding JSON response") return errors.New("Error decoding JSON response")
} }
m.removeGroup(&jsonOut) m.filterMetrics(role, &jsonOut)
jf := jsonparser.JSONFlattener{} jf := jsonparser.JSONFlattener{}

View File

@ -2,70 +2,275 @@ package mesos
import ( import (
"encoding/json" "encoding/json"
"fmt"
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"testing" "testing"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
var mesosMetrics map[string]interface{} var masterMetrics map[string]interface{}
var ts *httptest.Server var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server
func randUUID() string {
b := make([]byte, 16)
rand.Read(b)
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
func generateMetrics() { func generateMetrics() {
mesosMetrics = make(map[string]interface{}) masterMetrics = make(map[string]interface{})
metricNames := []string{"master/cpus_percent", "master/cpus_used", "master/cpus_total", metricNames := []string{
"master/cpus_revocable_percent", "master/cpus_revocable_total", "master/cpus_revocable_used", // resources
"master/disk_percent", "master/disk_used", "master/disk_total", "master/disk_revocable_percent", "master/cpus_percent",
"master/disk_revocable_total", "master/disk_revocable_used", "master/mem_percent", "master/cpus_used",
"master/mem_used", "master/mem_total", "master/mem_revocable_percent", "master/mem_revocable_total", "master/cpus_total",
"master/mem_revocable_used", "master/elected", "master/uptime_secs", "system/cpus_total", "master/cpus_revocable_percent",
"system/load_15min", "system/load_5min", "system/load_1min", "system/mem_free_bytes", "master/cpus_revocable_total",
"system/mem_total_bytes", "master/slave_registrations", "master/slave_removals", "master/cpus_revocable_used",
"master/slave_reregistrations", "master/slave_shutdowns_scheduled", "master/slave_shutdowns_canceled", "master/disk_percent",
"master/slave_shutdowns_completed", "master/slaves_active", "master/slaves_connected", "master/disk_used",
"master/slaves_disconnected", "master/slaves_inactive", "master/frameworks_active", "master/disk_total",
"master/frameworks_connected", "master/frameworks_disconnected", "master/frameworks_inactive", "master/disk_revocable_percent",
"master/outstanding_offers", "master/tasks_error", "master/tasks_failed", "master/tasks_finished", "master/disk_revocable_total",
"master/tasks_killed", "master/tasks_lost", "master/tasks_running", "master/tasks_staging", "master/disk_revocable_used",
"master/tasks_starting", "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", "master/gpus_percent",
"master/invalid_status_update_acknowledgements", "master/invalid_status_updates", "master/gpus_used",
"master/dropped_messages", "master/messages_authenticate", "master/messages_deactivate_framework", "master/gpus_total",
"master/messages_decline_offers", "master/messages_executor_to_framework", "master/messages_exited_executor", "master/gpus_revocable_percent",
"master/messages_framework_to_executor", "master/messages_kill_task", "master/messages_launch_tasks", "master/gpus_revocable_total",
"master/messages_reconcile_tasks", "master/messages_register_framework", "master/messages_register_slave", "master/gpus_revocable_used",
"master/messages_reregister_framework", "master/messages_reregister_slave", "master/messages_resource_request", "master/mem_percent",
"master/messages_revive_offers", "master/messages_status_update", "master/messages_status_update_acknowledgement", "master/mem_used",
"master/messages_unregister_framework", "master/messages_unregister_slave", "master/messages_update_slave", "master/mem_total",
"master/recovery_slave_removals", "master/slave_removals/reason_registered", "master/slave_removals/reason_unhealthy", "master/mem_revocable_percent",
"master/slave_removals/reason_unregistered", "master/valid_framework_to_executor_messages", "master/valid_status_update_acknowledgements", "master/mem_revocable_total",
"master/valid_status_updates", "master/task_lost/source_master/reason_invalid_offers", "master/mem_revocable_used",
"master/task_lost/source_master/reason_slave_removed", "master/task_lost/source_slave/reason_executor_terminated", // master
"master/valid_executor_to_framework_messages", "master/event_queue_dispatches", "master/elected",
"master/event_queue_http_requests", "master/event_queue_messages", "registrar/state_fetch_ms", "master/uptime_secs",
"registrar/state_store_ms", "registrar/state_store_ms/max", "registrar/state_store_ms/min", // system
"registrar/state_store_ms/p50", "registrar/state_store_ms/p90", "registrar/state_store_ms/p95", "system/cpus_total",
"registrar/state_store_ms/p99", "registrar/state_store_ms/p999", "registrar/state_store_ms/p9999"} "system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// agents
"master/slave_registrations",
"master/slave_removals",
"master/slave_reregistrations",
"master/slave_shutdowns_scheduled",
"master/slave_shutdowns_canceled",
"master/slave_shutdowns_completed",
"master/slaves_active",
"master/slaves_connected",
"master/slaves_disconnected",
"master/slaves_inactive",
// frameworks
"master/frameworks_active",
"master/frameworks_connected",
"master/frameworks_disconnected",
"master/frameworks_inactive",
"master/outstanding_offers",
// tasks
"master/tasks_error",
"master/tasks_failed",
"master/tasks_finished",
"master/tasks_killed",
"master/tasks_lost",
"master/tasks_running",
"master/tasks_staging",
"master/tasks_starting",
// messages
"master/invalid_executor_to_framework_messages",
"master/invalid_framework_to_executor_messages",
"master/invalid_status_update_acknowledgements",
"master/invalid_status_updates",
"master/dropped_messages",
"master/messages_authenticate",
"master/messages_deactivate_framework",
"master/messages_decline_offers",
"master/messages_executor_to_framework",
"master/messages_exited_executor",
"master/messages_framework_to_executor",
"master/messages_kill_task",
"master/messages_launch_tasks",
"master/messages_reconcile_tasks",
"master/messages_register_framework",
"master/messages_register_slave",
"master/messages_reregister_framework",
"master/messages_reregister_slave",
"master/messages_resource_request",
"master/messages_revive_offers",
"master/messages_status_update",
"master/messages_status_update_acknowledgement",
"master/messages_unregister_framework",
"master/messages_unregister_slave",
"master/messages_update_slave",
"master/recovery_slave_removals",
"master/slave_removals/reason_registered",
"master/slave_removals/reason_unhealthy",
"master/slave_removals/reason_unregistered",
"master/valid_framework_to_executor_messages",
"master/valid_status_update_acknowledgements",
"master/valid_status_updates",
"master/task_lost/source_master/reason_invalid_offers",
"master/task_lost/source_master/reason_slave_removed",
"master/task_lost/source_slave/reason_executor_terminated",
"master/valid_executor_to_framework_messages",
// evgqueue
"master/event_queue_dispatches",
"master/event_queue_http_requests",
"master/event_queue_messages",
// registrar
"registrar/state_fetch_ms",
"registrar/state_store_ms",
"registrar/state_store_ms/max",
"registrar/state_store_ms/min",
"registrar/state_store_ms/p50",
"registrar/state_store_ms/p90",
"registrar/state_store_ms/p95",
"registrar/state_store_ms/p99",
"registrar/state_store_ms/p999",
"registrar/state_store_ms/p9999",
}
for _, k := range metricNames { for _, k := range metricNames {
mesosMetrics[k] = rand.Float64() masterMetrics[k] = rand.Float64()
}
slaveMetrics = make(map[string]interface{})
metricNames = []string{
// resources
"slave/cpus_percent",
"slave/cpus_used",
"slave/cpus_total",
"slave/cpus_revocable_percent",
"slave/cpus_revocable_total",
"slave/cpus_revocable_used",
"slave/disk_percent",
"slave/disk_used",
"slave/disk_total",
"slave/disk_revocable_percent",
"slave/disk_revocable_total",
"slave/disk_revocable_used",
"slave/gpus_percent",
"slave/gpus_used",
"slave/gpus_total",
"slave/gpus_revocable_percent",
"slave/gpus_revocable_total",
"slave/gpus_revocable_used",
"slave/mem_percent",
"slave/mem_used",
"slave/mem_total",
"slave/mem_revocable_percent",
"slave/mem_revocable_total",
"slave/mem_revocable_used",
// agent
"slave/registered",
"slave/uptime_secs",
// system
"system/cpus_total",
"system/load_15min",
"system/load_5min",
"system/load_1min",
"system/mem_free_bytes",
"system/mem_total_bytes",
// executors
"containerizer/mesos/container_destroy_errors",
"slave/container_launch_errors",
"slave/executors_preempted",
"slave/frameworks_active",
"slave/executor_directory_max_allowed_age_secs",
"slave/executors_registering",
"slave/executors_running",
"slave/executors_terminated",
"slave/executors_terminating",
"slave/recovery_errors",
// tasks
"slave/tasks_failed",
"slave/tasks_finished",
"slave/tasks_killed",
"slave/tasks_lost",
"slave/tasks_running",
"slave/tasks_staging",
"slave/tasks_starting",
// messages
"slave/invalid_framework_messages",
"slave/invalid_status_updates",
"slave/valid_framework_messages",
"slave/valid_status_updates",
}
for _, k := range metricNames {
slaveMetrics[k] = rand.Float64()
}
slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_%s", randUUID()),
"executor_name": "Some task description",
"framework_id": randUUID(),
"source": fmt.Sprintf("task_source_%s", randUUID()),
"statistics": map[string]interface{}{
"cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(),
},
} }
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
generateMetrics() generateMetrics()
r := http.NewServeMux()
r.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) { masterRouter := http.NewServeMux()
masterRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(mesosMetrics) json.NewEncoder(w).Encode(masterMetrics)
}) })
ts = httptest.NewServer(r) masterTestServer = httptest.NewServer(masterRouter)
slaveRouter := http.NewServeMux()
slaveRouter.HandleFunc("/metrics/snapshot", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics)
})
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
})
slaveTestServer = httptest.NewServer(slaveRouter)
rc := m.Run() rc := m.Run()
ts.Close()
masterTestServer.Close()
slaveTestServer.Close()
os.Exit(rc) os.Exit(rc)
} }
@ -73,7 +278,7 @@ func TestMesosMaster(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
m := Mesos{ m := Mesos{
Masters: []string{ts.Listener.Addr().String()}, Masters: []string{masterTestServer.Listener.Addr().String()},
Timeout: 10, Timeout: 10,
} }
@ -83,34 +288,88 @@ func TestMesosMaster(t *testing.T) {
t.Errorf(err.Error()) t.Errorf(err.Error())
} }
acc.AssertContainsFields(t, "mesos", mesosMetrics) acc.AssertContainsFields(t, "mesos", masterMetrics)
} }
func TestRemoveGroup(t *testing.T) { func TestMasterFilter(t *testing.T) {
generateMetrics()
m := Mesos{ m := Mesos{
MasterCols: []string{ MasterCols: []string{
"resources", "master", "registrar", "resources", "master", "registrar",
}, },
} }
b := []string{ b := []string{
"system", "slaves", "frameworks", "system", "agents", "frameworks",
"messages", "evqueue", "messages", "evqueue", "tasks",
} }
m.removeGroup(&mesosMetrics) m.filterMetrics(MASTER, &masterMetrics)
for _, v := range b { for _, v := range b {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; ok { if _, ok := masterMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x) t.Errorf("Found key %s, it should be gone.", x)
} }
} }
} }
for _, v := range m.MasterCols { for _, v := range m.MasterCols {
for _, x := range masterBlocks(v) { for _, x := range getMetrics(MASTER, v) {
if _, ok := mesosMetrics[x]; !ok { if _, ok := masterMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x)
}
}
}
}
func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator
m := Mesos{
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true,
Timeout: 10,
}
err := m.Gather(&acc)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos", slaveMetrics)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", slaveTaskMetrics)
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(t, "mesos-tasks", jf.Fields)
}
func TestSlaveFilter(t *testing.T) {
m := Mesos{
SlaveCols: []string{
"resources", "agent", "tasks",
},
}
b := []string{
"system", "executors", "messages",
}
m.filterMetrics(SLAVE, &slaveMetrics)
for _, v := range b {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; ok {
t.Errorf("Found key %s, it should be gone.", x)
}
}
}
for _, v := range m.MasterCols {
for _, x := range getMetrics(SLAVE, v) {
if _, ok := slaveMetrics[x]; !ok {
t.Errorf("Didn't find key %s, it should present.", x) t.Errorf("Didn't find key %s, it should present.", x)
} }
} }

View File

@ -10,6 +10,7 @@
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. ## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"] servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
``` ```
For authenticated mongodb istances use connection mongdb connection URI For authenticated mongodb istances use connection mongdb connection URI
@ -52,3 +53,15 @@ and create a single measurement containing values e.g.
* ttl_passes_per_sec * ttl_passes_per_sec
* repl_lag * repl_lag
* jumbo_chunks (only if mongos or mongo config) * jumbo_chunks (only if mongos or mongo config)
If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()
creating another measurement called mongodb_db_stats and containing values:
* collections
* objects
* avg_obj_size
* data_size
* storage_size
* num_extents
* indexes
* index_size
* ok

View File

@ -10,14 +10,16 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2" "gopkg.in/mgo.v2"
) )
type MongoDB struct { type MongoDB struct {
Servers []string Servers []string
Ssl Ssl Ssl Ssl
mongos map[string]*Server mongos map[string]*Server
GatherPerdbStats bool
} }
type Ssl struct { type Ssl struct {
@ -32,6 +34,7 @@ var sampleConfig = `
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. ## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"] servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
` `
func (m *MongoDB) SampleConfig() string { func (m *MongoDB) SampleConfig() string {
@ -53,9 +56,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
var outerr error
for _, serv := range m.Servers { for _, serv := range m.Servers {
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
@ -71,13 +72,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(srv *Server) { go func(srv *Server) {
defer wg.Done() defer wg.Done()
outerr = m.gatherServer(srv, acc) errChan.C <- m.gatherServer(srv, acc)
}(m.getMongoServer(u)) }(m.getMongoServer(u))
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (m *MongoDB) getMongoServer(url *url.URL) *Server { func (m *MongoDB) getMongoServer(url *url.URL) *Server {
@ -135,7 +135,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
} }
server.Session = sess server.Session = sess
} }
return server.gatherData(acc) return server.gatherData(acc, m.GatherPerdbStats)
} }
func init() { func init() {

View File

@ -12,6 +12,12 @@ type MongodbData struct {
StatLine *StatLine StatLine *StatLine
Fields map[string]interface{} Fields map[string]interface{}
Tags map[string]string Tags map[string]string
DbData []DbData
}
type DbData struct {
Name string
Fields map[string]interface{}
} }
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
@ -22,6 +28,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
StatLine: statLine, StatLine: statLine,
Tags: tags, Tags: tags,
Fields: make(map[string]interface{}), Fields: make(map[string]interface{}),
DbData: []DbData{},
} }
} }
@ -72,6 +79,34 @@ var WiredTigerStats = map[string]string{
"percent_cache_used": "CacheUsedPercent", "percent_cache_used": "CacheUsedPercent",
} }
var DbDataStats = map[string]string{
"collections": "Collections",
"objects": "Objects",
"avg_obj_size": "AvgObjSize",
"data_size": "DataSize",
"storage_size": "StorageSize",
"num_extents": "NumExtents",
"indexes": "Indexes",
"index_size": "IndexSize",
"ok": "Ok",
}
func (d *MongodbData) AddDbStats() {
for _, dbstat := range d.StatLine.DbStatsLines {
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "db_stat"
for key, value := range DbDataStats {
val := dbStatLine.FieldByName(value).Interface()
newDbData.Fields[key] = val
}
d.DbData = append(d.DbData, *newDbData)
}
}
func (d *MongodbData) AddDefaultStats() { func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem() statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, DefaultStats) d.addStat(statLine, DefaultStats)
@ -113,4 +148,15 @@ func (d *MongodbData) flush(acc telegraf.Accumulator) {
d.StatLine.Time, d.StatLine.Time,
) )
d.Fields = make(map[string]interface{}) d.Fields = make(map[string]interface{})
for _, db := range d.DbData {
d.Tags["db_name"] = db.Name
acc.AddFields(
"mongodb_db_stats",
db.Fields,
d.Tags,
d.StatLine.Time,
)
db.Fields = make(map[string]interface{})
}
} }

View File

@ -22,7 +22,7 @@ func (s *Server) getDefaultTags() map[string]string {
return tags return tags
} }
func (s *Server) gatherData(acc telegraf.Accumulator) error { func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true) s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0) s.Session.SetSocketTimeout(0)
result_server := &ServerStatus{} result_server := &ServerStatus{}
@ -42,10 +42,34 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
JumboChunksCount: int64(jumbo_chunks), JumboChunksCount: int64(jumbo_chunks),
} }
result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
err = s.Session.DB(db_name).Run(bson.D{{"dbStats", 1}}, db_stat_line)
if err != nil {
log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")")
}
db := &Db{
Name: db_name,
DbStatsData: db_stat_line,
}
result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
}
}
result := &MongoStatus{ result := &MongoStatus{
ServerStatus: result_server, ServerStatus: result_server,
ReplSetStatus: result_repl, ReplSetStatus: result_repl,
ClusterStatus: result_cluster, ClusterStatus: result_cluster,
DbStats: result_db_stats,
} }
defer func() { defer func() {
@ -64,6 +88,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
s.getDefaultTags(), s.getDefaultTags(),
) )
data.AddDefaultStats() data.AddDefaultStats()
data.AddDbStats()
data.flush(acc) data.flush(acc)
} }
return nil return nil

View File

@ -29,12 +29,12 @@ func TestGetDefaultTags(t *testing.T) {
func TestAddDefaultStats(t *testing.T) { func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := server.gatherData(&acc) err := server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second) time.Sleep(time.Duration(1) * time.Second)
// need to call this twice so it can perform the diff // need to call this twice so it can perform the diff
err = server.gatherData(&acc) err = server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
for key, _ := range DefaultStats { for key, _ := range DefaultStats {

View File

@ -35,6 +35,7 @@ type MongoStatus struct {
ServerStatus *ServerStatus ServerStatus *ServerStatus
ReplSetStatus *ReplSetStatus ReplSetStatus *ReplSetStatus
ClusterStatus *ClusterStatus ClusterStatus *ClusterStatus
DbStats *DbStats
} }
type ServerStatus struct { type ServerStatus struct {
@ -65,6 +66,32 @@ type ServerStatus struct {
Metrics *MetricsStats `bson:"metrics"` Metrics *MetricsStats `bson:"metrics"`
} }
// DbStats stores stats from all dbs
type DbStats struct {
Dbs []Db
}
// Db represent a single DB
type Db struct {
Name string
DbStatsData *DbStatsData
}
// DbStatsData stores stats from a db
type DbStatsData struct {
Db string `bson:"db"`
Collections int64 `bson:"collections"`
Objects int64 `bson:"objects"`
AvgObjSize float64 `bson:"avgObjSize"`
DataSize int64 `bson:"dataSize"`
StorageSize int64 `bson:"storageSize"`
NumExtents int64 `bson:"numExtents"`
Indexes int64 `bson:"indexes"`
IndexSize int64 `bson:"indexSize"`
Ok int64 `bson:"ok"`
GleStats interface{} `bson:"gleStats"`
}
// ClusterStatus stores information related to the whole cluster // ClusterStatus stores information related to the whole cluster
type ClusterStatus struct { type ClusterStatus struct {
JumboChunksCount int64 JumboChunksCount int64
@ -396,6 +423,22 @@ type StatLine struct {
// Cluster fields // Cluster fields
JumboChunksCount int64 JumboChunksCount int64
// DB stats field
DbStatsLines []DbStatLine
}
type DbStatLine struct {
Name string
Collections int64
Objects int64
AvgObjSize float64
DataSize int64
StorageSize int64
NumExtents int64
Indexes int64
IndexSize int64
Ok int64
} }
func parseLocks(stat ServerStatus) map[string]LockUsage { func parseLocks(stat ServerStatus) map[string]LockUsage {
@ -677,5 +720,27 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
newClusterStat := *newMongo.ClusterStatus newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {
dbStatsData := db.DbStatsData
// mongos doesn't have the db key, so setting the db name
if dbStatsData.Db == "" {
dbStatsData.Db = db.Name
}
dbStatLine := &DbStatLine{
Name: dbStatsData.Db,
Collections: dbStatsData.Collections,
Objects: dbStatsData.Objects,
AvgObjSize: dbStatsData.AvgObjSize,
DataSize: dbStatsData.DataSize,
StorageSize: dbStatsData.StorageSize,
NumExtents: dbStatsData.NumExtents,
Indexes: dbStatsData.Indexes,
IndexSize: dbStatsData.IndexSize,
Ok: dbStatsData.Ok,
}
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
}
return returnVal return returnVal
} }

View File

@ -7,10 +7,12 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -118,26 +120,27 @@ func (m *Mysql) InitMysql() {
func (m *Mysql) Gather(acc telegraf.Accumulator) error { func (m *Mysql) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 { if len(m.Servers) == 0 {
// if we can't get stats in this case, thats fine, don't report // default to localhost if nothing specified.
// an error. return m.gatherServer(localhost, acc)
m.gatherServer(localhost, acc)
return nil
} }
// Initialise additional query intervals // Initialise additional query intervals
if !initDone { if !initDone {
m.InitMysql() m.InitMysql()
} }
var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
// Loop through each server and collect metrics // Loop through each server and collect metrics
for _, serv := range m.Servers { for _, server := range m.Servers {
err := m.gatherServer(serv, acc) wg.Add(1)
if err != nil { go func(s string) {
return err defer wg.Done()
} errChan.C <- m.gatherServer(s, acc)
}(server)
} }
return nil wg.Wait()
return errChan.Error()
} }
type mapping struct { type mapping struct {

View File

@ -20,7 +20,6 @@ func TestMysqlDefaultsToLocal(t *testing.T) {
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -34,7 +35,7 @@ func (n *Nginx) Description() string {
func (n *Nginx) Gather(acc telegraf.Accumulator) error { func (n *Nginx) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
addr, err := url.Parse(u) addr, err := url.Parse(u)
@ -45,13 +46,12 @@ func (n *Nginx) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(addr *url.URL) { go func(addr *url.URL) {
defer wg.Done() defer wg.Done()
outerr = n.gatherUrl(addr, acc) errChan.C <- n.gatherUrl(addr, acc)
}(addr) }(addr)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{

View File

@ -32,6 +32,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -65,19 +66,17 @@ func (n *NSQ) Description() string {
func (n *NSQ) Gather(acc telegraf.Accumulator) error { func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Endpoints))
for _, e := range n.Endpoints { for _, e := range n.Endpoints {
wg.Add(1) wg.Add(1)
go func(e string) { go func(e string) {
defer wg.Done() defer wg.Done()
outerr = n.gatherEndpoint(e, acc) errChan.C <- n.gatherEndpoint(e, acc)
}(e) }(e)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{

View File

@ -43,9 +43,9 @@ var sampleConfig = `
## file paths for proc files. If empty default paths will be used: ## file paths for proc files. If empty default paths will be used:
## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6
## These can also be overridden with env variables, see README. ## These can also be overridden with env variables, see README.
proc_net_netstat = "" proc_net_netstat = "/proc/net/netstat"
proc_net_snmp = "" proc_net_snmp = "/proc/net/snmp"
proc_net_snmp6 = "" proc_net_snmp6 = "/proc/net/snmp6"
## dump metrics with 0 values too ## dump metrics with 0 values too
dump_zeros = true dump_zeros = true
` `
@ -141,7 +141,7 @@ func (ns *Nstat) loadPaths() {
ns.ProcNetSNMP = proc(ENV_SNMP, NET_SNMP) ns.ProcNetSNMP = proc(ENV_SNMP, NET_SNMP)
} }
if ns.ProcNetSNMP6 == "" { if ns.ProcNetSNMP6 == "" {
ns.ProcNetSNMP = proc(ENV_SNMP6, NET_SNMP6) ns.ProcNetSNMP6 = proc(ENV_SNMP6, NET_SNMP6)
} }
} }

View File

@ -1,3 +1,210 @@
// +build windows // +build windows
package ping package ping
import (
"errors"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(timeout float64, args ...string) (string, error)
type Ping struct {
// Number of pings to send (ping -c <COUNT>)
Count int
// Ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
Timeout float64
// URLs to ping
Urls []string
// host ping function
pingHost HostPinger
}
func (s *Ping) Description() string {
return "Ping given url(s) and return statistics"
}
const sampleConfig = `
## urls to ping
urls = ["www.google.com"] # required
## number of pings to send per collection (ping -n <COUNT>)
count = 4 # required
## Ping timeout, in seconds. 0 means default timeout (ping -w <TIMEOUT>)
Timeout = 0
`
func (s *Ping) SampleConfig() string {
return sampleConfig
}
func hostPinger(timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath("ping")
if err != nil {
return "", err
}
c := exec.Command(bin, args...)
out, err := internal.CombinedOutputTimeout(c,
time.Second*time.Duration(timeout+1))
return string(out), err
}
// processPingOutput takes in a string output from the ping command
// based on linux implementation but using regex ( multilanguage support ) ( shouldn't affect the performance of the program )
// It returns (<transmitted packets>, <received packets>, <average response>, <min response>, <max response>)
func processPingOutput(out string) (int, int, int, int, int, error) {
// So find a line contain 3 numbers except reply lines
var stats, aproxs []string = nil, nil
err := errors.New("Fatal error processing ping output")
stat := regexp.MustCompile(`=\W*(\d+)\D*=\W*(\d+)\D*=\W*(\d+)`)
aprox := regexp.MustCompile(`=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms\D*=\W*(\d+)\D*ms`)
lines := strings.Split(out, "\n")
for _, line := range lines {
if !strings.Contains(line, "TTL") {
if stats == nil {
stats = stat.FindStringSubmatch(line)
}
if stats != nil && aproxs == nil {
aproxs = aprox.FindStringSubmatch(line)
}
}
}
// stats data should contain 4 members: entireExpression + ( Send, Receive, Lost )
if len(stats) != 4 {
return 0, 0, 0, 0, 0, err
}
trans, err := strconv.Atoi(stats[1])
if err != nil {
return 0, 0, 0, 0, 0, err
}
rec, err := strconv.Atoi(stats[2])
if err != nil {
return 0, 0, 0, 0, 0, err
}
// aproxs data should contain 4 members: entireExpression + ( min, max, avg )
if len(aproxs) != 4 {
return trans, rec, 0, 0, 0, err
}
min, err := strconv.Atoi(aproxs[1])
if err != nil {
return trans, rec, 0, 0, 0, err
}
max, err := strconv.Atoi(aproxs[2])
if err != nil {
return trans, rec, 0, 0, 0, err
}
avg, err := strconv.Atoi(aproxs[3])
if err != nil {
return 0, 0, 0, 0, 0, err
}
return trans, rec, avg, min, max, err
}
func (p *Ping) timeout() float64 {
// According to MSDN, default ping timeout for windows is 4 second
// Add also one second interval
if p.Timeout > 0 {
return p.Timeout + 1
}
return 4 + 1
}
// args returns the arguments for the 'ping' executable
func (p *Ping) args(url string) []string {
args := []string{"-n", strconv.Itoa(p.Count)}
if p.Timeout > 0 {
args = append(args, "-w", strconv.FormatFloat(p.Timeout*1000, 'f', 0, 64))
}
args = append(args, url)
return args
}
func (p *Ping) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
errorChannel := make(chan error, len(p.Urls)*2)
var pendingError error = nil
// Spin off a go routine for each url to ping
for _, url := range p.Urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
args := p.args(u)
totalTimeout := p.timeout() * float64(p.Count)
out, err := p.pingHost(totalTimeout, args...)
// ping host return exitcode != 0 also when there was no response from host
// but command was execute succesfully
if err != nil {
// Combine go err + stderr output
pendingError = errors.New(strings.TrimSpace(out) + ", " + err.Error())
}
tags := map[string]string{"url": u}
trans, rec, avg, min, max, err := processPingOutput(out)
if err != nil {
// fatal error
if pendingError != nil {
errorChannel <- pendingError
}
errorChannel <- err
return
}
// Calculate packet loss percentage
loss := float64(trans-rec) / float64(trans) * 100.0
fields := map[string]interface{}{
"packets_transmitted": trans,
"packets_received": rec,
"percent_packet_loss": loss,
}
if avg > 0 {
fields["average_response_ms"] = avg
}
if min > 0 {
fields["minimum_response_ms"] = min
}
if max > 0 {
fields["maximum_response_ms"] = max
}
acc.AddFields("ping", fields, tags)
}(url)
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
func init() {
inputs.Add("ping", func() telegraf.Input {
return &Ping{pingHost: hostPinger}
})
}

View File

@ -0,0 +1,218 @@
// +build windows
package ping
import (
"errors"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)
// Windows ping format ( should support multilanguage ?)
var winPLPingOutput = `
Badanie 8.8.8.8 z 32 bajtami danych:
Odpowiedz z 8.8.8.8: bajtow=32 czas=49ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=46ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=48ms TTL=43
Odpowiedz z 8.8.8.8: bajtow=32 czas=57ms TTL=43
Statystyka badania ping dla 8.8.8.8:
Pakiety: Wyslane = 4, Odebrane = 4, Utracone = 0
(0% straty),
Szacunkowy czas bladzenia pakietww w millisekundach:
Minimum = 46 ms, Maksimum = 57 ms, Czas sredni = 50 ms
`
// Windows ping format ( should support multilanguage ?)
var winENPingOutput = `
Pinging 8.8.8.8 with 32 bytes of data:
Reply from 8.8.8.8: bytes=32 time=52ms TTL=43
Reply from 8.8.8.8: bytes=32 time=50ms TTL=43
Reply from 8.8.8.8: bytes=32 time=50ms TTL=43
Reply from 8.8.8.8: bytes=32 time=51ms TTL=43
Ping statistics for 8.8.8.8:
Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
Minimum = 50ms, Maximum = 52ms, Average = 50ms
`
func TestHost(t *testing.T) {
trans, rec, avg, min, max, err := processPingOutput(winPLPingOutput)
assert.NoError(t, err)
assert.Equal(t, 4, trans, "4 packets were transmitted")
assert.Equal(t, 4, rec, "4 packets were received")
assert.Equal(t, 50, avg, "Average 50")
assert.Equal(t, 46, min, "Min 46")
assert.Equal(t, 57, max, "max 57")
trans, rec, avg, min, max, err = processPingOutput(winENPingOutput)
assert.NoError(t, err)
assert.Equal(t, 4, trans, "4 packets were transmitted")
assert.Equal(t, 4, rec, "4 packets were received")
assert.Equal(t, 50, avg, "Average 50")
assert.Equal(t, 50, min, "Min 50")
assert.Equal(t, 52, max, "Max 52")
}
func mockHostPinger(timeout float64, args ...string) (string, error) {
return winENPingOutput, nil
}
// Test that Gather function works on a normal ping
func TestPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com", "www.reddit.com"},
pingHost: mockHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
fields := map[string]interface{}{
"packets_transmitted": 4,
"packets_received": 4,
"percent_packet_loss": 0.0,
"average_response_ms": 50,
"minimum_response_ms": 50,
"maximum_response_ms": 52,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
tags = map[string]string{"url": "www.reddit.com"}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
var errorPingOutput = `
Badanie nask.pl [195.187.242.157] z 32 bajtami danych:
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Upłynął limit czasu żądania.
Statystyka badania ping dla 195.187.242.157:
Pakiety: Wysłane = 4, Odebrane = 0, Utracone = 4
(100% straty),
`
func mockErrorHostPinger(timeout float64, args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received")
}
// Test that Gather works on a ping with no transmitted packets, even though the
// command returns an error
func TestBadPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockErrorHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.amazon.com"}
fields := map[string]interface{}{
"packets_transmitted": 4,
"packets_received": 0,
"percent_packet_loss": 100.0,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
var lossyPingOutput = `
Badanie thecodinglove.com [66.6.44.4] z 9800 bajtami danych:
Upłynął limit czasu żądania.
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=118ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=114ms TTL=48
Upłynął limit czasu żądania.
Odpowiedź z 66.6.44.4: bajtów=9800 czas=119ms TTL=48
Odpowiedź z 66.6.44.4: bajtów=9800 czas=116ms TTL=48
Statystyka badania ping dla 66.6.44.4:
Pakiety: Wysłane = 9, Odebrane = 7, Utracone = 2
(22% straty),
Szacunkowy czas błądzenia pakietów w millisekundach:
Minimum = 114 ms, Maksimum = 119 ms, Czas średni = 115 ms
`
func mockLossyHostPinger(timeout float64, args ...string) (string, error) {
return lossyPingOutput, nil
}
// Test that Gather works on a ping with lossy packets
func TestLossyPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com"},
pingHost: mockLossyHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
fields := map[string]interface{}{
"packets_transmitted": 9,
"packets_received": 7,
"percent_packet_loss": 22.22222222222222,
"average_response_ms": 115,
"minimum_response_ms": 114,
"maximum_response_ms": 119,
}
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}
// Fatal ping output (invalid argument)
var fatalPingOutput = `
Bad option -d.
Usage: ping [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS]
[-r count] [-s count] [[-j host-list] | [-k host-list]]
[-w timeout] [-R] [-S srcaddr] [-4] [-6] target_name
Options:
-t Ping the specified host until stopped.
To see statistics and continue - type Control-Break;
To stop - type Control-C.
-a Resolve addresses to hostnames.
-n count Number of echo requests to send.
-l size Send buffer size.
-f Set Don't Fragment flag in packet (IPv4-only).
-i TTL Time To Live.
-v TOS Type Of Service (IPv4-only. This setting has been deprecated
and has no effect on the type of service field in the IP Header).
-r count Record route for count hops (IPv4-only).
-s count Timestamp for count hops (IPv4-only).
-j host-list Loose source route along host-list (IPv4-only).
-k host-list Strict source route along host-list (IPv4-only).
-w timeout Timeout in milliseconds to wait for each reply.
-R Use routing header to test reverse route also (IPv6-only).
-S srcaddr Source address to use.
-4 Force using IPv4.
-6 Force using IPv6.
`
func mockFatalHostPinger(timeout float64, args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad")
}
// Test that a fatal ping command does not gather any statistics.
func TestFatalPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockFatalHostPinger,
}
p.Gather(&acc)
assert.False(t, acc.HasMeasurement("packets_transmitted"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("packets_received"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("percent_packet_loss"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("average_response_ms"),
"Fatal ping should not have packet measurements")
}

View File

@ -9,35 +9,59 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// DefaultUsername will set a default value that corrasponds to the default
// value used by Rabbitmq
const DefaultUsername = "guest" const DefaultUsername = "guest"
// DefaultPassword will set a default value that corrasponds to the default
// value used by Rabbitmq
const DefaultPassword = "guest" const DefaultPassword = "guest"
// DefaultURL will set a default value that corrasponds to the default value
// used by Rabbitmq
const DefaultURL = "http://localhost:15672" const DefaultURL = "http://localhost:15672"
// RabbitMQ defines the configuration necessary for gathering metrics,
// see the sample config for further details
type RabbitMQ struct { type RabbitMQ struct {
URL string URL string
Name string Name string
Username string Username string
Password string Password string
Nodes []string // Path to CA file
Queues []string SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// InsecureSkipVerify bool
Nodes []string
Queues []string
Client *http.Client Client *http.Client
} }
// OverviewResponse ...
type OverviewResponse struct { type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"` MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"` ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"` QueueTotals *QueueTotals `json:"queue_totals"`
} }
// Details ...
type Details struct { type Details struct {
Rate float64 Rate float64
} }
// MessageStats ...
type MessageStats struct { type MessageStats struct {
Ack int64 Ack int64
AckDetails Details `json:"ack_details"` AckDetails Details `json:"ack_details"`
@ -51,6 +75,7 @@ type MessageStats struct {
RedeliverDetails Details `json:"redeliver_details"` RedeliverDetails Details `json:"redeliver_details"`
} }
// ObjectTotals ...
type ObjectTotals struct { type ObjectTotals struct {
Channels int64 Channels int64
Connections int64 Connections int64
@ -59,6 +84,7 @@ type ObjectTotals struct {
Queues int64 Queues int64
} }
// QueueTotals ...
type QueueTotals struct { type QueueTotals struct {
Messages int64 Messages int64
MessagesReady int64 `json:"messages_ready"` MessagesReady int64 `json:"messages_ready"`
@ -66,10 +92,11 @@ type QueueTotals struct {
MessageBytes int64 `json:"message_bytes"` MessageBytes int64 `json:"message_bytes"`
MessageBytesReady int64 `json:"message_bytes_ready"` MessageBytesReady int64 `json:"message_bytes_ready"`
MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"` MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"`
MessageRam int64 `json:"message_bytes_ram"` MessageRAM int64 `json:"message_bytes_ram"`
MessagePersistent int64 `json:"message_bytes_persistent"` MessagePersistent int64 `json:"message_bytes_persistent"`
} }
// Queue ...
type Queue struct { type Queue struct {
QueueTotals // just to not repeat the same code QueueTotals // just to not repeat the same code
MessageStats `json:"message_stats"` MessageStats `json:"message_stats"`
@ -83,6 +110,7 @@ type Queue struct {
AutoDelete bool `json:"auto_delete"` AutoDelete bool `json:"auto_delete"`
} }
// Node ...
type Node struct { type Node struct {
Name string Name string
@ -99,6 +127,7 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
} }
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
@ -109,22 +138,40 @@ var sampleConfig = `
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## A list of nodes to pull metrics about. If not specified, metrics for ## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered. ## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"] # nodes = ["rabbit@node1", "rabbit@node2"]
` `
// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string { func (r *RabbitMQ) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description ...
func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Read metrics from one or many RabbitMQ servers via the management API"
} }
// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
if r.Client == nil { if r.Client == nil {
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} tlsCfg, err := internal.GetTLSConfig(
r.SSLCert, r.SSLKey, r.SSLCA, r.InsecureSkipVerify)
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
r.Client = &http.Client{ r.Client = &http.Client{
Transport: tr, Transport: tr,
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
@ -286,7 +333,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
"message_bytes": queue.MessageBytes, "message_bytes": queue.MessageBytes,
"message_bytes_ready": queue.MessageBytesReady, "message_bytes_ready": queue.MessageBytesReady,
"message_bytes_unacked": queue.MessageBytesUnacknowledged, "message_bytes_unacked": queue.MessageBytesUnacknowledged,
"message_bytes_ram": queue.MessageRam, "message_bytes_ram": queue.MessageRAM,
"message_bytes_persist": queue.MessagePersistent, "message_bytes_persist": queue.MessagePersistent,
"messages": queue.Messages, "messages": queue.Messages,
"messages_ready": queue.MessagesReady, "messages_ready": queue.MessagesReady,

View File

@ -99,7 +99,7 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(r.Servers)) errChan := errchan.New(len(r.Servers))
for _, serv := range r.Servers { for _, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
serv = "tcp://" + serv serv = "tcp://" + serv
} }

View File

@ -107,7 +107,8 @@ type item struct {
counterHandle win.PDH_HCOUNTER counterHandle win.PDH_HCOUNTER
} }
var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec", " ", "_") var sanitizedChars = strings.NewReplacer("/sec", "_persec", "/Sec", "_persec",
" ", "_", "%", "Percent", `\`, "")
func (m *Win_PerfCounters) AddItem(metrics *itemList, query string, objectName string, counter string, instance string, func (m *Win_PerfCounters) AddItem(metrics *itemList, query string, objectName string, counter string, instance string,
measurement string, include_total bool) { measurement string, include_total bool) {
@ -299,13 +300,12 @@ func (m *Win_PerfCounters) Gather(acc telegraf.Accumulator) error {
tags["instance"] = s tags["instance"] = s
} }
tags["objectname"] = metric.objectName tags["objectname"] = metric.objectName
fields[sanitizedChars.Replace(string(metric.counter))] = float32(c.FmtValue.DoubleValue) fields[sanitizedChars.Replace(metric.counter)] =
float32(c.FmtValue.DoubleValue)
var measurement string measurement := sanitizedChars.Replace(metric.measurement)
if metric.measurement == "" { if measurement == "" {
measurement = "win_perf_counters" measurement = "win_perf_counters"
} else {
measurement = metric.measurement
} }
acc.AddFields(measurement, fields, tags) acc.AddFields(measurement, fields, tags)
} }

View File

@ -12,17 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var ( var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
// Prometheus metric names must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
metricName = regexp.MustCompile("^[a-zA-Z_:][a-zA-Z0-9_:]*$")
// Prometheus labels must match this regex
// see https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
labelName = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
)
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
@ -119,9 +109,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
if len(k) == 0 { if len(k) == 0 {
continue continue
} }
if !labelName.MatchString(k) {
continue
}
labels = append(labels, k) labels = append(labels, k)
l[k] = v l[k] = v
} }
@ -144,11 +131,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
mname = fmt.Sprintf("%s_%s", key, n) mname = fmt.Sprintf("%s_%s", key, n)
} }
// verify that it is a valid measurement name
if !metricName.MatchString(mname) {
continue
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l) desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric var metric prometheus.Metric
var err error var err error

View File

@ -28,6 +28,7 @@ type Accumulator struct {
sync.Mutex sync.Mutex
Metrics []*Metric Metrics []*Metric
Errors []error
debug bool debug bool
} }
@ -84,6 +85,16 @@ func (a *Accumulator) AddFields(
a.Metrics = append(a.Metrics, p) a.Metrics = append(a.Metrics, p)
} }
// AddError appends the given error to Accumulator.Errors.
func (a *Accumulator) AddError(err error) {
if err == nil {
return
}
a.Lock()
a.Errors = append(a.Errors, err)
a.Unlock()
}
func (a *Accumulator) SetPrecision(precision, interval time.Duration) { func (a *Accumulator) SetPrecision(precision, interval time.Duration) {
return return
} }