Updated with new code from master

This commit is contained in:
Dennis Bellinger 2016-07-20 16:57:47 -04:00
commit c2a4df306e
25 changed files with 378 additions and 346 deletions

View File

@ -1,5 +1,15 @@
## v1.0 [unreleased] ## v1.0 [unreleased]
### Features
- [#1413](https://github.com/influxdata/telegraf/issues/1413): Separate container_version from container_image tag.
### Bugfixes
- [#1519](https://github.com/influxdata/telegraf/pull/1519): Fix error race conditions and partial failures.
- [#1477](https://github.com/influxdata/telegraf/issues/1477): nstat: fix inaccurate config panic.
- [#1481](https://github.com/influxdata/telegraf/issues/1481): jolokia: fix handling multiple multi-dimensional attributes.
## v1.0 beta 3 [2016-07-18] ## v1.0 beta 3 [2016-07-18]
### Release Notes ### Release Notes
@ -36,6 +46,7 @@ should now look like:
### Features ### Features
- [#1503](https://github.com/influxdata/telegraf/pull/1503): Add tls support for certs to RabbitMQ input plugin
- [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez! - [#1289](https://github.com/influxdata/telegraf/pull/1289): webhooks input plugin. Thanks @francois2metz and @cduez!
- [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin. - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
- [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin. - [#1408](https://github.com/influxdata/telegraf/pull/1408): mandrill webhook plugin.
@ -48,6 +59,7 @@ should now look like:
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib. - [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin. - [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag. - [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
### Bugfixes ### Bugfixes
@ -67,6 +79,8 @@ should now look like:
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin - [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config. - [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors. - [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.
- [#1499](https://github.com/influxdata/telegraf/pull/1499): Make the user able to specify full path for HAproxy stats
- [#1521](https://github.com/influxdata/telegraf/pull/1521): Fix Redis url, an extra "tcp://" was added.
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

File diff suppressed because one or more lines are too long

View File

@ -72,18 +72,17 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
nodes := c.GetNodes() nodes := c.GetNodes()
for _, n := range nodes { for _, n := range nodes {
tags := map[string]string{ tags := map[string]string{
"node_name": n.GetName(),
"aerospike_host": hostport, "aerospike_host": hostport,
} }
fields := make(map[string]interface{}) fields := map[string]interface{}{
"node_name": n.GetName(),
}
stats, err := as.RequestNodeStats(n) stats, err := as.RequestNodeStats(n)
if err != nil { if err != nil {
return err return err
} }
for k, v := range stats { for k, v := range stats {
if iv, err := strconv.ParseInt(v, 10, 64); err == nil { fields[strings.Replace(k, "-", "_", -1)] = parseValue(v)
fields[strings.Replace(k, "-", "_", -1)] = iv
}
} }
acc.AddFields("aerospike_node", fields, tags, time.Now()) acc.AddFields("aerospike_node", fields, tags, time.Now())
@ -94,9 +93,13 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
namespaces := strings.Split(info["namespaces"], ";") namespaces := strings.Split(info["namespaces"], ";")
for _, namespace := range namespaces { for _, namespace := range namespaces {
nTags := copyTags(tags) nTags := map[string]string{
"aerospike_host": hostport,
}
nTags["namespace"] = namespace nTags["namespace"] = namespace
nFields := make(map[string]interface{}) nFields := map[string]interface{}{
"node_name": n.GetName(),
}
info, err := as.RequestNodeInfo(n, "namespace/"+namespace) info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
if err != nil { if err != nil {
continue continue
@ -107,9 +110,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
if len(parts) < 2 { if len(parts) < 2 {
continue continue
} }
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil { nFields[strings.Replace(parts[0], "-", "_", -1)] = parseValue(parts[1])
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
}
} }
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now()) acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
} }
@ -117,6 +118,16 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
return nil return nil
} }
func parseValue(v string) interface{} {
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
return parsed
} else if parsed, err := strconv.ParseBool(v); err == nil {
return parsed
} else {
return v
}
}
func copyTags(m map[string]string) map[string]string { func copyTags(m map[string]string) map[string]string {
out := make(map[string]string) out := make(map[string]string)
for k, v := range m { for k, v := range m {

View File

@ -1,18 +1,18 @@
# Ceph Storage Input Plugin # Ceph Storage Input Plugin
Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds
a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump** a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump**
The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are
used as collection tags, and all sub-keys are flattened. For example: used as collection tags, and all sub-keys are flattened. For example:
``` ```
{ {
"paxos": { "paxos": {
"refresh": 9363435, "refresh": 9363435,
"refresh_latency": { "refresh_latency": {
"avgcount": 9363435, "avgcount": 9363435,
"sum": 5378.794002000 "sum": 5378.794002000
} }
@ -50,7 +50,7 @@ Would be parsed into the following metrics, all of which would be tagged with co
### Measurements & Fields: ### Measurements & Fields:
All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go. All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go.
### Tags: ### Tags:
@ -95,7 +95,7 @@ All measurements will have the following tags:
- throttle-objecter_ops - throttle-objecter_ops
- throttle-osd_client_bytes - throttle-osd_client_bytes
- throttle-osd_client_messages - throttle-osd_client_messages
### Example Output: ### Example Output:

View File

@ -3,12 +3,14 @@ package dns_query
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/miekg/dns" "github.com/miekg/dns"
"net" "net"
"strconv" "strconv"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
) )
type DnsQuery struct { type DnsQuery struct {
@ -55,12 +57,12 @@ func (d *DnsQuery) Description() string {
} }
func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
d.setDefaultValues() d.setDefaultValues()
errChan := errchan.New(len(d.Domains) * len(d.Servers))
for _, domain := range d.Domains { for _, domain := range d.Domains {
for _, server := range d.Servers { for _, server := range d.Servers {
dnsQueryTime, err := d.getDnsQueryTime(domain, server) dnsQueryTime, err := d.getDnsQueryTime(domain, server)
if err != nil { errChan.C <- err
return err
}
tags := map[string]string{ tags := map[string]string{
"server": server, "server": server,
"domain": domain, "domain": domain,
@ -72,7 +74,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error {
} }
} }
return nil return errChan.Error()
} }
func (d *DnsQuery) setDefaultValues() { func (d *DnsQuery) setDefaultValues() {

View File

@ -207,9 +207,18 @@ func (d *Docker) gatherContainer(
cname = strings.TrimPrefix(container.Names[0], "/") cname = strings.TrimPrefix(container.Names[0], "/")
} }
// the image name sometimes has a version part.
// ie, rabbitmq:3-management
imageParts := strings.Split(container.Image, ":")
imageName := imageParts[0]
imageVersion := "unknown"
if len(imageParts) > 1 {
imageVersion = imageParts[1]
}
tags := map[string]string{ tags := map[string]string{
"container_name": cname, "container_name": cname,
"container_image": container.Image, "container_image": imageName,
"container_version": imageVersion,
} }
if len(d.ContainerNames) > 0 { if len(d.ContainerNames) > 0 {
if !sliceContains(cname, d.ContainerNames) { if !sliceContains(cname, d.ContainerNames) {

View File

@ -378,9 +378,10 @@ func TestDockerGatherInfo(t *testing.T) {
"container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
}, },
map[string]string{ map[string]string{
"container_name": "etcd2", "container_name": "etcd2",
"container_image": "quay.io/coreos/etcd:v2.2.2", "container_image": "quay.io/coreos/etcd",
"cpu": "cpu3", "cpu": "cpu3",
"container_version": "v2.2.2",
}, },
) )
acc.AssertContainsTaggedFields(t, acc.AssertContainsTaggedFields(t,
@ -423,8 +424,9 @@ func TestDockerGatherInfo(t *testing.T) {
"container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173", "container_id": "b7dfbb9478a6ae55e237d4d74f8bbb753f0817192b5081334dc78476296e2173",
}, },
map[string]string{ map[string]string{
"container_name": "etcd2", "container_name": "etcd2",
"container_image": "quay.io/coreos/etcd:v2.2.2", "container_image": "quay.io/coreos/etcd",
"container_version": "v2.2.2",
}, },
) )

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -51,7 +52,6 @@ const defaultPort = "24242"
// Reads stats from all configured servers. // Reads stats from all configured servers.
func (d *Dovecot) Gather(acc telegraf.Accumulator) error { func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
if !validQuery[d.Type] { if !validQuery[d.Type] {
return fmt.Errorf("Error: %s is not a valid query type\n", return fmt.Errorf("Error: %s is not a valid query type\n",
d.Type) d.Type)
@ -61,31 +61,27 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
d.Servers = append(d.Servers, "127.0.0.1:24242") d.Servers = append(d.Servers, "127.0.0.1:24242")
} }
var wg sync.WaitGroup
var outerr error
if len(d.Filters) <= 0 { if len(d.Filters) <= 0 {
d.Filters = append(d.Filters, "") d.Filters = append(d.Filters, "")
} }
for _, serv := range d.Servers { var wg sync.WaitGroup
errChan := errchan.New(len(d.Servers) * len(d.Filters))
for _, server := range d.Servers {
for _, filter := range d.Filters { for _, filter := range d.Filters {
wg.Add(1) wg.Add(1)
go func(serv string, filter string) { go func(s string, f string) {
defer wg.Done() defer wg.Done()
outerr = d.gatherServer(serv, acc, d.Type, filter) errChan.C <- d.gatherServer(s, acc, d.Type, f)
}(serv, filter) }(server, filter)
} }
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error {
_, _, err := net.SplitHostPort(addr) _, _, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return fmt.Errorf("Error: %s on url %s\n", err, addr) return fmt.Errorf("Error: %s on url %s\n", err, addr)

View File

@ -92,9 +92,11 @@ type haproxy struct {
var sampleConfig = ` var sampleConfig = `
## An array of address to gather stats about. Specify an ip on hostname ## An array of address to gather stats about. Specify an ip on hostname
## with optional port. ie localhost, 10.10.3.33:1936, etc. ## with optional port. ie localhost, 10.10.3.33:1936, etc.
## Make sure you specify the complete path to the stats endpoint
## If no servers are specified, then default to 127.0.0.1:1936 ## ie 10.10.3.33:1936/haproxy?stats
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] #
## If no servers are specified, then default to 127.0.0.1:1936/haproxy?stats
servers = ["http://myhaproxy.com:1936/haproxy?stats"]
## Or you can also use local socket ## Or you can also use local socket
## servers = ["socket:/run/haproxy/admin.sock"] ## servers = ["socket:/run/haproxy/admin.sock"]
` `
@ -111,7 +113,7 @@ func (r *haproxy) Description() string {
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (g *haproxy) Gather(acc telegraf.Accumulator) error { func (g *haproxy) Gather(acc telegraf.Accumulator) error {
if len(g.Servers) == 0 { if len(g.Servers) == 0 {
return g.gatherServer("http://127.0.0.1:1936", acc) return g.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -167,12 +169,16 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
g.client = client g.client = client
} }
if !strings.HasSuffix(addr, ";csv") {
addr += "/;csv"
}
u, err := url.Parse(addr) u, err := url.Parse(addr)
if err != nil { if err != nil {
return fmt.Errorf("Unable parse server address '%s': %s", addr, err) return fmt.Errorf("Unable parse server address '%s': %s", addr, err)
} }
req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil) req, err := http.NewRequest("GET", addr, nil)
if u.User != nil { if u.User != nil {
p, _ := u.User.Password() p, _ := u.User.Password()
req.SetBasicAuth(u.User.Username(), p) req.SetBasicAuth(u.User.Username(), p)
@ -184,7 +190,7 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
} }
if res.StatusCode != 200 { if res.StatusCode != 200 {
return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) return fmt.Errorf("Unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode)
} }
return importCsvResult(res.Body, acc, u.Host) return importCsvResult(res.Body, acc, u.Host)

View File

@ -243,7 +243,7 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
err := r.Gather(&acc) err := r.Gather(&acc)
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "127.0.0.1:1936/;csv") assert.Contains(t, err.Error(), "127.0.0.1:1936/haproxy?stats/;csv")
} }
const csvOutputSample = ` const csvOutputSample = `

View File

@ -249,7 +249,14 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
switch t := values.(type) { switch t := values.(type) {
case map[string]interface{}: case map[string]interface{}:
for k, v := range t { for k, v := range t {
fields[measurement+"_"+k] = v switch t2 := v.(type) {
case map[string]interface{}:
for k2, v2 := range t2 {
fields[measurement+"_"+k+"_"+k2] = v2
}
case interface{}:
fields[measurement+"_"+k] = t2
}
} }
case interface{}: case interface{}:
fields[measurement] = t fields[measurement] = t

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -73,19 +74,16 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error {
return m.gatherServer(":11211", false, acc) return m.gatherServer(":11211", false, acc)
} }
errChan := errchan.New(len(m.Servers) + len(m.UnixSockets))
for _, serverAddress := range m.Servers { for _, serverAddress := range m.Servers {
if err := m.gatherServer(serverAddress, false, acc); err != nil { errChan.C <- m.gatherServer(serverAddress, false, acc)
return err
}
} }
for _, unixAddress := range m.UnixSockets { for _, unixAddress := range m.UnixSockets {
if err := m.gatherServer(unixAddress, true, acc); err != nil { errChan.C <- m.gatherServer(unixAddress, true, acc)
return err
}
} }
return nil return errChan.Error()
} }
func (m *Memcached) gatherServer( func (m *Memcached) gatherServer(

View File

@ -10,6 +10,7 @@
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. ## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"] servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
``` ```
For authenticated mongodb istances use connection mongdb connection URI For authenticated mongodb istances use connection mongdb connection URI
@ -52,3 +53,15 @@ and create a single measurement containing values e.g.
* ttl_passes_per_sec * ttl_passes_per_sec
* repl_lag * repl_lag
* jumbo_chunks (only if mongos or mongo config) * jumbo_chunks (only if mongos or mongo config)
If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()
creating another measurement called mongodb_db_stats and containing values:
* collections
* objects
* avg_obj_size
* data_size
* storage_size
* num_extents
* indexes
* index_size
* ok

View File

@ -10,14 +10,16 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2" "gopkg.in/mgo.v2"
) )
type MongoDB struct { type MongoDB struct {
Servers []string Servers []string
Ssl Ssl Ssl Ssl
mongos map[string]*Server mongos map[string]*Server
GatherPerdbStats bool
} }
type Ssl struct { type Ssl struct {
@ -32,6 +34,7 @@ var sampleConfig = `
## mongodb://10.10.3.33:18832, ## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc. ## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"] servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
` `
func (m *MongoDB) SampleConfig() string { func (m *MongoDB) SampleConfig() string {
@ -53,9 +56,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
var outerr error
for _, serv := range m.Servers { for _, serv := range m.Servers {
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
@ -71,13 +72,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(srv *Server) { go func(srv *Server) {
defer wg.Done() defer wg.Done()
outerr = m.gatherServer(srv, acc) errChan.C <- m.gatherServer(srv, acc)
}(m.getMongoServer(u)) }(m.getMongoServer(u))
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
func (m *MongoDB) getMongoServer(url *url.URL) *Server { func (m *MongoDB) getMongoServer(url *url.URL) *Server {
@ -135,7 +135,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
} }
server.Session = sess server.Session = sess
} }
return server.gatherData(acc) return server.gatherData(acc, m.GatherPerdbStats)
} }
func init() { func init() {

View File

@ -12,6 +12,12 @@ type MongodbData struct {
StatLine *StatLine StatLine *StatLine
Fields map[string]interface{} Fields map[string]interface{}
Tags map[string]string Tags map[string]string
DbData []DbData
}
type DbData struct {
Name string
Fields map[string]interface{}
} }
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
@ -22,6 +28,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
StatLine: statLine, StatLine: statLine,
Tags: tags, Tags: tags,
Fields: make(map[string]interface{}), Fields: make(map[string]interface{}),
DbData: []DbData{},
} }
} }
@ -72,6 +79,34 @@ var WiredTigerStats = map[string]string{
"percent_cache_used": "CacheUsedPercent", "percent_cache_used": "CacheUsedPercent",
} }
var DbDataStats = map[string]string{
"collections": "Collections",
"objects": "Objects",
"avg_obj_size": "AvgObjSize",
"data_size": "DataSize",
"storage_size": "StorageSize",
"num_extents": "NumExtents",
"indexes": "Indexes",
"index_size": "IndexSize",
"ok": "Ok",
}
func (d *MongodbData) AddDbStats() {
for _, dbstat := range d.StatLine.DbStatsLines {
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "db_stat"
for key, value := range DbDataStats {
val := dbStatLine.FieldByName(value).Interface()
newDbData.Fields[key] = val
}
d.DbData = append(d.DbData, *newDbData)
}
}
func (d *MongodbData) AddDefaultStats() { func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem() statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, DefaultStats) d.addStat(statLine, DefaultStats)
@ -113,4 +148,15 @@ func (d *MongodbData) flush(acc telegraf.Accumulator) {
d.StatLine.Time, d.StatLine.Time,
) )
d.Fields = make(map[string]interface{}) d.Fields = make(map[string]interface{})
for _, db := range d.DbData {
d.Tags["db_name"] = db.Name
acc.AddFields(
"mongodb_db_stats",
db.Fields,
d.Tags,
d.StatLine.Time,
)
db.Fields = make(map[string]interface{})
}
} }

View File

@ -22,7 +22,7 @@ func (s *Server) getDefaultTags() map[string]string {
return tags return tags
} }
func (s *Server) gatherData(acc telegraf.Accumulator) error { func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true) s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0) s.Session.SetSocketTimeout(0)
result_server := &ServerStatus{} result_server := &ServerStatus{}
@ -42,10 +42,34 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
JumboChunksCount: int64(jumbo_chunks), JumboChunksCount: int64(jumbo_chunks),
} }
result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
err = s.Session.DB(db_name).Run(bson.D{{"dbStats", 1}}, db_stat_line)
if err != nil {
log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")")
}
db := &Db{
Name: db_name,
DbStatsData: db_stat_line,
}
result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
}
}
result := &MongoStatus{ result := &MongoStatus{
ServerStatus: result_server, ServerStatus: result_server,
ReplSetStatus: result_repl, ReplSetStatus: result_repl,
ClusterStatus: result_cluster, ClusterStatus: result_cluster,
DbStats: result_db_stats,
} }
defer func() { defer func() {
@ -64,6 +88,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
s.getDefaultTags(), s.getDefaultTags(),
) )
data.AddDefaultStats() data.AddDefaultStats()
data.AddDbStats()
data.flush(acc) data.flush(acc)
} }
return nil return nil

View File

@ -29,12 +29,12 @@ func TestGetDefaultTags(t *testing.T) {
func TestAddDefaultStats(t *testing.T) { func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
err := server.gatherData(&acc) err := server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second) time.Sleep(time.Duration(1) * time.Second)
// need to call this twice so it can perform the diff // need to call this twice so it can perform the diff
err = server.gatherData(&acc) err = server.gatherData(&acc, false)
require.NoError(t, err) require.NoError(t, err)
for key, _ := range DefaultStats { for key, _ := range DefaultStats {

View File

@ -35,6 +35,7 @@ type MongoStatus struct {
ServerStatus *ServerStatus ServerStatus *ServerStatus
ReplSetStatus *ReplSetStatus ReplSetStatus *ReplSetStatus
ClusterStatus *ClusterStatus ClusterStatus *ClusterStatus
DbStats *DbStats
} }
type ServerStatus struct { type ServerStatus struct {
@ -65,6 +66,32 @@ type ServerStatus struct {
Metrics *MetricsStats `bson:"metrics"` Metrics *MetricsStats `bson:"metrics"`
} }
// DbStats stores stats from all dbs
type DbStats struct {
Dbs []Db
}
// Db represent a single DB
type Db struct {
Name string
DbStatsData *DbStatsData
}
// DbStatsData stores stats from a db
type DbStatsData struct {
Db string `bson:"db"`
Collections int64 `bson:"collections"`
Objects int64 `bson:"objects"`
AvgObjSize float64 `bson:"avgObjSize"`
DataSize int64 `bson:"dataSize"`
StorageSize int64 `bson:"storageSize"`
NumExtents int64 `bson:"numExtents"`
Indexes int64 `bson:"indexes"`
IndexSize int64 `bson:"indexSize"`
Ok int64 `bson:"ok"`
GleStats interface{} `bson:"gleStats"`
}
// ClusterStatus stores information related to the whole cluster // ClusterStatus stores information related to the whole cluster
type ClusterStatus struct { type ClusterStatus struct {
JumboChunksCount int64 JumboChunksCount int64
@ -396,6 +423,22 @@ type StatLine struct {
// Cluster fields // Cluster fields
JumboChunksCount int64 JumboChunksCount int64
// DB stats field
DbStatsLines []DbStatLine
}
type DbStatLine struct {
Name string
Collections int64
Objects int64
AvgObjSize float64
DataSize int64
StorageSize int64
NumExtents int64
Indexes int64
IndexSize int64
Ok int64
} }
func parseLocks(stat ServerStatus) map[string]LockUsage { func parseLocks(stat ServerStatus) map[string]LockUsage {
@ -677,5 +720,27 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
newClusterStat := *newMongo.ClusterStatus newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {
dbStatsData := db.DbStatsData
// mongos doesn't have the db key, so setting the db name
if dbStatsData.Db == "" {
dbStatsData.Db = db.Name
}
dbStatLine := &DbStatLine{
Name: dbStatsData.Db,
Collections: dbStatsData.Collections,
Objects: dbStatsData.Objects,
AvgObjSize: dbStatsData.AvgObjSize,
DataSize: dbStatsData.DataSize,
StorageSize: dbStatsData.StorageSize,
NumExtents: dbStatsData.NumExtents,
Indexes: dbStatsData.Indexes,
IndexSize: dbStatsData.IndexSize,
Ok: dbStatsData.Ok,
}
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
}
return returnVal return returnVal
} }

View File

@ -7,10 +7,12 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -118,26 +120,27 @@ func (m *Mysql) InitMysql() {
func (m *Mysql) Gather(acc telegraf.Accumulator) error { func (m *Mysql) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 { if len(m.Servers) == 0 {
// if we can't get stats in this case, thats fine, don't report // default to localhost if nothing specified.
// an error. return m.gatherServer(localhost, acc)
m.gatherServer(localhost, acc)
return nil
} }
// Initialise additional query intervals // Initialise additional query intervals
if !initDone { if !initDone {
m.InitMysql() m.InitMysql()
} }
var wg sync.WaitGroup
errChan := errchan.New(len(m.Servers))
// Loop through each server and collect metrics // Loop through each server and collect metrics
for _, serv := range m.Servers { for _, server := range m.Servers {
err := m.gatherServer(serv, acc) wg.Add(1)
if err != nil { go func(s string) {
return err defer wg.Done()
} errChan.C <- m.gatherServer(s, acc)
}(server)
} }
return nil wg.Wait()
return errChan.Error()
} }
type mapping struct { type mapping struct {

View File

@ -20,7 +20,6 @@ func TestMysqlDefaultsToLocal(t *testing.T) {
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err := m.Gather(&acc) err := m.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -34,7 +35,7 @@ func (n *Nginx) Description() string {
func (n *Nginx) Gather(acc telegraf.Accumulator) error { func (n *Nginx) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Urls))
for _, u := range n.Urls { for _, u := range n.Urls {
addr, err := url.Parse(u) addr, err := url.Parse(u)
@ -45,13 +46,12 @@ func (n *Nginx) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func(addr *url.URL) { go func(addr *url.URL) {
defer wg.Done() defer wg.Done()
outerr = n.gatherUrl(addr, acc) errChan.C <- n.gatherUrl(addr, acc)
}(addr) }(addr)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{

View File

@ -32,6 +32,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -65,19 +66,17 @@ func (n *NSQ) Description() string {
func (n *NSQ) Gather(acc telegraf.Accumulator) error { func (n *NSQ) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error errChan := errchan.New(len(n.Endpoints))
for _, e := range n.Endpoints { for _, e := range n.Endpoints {
wg.Add(1) wg.Add(1)
go func(e string) { go func(e string) {
defer wg.Done() defer wg.Done()
outerr = n.gatherEndpoint(e, acc) errChan.C <- n.gatherEndpoint(e, acc)
}(e) }(e)
} }
wg.Wait() wg.Wait()
return errChan.Error()
return outerr
} }
var tr = &http.Transport{ var tr = &http.Transport{

View File

@ -43,9 +43,9 @@ var sampleConfig = `
## file paths for proc files. If empty default paths will be used: ## file paths for proc files. If empty default paths will be used:
## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6 ## /proc/net/netstat, /proc/net/snmp, /proc/net/snmp6
## These can also be overridden with env variables, see README. ## These can also be overridden with env variables, see README.
proc_net_netstat = "" proc_net_netstat = "/proc/net/netstat"
proc_net_snmp = "" proc_net_snmp = "/proc/net/snmp"
proc_net_snmp6 = "" proc_net_snmp6 = "/proc/net/snmp6"
## dump metrics with 0 values too ## dump metrics with 0 values too
dump_zeros = true dump_zeros = true
` `
@ -141,7 +141,7 @@ func (ns *Nstat) loadPaths() {
ns.ProcNetSNMP = proc(ENV_SNMP, NET_SNMP) ns.ProcNetSNMP = proc(ENV_SNMP, NET_SNMP)
} }
if ns.ProcNetSNMP6 == "" { if ns.ProcNetSNMP6 == "" {
ns.ProcNetSNMP = proc(ENV_SNMP6, NET_SNMP6) ns.ProcNetSNMP6 = proc(ENV_SNMP6, NET_SNMP6)
} }
} }

View File

@ -9,35 +9,59 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// DefaultUsername will set a default value that corrasponds to the default
// value used by Rabbitmq
const DefaultUsername = "guest" const DefaultUsername = "guest"
// DefaultPassword will set a default value that corrasponds to the default
// value used by Rabbitmq
const DefaultPassword = "guest" const DefaultPassword = "guest"
// DefaultURL will set a default value that corrasponds to the default value
// used by Rabbitmq
const DefaultURL = "http://localhost:15672" const DefaultURL = "http://localhost:15672"
// RabbitMQ defines the configuration necessary for gathering metrics,
// see the sample config for further details
type RabbitMQ struct { type RabbitMQ struct {
URL string URL string
Name string Name string
Username string Username string
Password string Password string
Nodes []string // Path to CA file
Queues []string SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
// InsecureSkipVerify bool
Nodes []string
Queues []string
Client *http.Client Client *http.Client
} }
// OverviewResponse ...
type OverviewResponse struct { type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"` MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"` ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"` QueueTotals *QueueTotals `json:"queue_totals"`
} }
// Details ...
type Details struct { type Details struct {
Rate float64 Rate float64
} }
// MessageStats ...
type MessageStats struct { type MessageStats struct {
Ack int64 Ack int64
AckDetails Details `json:"ack_details"` AckDetails Details `json:"ack_details"`
@ -51,6 +75,7 @@ type MessageStats struct {
RedeliverDetails Details `json:"redeliver_details"` RedeliverDetails Details `json:"redeliver_details"`
} }
// ObjectTotals ...
type ObjectTotals struct { type ObjectTotals struct {
Channels int64 Channels int64
Connections int64 Connections int64
@ -59,6 +84,7 @@ type ObjectTotals struct {
Queues int64 Queues int64
} }
// QueueTotals ...
type QueueTotals struct { type QueueTotals struct {
Messages int64 Messages int64
MessagesReady int64 `json:"messages_ready"` MessagesReady int64 `json:"messages_ready"`
@ -66,10 +92,11 @@ type QueueTotals struct {
MessageBytes int64 `json:"message_bytes"` MessageBytes int64 `json:"message_bytes"`
MessageBytesReady int64 `json:"message_bytes_ready"` MessageBytesReady int64 `json:"message_bytes_ready"`
MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"` MessageBytesUnacknowledged int64 `json:"message_bytes_unacknowledged"`
MessageRam int64 `json:"message_bytes_ram"` MessageRAM int64 `json:"message_bytes_ram"`
MessagePersistent int64 `json:"message_bytes_persistent"` MessagePersistent int64 `json:"message_bytes_persistent"`
} }
// Queue ...
type Queue struct { type Queue struct {
QueueTotals // just to not repeat the same code QueueTotals // just to not repeat the same code
MessageStats `json:"message_stats"` MessageStats `json:"message_stats"`
@ -83,6 +110,7 @@ type Queue struct {
AutoDelete bool `json:"auto_delete"` AutoDelete bool `json:"auto_delete"`
} }
// Node ...
type Node struct { type Node struct {
Name string Name string
@ -99,6 +127,7 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
} }
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
@ -109,22 +138,40 @@ var sampleConfig = `
# username = "guest" # username = "guest"
# password = "guest" # password = "guest"
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## A list of nodes to pull metrics about. If not specified, metrics for ## A list of nodes to pull metrics about. If not specified, metrics for
## all nodes are gathered. ## all nodes are gathered.
# nodes = ["rabbit@node1", "rabbit@node2"] # nodes = ["rabbit@node1", "rabbit@node2"]
` `
// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string { func (r *RabbitMQ) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Description ...
func (r *RabbitMQ) Description() string { func (r *RabbitMQ) Description() string {
return "Read metrics from one or many RabbitMQ servers via the management API" return "Read metrics from one or many RabbitMQ servers via the management API"
} }
// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
if r.Client == nil { if r.Client == nil {
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)} tlsCfg, err := internal.GetTLSConfig(
r.SSLCert, r.SSLKey, r.SSLCA, r.InsecureSkipVerify)
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
TLSClientConfig: tlsCfg,
}
r.Client = &http.Client{ r.Client = &http.Client{
Transport: tr, Transport: tr,
Timeout: time.Duration(4 * time.Second), Timeout: time.Duration(4 * time.Second),
@ -286,7 +333,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
"message_bytes": queue.MessageBytes, "message_bytes": queue.MessageBytes,
"message_bytes_ready": queue.MessageBytesReady, "message_bytes_ready": queue.MessageBytesReady,
"message_bytes_unacked": queue.MessageBytesUnacknowledged, "message_bytes_unacked": queue.MessageBytesUnacknowledged,
"message_bytes_ram": queue.MessageRam, "message_bytes_ram": queue.MessageRAM,
"message_bytes_persist": queue.MessagePersistent, "message_bytes_persist": queue.MessagePersistent,
"messages": queue.Messages, "messages": queue.Messages,
"messages_ready": queue.MessagesReady, "messages_ready": queue.MessagesReady,

View File

@ -99,7 +99,7 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := errchan.New(len(r.Servers)) errChan := errchan.New(len(r.Servers))
for _, serv := range r.Servers { for _, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") || !strings.HasPrefix(serv, "unix://") { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
serv = "tcp://" + serv serv = "tcp://" + serv
} }