Improvement of RabbitMQ plugin #3025 #3252

* new metrics:
  * unroutable messages
  * node uptime
  * gc metrics
  * mnesia metrics
  * node healthcheck
  * IO metrics
* refactoring tests:
  * moved the json examples to a separate files
  * check metric values

Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
Vitalii Solodilov
2018-06-19 11:19:23 +04:00
parent b66eb2fec7
commit 6c4032071f
8 changed files with 579 additions and 580 deletions

View File

@@ -72,23 +72,27 @@ type Listeners struct {
// Details ...
type Details struct {
Rate float64
Rate float64 `json:"rate"`
}
// MessageStats ...
type MessageStats struct {
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
Ack int64
AckDetails Details `json:"ack_details"`
Deliver int64
DeliverDetails Details `json:"deliver_details"`
DeliverGet int64 `json:"deliver_get"`
DeliverGetDetails Details `json:"deliver_get_details"`
Publish int64
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishInDetails Details `json:"publish_in_details"`
PublishOut int64 `json:"publish_out"`
PublishOutDetails Details `json:"publish_out_details"`
ReturnUnroutable int64 `json:"return_unroutable"`
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
}
// ObjectTotals ...
@@ -131,18 +135,37 @@ type Queue struct {
type Node struct {
Name string
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
DiskFree int64 `json:"disk_free"`
DiskFreeLimit int64 `json:"disk_free_limit"`
DiskFreeAlarm bool `json:"disk_free_alarm"`
FdTotal int64 `json:"fd_total"`
FdUsed int64 `json:"fd_used"`
MemLimit int64 `json:"mem_limit"`
MemUsed int64 `json:"mem_used"`
MemAlarm bool `json:"mem_alarm"`
ProcTotal int64 `json:"proc_total"`
ProcUsed int64 `json:"proc_used"`
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
Uptime int64 `json:"uptime"`
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
MnesiaRamTxCount int64 `json:"mnesia_ram_tx_count"`
MnesiaRamTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
GcNum int64 `json:"gc_num"`
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
}
type Exchange struct {
@@ -155,6 +178,10 @@ type Exchange struct {
AutoDelete bool `json:"auto_delete"`
}
type HealthCheck struct {
Status string `json:"status"`
}
// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
@@ -204,6 +231,13 @@ var sampleConfig = `
queue_name_exclude = []
`
func boolToInt(b bool) int64 {
if b {
return 1
}
return 0
}
// SampleConfig ...
func (r *RabbitMQ) SampleConfig() string {
return sampleConfig
@@ -302,12 +336,12 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}
var clustering_listeners, amqp_listeners int64 = 0, 0
var clusteringListeners, amqpListeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
clusteringListeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
amqpListeners++
}
}
@@ -328,48 +362,109 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
"clustering_listeners": clusteringListeners,
"amqp_listeners": amqpListeners,
"return_unroutable": overview.MessageStats.ReturnUnroutable,
"return_unroutable_rate": overview.MessageStats.ReturnUnroutableDetails.Rate,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
nodes := make([]Node, 0)
allNodes := make([]Node, 0)
// Gather information about nodes
err := r.requestJSON("/api/nodes", &nodes)
err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}
now := time.Now()
nodes := make(map[string]Node)
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
}
}
numberNodes := len(nodes)
if numberNodes == 0 {
return
}
type NodeHealthCheck struct {
NodeName string
HealthCheck HealthCheck
Error error
}
healthChecksChannel := make(chan NodeHealthCheck, numberNodes)
for _, node := range nodes {
if !r.shouldGatherNode(node) {
continue
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
var healthCheck HealthCheck
err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeHealthCheck := NodeHealthCheck{
NodeName: nodeName,
Error: err,
HealthCheck: healthCheck,
}
healthChecksChannel <- nodeHealthCheck
}(node.Name, healthChecksChannel)
}
now := time.Now()
for i := 0; i < len(nodes); i++ {
nodeHealthCheck := <-healthChecksChannel
var healthCheckStatus int64 = 0
if nodeHealthCheck.Error != nil {
acc.AddError(nodeHealthCheck.Error)
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}
node := nodes[nodeHealthCheck.NodeName]
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name
var running int64 = 0
if node.Running {
running = 1
}
fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
@@ -459,8 +554,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
"messages_publish_out": exchange.MessageStats.PublishOut,
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
},
tags,
)
@@ -487,11 +584,11 @@ func (r *RabbitMQ) createQueueFilter() error {
r.QueueInclude = append(r.QueueInclude, r.Queues...)
}
filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
queueFilter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
if err != nil {
return err
}
r.queueFilter = filter
r.queueFilter = queueFilter
for _, q := range r.QueueExclude {
if q == "*" {