From 6c4032071f275a9faca3f2150998c0f039c3006a Mon Sep 17 00:00:00 2001 From: Vitalii Solodilov Date: Tue, 19 Jun 2018 11:19:23 +0400 Subject: [PATCH] Improvement of RabbitMQ plugin #3025 #3252 * new metrics: * unroutable messages * node uptime * gc metrics * mnesia metrics * node healthcheck * IO metrics * refactoring tests: * moved the json examples to a separate files * check metric values Signed-off-by: Vitalii Solodilov --- plugins/inputs/rabbitmq/README.md | 25 + plugins/inputs/rabbitmq/rabbitmq.go | 209 ++++-- plugins/inputs/rabbitmq/rabbitmq_test.go | 638 ++++-------------- .../inputs/rabbitmq/testdata/exchanges.json | 22 + .../rabbitmq/testdata/healthchecks.json | 1 + plugins/inputs/rabbitmq/testdata/nodes.json | 87 +++ .../inputs/rabbitmq/testdata/overview.json | 63 ++ plugins/inputs/rabbitmq/testdata/queues.json | 114 ++++ 8 files changed, 579 insertions(+), 580 deletions(-) create mode 100644 plugins/inputs/rabbitmq/testdata/exchanges.json create mode 100644 plugins/inputs/rabbitmq/testdata/healthchecks.json create mode 100644 plugins/inputs/rabbitmq/testdata/nodes.json create mode 100644 plugins/inputs/rabbitmq/testdata/overview.json create mode 100644 plugins/inputs/rabbitmq/testdata/queues.json diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index ae6dac6f1..0406df700 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -68,20 +68,42 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - queues (int, queues) - clustering_listeners (int, cluster nodes) - amqp_listeners (int, amqp nodes up) + - return_unroutable (int, number of unroutable messages) + - return_unroutable_rate (float, number of unroutable messages per second) - rabbitmq_node - disk_free (int, bytes) - disk_free_limit (int, bytes) + - disk_free_alarm (int, disk alarm) - fd_total (int, file descriptors) - fd_used (int, file descriptors) - mem_limit (int, bytes) - mem_used (int, bytes) + - mem_alarm (int, memory a) - proc_total (int, erlang processes) - proc_used (int, erlang processes) - run_queue (int, erlang processes) - sockets_total (int, sockets) - sockets_used (int, sockets) - running (int, node up) + - uptime (int, milliseconds) + - health_check_status (int, 1 or 0) + - mnesia_disk_tx_count (int, number of disk transaction) + - mnesia_ram_tx_count (int, number of ram transaction) + - mnesia_disk_tx_count_rate (float, number of disk transaction per second) + - mnesia_ram_tx_count_rate (float, number of ram transaction per second) + - gc_num (int, number of garbage collection) + - gc_bytes_reclaimed (int, bytes) + - gc_num_rate (float, number of garbage collection per second) + - gc_bytes_reclaimed_rate (float, bytes per second) + - io_read_avg_time (float, number of read operations) + - io_read_avg_time_rate (int, number of read operations per second) + - io_read_bytes (int, bytes) + - io_read_bytes_rate (float, bytes per second) + - io_write_avg_time (int, milliseconds) + - io_write_avg_time_rate (float, milliseconds per second) + - io_write_bytes (int, bytes) + - io_write_bytes_rate (float, bytes per second) - rabbitmq_queue - consumer_utilisation (float, percent) @@ -109,7 +131,9 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - rabbitmq_exchange - messages_publish_in (int, count) + - messages_publish_in_rate (int, messages per second) - messages_publish_out (int, count) + - messages_publish_out_rate (int, messages per second) ### Tags: @@ -121,6 +145,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - rabbitmq_node - node + - url - rabbitmq_queue - url diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 49dabe1b5..4e7e918da 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -72,23 +72,27 @@ type Listeners struct { // Details ... type Details struct { - Rate float64 + Rate float64 `json:"rate"` } // MessageStats ... type MessageStats struct { - Ack int64 - AckDetails Details `json:"ack_details"` - Deliver int64 - DeliverDetails Details `json:"deliver_details"` - DeliverGet int64 `json:"deliver_get"` - DeliverGetDetails Details `json:"deliver_get_details"` - Publish int64 - PublishDetails Details `json:"publish_details"` - Redeliver int64 - RedeliverDetails Details `json:"redeliver_details"` - PublishIn int64 `json:"publish_in"` - PublishOut int64 `json:"publish_out"` + Ack int64 + AckDetails Details `json:"ack_details"` + Deliver int64 + DeliverDetails Details `json:"deliver_details"` + DeliverGet int64 `json:"deliver_get"` + DeliverGetDetails Details `json:"deliver_get_details"` + Publish int64 + PublishDetails Details `json:"publish_details"` + Redeliver int64 + RedeliverDetails Details `json:"redeliver_details"` + PublishIn int64 `json:"publish_in"` + PublishInDetails Details `json:"publish_in_details"` + PublishOut int64 `json:"publish_out"` + PublishOutDetails Details `json:"publish_out_details"` + ReturnUnroutable int64 `json:"return_unroutable"` + ReturnUnroutableDetails Details `json:"return_unroutable_details"` } // ObjectTotals ... @@ -131,18 +135,37 @@ type Queue struct { type Node struct { Name string - DiskFree int64 `json:"disk_free"` - DiskFreeLimit int64 `json:"disk_free_limit"` - FdTotal int64 `json:"fd_total"` - FdUsed int64 `json:"fd_used"` - MemLimit int64 `json:"mem_limit"` - MemUsed int64 `json:"mem_used"` - ProcTotal int64 `json:"proc_total"` - ProcUsed int64 `json:"proc_used"` - RunQueue int64 `json:"run_queue"` - SocketsTotal int64 `json:"sockets_total"` - SocketsUsed int64 `json:"sockets_used"` - Running bool `json:"running"` + DiskFree int64 `json:"disk_free"` + DiskFreeLimit int64 `json:"disk_free_limit"` + DiskFreeAlarm bool `json:"disk_free_alarm"` + FdTotal int64 `json:"fd_total"` + FdUsed int64 `json:"fd_used"` + MemLimit int64 `json:"mem_limit"` + MemUsed int64 `json:"mem_used"` + MemAlarm bool `json:"mem_alarm"` + ProcTotal int64 `json:"proc_total"` + ProcUsed int64 `json:"proc_used"` + RunQueue int64 `json:"run_queue"` + SocketsTotal int64 `json:"sockets_total"` + SocketsUsed int64 `json:"sockets_used"` + Running bool `json:"running"` + Uptime int64 `json:"uptime"` + MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"` + MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"` + MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"` + MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"` + GcNum int64 `json:"gc_num"` + GcNumDetails Details `json:"gc_num_details"` + GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"` + GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"` + IoReadAvgTime int64 `json:"io_read_avg_time"` + IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"` + IoReadBytes int64 `json:"io_read_bytes"` + IoReadBytesDetails Details `json:"io_read_bytes_details"` + IoWriteAvgTime int64 `json:"io_write_avg_time"` + IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"` + IoWriteBytes int64 `json:"io_write_bytes"` + IoWriteBytesDetails Details `json:"io_write_bytes_details"` } type Exchange struct { @@ -155,6 +178,10 @@ type Exchange struct { AutoDelete bool `json:"auto_delete"` } +type HealthCheck struct { + Status string `json:"status"` +} + // gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator) @@ -204,6 +231,13 @@ var sampleConfig = ` queue_name_exclude = [] ` +func boolToInt(b bool) int64 { + if b { + return 1 + } + return 0 +} + // SampleConfig ... func (r *RabbitMQ) SampleConfig() string { return sampleConfig @@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { return } - var clustering_listeners, amqp_listeners int64 = 0, 0 + var clusteringListeners, amqpListeners int64 = 0, 0 for _, listener := range overview.Listeners { if listener.Protocol == "clustering" { - clustering_listeners++ + clusteringListeners++ } else if listener.Protocol == "amqp" { - amqp_listeners++ + amqpListeners++ } } @@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { "messages_delivered": overview.MessageStats.Deliver, "messages_delivered_get": overview.MessageStats.DeliverGet, "messages_published": overview.MessageStats.Publish, - "clustering_listeners": clustering_listeners, - "amqp_listeners": amqp_listeners, + "clustering_listeners": clusteringListeners, + "amqp_listeners": amqpListeners, + "return_unroutable": overview.MessageStats.ReturnUnroutable, + "return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate, } acc.AddFields("rabbitmq_overview", fields, tags) } func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { - nodes := make([]Node, 0) + allNodes := make([]Node, 0) // Gather information about nodes - err := r.requestJSON("/api/nodes", &nodes) + err := r.requestJSON("/api/nodes", &allNodes) if err != nil { acc.AddError(err) return } - now := time.Now() + + nodes := make(map[string]Node) + for _, node := range allNodes { + if r.shouldGatherNode(node) { + nodes[node.Name] = node + } + } + + numberNodes := len(nodes) + if numberNodes == 0 { + return + } + + type NodeHealthCheck struct { + NodeName string + HealthCheck HealthCheck + Error error + } + + healthChecksChannel := make(chan NodeHealthCheck, numberNodes) for _, node := range nodes { - if !r.shouldGatherNode(node) { - continue + go func(nodeName string, healthChecksChannel chan NodeHealthCheck) { + var healthCheck HealthCheck + + err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck) + nodeHealthCheck := NodeHealthCheck{ + NodeName: nodeName, + Error: err, + HealthCheck: healthCheck, + } + + healthChecksChannel <- nodeHealthCheck + }(node.Name, healthChecksChannel) + } + + now := time.Now() + + for i := 0; i < len(nodes); i++ { + nodeHealthCheck := <-healthChecksChannel + + var healthCheckStatus int64 = 0 + + if nodeHealthCheck.Error != nil { + acc.AddError(nodeHealthCheck.Error) + } else if nodeHealthCheck.HealthCheck.Status == "ok" { + healthCheckStatus = 1 } + node := nodes[nodeHealthCheck.NodeName] + tags := map[string]string{"url": r.URL} tags["node"] = node.Name - var running int64 = 0 - if node.Running { - running = 1 - } - fields := map[string]interface{}{ - "disk_free": node.DiskFree, - "disk_free_limit": node.DiskFreeLimit, - "fd_total": node.FdTotal, - "fd_used": node.FdUsed, - "mem_limit": node.MemLimit, - "mem_used": node.MemUsed, - "proc_total": node.ProcTotal, - "proc_used": node.ProcUsed, - "run_queue": node.RunQueue, - "sockets_total": node.SocketsTotal, - "sockets_used": node.SocketsUsed, - "running": running, + "disk_free": node.DiskFree, + "disk_free_limit": node.DiskFreeLimit, + "disk_free_alarm": boolToInt(node.DiskFreeAlarm), + "fd_total": node.FdTotal, + "fd_used": node.FdUsed, + "mem_limit": node.MemLimit, + "mem_used": node.MemUsed, + "mem_alarm": boolToInt(node.MemAlarm), + "proc_total": node.ProcTotal, + "proc_used": node.ProcUsed, + "run_queue": node.RunQueue, + "sockets_total": node.SocketsTotal, + "sockets_used": node.SocketsUsed, + "uptime": node.Uptime, + "mnesia_disk_tx_count": node.MnesiaDiskTxCount, + "mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate, + "mnesia_ram_tx_count": node.MnesiaRamTxCount, + "mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate, + "gc_num": node.GcNum, + "gc_num_rate": node.GcNumDetails.Rate, + "gc_bytes_reclaimed": node.GcBytesReclaimed, + "gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate, + "io_read_avg_time": node.IoReadAvgTime, + "io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate, + "io_read_bytes": node.IoReadBytes, + "io_read_bytes_rate": node.IoReadBytesDetails.Rate, + "io_write_avg_time": node.IoWriteAvgTime, + "io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate, + "io_write_bytes": node.IoWriteBytes, + "io_write_bytes_rate": node.IoWriteBytesDetails.Rate, + "running": boolToInt(node.Running), + "health_check_status": healthCheckStatus, } acc.AddFields("rabbitmq_node", fields, tags, now) } @@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) { acc.AddFields( "rabbitmq_exchange", map[string]interface{}{ - "messages_publish_in": exchange.MessageStats.PublishIn, - "messages_publish_out": exchange.MessageStats.PublishOut, + "messages_publish_in": exchange.MessageStats.PublishIn, + "messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate, + "messages_publish_out": exchange.MessageStats.PublishOut, + "messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate, }, tags, ) @@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error { r.QueueInclude = append(r.QueueInclude, r.Queues...) } - filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude) + queueFilter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude) if err != nil { return err } - r.queueFilter = filter + r.queueFilter = queueFilter for _, q := range r.QueueExclude { if q == "*" { diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 5e9829cc0..0f98f95ce 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -9,503 +9,35 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "io/ioutil" ) -const sampleOverviewResponse = ` -{ - "message_stats": { - "ack": 5246, - "ack_details": { - "rate": 0.0 - }, - "deliver": 5246, - "deliver_details": { - "rate": 0.0 - }, - "deliver_get": 5246, - "deliver_get_details": { - "rate": 0.0 - }, - "publish": 5258, - "publish_details": { - "rate": 0.0 - } - }, - "object_totals": { - "channels": 44, - "connections": 44, - "consumers": 65, - "exchanges": 43, - "queues": 62 - }, - "queue_totals": { - "messages": 0, - "messages_details": { - "rate": 0.0 - }, - "messages_ready": 0, - "messages_ready_details": { - "rate": 0.0 - }, - "messages_unacknowledged": 0, - "messages_unacknowledged_details": { - "rate": 0.0 - } - }, - "listeners": [ - { - "name": "rabbit@node-a", - "protocol": "amqp" - }, - { - "name": "rabbit@node-b", - "protocol": "amqp" - }, - { - "name": "rabbit@node-a", - "protocol": "clustering" - }, - { - "name": "rabbit@node-b", - "protocol": "clustering" - } - ] -} -` - -const sampleNodesResponse = ` -[ - { - "db_dir": "/var/lib/rabbitmq/mnesia/rabbit@vagrant-ubuntu-trusty-64", - "disk_free": 37768282112, - "disk_free_alarm": false, - "disk_free_details": { - "rate": 0.0 - }, - "disk_free_limit": 50000000, - "enabled_plugins": [ - "rabbitmq_management" - ], - "fd_total": 1024, - "fd_used": 63, - "fd_used_details": { - "rate": 0.0 - }, - "io_read_avg_time": 0, - "io_read_avg_time_details": { - "rate": 0.0 - }, - "io_read_bytes": 1, - "io_read_bytes_details": { - "rate": 0.0 - }, - "io_read_count": 1, - "io_read_count_details": { - "rate": 0.0 - }, - "io_sync_avg_time": 0, - "io_sync_avg_time_details": { - "rate": 0.0 - }, - "io_write_avg_time": 0, - "io_write_avg_time_details": { - "rate": 0.0 - }, - "log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64.log", - "mem_alarm": false, - "mem_limit": 2503771750, - "mem_used": 159707080, - "mem_used_details": { - "rate": 15185.6 - }, - "mnesia_disk_tx_count": 16, - "mnesia_disk_tx_count_details": { - "rate": 0.0 - }, - "mnesia_ram_tx_count": 296, - "mnesia_ram_tx_count_details": { - "rate": 0.0 - }, - "name": "rabbit@vagrant-ubuntu-trusty-64", - "net_ticktime": 60, - "os_pid": "14244", - "partitions": [], - "proc_total": 1048576, - "proc_used": 783, - "proc_used_details": { - "rate": 0.0 - }, - "processors": 1, - "rates_mode": "basic", - "run_queue": 0, - "running": true, - "sasl_log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64-sasl.log", - "sockets_total": 829, - "sockets_used": 45, - "sockets_used_details": { - "rate": 0.0 - }, - "type": "disc", - "uptime": 7464827 - } -] -` -const sampleQueuesResponse = ` -[ - { - "memory": 21960, - "messages": 0, - "messages_details": { - "rate": 0 - }, - "messages_ready": 0, - "messages_ready_details": { - "rate": 0 - }, - "messages_unacknowledged": 0, - "messages_unacknowledged_details": { - "rate": 0 - }, - "idle_since": "2015-11-01 8:22:15", - "consumer_utilisation": "", - "policy": "federator", - "exclusive_consumer_tag": "", - "consumers": 0, - "recoverable_slaves": "", - "state": "running", - "messages_ram": 0, - "messages_ready_ram": 0, - "messages_unacknowledged_ram": 0, - "messages_persistent": 0, - "message_bytes": 0, - "message_bytes_ready": 0, - "message_bytes_unacknowledged": 0, - "message_bytes_ram": 0, - "message_bytes_persistent": 0, - "disk_reads": 0, - "disk_writes": 0, - "backing_queue_status": { - "q1": 0, - "q2": 0, - "delta": [ - "delta", - "undefined", - 0, - "undefined" - ], - "q3": 0, - "q4": 0, - "len": 0, - "target_ram_count": "infinity", - "next_seq_id": 0, - "avg_ingress_rate": 0, - "avg_egress_rate": 0, - "avg_ack_ingress_rate": 0, - "avg_ack_egress_rate": 0 - }, - "name": "collectd-queue", - "vhost": "collectd", - "durable": true, - "auto_delete": false, - "arguments": {}, - "node": "rabbit@testhost" - }, - { - "memory": 55528, - "message_stats": { - "ack": 223654927, - "ack_details": { - "rate": 0 - }, - "deliver": 224518745, - "deliver_details": { - "rate": 0 - }, - "deliver_get": 224518829, - "deliver_get_details": { - "rate": 0 - }, - "get": 19, - "get_details": { - "rate": 0 - }, - "get_no_ack": 65, - "get_no_ack_details": { - "rate": 0 - }, - "publish": 223883765, - "publish_details": { - "rate": 0 - }, - "redeliver": 863805, - "redeliver_details": { - "rate": 0 - } - }, - "messages": 24, - "messages_details": { - "rate": 0 - }, - "messages_ready": 24, - "messages_ready_details": { - "rate": 0 - }, - "messages_unacknowledged": 0, - "messages_unacknowledged_details": { - "rate": 0 - }, - "idle_since": "2015-11-01 8:22:14", - "consumer_utilisation": "", - "policy": "", - "exclusive_consumer_tag": "", - "consumers": 0, - "recoverable_slaves": "", - "state": "running", - "messages_ram": 24, - "messages_ready_ram": 24, - "messages_unacknowledged_ram": 0, - "messages_persistent": 0, - "message_bytes": 149220, - "message_bytes_ready": 149220, - "message_bytes_unacknowledged": 0, - "message_bytes_ram": 149220, - "message_bytes_persistent": 0, - "disk_reads": 0, - "disk_writes": 0, - "backing_queue_status": { - "q1": 0, - "q2": 0, - "delta": [ - "delta", - "undefined", - 0, - "undefined" - ], - "q3": 0, - "q4": 24, - "len": 24, - "target_ram_count": "infinity", - "next_seq_id": 223883765, - "avg_ingress_rate": 0, - "avg_egress_rate": 0, - "avg_ack_ingress_rate": 0, - "avg_ack_egress_rate": 0 - }, - "name": "telegraf", - "vhost": "collectd", - "durable": true, - "auto_delete": false, - "arguments": {}, - "node": "rabbit@testhost" - }, - { - "message_stats": { - "ack": 1296077, - "ack_details": { - "rate": 0 - }, - "deliver": 1513176, - "deliver_details": { - "rate": 0.4 - }, - "deliver_get": 1513239, - "deliver_get_details": { - "rate": 0.4 - }, - "disk_writes": 7976, - "disk_writes_details": { - "rate": 0 - }, - "get": 40, - "get_details": { - "rate": 0 - }, - "get_no_ack": 23, - "get_no_ack_details": { - "rate": 0 - }, - "publish": 1325628, - "publish_details": { - "rate": 0.4 - }, - "redeliver": 216034, - "redeliver_details": { - "rate": 0 - } - }, - "messages": 5, - "messages_details": { - "rate": 0.4 - }, - "messages_ready": 0, - "messages_ready_details": { - "rate": 0 - }, - "messages_unacknowledged": 5, - "messages_unacknowledged_details": { - "rate": 0.4 - }, - "policy": "federator", - "exclusive_consumer_tag": "", - "consumers": 1, - "consumer_utilisation": 1, - "memory": 122856, - "recoverable_slaves": "", - "state": "running", - "messages_ram": 5, - "messages_ready_ram": 0, - "messages_unacknowledged_ram": 5, - "messages_persistent": 0, - "message_bytes": 150096, - "message_bytes_ready": 0, - "message_bytes_unacknowledged": 150096, - "message_bytes_ram": 150096, - "message_bytes_persistent": 0, - "disk_reads": 0, - "disk_writes": 7976, - "backing_queue_status": { - "q1": 0, - "q2": 0, - "delta": [ - "delta", - "undefined", - 0, - "undefined" - ], - "q3": 0, - "q4": 0, - "len": 0, - "target_ram_count": "infinity", - "next_seq_id": 1325628, - "avg_ingress_rate": 0.19115840579934168, - "avg_egress_rate": 0.19115840579934168, - "avg_ack_ingress_rate": 0.19115840579934168, - "avg_ack_egress_rate": 0.1492766485341716 - }, - "name": "telegraf", - "vhost": "metrics", - "durable": true, - "auto_delete": false, - "arguments": {}, - "node": "rabbit@testhost" - } -] -` - -const sampleExchangesResponse = ` -[ - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "direct", - "vhost": "\/", - "name": "" - }, - { - "message_stats": { - "publish_in_details": { - "rate": 0 - }, - "publish_in": 2, - "publish_out_details": { - "rate": 0 - }, - "publish_out": 1 - }, - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "fanout", - "vhost": "\/", - "name": "telegraf" - }, - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "direct", - "vhost": "\/", - "name": "amq.direct" - }, - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "fanout", - "vhost": "\/", - "name": "amq.fanout" - }, - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "headers", - "vhost": "\/", - "name": "amq.headers" - }, - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "headers", - "vhost": "\/", - "name": "amq.match" - }, - { - "arguments": { }, - "internal": true, - "auto_delete": false, - "durable": true, - "type": "topic", - "vhost": "\/", - "name": "amq.rabbitmq.log" - }, - { - "arguments": { }, - "internal": true, - "auto_delete": false, - "durable": true, - "type": "topic", - "vhost": "\/", - "name": "amq.rabbitmq.trace" - }, - { - "arguments": { }, - "internal": false, - "auto_delete": false, - "durable": true, - "type": "topic", - "vhost": "\/", - "name": "amq.topic" - } -] -` - func TestRabbitMQGeneratesMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var rsp string + var jsonFilePath string switch r.URL.Path { case "/api/overview": - rsp = sampleOverviewResponse + jsonFilePath = "testdata/overview.json" case "/api/nodes": - rsp = sampleNodesResponse + jsonFilePath = "testdata/nodes.json" case "/api/queues": - rsp = sampleQueuesResponse + jsonFilePath = "testdata/queues.json" case "/api/exchanges": - rsp = sampleExchangesResponse + jsonFilePath = "testdata/exchanges.json" + case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64": + jsonFilePath = "testdata/healthchecks.json" default: panic("Cannot handle request") } - fmt.Fprintln(w, rsp) + data, err := ioutil.ReadFile(jsonFilePath) + + if err != nil { + panic(fmt.Sprintf("could not read from data file %s", jsonFilePath)) + } + + w.Write(data) })) defer ts.Close() @@ -513,60 +45,118 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { URL: ts.URL, } - var acc testutil.Accumulator + acc := &testutil.Accumulator{} err := acc.GatherError(r.Gather) require.NoError(t, err) - intMetrics := []string{ - "messages", - "messages_ready", - "messages_unacked", - - "messages_acked", - "messages_delivered", - "messages_published", - - "channels", - "connections", - "consumers", - "exchanges", - "queues", - "clustering_listeners", - "amqp_listeners", + overviewMetrics := map[string]interface{}{ + "messages": 5, + "messages_ready": 32, + "messages_unacked": 27, + "messages_acked": 5246, + "messages_delivered": 5234, + "messages_delivered_get": 3333, + "messages_published": 5258, + "channels": 44, + "connections": 44, + "consumers": 65, + "exchanges": 43, + "queues": 62, + "clustering_listeners": 2, + "amqp_listeners": 2, + "return_unroutable": 10, + "return_unroutable_rate": 3.3, } + compareMetrics(t, overviewMetrics, acc, "rabbitmq_overview") - for _, metric := range intMetrics { - assert.True(t, acc.HasInt64Field("rabbitmq_overview", metric)) + queuesMetrics := map[string]interface{}{ + "consumers": 3, + "consumer_utilisation": 1.0, + "memory": 143776, + "message_bytes": 3, + "message_bytes_ready": 4, + "message_bytes_unacked": 5, + "message_bytes_ram": 6, + "message_bytes_persist": 7, + "messages": 44, + "messages_ready": 32, + "messages_unack": 44, + "messages_ack": 3457, + "messages_ack_rate": 9.9, + "messages_deliver": 22222, + "messages_deliver_rate": 333.4, + "messages_deliver_get": 3457, + "messages_deliver_get_rate": 0.2, + "messages_publish": 3457, + "messages_publish_rate": 11.2, + "messages_redeliver": 33, + "messages_redeliver_rate": 2.5, + "idle_since": "2015-11-01 8:22:14", } + compareMetrics(t, queuesMetrics, acc, "rabbitmq_queue") - nodeIntMetrics := []string{ - "disk_free", - "disk_free_limit", - "fd_total", - "fd_used", - "mem_limit", - "mem_used", - "proc_total", - "proc_used", - "run_queue", - "sockets_total", - "sockets_used", - "running", + nodeMetrics := map[string]interface{}{ + "disk_free": 3776, + "disk_free_limit": 50000000, + "disk_free_alarm": 0, + "fd_total": 1024, + "fd_used": 63, + "mem_limit": 2503, + "mem_used": 159707080, + "mem_alarm": 1, + "proc_total": 1048576, + "proc_used": 783, + "run_queue": 0, + "sockets_total": 829, + "sockets_used": 45, + "uptime": 7464827, + "running": 1, + "health_check_status": 1, + "mnesia_disk_tx_count": 16, + "mnesia_ram_tx_count": 296, + "mnesia_disk_tx_count_rate": 1.1, + "mnesia_ram_tx_count_rate": 2.2, + "gc_num": 57280132, + "gc_bytes_reclaimed": 2533, + "gc_num_rate": 274.2, + "gc_bytes_reclaimed_rate": 16490856.3, + "io_read_avg_time": 983, + "io_read_avg_time_rate": 88.77, + "io_read_bytes": 1111, + "io_read_bytes_rate": 99.99, + "io_write_avg_time": 134, + "io_write_avg_time_rate": 4.32, + "io_write_bytes": 823, + "io_write_bytes_rate": 32.8, } + compareMetrics(t, nodeMetrics, acc, "rabbitmq_node") - for _, metric := range nodeIntMetrics { - assert.True(t, acc.HasInt64Field("rabbitmq_node", metric)) + exchangeMetrics := map[string]interface{}{ + "messages_publish_in": 3678, + "messages_publish_in_rate": 3.2, + "messages_publish_out": 3677, + "messages_publish_out_rate": 5.1, } + compareMetrics(t, exchangeMetrics, acc, "rabbitmq_exchange") +} - assert.True(t, acc.HasMeasurement("rabbitmq_queue")) +func compareMetrics(t *testing.T, expectedMetrics map[string]interface{}, + accumulator *testutil.Accumulator, measurementKey string) { + measurement, exist := accumulator.Get(measurementKey) - exchangeIntMetrics := []string{ - "messages_publish_in", - "messages_publish_out", - } + assert.True(t, exist, "There is measurement %s", measurementKey) + assert.Equal(t, len(expectedMetrics), len(measurement.Fields)) - for _, metric := range exchangeIntMetrics { - assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric)) + for metricName, metricValue := range expectedMetrics { + actualMetricValue := measurement.Fields[metricName] + + if accumulator.HasStringField(measurementKey, metricName) { + assert.Equal(t, metricValue, actualMetricValue, + "Metric name: %s", metricName) + } else { + assert.InDelta(t, metricValue, actualMetricValue, 0e5, + "Metric name: %s", metricName) + } } } diff --git a/plugins/inputs/rabbitmq/testdata/exchanges.json b/plugins/inputs/rabbitmq/testdata/exchanges.json new file mode 100644 index 000000000..203c29a59 --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/exchanges.json @@ -0,0 +1,22 @@ +[ + { + "message_stats": { + "publish_in_details": { + "rate": 3.2 + }, + "publish_in": 3678, + "publish_out_details": { + "rate": 5.1 + }, + "publish_out": 3677 + }, + "user_who_performed_action": "mistral_testuser_1", + "arguments": {}, + "internal": false, + "auto_delete": true, + "durable": false, + "type": "direct", + "vhost": "sorandomsorandom", + "name": "reply_a716f0523cd44941ad2ea6ce4a3869c3" + } +] \ No newline at end of file diff --git a/plugins/inputs/rabbitmq/testdata/healthchecks.json b/plugins/inputs/rabbitmq/testdata/healthchecks.json new file mode 100644 index 000000000..1a36cf5fc --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/healthchecks.json @@ -0,0 +1 @@ +{"status":"ok"} \ No newline at end of file diff --git a/plugins/inputs/rabbitmq/testdata/nodes.json b/plugins/inputs/rabbitmq/testdata/nodes.json new file mode 100644 index 000000000..42b7a4b7a --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/nodes.json @@ -0,0 +1,87 @@ +[ + { + "db_dir": "/var/lib/rabbitmq/mnesia/rabbit@vagrant-ubuntu-trusty-64", + "disk_free": 3776, + "disk_free_alarm": false, + "disk_free_details": { + "rate": 0.0 + }, + "disk_free_limit": 50000000, + "enabled_plugins": [ + "rabbitmq_management" + ], + "gc_num": 57280132, + "gc_num_details": { + "rate": 274.2 + }, + "gc_bytes_reclaimed": 2533, + "gc_bytes_reclaimed_details": { + "rate": 16490856.3 + }, + "fd_total": 1024, + "fd_used": 63, + "fd_used_details": { + "rate": 0.0 + }, + "io_read_avg_time": 983, + "io_read_avg_time_details": { + "rate": 88.77 + }, + "io_read_bytes": 1111, + "io_read_bytes_details": { + "rate": 99.99 + }, + "io_read_count": 1, + "io_read_count_details": { + "rate": 0.0 + }, + "io_sync_avg_time": 0, + "io_sync_avg_time_details": { + "rate": 0.0 + }, + "io_write_avg_time": 134, + "io_write_avg_time_details": { + "rate": 4.32 + }, + "io_write_bytes": 823, + "io_write_bytes_details": { + "rate": 32.8 + }, + "log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64.log", + "mem_alarm": true, + "mem_limit": 2503, + "mem_used": 159707080, + "mem_used_details": { + "rate": 15185.6 + }, + "mnesia_disk_tx_count": 16, + "mnesia_disk_tx_count_details": { + "rate": 1.1 + }, + "mnesia_ram_tx_count": 296, + "mnesia_ram_tx_count_details": { + "rate": 2.2 + }, + "name": "rabbit@vagrant-ubuntu-trusty-64", + "net_ticktime": 60, + "os_pid": "14244", + "partitions": [], + "proc_total": 1048576, + "proc_used": 783, + "proc_used_details": { + "rate": 0.0 + }, + "processors": 1, + "rates_mode": "basic", + "run_queue": 0, + "running": true, + "sasl_log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64-sasl.log", + "sockets_total": 829, + "sockets_used": 45, + "sockets_used_details": { + "rate": 0.0 + }, + "type": "disc", + "uptime": 7464827 + } +] \ No newline at end of file diff --git a/plugins/inputs/rabbitmq/testdata/overview.json b/plugins/inputs/rabbitmq/testdata/overview.json new file mode 100644 index 000000000..a4cbb2ad6 --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/overview.json @@ -0,0 +1,63 @@ +{ + "message_stats": { + "ack": 5246, + "ack_details": { + "rate": 0.0 + }, + "deliver": 5234, + "deliver_details": { + "rate": 0.0 + }, + "deliver_get": 3333, + "deliver_get_details": { + "rate": 0.0 + }, + "publish": 5258, + "publish_details": { + "rate": 0.0 + }, + "return_unroutable": 10, + "return_unroutable_details": { + "rate": 3.3 + } + }, + "object_totals": { + "channels": 44, + "connections": 44, + "consumers": 65, + "exchanges": 43, + "queues": 62 + }, + "queue_totals": { + "messages": 5, + "messages_details": { + "rate": 0.0 + }, + "messages_ready": 32, + "messages_ready_details": { + "rate": 0.0 + }, + "messages_unacknowledged": 27, + "messages_unacknowledged_details": { + "rate": 0.0 + } + }, + "listeners": [ + { + "name": "rabbit@node-a", + "protocol": "amqp" + }, + { + "name": "rabbit@node-b", + "protocol": "amqp" + }, + { + "name": "rabbit@node-a", + "protocol": "clustering" + }, + { + "name": "rabbit@node-b", + "protocol": "clustering" + } + ] +} \ No newline at end of file diff --git a/plugins/inputs/rabbitmq/testdata/queues.json b/plugins/inputs/rabbitmq/testdata/queues.json new file mode 100644 index 000000000..356e1a466 --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/queues.json @@ -0,0 +1,114 @@ +[ + { + "messages_details": { + "rate": 0.0 + }, + "messages": 44, + "messages_unacknowledged_details": { + "rate": 0.0 + }, + "messages_unacknowledged": 44, + "messages_ready_details": { + "rate": 0.0 + }, + "messages_ready": 32, + "reductions_details": { + "rate": 223.0 + }, + "reductions": 15875433, + "message_stats": { + "deliver_get_details": { + "rate": 0.2 + }, + "deliver_get": 3457, + "ack_details": { + "rate": 9.9 + }, + "ack": 3457, + "redeliver_details": { + "rate": 2.5 + }, + "redeliver": 33, + "deliver_no_ack_details": { + "rate": 0.0 + }, + "deliver_no_ack": 0, + "deliver_details": { + "rate": 333.4 + }, + "deliver": 22222, + "get_no_ack_details": { + "rate": 0.0 + }, + "get_no_ack": 0, + "get_details": { + "rate": 0.0 + }, + "get": 0, + "publish_details": { + "rate": 11.2 + }, + "publish": 3457 + }, + "node": "rabbit@rmqlocal-0.rmqlocal.ankorabbitstatefulset3.svc.cluster.local", + "arguments": { + "x-expires": 1800000, + "x-ha-policy": "all" + }, + "exclusive": false, + "auto_delete": false, + "durable": false, + "vhost": "sorandomsorandom", + "name": "reply_a716f0523cd44941ad2ea6ce4a3869c3", + "message_bytes_paged_out": 0, + "messages_paged_out": 0, + "idle_since": "2015-11-01 8:22:14", + "backing_queue_status": { + "avg_ack_egress_rate": 0.2374460025857711, + "avg_ack_ingress_rate": 0.2374460025857711, + "avg_egress_rate": 0.2374460025857711, + "avg_ingress_rate": 0.2374460025857711, + "delta": [ + "delta", + "undefined", + 0, + 0, + "undefined" + ], + "len": 0, + "mode": "default", + "next_seq_id": 3457, + "q1": 0, + "q2": 0, + "q3": 0, + "q4": 0, + "target_ram_count": 0 + }, + "head_message_timestamp": null, + "message_bytes_persistent": 7, + "message_bytes_ram": 6, + "message_bytes_unacknowledged": 5, + "message_bytes_ready": 4, + "message_bytes": 3, + "messages_persistent": 0, + "messages_unacknowledged_ram": 0, + "messages_ready_ram": 0, + "messages_ram": 0, + "garbage_collection": { + "minor_gcs": 314, + "fullsweep_after": 65535, + "min_heap_size": 233, + "min_bin_vheap_size": 46422, + "max_heap_size": 0 + }, + "state": "running", + "recoverable_slaves": null, + "memory": 143776, + "consumer_utilisation": 1.0, + "consumers": 3, + "exclusive_consumer_tag": null, + "effective_policy_definition": [], + "operator_policy": null, + "policy": null + } +] \ No newline at end of file