Add RabbitMQ cluster and running nodes count and running node status (#3703)
This commit is contained in:
parent
ff63421d0e
commit
782b1336e3
|
@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
||||||
- messages_ready (int, messages)
|
- messages_ready (int, messages)
|
||||||
- messages_unacked (int, messages)
|
- messages_unacked (int, messages)
|
||||||
- queues (int, queues)
|
- queues (int, queues)
|
||||||
|
- clustering_listeners (int, cluster nodes)
|
||||||
|
- amqp_listeners (int, amqp nodes up)
|
||||||
|
|
||||||
- rabbitmq_node
|
- rabbitmq_node
|
||||||
- disk_free (int, bytes)
|
- disk_free (int, bytes)
|
||||||
|
@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
||||||
- run_queue (int, erlang processes)
|
- run_queue (int, erlang processes)
|
||||||
- sockets_total (int, sockets)
|
- sockets_total (int, sockets)
|
||||||
- sockets_used (int, sockets)
|
- sockets_used (int, sockets)
|
||||||
|
- running (int, node up)
|
||||||
|
|
||||||
- rabbitmq_queue
|
- rabbitmq_queue
|
||||||
- consumer_utilisation (float, percent)
|
- consumer_utilisation (float, percent)
|
||||||
|
@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
|
||||||
|
|
||||||
```
|
```
|
||||||
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
|
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
|
||||||
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
|
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000
|
||||||
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
|
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
|
||||||
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
|
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
|
||||||
```
|
```
|
||||||
|
|
|
@ -60,6 +60,12 @@ type OverviewResponse struct {
|
||||||
MessageStats *MessageStats `json:"message_stats"`
|
MessageStats *MessageStats `json:"message_stats"`
|
||||||
ObjectTotals *ObjectTotals `json:"object_totals"`
|
ObjectTotals *ObjectTotals `json:"object_totals"`
|
||||||
QueueTotals *QueueTotals `json:"queue_totals"`
|
QueueTotals *QueueTotals `json:"queue_totals"`
|
||||||
|
Listeners []Listeners `json:"listeners"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listeners ...
|
||||||
|
type Listeners struct {
|
||||||
|
Protocol string `json:"protocol"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Details ...
|
// Details ...
|
||||||
|
@ -134,6 +140,7 @@ type Node struct {
|
||||||
RunQueue int64 `json:"run_queue"`
|
RunQueue int64 `json:"run_queue"`
|
||||||
SocketsTotal int64 `json:"sockets_total"`
|
SocketsTotal int64 `json:"sockets_total"`
|
||||||
SocketsUsed int64 `json:"sockets_used"`
|
SocketsUsed int64 `json:"sockets_used"`
|
||||||
|
Running bool `json:"running"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exchange struct {
|
type Exchange struct {
|
||||||
|
@ -186,7 +193,7 @@ var sampleConfig = `
|
||||||
# queues = ["telegraf"]
|
# queues = ["telegraf"]
|
||||||
|
|
||||||
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
||||||
## specified, metrics for all exchanges are gathered.
|
## specified, metrics for all exchanges are gathered.
|
||||||
# exchanges = ["telegraf"]
|
# exchanges = ["telegraf"]
|
||||||
`
|
`
|
||||||
|
|
||||||
|
@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
|
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
|
||||||
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
|
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var clustering_listeners, amqp_listeners int64 = 0, 0
|
||||||
|
for _, listener := range overview.Listeners {
|
||||||
|
if listener.Protocol == "clustering" {
|
||||||
|
clustering_listeners++
|
||||||
|
} else if listener.Protocol == "amqp" {
|
||||||
|
amqp_listeners++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tags := map[string]string{"url": r.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
if r.Name != "" {
|
if r.Name != "" {
|
||||||
tags["name"] = r.Name
|
tags["name"] = r.Name
|
||||||
|
@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
"messages_delivered": overview.MessageStats.Deliver,
|
"messages_delivered": overview.MessageStats.Deliver,
|
||||||
"messages_delivered_get": overview.MessageStats.DeliverGet,
|
"messages_delivered_get": overview.MessageStats.DeliverGet,
|
||||||
"messages_published": overview.MessageStats.Publish,
|
"messages_published": overview.MessageStats.Publish,
|
||||||
|
"clustering_listeners": clustering_listeners,
|
||||||
|
"amqp_listeners": amqp_listeners,
|
||||||
}
|
}
|
||||||
acc.AddFields("rabbitmq_overview", fields, tags)
|
acc.AddFields("rabbitmq_overview", fields, tags)
|
||||||
}
|
}
|
||||||
|
@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
tags := map[string]string{"url": r.URL}
|
tags := map[string]string{"url": r.URL}
|
||||||
tags["node"] = node.Name
|
tags["node"] = node.Name
|
||||||
|
|
||||||
|
var running int64 = 0
|
||||||
|
if node.Running {
|
||||||
|
running = 1
|
||||||
|
}
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"disk_free": node.DiskFree,
|
"disk_free": node.DiskFree,
|
||||||
"disk_free_limit": node.DiskFreeLimit,
|
"disk_free_limit": node.DiskFreeLimit,
|
||||||
|
@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
"run_queue": node.RunQueue,
|
"run_queue": node.RunQueue,
|
||||||
"sockets_total": node.SocketsTotal,
|
"sockets_total": node.SocketsTotal,
|
||||||
"sockets_used": node.SocketsUsed,
|
"sockets_used": node.SocketsUsed,
|
||||||
|
"running": running,
|
||||||
}
|
}
|
||||||
acc.AddFields("rabbitmq_node", fields, tags, now)
|
acc.AddFields("rabbitmq_node", fields, tags, now)
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,25 @@ const sampleOverviewResponse = `
|
||||||
"messages_unacknowledged_details": {
|
"messages_unacknowledged_details": {
|
||||||
"rate": 0.0
|
"rate": 0.0
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"listeners": [
|
||||||
|
{
|
||||||
|
"name": "rabbit@node-a",
|
||||||
|
"protocol": "amqp"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rabbit@node-b",
|
||||||
|
"protocol": "amqp"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rabbit@node-a",
|
||||||
|
"protocol": "clustering"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rabbit@node-b",
|
||||||
|
"protocol": "clustering"
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
|
|
||||||
|
@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
"consumers",
|
"consumers",
|
||||||
"exchanges",
|
"exchanges",
|
||||||
"queues",
|
"queues",
|
||||||
|
"clustering_listeners",
|
||||||
|
"amqp_listeners",
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range intMetrics {
|
for _, metric := range intMetrics {
|
||||||
|
@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
"run_queue",
|
"run_queue",
|
||||||
"sockets_total",
|
"sockets_total",
|
||||||
"sockets_used",
|
"sockets_used",
|
||||||
|
"running",
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range nodeIntMetrics {
|
for _, metric := range nodeIntMetrics {
|
||||||
|
|
Loading…
Reference in New Issue