Report rabbitmq_node measurement and return on gather error (#6819)

This commit is contained in:
Daniel Nelson 2020-01-03 11:38:20 -08:00 committed by GitHub
parent 7e498ede6d
commit 0cf94cfe54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 81 additions and 103 deletions

View File

@ -448,134 +448,112 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
}
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
allNodes := make([]Node, 0)
// Gather information about nodes
allNodes := make([]*Node, 0)
err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}
nodes := make(map[string]Node)
nodes := allNodes[:0]
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
nodes = append(nodes, node)
}
}
numberNodes := len(nodes)
if numberNodes == 0 {
return
}
type NodeCheck struct {
NodeName string
HealthCheck HealthCheck
Memory *Memory
}
nodeChecksChannel := make(chan NodeCheck, numberNodes)
var wg sync.WaitGroup
for _, node := range nodes {
go func(nodeName string, healthChecksChannel chan NodeCheck) {
var healthCheck HealthCheck
var memoryresponse MemoryResponse
wg.Add(1)
go func(node *Node) {
defer wg.Done()
err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeCheck := NodeCheck{
NodeName: nodeName,
HealthCheck: healthCheck,
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name
fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
}
var health HealthCheck
err := r.requestJSON("/api/healthchecks/node/"+node.Name, &health)
if err != nil {
acc.AddError(err)
return
}
err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse)
nodeCheck.Memory = memoryresponse.Memory
if health.Status == "ok" {
fields["health_check_status"] = int64(1)
} else {
fields["health_check_status"] = int64(0)
}
var memory MemoryResponse
err = r.requestJSON("/api/nodes/"+node.Name+"/memory", &memory)
if err != nil {
acc.AddError(err)
return
}
nodeChecksChannel <- nodeCheck
}(node.Name, nodeChecksChannel)
if memory.Memory != nil {
fields["mem_connection_readers"] = memory.Memory.ConnectionReaders
fields["mem_connection_writers"] = memory.Memory.ConnectionWriters
fields["mem_connection_channels"] = memory.Memory.ConnectionChannels
fields["mem_connection_other"] = memory.Memory.ConnectionOther
fields["mem_queue_procs"] = memory.Memory.QueueProcs
fields["mem_queue_slave_procs"] = memory.Memory.QueueSlaveProcs
fields["mem_plugins"] = memory.Memory.Plugins
fields["mem_other_proc"] = memory.Memory.OtherProc
fields["mem_metrics"] = memory.Memory.Metrics
fields["mem_mgmt_db"] = memory.Memory.MgmtDb
fields["mem_mnesia"] = memory.Memory.Mnesia
fields["mem_other_ets"] = memory.Memory.OtherEts
fields["mem_binary"] = memory.Memory.Binary
fields["mem_msg_index"] = memory.Memory.MsgIndex
fields["mem_code"] = memory.Memory.Code
fields["mem_atom"] = memory.Memory.Atom
fields["mem_other_system"] = memory.Memory.OtherSystem
fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated
fields["mem_total"] = memory.Memory.Total
}
acc.AddFields("rabbitmq_node", fields, tags)
}(node)
}
now := time.Now()
for i := 0; i < len(nodes); i++ {
nodeCheck := <-nodeChecksChannel
var healthCheckStatus int64 = 0
if nodeCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}
node := nodes[nodeCheck.NodeName]
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name
fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
if nodeCheck.Memory != nil {
fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders
fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters
fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels
fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther
fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs
fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs
fields["mem_plugins"] = nodeCheck.Memory.Plugins
fields["mem_other_proc"] = nodeCheck.Memory.OtherProc
fields["mem_metrics"] = nodeCheck.Memory.Metrics
fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb
fields["mem_mnesia"] = nodeCheck.Memory.Mnesia
fields["mem_other_ets"] = nodeCheck.Memory.OtherEts
fields["mem_binary"] = nodeCheck.Memory.Binary
fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex
fields["mem_code"] = nodeCheck.Memory.Code
fields["mem_atom"] = nodeCheck.Memory.Atom
fields["mem_other_system"] = nodeCheck.Memory.OtherSystem
fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated
fields["mem_total"] = nodeCheck.Memory.Total
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
wg.Wait()
}
func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
@ -718,7 +696,7 @@ func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
}
}
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
func (r *RabbitMQ) shouldGatherNode(node *Node) bool {
if len(r.Nodes) == 0 {
return true
}