diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go index 4fad0aa2e..8fe5437d3 100644 --- a/plugins/rabbitmq/rabbitmq.go +++ b/plugins/rabbitmq/rabbitmq.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "github.com/influxdb/telegraf/plugins" ) @@ -18,6 +19,7 @@ type Server struct { Username string Password string Nodes []string + Queues []string } type RabbitMQ struct { @@ -32,10 +34,21 @@ type OverviewResponse struct { QueueTotals *QueueTotals `json:"queue_totals"` } +type Details struct { + Rate float64 +} + type MessageStats struct { - Ack int64 - Deliver int64 - Publish int64 + Ack int64 + AckDetails Details `json:"ack_details"` + Deliver int64 + DeliverDetails Details `json:"deliver_details"` + DeliverGet int64 + DeliverGetDetails Details `json:"deliver_get_details"` + Publish int64 + PublishDetails Details `json:"publish_details"` + Redeliver int64 + RedeliverDetails Details `json:"redeliver_details"` } type ObjectTotals struct { @@ -52,6 +65,19 @@ type QueueTotals struct { MessagesUnacknowledged int64 `json:"messages_unacknowledged"` } +type Queue struct { + QueueTotals // just to not repeat the same code + MessageStats `json:"message_stats"` + Memory int64 + Consumers int64 + ConsumerUtilisation float64 `json:"consumer_utilisation"` + Name string + Node string + Vhost string + Durable bool + AutoDelete bool `json:"auto_delete"` +} + type Node struct { Name string @@ -68,6 +94,10 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } +type gatherFunc func(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) + +var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} + var sampleConfig = ` # Specify servers via an array of tables [[rabbitmq.servers]] @@ -96,13 +126,21 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { r.Client = &http.Client{} } + var errChan = make(chan error, len(r.Servers)) + + // use localhost is no servers are specified in config if len(r.Servers) == 0 { - r.gatherServer(localhost, acc) - return nil + r.Servers = append(r.Servers, localhost) } for _, serv := range r.Servers { - err := r.gatherServer(serv, acc) + for _, f := range gatherFunctions { + go f(r, serv, acc, errChan) + } + } + + for i := 1; i <= len(r.Servers)*len(gatherFunctions); i++ { + err := <-errChan if err != nil { return err } @@ -111,81 +149,6 @@ func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { return nil } -func (r *RabbitMQ) gatherServer(serv *Server, acc plugins.Accumulator) error { - overview := &OverviewResponse{} - - err := r.requestJSON(serv, "/api/overview", &overview) - if err != nil { - return err - } - - if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil { - return fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue") - } - - tags := map[string]string{"url": serv.URL} - if serv.Name != "" { - tags["name"] = serv.Name - } - - acc.Add("messages", overview.QueueTotals.Messages, tags) - acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) - acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) - - acc.Add("channels", overview.ObjectTotals.Channels, tags) - acc.Add("connections", overview.ObjectTotals.Connections, tags) - acc.Add("consumers", overview.ObjectTotals.Consumers, tags) - acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) - acc.Add("queues", overview.ObjectTotals.Queues, tags) - - acc.Add("messages_acked", overview.MessageStats.Ack, tags) - acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) - acc.Add("messages_published", overview.MessageStats.Publish, tags) - - nodes := make([]Node, 0) - - err = r.requestJSON(serv, "/api/nodes", &nodes) - if err != nil { - return err - } - - for _, node := range nodes { - if !shouldGatherNode(node, serv) { - continue - } - - tags["node"] = node.Name - - acc.Add("disk_free", node.DiskFree, tags) - acc.Add("disk_free_limit", node.DiskFreeLimit, tags) - acc.Add("fd_total", node.FdTotal, tags) - acc.Add("fd_used", node.FdUsed, tags) - acc.Add("mem_limit", node.MemLimit, tags) - acc.Add("mem_used", node.MemUsed, tags) - acc.Add("proc_total", node.ProcTotal, tags) - acc.Add("proc_used", node.ProcUsed, tags) - acc.Add("run_queue", node.RunQueue, tags) - acc.Add("sockets_total", node.SocketsTotal, tags) - acc.Add("sockets_used", node.SocketsUsed, tags) - } - - return nil -} - -func shouldGatherNode(node Node, serv *Server) bool { - if len(serv.Nodes) == 0 { - return true - } - - for _, name := range serv.Nodes { - if name == node.Name { - return true - } - } - - return false -} - func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error { u = fmt.Sprintf("%s%s", serv.URL, u) @@ -218,6 +181,154 @@ func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error return nil } +func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { + overview := &OverviewResponse{} + + err := r.requestJSON(serv, "/api/overview", &overview) + if err != nil { + errChan <- err + return + } + + if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil { + errChan <- fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue") + return + } + + tags := map[string]string{"url": serv.URL} + if serv.Name != "" { + tags["name"] = serv.Name + } + + acc.Add("messages", overview.QueueTotals.Messages, tags) + acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) + acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) + + acc.Add("channels", overview.ObjectTotals.Channels, tags) + acc.Add("connections", overview.ObjectTotals.Connections, tags) + acc.Add("consumers", overview.ObjectTotals.Consumers, tags) + acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) + acc.Add("queues", overview.ObjectTotals.Queues, tags) + + acc.Add("messages_acked", overview.MessageStats.Ack, tags) + acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) + acc.Add("messages_published", overview.MessageStats.Publish, tags) + + errChan <- nil +} + +func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { + nodes := make([]Node, 0) + // Gather information about nodes + err := r.requestJSON(serv, "/api/nodes", &nodes) + if err != nil { + errChan <- err + return + } + + for _, node := range nodes { + if !shouldGatherNode(node, serv) { + continue + } + + tags := map[string]string{"url": serv.URL} + tags["node"] = node.Name + + acc.Add("disk_free", node.DiskFree, tags) + acc.Add("disk_free_limit", node.DiskFreeLimit, tags) + acc.Add("fd_total", node.FdTotal, tags) + acc.Add("fd_used", node.FdUsed, tags) + acc.Add("mem_limit", node.MemLimit, tags) + acc.Add("mem_used", node.MemUsed, tags) + acc.Add("proc_total", node.ProcTotal, tags) + acc.Add("proc_used", node.ProcUsed, tags) + acc.Add("run_queue", node.RunQueue, tags) + acc.Add("sockets_total", node.SocketsTotal, tags) + acc.Add("sockets_used", node.SocketsUsed, tags) + } + + errChan <- nil +} + +func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan chan error) { + // Gather information about queues + queues := make([]Queue, 0) + err := r.requestJSON(serv, "/api/queues", &queues) + if err != nil { + errChan <- err + return + } + + for _, queue := range queues { + if !shouldGatherQueue(queue, serv) { + continue + } + tags := map[string]string{ + "url": serv.URL, + "queue": queue.Name, + "vhost": queue.Vhost, + "node": queue.Node, + "durable": strconv.FormatBool(queue.Durable), + "auto_delete": strconv.FormatBool(queue.AutoDelete), + } + + acc.AddFields( + "queue", + map[string]interface{}{ + // common information + "consumers": queue.Consumers, + "consumer_utilisation": queue.ConsumerUtilisation, + "memory": queue.Memory, + // messages information + "messages": queue.Messages, + "messages_ready": queue.MessagesReady, + "messages_unack": queue.MessagesUnacknowledged, + "messages_ack": queue.MessageStats.Ack, + "messages_ack_rate": queue.MessageStats.AckDetails.Rate, + "messages_deliver": queue.MessageStats.Deliver, + "messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate, + "messages_deliver_get": queue.MessageStats.DeliverGet, + "messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate, + "messages_publish": queue.MessageStats.Publish, + "messages_publish_rate": queue.MessageStats.PublishDetails.Rate, + "messages_redeliver": queue.MessageStats.Redeliver, + "messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate, + }, + tags, + ) + } + + errChan <- nil +} + +func shouldGatherNode(node Node, serv *Server) bool { + if len(serv.Nodes) == 0 { + return true + } + + for _, name := range serv.Nodes { + if name == node.Name { + return true + } + } + + return false +} + +func shouldGatherQueue(queue Queue, serv *Server) bool { + if len(serv.Queues) == 0 { + return true + } + + for _, name := range serv.Queues { + if name == queue.Name { + return true + } + } + + return false +} + func init() { plugins.Add("rabbitmq", func() plugins.Plugin { return &RabbitMQ{} diff --git a/plugins/rabbitmq/rabbitmq_test.go b/plugins/rabbitmq/rabbitmq_test.go index 689eb71cf..38bfb7a7d 100644 --- a/plugins/rabbitmq/rabbitmq_test.go +++ b/plugins/rabbitmq/rabbitmq_test.go @@ -13,7 +13,7 @@ import ( const sampleOverviewResponse = ` { - "message_stats": { + "message_stats": { "ack": 5246, "ack_details": { "rate": 0.0 @@ -132,16 +132,260 @@ const sampleNodesResponse = ` } ] ` +const sampleQueuesResponse = ` +[ + { + "memory": 21960, + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "idle_since": "2015-11-01 8:22:15", + "consumer_utilisation": "", + "policy": "federator", + "exclusive_consumer_tag": "", + "consumers": 0, + "recoverable_slaves": "", + "state": "running", + "messages_ram": 0, + "messages_ready_ram": 0, + "messages_unacknowledged_ram": 0, + "messages_persistent": 0, + "message_bytes": 0, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 0, + "message_bytes_ram": 0, + "message_bytes_persistent": 0, + "disk_reads": 0, + "disk_writes": 0, + "backing_queue_status": { + "q1": 0, + "q2": 0, + "delta": [ + "delta", + "undefined", + 0, + "undefined" + ], + "q3": 0, + "q4": 0, + "len": 0, + "target_ram_count": "infinity", + "next_seq_id": 0, + "avg_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_ack_egress_rate": 0 + }, + "name": "collectd-queue", + "vhost": "collectd", + "durable": true, + "auto_delete": false, + "arguments": {}, + "node": "rabbit@testhost" + }, + { + "memory": 55528, + "message_stats": { + "ack": 223654927, + "ack_details": { + "rate": 0 + }, + "deliver": 224518745, + "deliver_details": { + "rate": 0 + }, + "deliver_get": 224518829, + "deliver_get_details": { + "rate": 0 + }, + "get": 19, + "get_details": { + "rate": 0 + }, + "get_no_ack": 65, + "get_no_ack_details": { + "rate": 0 + }, + "publish": 223883765, + "publish_details": { + "rate": 0 + }, + "redeliver": 863805, + "redeliver_details": { + "rate": 0 + } + }, + "messages": 24, + "messages_details": { + "rate": 0 + }, + "messages_ready": 24, + "messages_ready_details": { + "rate": 0 + }, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "idle_since": "2015-11-01 8:22:14", + "consumer_utilisation": "", + "policy": "", + "exclusive_consumer_tag": "", + "consumers": 0, + "recoverable_slaves": "", + "state": "running", + "messages_ram": 24, + "messages_ready_ram": 24, + "messages_unacknowledged_ram": 0, + "messages_persistent": 0, + "message_bytes": 149220, + "message_bytes_ready": 149220, + "message_bytes_unacknowledged": 0, + "message_bytes_ram": 149220, + "message_bytes_persistent": 0, + "disk_reads": 0, + "disk_writes": 0, + "backing_queue_status": { + "q1": 0, + "q2": 0, + "delta": [ + "delta", + "undefined", + 0, + "undefined" + ], + "q3": 0, + "q4": 24, + "len": 24, + "target_ram_count": "infinity", + "next_seq_id": 223883765, + "avg_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_ack_egress_rate": 0 + }, + "name": "telegraf", + "vhost": "collectd", + "durable": true, + "auto_delete": false, + "arguments": {}, + "node": "rabbit@testhost" + }, + { + "message_stats": { + "ack": 1296077, + "ack_details": { + "rate": 0 + }, + "deliver": 1513176, + "deliver_details": { + "rate": 0.4 + }, + "deliver_get": 1513239, + "deliver_get_details": { + "rate": 0.4 + }, + "disk_writes": 7976, + "disk_writes_details": { + "rate": 0 + }, + "get": 40, + "get_details": { + "rate": 0 + }, + "get_no_ack": 23, + "get_no_ack_details": { + "rate": 0 + }, + "publish": 1325628, + "publish_details": { + "rate": 0.4 + }, + "redeliver": 216034, + "redeliver_details": { + "rate": 0 + } + }, + "messages": 5, + "messages_details": { + "rate": 0.4 + }, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_unacknowledged": 5, + "messages_unacknowledged_details": { + "rate": 0.4 + }, + "policy": "federator", + "exclusive_consumer_tag": "", + "consumers": 1, + "consumer_utilisation": 1, + "memory": 122856, + "recoverable_slaves": "", + "state": "running", + "messages_ram": 5, + "messages_ready_ram": 0, + "messages_unacknowledged_ram": 5, + "messages_persistent": 0, + "message_bytes": 150096, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 150096, + "message_bytes_ram": 150096, + "message_bytes_persistent": 0, + "disk_reads": 0, + "disk_writes": 7976, + "backing_queue_status": { + "q1": 0, + "q2": 0, + "delta": [ + "delta", + "undefined", + 0, + "undefined" + ], + "q3": 0, + "q4": 0, + "len": 0, + "target_ram_count": "infinity", + "next_seq_id": 1325628, + "avg_ingress_rate": 0.19115840579934168, + "avg_egress_rate": 0.19115840579934168, + "avg_ack_ingress_rate": 0.19115840579934168, + "avg_ack_egress_rate": 0.1492766485341716 + }, + "name": "telegraf", + "vhost": "metrics", + "durable": true, + "auto_delete": false, + "arguments": {}, + "node": "rabbit@testhost" + } +] +` func TestRabbitMQGeneratesMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var rsp string - if r.URL.Path == "/api/overview" { + switch r.URL.Path { + case "/api/overview": rsp = sampleOverviewResponse - } else if r.URL.Path == "/api/nodes" { + case "/api/nodes": rsp = sampleNodesResponse - } else { + case "/api/queues": + rsp = sampleQueuesResponse + default: panic("Cannot handle request") } @@ -199,4 +443,6 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { for _, metric := range nodeIntMetrics { assert.True(t, acc.HasIntValue(metric)) } + + assert.True(t, acc.HasMeasurement("queue")) }