diff --git a/plugins/all/all.go b/plugins/all/all.go index 1a46d5054..ff2e807a8 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/prometheus" + _ "github.com/influxdb/telegraf/plugins/rabbitmq" _ "github.com/influxdb/telegraf/plugins/redis" _ "github.com/influxdb/telegraf/plugins/system" ) diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go new file mode 100644 index 000000000..cd5ec6dc2 --- /dev/null +++ b/plugins/rabbitmq/rabbitmq.go @@ -0,0 +1,218 @@ +package rabbitmq + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/influxdb/telegraf/plugins" +) + +const DefaultUsername = "guest" +const DefaultPassword = "guest" +const DefaultURL = "http://localhost:15672" + +type Server struct { + URL string + Username string + Password string + Nodes []string +} + +type RabbitMQ struct { + Servers []*Server + + Client *http.Client +} + +type OverviewResponse struct { + MessageStats *MessageStats `json:"message_stats"` + ObjectTotals *ObjectTotals `json:"object_totals"` + QueueTotals *QueueTotals `json:"queue_totals"` +} + +type MessageStats struct { + Ack int64 + Deliver int64 + Publish int64 +} + +type ObjectTotals struct { + Channels int64 + Connections int64 + Consumers int64 + Exchanges int64 + Queues int64 +} + +type QueueTotals struct { + Messages int64 + MessagesReady int64 `json:"messages_ready"` + MessagesUnacknowledged int64 `json:"messages_unacknowledged"` +} + +type Node struct { + Name string + + DiskFree int64 `json:"disk_free"` + DiskFreeLimit int64 `json:"disk_free_limit"` + FdTotal int64 `json:"fd_total"` + FdUsed int64 `json:"fd_used"` + MemLimit int64 `json:"mem_limit"` + MemUsed int64 `json:"mem_used"` + ProcTotal int64 `json:"proc_total"` + ProcUsed int64 `json:"proc_used"` + RunQueue int64 `json:"run_queue"` + SocketsTotal int64 `json:"sockets_total"` + SocketsUsed int64 `json:"sockets_used"` +} + +var sampleConfig = ` +# Specify servers via an array of tables +[[rabbitmq.servers]] +# url = "http://localhost:15672" +# username = "guest" +# password = "guest" + +# A list of nodes to pull metrics about. If not specified, metrics for +# all nodes are gathered. +# nodes = ["rabbit@node1", "rabbit@node2"] +` + +func (r *RabbitMQ) SampleConfig() string { + return sampleConfig +} + +func (r *RabbitMQ) Description() string { + return "Read metrics from one or many RabbitMQ servers via the management API" +} + +var localhost = &Server{URL: DefaultURL} + +func (r *RabbitMQ) Gather(acc plugins.Accumulator) error { + if r.Client == nil { + r.Client = &http.Client{} + } + + if len(r.Servers) == 0 { + r.gatherServer(localhost, acc) + return nil + } + + for _, serv := range r.Servers { + err := r.gatherServer(serv, acc) + if err != nil { + return err + } + } + + return nil +} + +func (r *RabbitMQ) gatherServer(serv *Server, acc plugins.Accumulator) error { + overview := &OverviewResponse{} + + err := r.requestJSON(serv, "/api/overview", &overview) + if err != nil { + return err + } + + tags := map[string]string{} + + acc.Add("messages", overview.QueueTotals.Messages, tags) + acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags) + acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags) + + acc.Add("channels", overview.ObjectTotals.Channels, tags) + acc.Add("connections", overview.ObjectTotals.Connections, tags) + acc.Add("consumers", overview.ObjectTotals.Consumers, tags) + acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags) + acc.Add("queues", overview.ObjectTotals.Queues, tags) + + if overview.MessageStats != nil { + acc.Add("messages_acked", overview.MessageStats.Ack, tags) + acc.Add("messages_delivered", overview.MessageStats.Deliver, tags) + acc.Add("messages_published", overview.MessageStats.Publish, tags) + } + + nodes := make([]Node, 0) + + err = r.requestJSON(serv, "/api/nodes", &nodes) + if err != nil { + return err + } + + for _, node := range nodes { + if !shouldGatherNode(node, serv) { + continue + } + + tags = map[string]string{"node": node.Name} + + acc.Add("disk_free", node.DiskFree, tags) + acc.Add("disk_free_limit", node.DiskFreeLimit, tags) + acc.Add("fd_total", node.FdTotal, tags) + acc.Add("fd_used", node.FdUsed, tags) + acc.Add("mem_limit", node.MemLimit, tags) + acc.Add("mem_used", node.MemUsed, tags) + acc.Add("proc_total", node.ProcTotal, tags) + acc.Add("proc_used", node.ProcUsed, tags) + acc.Add("run_queue", node.RunQueue, tags) + acc.Add("sockets_total", node.SocketsTotal, tags) + acc.Add("sockets_used", node.SocketsUsed, tags) + } + + return nil +} + +func shouldGatherNode(node Node, serv *Server) bool { + if len(serv.Nodes) == 0 { + return true + } + + for _, name := range serv.Nodes { + if name == node.Name { + return true + } + } + + return false +} + +func (r *RabbitMQ) requestJSON(serv *Server, u string, target interface{}) error { + u = fmt.Sprintf("%s%s", serv.URL, u) + + req, err := http.NewRequest("GET", u, nil) + if err != nil { + return err + } + + username := serv.Username + if username == "" { + username = DefaultUsername + } + + password := serv.Password + if password == "" { + password = DefaultPassword + } + + req.SetBasicAuth(username, password) + + resp, err := r.Client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + json.NewDecoder(resp.Body).Decode(target) + + return nil +} + +func init() { + plugins.Add("rabbitmq", func() plugins.Plugin { + return &RabbitMQ{} + }) +} diff --git a/plugins/rabbitmq/rabbitmq_test.go b/plugins/rabbitmq/rabbitmq_test.go new file mode 100644 index 000000000..689eb71cf --- /dev/null +++ b/plugins/rabbitmq/rabbitmq_test.go @@ -0,0 +1,202 @@ +package rabbitmq + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const sampleOverviewResponse = ` +{ + "message_stats": { + "ack": 5246, + "ack_details": { + "rate": 0.0 + }, + "deliver": 5246, + "deliver_details": { + "rate": 0.0 + }, + "deliver_get": 5246, + "deliver_get_details": { + "rate": 0.0 + }, + "publish": 5258, + "publish_details": { + "rate": 0.0 + } + }, + "object_totals": { + "channels": 44, + "connections": 44, + "consumers": 65, + "exchanges": 43, + "queues": 62 + }, + "queue_totals": { + "messages": 0, + "messages_details": { + "rate": 0.0 + }, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0.0 + }, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0.0 + } + } +} +` + +const sampleNodesResponse = ` +[ + { + "db_dir": "/var/lib/rabbitmq/mnesia/rabbit@vagrant-ubuntu-trusty-64", + "disk_free": 37768282112, + "disk_free_alarm": false, + "disk_free_details": { + "rate": 0.0 + }, + "disk_free_limit": 50000000, + "enabled_plugins": [ + "rabbitmq_management" + ], + "fd_total": 1024, + "fd_used": 63, + "fd_used_details": { + "rate": 0.0 + }, + "io_read_avg_time": 0, + "io_read_avg_time_details": { + "rate": 0.0 + }, + "io_read_bytes": 1, + "io_read_bytes_details": { + "rate": 0.0 + }, + "io_read_count": 1, + "io_read_count_details": { + "rate": 0.0 + }, + "io_sync_avg_time": 0, + "io_sync_avg_time_details": { + "rate": 0.0 + }, + "io_write_avg_time": 0, + "io_write_avg_time_details": { + "rate": 0.0 + }, + "log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64.log", + "mem_alarm": false, + "mem_limit": 2503771750, + "mem_used": 159707080, + "mem_used_details": { + "rate": 15185.6 + }, + "mnesia_disk_tx_count": 16, + "mnesia_disk_tx_count_details": { + "rate": 0.0 + }, + "mnesia_ram_tx_count": 296, + "mnesia_ram_tx_count_details": { + "rate": 0.0 + }, + "name": "rabbit@vagrant-ubuntu-trusty-64", + "net_ticktime": 60, + "os_pid": "14244", + "partitions": [], + "proc_total": 1048576, + "proc_used": 783, + "proc_used_details": { + "rate": 0.0 + }, + "processors": 1, + "rates_mode": "basic", + "run_queue": 0, + "running": true, + "sasl_log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64-sasl.log", + "sockets_total": 829, + "sockets_used": 45, + "sockets_used_details": { + "rate": 0.0 + }, + "type": "disc", + "uptime": 7464827 + } +] +` + +func TestRabbitMQGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + + if r.URL.Path == "/api/overview" { + rsp = sampleOverviewResponse + } else if r.URL.Path == "/api/nodes" { + rsp = sampleNodesResponse + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + r := &RabbitMQ{ + Servers: []*Server{ + { + URL: ts.URL, + }, + }, + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{ + "messages", + "messages_ready", + "messages_unacked", + + "messages_acked", + "messages_delivered", + "messages_published", + + "channels", + "connections", + "consumers", + "exchanges", + "queues", + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric)) + } + + nodeIntMetrics := []string{ + "disk_free", + "disk_free_limit", + "fd_total", + "fd_used", + "mem_limit", + "mem_used", + "proc_total", + "proc_used", + "run_queue", + "sockets_total", + "sockets_used", + } + + for _, metric := range nodeIntMetrics { + assert.True(t, acc.HasIntValue(metric)) + } +}