[rabbitmq plugin] Add support for per-queue metrics

Also metrics now are gathered concurrently across servers. Fixes #185

fixes #185
closes #334
This commit is contained in:
Eugene Dementiev
2015-11-01 11:22:54 +03:00
committed by Cameron Sparr
parent 688ffd024b
commit 5592738603
2 changed files with 442 additions and 85 deletions

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/influxdb/telegraf/plugins"
)
@@ -18,6 +19,7 @@ type Server struct {
Username string
Password string
Nodes []string
Queues []string
}
type RabbitMQ struct {
@@ -32,10 +34,21 @@ type OverviewResponse struct {
QueueTotals *QueueTotals `json:"queue_totals"`
}
type Details struct {
Rate float64
}
type MessageStats struct {
Ack int64
Deliver int64
Publish int64
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
}
type ObjectTotals struct {
@@ -52,6 +65,19 @@ type QueueTotals struct {
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
}
type Queue struct {
QueueTotals // just to not repeat the same code
MessageStats `json:"message_stats"`
Memory int64
Consumers int64
ConsumerUtilisation float64 `json:"consumer_utilisation"`
Name string
Node string
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
}
type Node struct {
Name string
@@ -68,6 +94,10 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"`
}
type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = `
# Specify servers via an array of tables
[[rabbitmq.servers]]
@@ -96,13 +126,21 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
r.Client = &http.Client{}
}
var errChan = make(chan error, len(r.Servers))
// use localhost is no servers are specified in config
if len(r.Servers) == 0 {
r.gatherServer(localhost, acc)
return nil
r.Servers = append(r.Servers, localhost)
}
for _, serv := range r.Servers {
err := r.gatherServer(serv, acc)
for _, f := range gatherFunctions {
go f(r, serv, acc, errChan)
}
}
for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ {
err := <-errChan
if err != nil {
return err
}
@@ -111,81 +149,6 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error {
return nil
}
func (r *RabbitMQ) gatherServer(serv *Server, acc plugins.Accumulator) error {
overview := &OverviewResponse{}
err := r.requestJSON(serv, "/api/overview", &overview)
if err != nil {
return err
}
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
return fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue")
}
tags := map[string]string{"url": serv.URL}
if serv.Name != "" {
tags["name"] = serv.Name
}
acc.Add("messages", overview.QueueTotals.Messages, tags)
acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags)
acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags)
acc.Add("channels", overview.ObjectTotals.Channels, tags)
acc.Add("connections", overview.ObjectTotals.Connections, tags)
acc.Add("consumers", overview.ObjectTotals.Consumers, tags)
acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags)
acc.Add("queues", overview.ObjectTotals.Queues, tags)
acc.Add("messages_acked", overview.MessageStats.Ack, tags)
acc.Add("messages_delivered", overview.MessageStats.Deliver, tags)
acc.Add("messages_published", overview.MessageStats.Publish, tags)
nodes := make([]Node, 0)
err = r.requestJSON(serv, "/api/nodes", &nodes)
if err != nil {
return err
}
for _, node := range nodes {
if !shouldGatherNode(node, serv) {
continue
}
tags["node"] = node.Name
acc.Add("disk_free", node.DiskFree, tags)
acc.Add("disk_free_limit", node.DiskFreeLimit, tags)
acc.Add("fd_total", node.FdTotal, tags)
acc.Add("fd_used", node.FdUsed, tags)
acc.Add("mem_limit", node.MemLimit, tags)
acc.Add("mem_used", node.MemUsed, tags)
acc.Add("proc_total", node.ProcTotal, tags)
acc.Add("proc_used", node.ProcUsed, tags)
acc.Add("run_queue", node.RunQueue, tags)
acc.Add("sockets_total", node.SocketsTotal, tags)
acc.Add("sockets_used", node.SocketsUsed, tags)
}
return nil
}
func shouldGatherNode(node Node, serv *Server) bool {
if len(serv.Nodes) == 0 {
return true
}
for _, name := range serv.Nodes {
if name == node.Name {
return true
}
}
return false
}
func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error {
u = fmt.Sprintf("%s%s", serv.URL, u)
@@ -218,6 +181,154 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error
return nil
}
func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
overview := &OverviewResponse{}
err := r.requestJSON(serv, "/api/overview", &overview)
if err != nil {
errChan <- err
return
}
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
errChan <- fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue")
return
}
tags := map[string]string{"url": serv.URL}
if serv.Name != "" {
tags["name"] = serv.Name
}
acc.Add("messages", overview.QueueTotals.Messages, tags)
acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags)
acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags)
acc.Add("channels", overview.ObjectTotals.Channels, tags)
acc.Add("connections", overview.ObjectTotals.Connections, tags)
acc.Add("consumers", overview.ObjectTotals.Consumers, tags)
acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags)
acc.Add("queues", overview.ObjectTotals.Queues, tags)
acc.Add("messages_acked", overview.MessageStats.Ack, tags)
acc.Add("messages_delivered", overview.MessageStats.Deliver, tags)
acc.Add("messages_published", overview.MessageStats.Publish, tags)
errChan <- nil
}
func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
nodes := make([]Node, 0)
// Gather information about nodes
err := r.requestJSON(serv, "/api/nodes", &nodes)
if err != nil {
errChan <- err
return
}
for _, node := range nodes {
if !shouldGatherNode(node, serv) {
continue
}
tags := map[string]string{"url": serv.URL}
tags["node"] = node.Name
acc.Add("disk_free", node.DiskFree, tags)
acc.Add("disk_free_limit", node.DiskFreeLimit, tags)
acc.Add("fd_total", node.FdTotal, tags)
acc.Add("fd_used", node.FdUsed, tags)
acc.Add("mem_limit", node.MemLimit, tags)
acc.Add("mem_used", node.MemUsed, tags)
acc.Add("proc_total", node.ProcTotal, tags)
acc.Add("proc_used", node.ProcUsed, tags)
acc.Add("run_queue", node.RunQueue, tags)
acc.Add("sockets_total", node.SocketsTotal, tags)
acc.Add("sockets_used", node.SocketsUsed, tags)
}
errChan <- nil
}
func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) {
// Gather information about queues
queues := make([]Queue, 0)
err := r.requestJSON(serv, "/api/queues", &queues)
if err != nil {
errChan <- err
return
}
for _, queue := range queues {
if !shouldGatherQueue(queue, serv) {
continue
}
tags := map[string]string{
"url": serv.URL,
"queue": queue.Name,
"vhost": queue.Vhost,
"node": queue.Node,
"durable": strconv.FormatBool(queue.Durable),
"auto_delete": strconv.FormatBool(queue.AutoDelete),
}
acc.AddFields(
"queue",
map[string]interface{}{
// common information
"consumers": queue.Consumers,
"consumer_utilisation": queue.ConsumerUtilisation,
"memory": queue.Memory,
// messages information
"messages": queue.Messages,
"messages_ready": queue.MessagesReady,
"messages_unack": queue.MessagesUnacknowledged,
"messages_ack": queue.MessageStats.Ack,
"messages_ack_rate": queue.MessageStats.AckDetails.Rate,
"messages_deliver": queue.MessageStats.Deliver,
"messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate,
"messages_deliver_get": queue.MessageStats.DeliverGet,
"messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate,
"messages_publish": queue.MessageStats.Publish,
"messages_publish_rate": queue.MessageStats.PublishDetails.Rate,
"messages_redeliver": queue.MessageStats.Redeliver,
"messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate,
},
tags,
)
}
errChan <- nil
}
func shouldGatherNode(node Node, serv *Server) bool {
if len(serv.Nodes) == 0 {
return true
}
for _, name := range serv.Nodes {
if name == node.Name {
return true
}
}
return false
}
func shouldGatherQueue(queue Queue, serv *Server) bool {
if len(serv.Queues) == 0 {
return true
}
for _, name := range serv.Queues {
if name == queue.Name {
return true
}
}
return false
}
func init() {
plugins.Add("rabbitmq", func() plugins.Plugin {
return &RabbitMQ{}