Add additional metrics to rabbitmq input

This commit is contained in:
Daniel Nelson 2019-02-26 18:41:18 -08:00 committed by GitHub
commit b805e83b71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 579 additions and 580 deletions

View File

@ -68,20 +68,42 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)
- return_unroutable (int, number of unroutable messages)
- return_unroutable_rate (float, number of unroutable messages per second)
- rabbitmq_node
- disk_free (int, bytes)
- disk_free_limit (int, bytes)
- disk_free_alarm (int, disk alarm)
- fd_total (int, file descriptors)
- fd_used (int, file descriptors)
- mem_limit (int, bytes)
- mem_used (int, bytes)
- mem_alarm (int, memory a)
- proc_total (int, erlang processes)
- proc_used (int, erlang processes)
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)
- uptime (int, milliseconds)
- health_check_status (int, 1 or 0)
- mnesia_disk_tx_count (int, number of disk transaction)
- mnesia_ram_tx_count (int, number of ram transaction)
- mnesia_disk_tx_count_rate (float, number of disk transaction per second)
- mnesia_ram_tx_count_rate (float, number of ram transaction per second)
- gc_num (int, number of garbage collection)
- gc_bytes_reclaimed (int, bytes)
- gc_num_rate (float, number of garbage collection per second)
- gc_bytes_reclaimed_rate (float, bytes per second)
- io_read_avg_time (float, number of read operations)
- io_read_avg_time_rate (int, number of read operations per second)
- io_read_bytes (int, bytes)
- io_read_bytes_rate (float, bytes per second)
- io_write_avg_time (int, milliseconds)
- io_write_avg_time_rate (float, milliseconds per second)
- io_write_bytes (int, bytes)
- io_write_bytes_rate (float, bytes per second)
- rabbitmq_queue
- consumer_utilisation (float, percent)
@ -109,7 +131,9 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- rabbitmq_exchange
- messages_publish_in (int, count)
- messages_publish_in_rate (int, messages per second)
- messages_publish_out (int, count)
- messages_publish_out_rate (int, messages per second)
### Tags:
@ -121,6 +145,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- rabbitmq_node
- node
- url
- rabbitmq_queue
- url

View File

@ -72,23 +72,27 @@ type Listeners struct {
// Details ...
type Details struct {
Rate float64
Rate float64 `json:"rate"`
}
// MessageStats ...
type MessageStats struct {
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishInDetails Details `json:"publish_in_details"`
PublishOut int64 `json:"publish_out"`
PublishOutDetails Details `json:"publish_out_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}
// ObjectTotals ...
@ -131,18 +135,37 @@ type Queue struct {
type Node struct {
Name string
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
MemAlarm bool `json:"mem_alarm"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
Uptime int64 `json:"uptime"`
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
GcNum int64 `json:"gc_num"`
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
}
type Exchange struct {
@ -155,6 +178,10 @@ type Exchange struct {
AutoDelete bool `json:"auto_delete"`
}
type HealthCheck struct {
Status string `json:"status"`
}
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
@ -204,6 +231,13 @@ var sampleConfig = `
queue_name_exclude = []
`
func boolToInt(b bool) int64 {
if b {
return 1
}
return 0
}
// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string {
return sampleConfig
@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}
var clustering_listeners, amqp_listeners int64 = 0, 0
var clusteringListeners, amqpListeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
clusteringListeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
amqpListeners++
}
}
@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
"clustering_listeners": clusteringListeners,
"amqp_listeners": amqpListeners,
"return_unroutable": overview.MessageStats.ReturnUnroutable,
"return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
nodes := make([]Node, 0)
allNodes := make([]Node, 0)
// Gather information about nodes
err := r.requestJSON("/api/nodes", &nodes)
err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}
now := time.Now()
nodes := make(map[string]Node)
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
}
}
numberNodes := len(nodes)
if numberNodes == 0 {
return
}
type NodeHealthCheck struct {
NodeName string
HealthCheck HealthCheck
Error error
}
healthChecksChannel := make(chan NodeHealthCheck, numberNodes)
for _, node := range nodes {
if !r.shouldGatherNode(node) {
continue
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
var healthCheck HealthCheck
err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeHealthCheck := NodeHealthCheck{
NodeName: nodeName,
Error: err,
HealthCheck: healthCheck,
}
healthChecksChannel <- nodeHealthCheck
}(node.Name, healthChecksChannel)
}
now := time.Now()
for i := 0; i < len(nodes); i++ {
nodeHealthCheck := <-healthChecksChannel
var healthCheckStatus int64 = 0
if nodeHealthCheck.Error != nil {
acc.AddError(nodeHealthCheck.Error)
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}
node := nodes[nodeHealthCheck.NodeName]
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name
var running int64 = 0
if node.Running {
running = 1
}
fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
},
tags,
)
@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error {
r.QueueInclude = append(r.QueueInclude, r.Queues...)
}
filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
queueFilter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
if err != nil {
return err
}
r.queueFilter = filter
r.queueFilter = queueFilter
for _, q := range r.QueueExclude {
if q == "*" {

View File

@ -9,503 +9,35 @@ import (
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io/ioutil"
)
const sampleOverviewResponse = `
{
"message_stats": {
"ack": 5246,
"ack_details": {
"rate": 0.0
},
"deliver": 5246,
"deliver_details": {
"rate": 0.0
},
"deliver_get": 5246,
"deliver_get_details": {
"rate": 0.0
},
"publish": 5258,
"publish_details": {
"rate": 0.0
}
},
"object_totals": {
"channels": 44,
"connections": 44,
"consumers": 65,
"exchanges": 43,
"queues": 62
},
"queue_totals": {
"messages": 0,
"messages_details": {
"rate": 0.0
},
"messages_ready": 0,
"messages_ready_details": {
"rate": 0.0
},
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0.0
}
},
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
}
`
const sampleNodesResponse = `
[
{
"db_dir": "/var/lib/rabbitmq/mnesia/rabbit@vagrant-ubuntu-trusty-64",
"disk_free": 37768282112,
"disk_free_alarm": false,
"disk_free_details": {
"rate": 0.0
},
"disk_free_limit": 50000000,
"enabled_plugins": [
"rabbitmq_management"
],
"fd_total": 1024,
"fd_used": 63,
"fd_used_details": {
"rate": 0.0
},
"io_read_avg_time": 0,
"io_read_avg_time_details": {
"rate": 0.0
},
"io_read_bytes": 1,
"io_read_bytes_details": {
"rate": 0.0
},
"io_read_count": 1,
"io_read_count_details": {
"rate": 0.0
},
"io_sync_avg_time": 0,
"io_sync_avg_time_details": {
"rate": 0.0
},
"io_write_avg_time": 0,
"io_write_avg_time_details": {
"rate": 0.0
},
"log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64.log",
"mem_alarm": false,
"mem_limit": 2503771750,
"mem_used": 159707080,
"mem_used_details": {
"rate": 15185.6
},
"mnesia_disk_tx_count": 16,
"mnesia_disk_tx_count_details": {
"rate": 0.0
},
"mnesia_ram_tx_count": 296,
"mnesia_ram_tx_count_details": {
"rate": 0.0
},
"name": "rabbit@vagrant-ubuntu-trusty-64",
"net_ticktime": 60,
"os_pid": "14244",
"partitions": [],
"proc_total": 1048576,
"proc_used": 783,
"proc_used_details": {
"rate": 0.0
},
"processors": 1,
"rates_mode": "basic",
"run_queue": 0,
"running": true,
"sasl_log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64-sasl.log",
"sockets_total": 829,
"sockets_used": 45,
"sockets_used_details": {
"rate": 0.0
},
"type": "disc",
"uptime": 7464827
}
]
`
const sampleQueuesResponse = `
[
{
"memory": 21960,
"messages": 0,
"messages_details": {
"rate": 0
},
"messages_ready": 0,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0
},
"idle_since": "2015-11-01 8:22:15",
"consumer_utilisation": "",
"policy": "federator",
"exclusive_consumer_tag": "",
"consumers": 0,
"recoverable_slaves": "",
"state": "running",
"messages_ram": 0,
"messages_ready_ram": 0,
"messages_unacknowledged_ram": 0,
"messages_persistent": 0,
"message_bytes": 0,
"message_bytes_ready": 0,
"message_bytes_unacknowledged": 0,
"message_bytes_ram": 0,
"message_bytes_persistent": 0,
"disk_reads": 0,
"disk_writes": 0,
"backing_queue_status": {
"q1": 0,
"q2": 0,
"delta": [
"delta",
"undefined",
0,
"undefined"
],
"q3": 0,
"q4": 0,
"len": 0,
"target_ram_count": "infinity",
"next_seq_id": 0,
"avg_ingress_rate": 0,
"avg_egress_rate": 0,
"avg_ack_ingress_rate": 0,
"avg_ack_egress_rate": 0
},
"name": "collectd-queue",
"vhost": "collectd",
"durable": true,
"auto_delete": false,
"arguments": {},
"node": "rabbit@testhost"
},
{
"memory": 55528,
"message_stats": {
"ack": 223654927,
"ack_details": {
"rate": 0
},
"deliver": 224518745,
"deliver_details": {
"rate": 0
},
"deliver_get": 224518829,
"deliver_get_details": {
"rate": 0
},
"get": 19,
"get_details": {
"rate": 0
},
"get_no_ack": 65,
"get_no_ack_details": {
"rate": 0
},
"publish": 223883765,
"publish_details": {
"rate": 0
},
"redeliver": 863805,
"redeliver_details": {
"rate": 0
}
},
"messages": 24,
"messages_details": {
"rate": 0
},
"messages_ready": 24,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0
},
"idle_since": "2015-11-01 8:22:14",
"consumer_utilisation": "",
"policy": "",
"exclusive_consumer_tag": "",
"consumers": 0,
"recoverable_slaves": "",
"state": "running",
"messages_ram": 24,
"messages_ready_ram": 24,
"messages_unacknowledged_ram": 0,
"messages_persistent": 0,
"message_bytes": 149220,
"message_bytes_ready": 149220,
"message_bytes_unacknowledged": 0,
"message_bytes_ram": 149220,
"message_bytes_persistent": 0,
"disk_reads": 0,
"disk_writes": 0,
"backing_queue_status": {
"q1": 0,
"q2": 0,
"delta": [
"delta",
"undefined",
0,
"undefined"
],
"q3": 0,
"q4": 24,
"len": 24,
"target_ram_count": "infinity",
"next_seq_id": 223883765,
"avg_ingress_rate": 0,
"avg_egress_rate": 0,
"avg_ack_ingress_rate": 0,
"avg_ack_egress_rate": 0
},
"name": "telegraf",
"vhost": "collectd",
"durable": true,
"auto_delete": false,
"arguments": {},
"node": "rabbit@testhost"
},
{
"message_stats": {
"ack": 1296077,
"ack_details": {
"rate": 0
},
"deliver": 1513176,
"deliver_details": {
"rate": 0.4
},
"deliver_get": 1513239,
"deliver_get_details": {
"rate": 0.4
},
"disk_writes": 7976,
"disk_writes_details": {
"rate": 0
},
"get": 40,
"get_details": {
"rate": 0
},
"get_no_ack": 23,
"get_no_ack_details": {
"rate": 0
},
"publish": 1325628,
"publish_details": {
"rate": 0.4
},
"redeliver": 216034,
"redeliver_details": {
"rate": 0
}
},
"messages": 5,
"messages_details": {
"rate": 0.4
},
"messages_ready": 0,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 5,
"messages_unacknowledged_details": {
"rate": 0.4
},
"policy": "federator",
"exclusive_consumer_tag": "",
"consumers": 1,
"consumer_utilisation": 1,
"memory": 122856,
"recoverable_slaves": "",
"state": "running",
"messages_ram": 5,
"messages_ready_ram": 0,
"messages_unacknowledged_ram": 5,
"messages_persistent": 0,
"message_bytes": 150096,
"message_bytes_ready": 0,
"message_bytes_unacknowledged": 150096,
"message_bytes_ram": 150096,
"message_bytes_persistent": 0,
"disk_reads": 0,
"disk_writes": 7976,
"backing_queue_status": {
"q1": 0,
"q2": 0,
"delta": [
"delta",
"undefined",
0,
"undefined"
],
"q3": 0,
"q4": 0,
"len": 0,
"target_ram_count": "infinity",
"next_seq_id": 1325628,
"avg_ingress_rate": 0.19115840579934168,
"avg_egress_rate": 0.19115840579934168,
"avg_ack_ingress_rate": 0.19115840579934168,
"avg_ack_egress_rate": 0.1492766485341716
},
"name": "telegraf",
"vhost": "metrics",
"durable": true,
"auto_delete": false,
"arguments": {},
"node": "rabbit@testhost"
}
]
`
const sampleExchangesResponse = `
[
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": ""
},
{
"message_stats": {
"publish_in_details": {
"rate": 0
},
"publish_in": 2,
"publish_out_details": {
"rate": 0
},
"publish_out": 1
},
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "telegraf"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": "amq.direct"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "amq.fanout"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.headers"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.match"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.log"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.trace"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.topic"
}
]
`
func TestRabbitMQGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
var jsonFilePath string
switch r.URL.Path {
case "/api/overview":
rsp = sampleOverviewResponse
jsonFilePath = "testdata/overview.json"
case "/api/nodes":
rsp = sampleNodesResponse
jsonFilePath = "testdata/nodes.json"
case "/api/queues":
rsp = sampleQueuesResponse
jsonFilePath = "testdata/queues.json"
case "/api/exchanges":
rsp = sampleExchangesResponse
jsonFilePath = "testdata/exchanges.json"
case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64":
jsonFilePath = "testdata/healthchecks.json"
default:
panic("Cannot handle request")
}
fmt.Fprintln(w, rsp)
data, err := ioutil.ReadFile(jsonFilePath)
if err != nil {
panic(fmt.Sprintf("could not read from data file %s", jsonFilePath))
}
w.Write(data)
}))
defer ts.Close()
@ -513,60 +45,118 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
URL: ts.URL,
}
var acc testutil.Accumulator
acc := &testutil.Accumulator{}
err := acc.GatherError(r.Gather)
require.NoError(t, err)
intMetrics := []string{
"messages",
"messages_ready",
"messages_unacked",
"messages_acked",
"messages_delivered",
"messages_published",
"channels",
"connections",
"consumers",
"exchanges",
"queues",
"clustering_listeners",
"amqp_listeners",
overviewMetrics := map[string]interface{}{
"messages": 5,
"messages_ready": 32,
"messages_unacked": 27,
"messages_acked": 5246,
"messages_delivered": 5234,
"messages_delivered_get": 3333,
"messages_published": 5258,
"channels": 44,
"connections": 44,
"consumers": 65,
"exchanges": 43,
"queues": 62,
"clustering_listeners": 2,
"amqp_listeners": 2,
"return_unroutable": 10,
"return_unroutable_rate": 3.3,
}
compareMetrics(t, overviewMetrics, acc, "rabbitmq_overview")
for _, metric := range intMetrics {
assert.True(t, acc.HasInt64Field("rabbitmq_overview", metric))
queuesMetrics := map[string]interface{}{
"consumers": 3,
"consumer_utilisation": 1.0,
"memory": 143776,
"message_bytes": 3,
"message_bytes_ready": 4,
"message_bytes_unacked": 5,
"message_bytes_ram": 6,
"message_bytes_persist": 7,
"messages": 44,
"messages_ready": 32,
"messages_unack": 44,
"messages_ack": 3457,
"messages_ack_rate": 9.9,
"messages_deliver": 22222,
"messages_deliver_rate": 333.4,
"messages_deliver_get": 3457,
"messages_deliver_get_rate": 0.2,
"messages_publish": 3457,
"messages_publish_rate": 11.2,
"messages_redeliver": 33,
"messages_redeliver_rate": 2.5,
"idle_since": "2015-11-01 8:22:14",
}
compareMetrics(t, queuesMetrics, acc, "rabbitmq_queue")
nodeIntMetrics := []string{
"disk_free",
"disk_free_limit",
"fd_total",
"fd_used",
"mem_limit",
"mem_used",
"proc_total",
"proc_used",
"run_queue",
"sockets_total",
"sockets_used",
"running",
nodeMetrics := map[string]interface{}{
"disk_free": 3776,
"disk_free_limit": 50000000,
"disk_free_alarm": 0,
"fd_total": 1024,
"fd_used": 63,
"mem_limit": 2503,
"mem_used": 159707080,
"mem_alarm": 1,
"proc_total": 1048576,
"proc_used": 783,
"run_queue": 0,
"sockets_total": 829,
"sockets_used": 45,
"uptime": 7464827,
"running": 1,
"health_check_status": 1,
"mnesia_disk_tx_count": 16,
"mnesia_ram_tx_count": 296,
"mnesia_disk_tx_count_rate": 1.1,
"mnesia_ram_tx_count_rate": 2.2,
"gc_num": 57280132,
"gc_bytes_reclaimed": 2533,
"gc_num_rate": 274.2,
"gc_bytes_reclaimed_rate": 16490856.3,
"io_read_avg_time": 983,
"io_read_avg_time_rate": 88.77,
"io_read_bytes": 1111,
"io_read_bytes_rate": 99.99,
"io_write_avg_time": 134,
"io_write_avg_time_rate": 4.32,
"io_write_bytes": 823,
"io_write_bytes_rate": 32.8,
}
compareMetrics(t, nodeMetrics, acc, "rabbitmq_node")
for _, metric := range nodeIntMetrics {
assert.True(t, acc.HasInt64Field("rabbitmq_node", metric))
exchangeMetrics := map[string]interface{}{
"messages_publish_in": 3678,
"messages_publish_in_rate": 3.2,
"messages_publish_out": 3677,
"messages_publish_out_rate": 5.1,
}
compareMetrics(t, exchangeMetrics, acc, "rabbitmq_exchange")
}
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
func compareMetrics(t *testing.T, expectedMetrics map[string]interface{},
accumulator *testutil.Accumulator, measurementKey string) {
measurement, exist := accumulator.Get(measurementKey)
exchangeIntMetrics := []string{
"messages_publish_in",
"messages_publish_out",
}
assert.True(t, exist, "There is measurement %s", measurementKey)
assert.Equal(t, len(expectedMetrics), len(measurement.Fields))
for _, metric := range exchangeIntMetrics {
assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric))
for metricName, metricValue := range expectedMetrics {
actualMetricValue := measurement.Fields[metricName]
if accumulator.HasStringField(measurementKey, metricName) {
assert.Equal(t, metricValue, actualMetricValue,
"Metric name: %s", metricName)
} else {
assert.InDelta(t, metricValue, actualMetricValue, 0e5,
"Metric name: %s", metricName)
}
}
}

View File

@ -0,0 +1,22 @@
[
{
"message_stats": {
"publish_in_details": {
"rate": 3.2
},
"publish_in": 3678,
"publish_out_details": {
"rate": 5.1
},
"publish_out": 3677
},
"user_who_performed_action": "mistral_testuser_1",
"arguments": {},
"internal": false,
"auto_delete": true,
"durable": false,
"type": "direct",
"vhost": "sorandomsorandom",
"name": "reply_a716f0523cd44941ad2ea6ce4a3869c3"
}
]

View File

@ -0,0 +1 @@
{"status":"ok"}

View File

@ -0,0 +1,87 @@
[
{
"db_dir": "/var/lib/rabbitmq/mnesia/rabbit@vagrant-ubuntu-trusty-64",
"disk_free": 3776,
"disk_free_alarm": false,
"disk_free_details": {
"rate": 0.0
},
"disk_free_limit": 50000000,
"enabled_plugins": [
"rabbitmq_management"
],
"gc_num": 57280132,
"gc_num_details": {
"rate": 274.2
},
"gc_bytes_reclaimed": 2533,
"gc_bytes_reclaimed_details": {
"rate": 16490856.3
},
"fd_total": 1024,
"fd_used": 63,
"fd_used_details": {
"rate": 0.0
},
"io_read_avg_time": 983,
"io_read_avg_time_details": {
"rate": 88.77
},
"io_read_bytes": 1111,
"io_read_bytes_details": {
"rate": 99.99
},
"io_read_count": 1,
"io_read_count_details": {
"rate": 0.0
},
"io_sync_avg_time": 0,
"io_sync_avg_time_details": {
"rate": 0.0
},
"io_write_avg_time": 134,
"io_write_avg_time_details": {
"rate": 4.32
},
"io_write_bytes": 823,
"io_write_bytes_details": {
"rate": 32.8
},
"log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64.log",
"mem_alarm": true,
"mem_limit": 2503,
"mem_used": 159707080,
"mem_used_details": {
"rate": 15185.6
},
"mnesia_disk_tx_count": 16,
"mnesia_disk_tx_count_details": {
"rate": 1.1
},
"mnesia_ram_tx_count": 296,
"mnesia_ram_tx_count_details": {
"rate": 2.2
},
"name": "rabbit@vagrant-ubuntu-trusty-64",
"net_ticktime": 60,
"os_pid": "14244",
"partitions": [],
"proc_total": 1048576,
"proc_used": 783,
"proc_used_details": {
"rate": 0.0
},
"processors": 1,
"rates_mode": "basic",
"run_queue": 0,
"running": true,
"sasl_log_file": "/var/log/rabbitmq/rabbit@vagrant-ubuntu-trusty-64-sasl.log",
"sockets_total": 829,
"sockets_used": 45,
"sockets_used_details": {
"rate": 0.0
},
"type": "disc",
"uptime": 7464827
}
]

View File

@ -0,0 +1,63 @@
{
"message_stats": {
"ack": 5246,
"ack_details": {
"rate": 0.0
},
"deliver": 5234,
"deliver_details": {
"rate": 0.0
},
"deliver_get": 3333,
"deliver_get_details": {
"rate": 0.0
},
"publish": 5258,
"publish_details": {
"rate": 0.0
},
"return_unroutable": 10,
"return_unroutable_details": {
"rate": 3.3
}
},
"object_totals": {
"channels": 44,
"connections": 44,
"consumers": 65,
"exchanges": 43,
"queues": 62
},
"queue_totals": {
"messages": 5,
"messages_details": {
"rate": 0.0
},
"messages_ready": 32,
"messages_ready_details": {
"rate": 0.0
},
"messages_unacknowledged": 27,
"messages_unacknowledged_details": {
"rate": 0.0
}
},
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
}

View File

@ -0,0 +1,114 @@
[
{
"messages_details": {
"rate": 0.0
},
"messages": 44,
"messages_unacknowledged_details": {
"rate": 0.0
},
"messages_unacknowledged": 44,
"messages_ready_details": {
"rate": 0.0
},
"messages_ready": 32,
"reductions_details": {
"rate": 223.0
},
"reductions": 15875433,
"message_stats": {
"deliver_get_details": {
"rate": 0.2
},
"deliver_get": 3457,
"ack_details": {
"rate": 9.9
},
"ack": 3457,
"redeliver_details": {
"rate": 2.5
},
"redeliver": 33,
"deliver_no_ack_details": {
"rate": 0.0
},
"deliver_no_ack": 0,
"deliver_details": {
"rate": 333.4
},
"deliver": 22222,
"get_no_ack_details": {
"rate": 0.0
},
"get_no_ack": 0,
"get_details": {
"rate": 0.0
},
"get": 0,
"publish_details": {
"rate": 11.2
},
"publish": 3457
},
"node": "rabbit@rmqlocal-0.rmqlocal.ankorabbitstatefulset3.svc.cluster.local",
"arguments": {
"x-expires": 1800000,
"x-ha-policy": "all"
},
"exclusive": false,
"auto_delete": false,
"durable": false,
"vhost": "sorandomsorandom",
"name": "reply_a716f0523cd44941ad2ea6ce4a3869c3",
"message_bytes_paged_out": 0,
"messages_paged_out": 0,
"idle_since": "2015-11-01 8:22:14",
"backing_queue_status": {
"avg_ack_egress_rate": 0.2374460025857711,
"avg_ack_ingress_rate": 0.2374460025857711,
"avg_egress_rate": 0.2374460025857711,
"avg_ingress_rate": 0.2374460025857711,
"delta": [
"delta",
"undefined",
0,
0,
"undefined"
],
"len": 0,
"mode": "default",
"next_seq_id": 3457,
"q1": 0,
"q2": 0,
"q3": 0,
"q4": 0,
"target_ram_count": 0
},
"head_message_timestamp": null,
"message_bytes_persistent": 7,
"message_bytes_ram": 6,
"message_bytes_unacknowledged": 5,
"message_bytes_ready": 4,
"message_bytes": 3,
"messages_persistent": 0,
"messages_unacknowledged_ram": 0,
"messages_ready_ram": 0,
"messages_ram": 0,
"garbage_collection": {
"minor_gcs": 314,
"fullsweep_after": 65535,
"min_heap_size": 233,
"min_bin_vheap_size": 46422,
"max_heap_size": 0
},
"state": "running",
"recoverable_slaves": null,
"memory": 143776,
"consumer_utilisation": 1.0,
"consumers": 3,
"exclusive_consumer_tag": null,
"effective_policy_definition": [],
"operator_policy": null,
"policy": null
}
]