From 0cf94cfe54b7150eda58877d96cf6145f09ab6c1 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Fri, 3 Jan 2020 11:38:20 -0800 Subject: [PATCH] Report rabbitmq_node measurement and return on gather error (#6819) --- plugins/inputs/rabbitmq/rabbitmq.go | 184 ++++++++++++---------------- 1 file changed, 81 insertions(+), 103 deletions(-) diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 199b24922..d27c522bf 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -448,134 +448,112 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { } func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { - allNodes := make([]Node, 0) - // Gather information about nodes + allNodes := make([]*Node, 0) + err := r.requestJSON("/api/nodes", &allNodes) if err != nil { acc.AddError(err) return } - nodes := make(map[string]Node) + nodes := allNodes[:0] for _, node := range allNodes { if r.shouldGatherNode(node) { - nodes[node.Name] = node + nodes = append(nodes, node) } } - numberNodes := len(nodes) - if numberNodes == 0 { - return - } - - type NodeCheck struct { - NodeName string - HealthCheck HealthCheck - Memory *Memory - } - - nodeChecksChannel := make(chan NodeCheck, numberNodes) - + var wg sync.WaitGroup for _, node := range nodes { - go func(nodeName string, healthChecksChannel chan NodeCheck) { - var healthCheck HealthCheck - var memoryresponse MemoryResponse + wg.Add(1) + go func(node *Node) { + defer wg.Done() - err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck) - nodeCheck := NodeCheck{ - NodeName: nodeName, - HealthCheck: healthCheck, + tags := map[string]string{"url": r.URL} + tags["node"] = node.Name + + fields := map[string]interface{}{ + "disk_free": node.DiskFree, + "disk_free_limit": node.DiskFreeLimit, + "disk_free_alarm": boolToInt(node.DiskFreeAlarm), + "fd_total": node.FdTotal, + "fd_used": node.FdUsed, + "mem_limit": node.MemLimit, + "mem_used": node.MemUsed, + "mem_alarm": boolToInt(node.MemAlarm), + "proc_total": node.ProcTotal, + "proc_used": node.ProcUsed, + "run_queue": node.RunQueue, + "sockets_total": node.SocketsTotal, + "sockets_used": node.SocketsUsed, + "uptime": node.Uptime, + "mnesia_disk_tx_count": node.MnesiaDiskTxCount, + "mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate, + "mnesia_ram_tx_count": node.MnesiaRamTxCount, + "mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate, + "gc_num": node.GcNum, + "gc_num_rate": node.GcNumDetails.Rate, + "gc_bytes_reclaimed": node.GcBytesReclaimed, + "gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate, + "io_read_avg_time": node.IoReadAvgTime, + "io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate, + "io_read_bytes": node.IoReadBytes, + "io_read_bytes_rate": node.IoReadBytesDetails.Rate, + "io_write_avg_time": node.IoWriteAvgTime, + "io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate, + "io_write_bytes": node.IoWriteBytes, + "io_write_bytes_rate": node.IoWriteBytesDetails.Rate, + "running": boolToInt(node.Running), } + + var health HealthCheck + err := r.requestJSON("/api/healthchecks/node/"+node.Name, &health) if err != nil { acc.AddError(err) return } - err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse) - nodeCheck.Memory = memoryresponse.Memory + if health.Status == "ok" { + fields["health_check_status"] = int64(1) + } else { + fields["health_check_status"] = int64(0) + } + + var memory MemoryResponse + err = r.requestJSON("/api/nodes/"+node.Name+"/memory", &memory) if err != nil { acc.AddError(err) return } - nodeChecksChannel <- nodeCheck - }(node.Name, nodeChecksChannel) + if memory.Memory != nil { + fields["mem_connection_readers"] = memory.Memory.ConnectionReaders + fields["mem_connection_writers"] = memory.Memory.ConnectionWriters + fields["mem_connection_channels"] = memory.Memory.ConnectionChannels + fields["mem_connection_other"] = memory.Memory.ConnectionOther + fields["mem_queue_procs"] = memory.Memory.QueueProcs + fields["mem_queue_slave_procs"] = memory.Memory.QueueSlaveProcs + fields["mem_plugins"] = memory.Memory.Plugins + fields["mem_other_proc"] = memory.Memory.OtherProc + fields["mem_metrics"] = memory.Memory.Metrics + fields["mem_mgmt_db"] = memory.Memory.MgmtDb + fields["mem_mnesia"] = memory.Memory.Mnesia + fields["mem_other_ets"] = memory.Memory.OtherEts + fields["mem_binary"] = memory.Memory.Binary + fields["mem_msg_index"] = memory.Memory.MsgIndex + fields["mem_code"] = memory.Memory.Code + fields["mem_atom"] = memory.Memory.Atom + fields["mem_other_system"] = memory.Memory.OtherSystem + fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused + fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated + fields["mem_total"] = memory.Memory.Total + } + + acc.AddFields("rabbitmq_node", fields, tags) + }(node) } - now := time.Now() - - for i := 0; i < len(nodes); i++ { - nodeCheck := <-nodeChecksChannel - - var healthCheckStatus int64 = 0 - - if nodeCheck.HealthCheck.Status == "ok" { - healthCheckStatus = 1 - } - - node := nodes[nodeCheck.NodeName] - - tags := map[string]string{"url": r.URL} - tags["node"] = node.Name - - fields := map[string]interface{}{ - "disk_free": node.DiskFree, - "disk_free_limit": node.DiskFreeLimit, - "disk_free_alarm": boolToInt(node.DiskFreeAlarm), - "fd_total": node.FdTotal, - "fd_used": node.FdUsed, - "mem_limit": node.MemLimit, - "mem_used": node.MemUsed, - "mem_alarm": boolToInt(node.MemAlarm), - "proc_total": node.ProcTotal, - "proc_used": node.ProcUsed, - "run_queue": node.RunQueue, - "sockets_total": node.SocketsTotal, - "sockets_used": node.SocketsUsed, - "uptime": node.Uptime, - "mnesia_disk_tx_count": node.MnesiaDiskTxCount, - "mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate, - "mnesia_ram_tx_count": node.MnesiaRamTxCount, - "mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate, - "gc_num": node.GcNum, - "gc_num_rate": node.GcNumDetails.Rate, - "gc_bytes_reclaimed": node.GcBytesReclaimed, - "gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate, - "io_read_avg_time": node.IoReadAvgTime, - "io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate, - "io_read_bytes": node.IoReadBytes, - "io_read_bytes_rate": node.IoReadBytesDetails.Rate, - "io_write_avg_time": node.IoWriteAvgTime, - "io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate, - "io_write_bytes": node.IoWriteBytes, - "io_write_bytes_rate": node.IoWriteBytesDetails.Rate, - "running": boolToInt(node.Running), - "health_check_status": healthCheckStatus, - } - if nodeCheck.Memory != nil { - fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders - fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters - fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels - fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther - fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs - fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs - fields["mem_plugins"] = nodeCheck.Memory.Plugins - fields["mem_other_proc"] = nodeCheck.Memory.OtherProc - fields["mem_metrics"] = nodeCheck.Memory.Metrics - fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb - fields["mem_mnesia"] = nodeCheck.Memory.Mnesia - fields["mem_other_ets"] = nodeCheck.Memory.OtherEts - fields["mem_binary"] = nodeCheck.Memory.Binary - fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex - fields["mem_code"] = nodeCheck.Memory.Code - fields["mem_atom"] = nodeCheck.Memory.Atom - fields["mem_other_system"] = nodeCheck.Memory.OtherSystem - fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused - fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated - fields["mem_total"] = nodeCheck.Memory.Total - } - acc.AddFields("rabbitmq_node", fields, tags, now) - } + wg.Wait() } func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { @@ -718,7 +696,7 @@ func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) { } } -func (r *RabbitMQ) shouldGatherNode(node Node) bool { +func (r *RabbitMQ) shouldGatherNode(node *Node) bool { if len(r.Nodes) == 0 { return true }