0.3.0 redis & rabbitmq

This commit is contained in:
Cameron Sparr 2015-12-15 11:08:13 -06:00
parent 7746a2b3cd
commit 6fcd05b855
3 changed files with 41 additions and 30 deletions

View File

@ -7,6 +7,7 @@ import (
"math/rand" "math/rand"
"net/url" "net/url"
"strings" "strings"
"time"
"github.com/influxdb/influxdb/client/v2" "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal"
@ -110,6 +111,7 @@ func (i *InfluxDB) Connect() error {
} }
i.conns = conns i.conns = conns
rand.Seed(time.Now().UnixNano())
return nil return nil
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@ -199,20 +200,20 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
if serv.Name != "" { if serv.Name != "" {
tags["name"] = serv.Name tags["name"] = serv.Name
} }
fields := map[string]interface{}{
acc.Add("messages", overview.QueueTotals.Messages, tags) "messages": overview.QueueTotals.Messages,
acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) "messages_ready": overview.QueueTotals.MessagesReady,
acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) "messages_unacked": overview.QueueTotals.MessagesUnacknowledged,
"channels": overview.ObjectTotals.Channels,
acc.Add("channels", overview.ObjectTotals.Channels, tags) "connections": overview.ObjectTotals.Connections,
acc.Add("connections", overview.ObjectTotals.Connections, tags) "consumers": overview.ObjectTotals.Consumers,
acc.Add("consumers", overview.ObjectTotals.Consumers, tags) "exchanges": overview.ObjectTotals.Exchanges,
acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) "queues": overview.ObjectTotals.Queues,
acc.Add("queues", overview.ObjectTotals.Queues, tags) "messages_acked": overview.MessageStats.Ack,
"messages_delivered": overview.MessageStats.Deliver,
acc.Add("messages_acked", overview.MessageStats.Ack, tags) "messages_published": overview.MessageStats.Publish,
acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) }
acc.Add("messages_published", overview.MessageStats.Publish, tags) acc.AddFields("rabbitmq_overview", fields, tags)
errChan <- nil errChan <- nil
} }
@ -225,6 +226,7 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
errChan <- err errChan <- err
return return
} }
now := time.Now()
for _, node := range nodes { for _, node := range nodes {
if !shouldGatherNode(node, serv) { if !shouldGatherNode(node, serv) {
@ -234,17 +236,20 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
tags := map[string]string{"url": serv.URL} tags := map[string]string{"url": serv.URL}
tags["node"] = node.Name tags["node"] = node.Name
acc.Add("disk_free", node.DiskFree, tags) fields := map[string]interface{}{
acc.Add("disk_free_limit", node.DiskFreeLimit, tags) "disk_free": node.DiskFree,
acc.Add("fd_total", node.FdTotal, tags) "disk_free_limit": node.DiskFreeLimit,
acc.Add("fd_used", node.FdUsed, tags) "fd_total": node.FdTotal,
acc.Add("mem_limit", node.MemLimit, tags) "fd_used": node.FdUsed,
acc.Add("mem_used", node.MemUsed, tags) "mem_limit": node.MemLimit,
acc.Add("proc_total", node.ProcTotal, tags) "mem_used": node.MemUsed,
acc.Add("proc_used", node.ProcUsed, tags) "proc_total": node.ProcTotal,
acc.Add("run_queue", node.RunQueue, tags) "proc_used": node.ProcUsed,
acc.Add("sockets_total", node.SocketsTotal, tags) "run_queue": node.RunQueue,
acc.Add("sockets_used", node.SocketsUsed, tags) "sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
} }
errChan <- nil errChan <- nil
@ -273,7 +278,7 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
} }
acc.AddFields( acc.AddFields(
"queue", "rabbitmq_queue",
map[string]interface{}{ map[string]interface{}{
// common information // common information
"consumers": queue.Consumers, "consumers": queue.Consumers,

View File

@ -164,6 +164,7 @@ func gatherInfoOutput(
var keyspace_hits, keyspace_misses uint64 = 0, 0 var keyspace_hits, keyspace_misses uint64 = 0, 0
scanner := bufio.NewScanner(rdr) scanner := bufio.NewScanner(rdr)
fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
if strings.Contains(line, "ERR") { if strings.Contains(line, "ERR") {
@ -199,7 +200,7 @@ func gatherInfoOutput(
} }
if err == nil { if err == nil {
acc.Add(metric, ival, tags) fields[metric] = ival
continue continue
} }
@ -208,13 +209,14 @@ func gatherInfoOutput(
return err return err
} }
acc.Add(metric, fval, tags) fields[metric] = fval
} }
var keyspace_hitrate float64 = 0.0 var keyspace_hitrate float64 = 0.0
if keyspace_hits != 0 || keyspace_misses != 0 { if keyspace_hits != 0 || keyspace_misses != 0 {
keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses) keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
} }
acc.Add("keyspace_hitrate", keyspace_hitrate, tags) fields["keyspace_hitrate"] = keyspace_hitrate
acc.AddFields("redis", fields, tags)
return nil return nil
} }
@ -229,15 +231,17 @@ func gatherKeyspaceLine(
tags map[string]string, tags map[string]string,
) { ) {
if strings.Contains(line, "keys=") { if strings.Contains(line, "keys=") {
fields := make(map[string]interface{})
tags["database"] = name tags["database"] = name
dbparts := strings.Split(line, ",") dbparts := strings.Split(line, ",")
for _, dbp := range dbparts { for _, dbp := range dbparts {
kv := strings.Split(dbp, "=") kv := strings.Split(dbp, "=")
ival, err := strconv.ParseUint(kv[1], 10, 64) ival, err := strconv.ParseUint(kv[1], 10, 64)
if err == nil { if err == nil {
acc.Add(kv[0], ival, tags) fields[kv[0]] = ival
} }
} }
acc.AddFields("redis_keyspace", fields, tags)
} }
} }