Add per node memory stats to rabbitmq input (#6326)
This commit is contained in:
committed by
Daniel Nelson
parent
80f38ae352
commit
32de8bb459
@@ -182,6 +182,35 @@ type HealthCheck struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// MemoryResponse ...
|
||||
type MemoryResponse struct {
|
||||
Memory *Memory `json:"memory"`
|
||||
}
|
||||
|
||||
// Memory details
|
||||
type Memory struct {
|
||||
ConnectionReaders int64 `json:"connection_readers"`
|
||||
ConnectionWriters int64 `json:"connection_writers"`
|
||||
ConnectionChannels int64 `json:"connection_channels"`
|
||||
ConnectionOther int64 `json:"connection_other"`
|
||||
QueueProcs int64 `json:"queue_procs"`
|
||||
QueueSlaveProcs int64 `json:"queue_slave_procs"`
|
||||
Plugins int64 `json:"plugins"`
|
||||
OtherProc int64 `json:"other_proc"`
|
||||
Metrics int64 `json:"metrics"`
|
||||
MgmtDb int64 `json:"mgmt_db"`
|
||||
Mnesia int64 `json:"mnesia"`
|
||||
OtherEts int64 `json:"other_ets"`
|
||||
Binary int64 `json:"binary"`
|
||||
MsgIndex int64 `json:"msg_index"`
|
||||
Code int64 `json:"code"`
|
||||
Atom int64 `json:"atom"`
|
||||
OtherSystem int64 `json:"other_system"`
|
||||
AllocatedUnused int64 `json:"allocated_unused"`
|
||||
ReservedUnallocated int64 `json:"reserved_unallocated"`
|
||||
Total int64 `json:"total"`
|
||||
}
|
||||
|
||||
// gatherFunc ...
|
||||
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
|
||||
|
||||
@@ -391,43 +420,52 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
return
|
||||
}
|
||||
|
||||
type NodeHealthCheck struct {
|
||||
type NodeCheck struct {
|
||||
NodeName string
|
||||
HealthCheck HealthCheck
|
||||
Error error
|
||||
Memory *Memory
|
||||
}
|
||||
|
||||
healthChecksChannel := make(chan NodeHealthCheck, numberNodes)
|
||||
nodeChecksChannel := make(chan NodeCheck, numberNodes)
|
||||
|
||||
for _, node := range nodes {
|
||||
go func(nodeName string, healthChecksChannel chan NodeHealthCheck) {
|
||||
go func(nodeName string, healthChecksChannel chan NodeCheck) {
|
||||
var healthCheck HealthCheck
|
||||
var memoryresponse MemoryResponse
|
||||
|
||||
err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
|
||||
nodeHealthCheck := NodeHealthCheck{
|
||||
nodeCheck := NodeCheck{
|
||||
NodeName: nodeName,
|
||||
Error: err,
|
||||
HealthCheck: healthCheck,
|
||||
}
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
healthChecksChannel <- nodeHealthCheck
|
||||
}(node.Name, healthChecksChannel)
|
||||
err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse)
|
||||
nodeCheck.Memory = memoryresponse.Memory
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
nodeChecksChannel <- nodeCheck
|
||||
}(node.Name, nodeChecksChannel)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
nodeHealthCheck := <-healthChecksChannel
|
||||
nodeCheck := <-nodeChecksChannel
|
||||
|
||||
var healthCheckStatus int64 = 0
|
||||
|
||||
if nodeHealthCheck.Error != nil {
|
||||
acc.AddError(nodeHealthCheck.Error)
|
||||
} else if nodeHealthCheck.HealthCheck.Status == "ok" {
|
||||
if nodeCheck.HealthCheck.Status == "ok" {
|
||||
healthCheckStatus = 1
|
||||
}
|
||||
|
||||
node := nodes[nodeHealthCheck.NodeName]
|
||||
node := nodes[nodeCheck.NodeName]
|
||||
|
||||
tags := map[string]string{"url": r.URL}
|
||||
tags["node"] = node.Name
|
||||
@@ -466,6 +504,28 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
"running": boolToInt(node.Running),
|
||||
"health_check_status": healthCheckStatus,
|
||||
}
|
||||
if nodeCheck.Memory != nil {
|
||||
fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders
|
||||
fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters
|
||||
fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels
|
||||
fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther
|
||||
fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs
|
||||
fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs
|
||||
fields["mem_plugins"] = nodeCheck.Memory.Plugins
|
||||
fields["mem_other_proc"] = nodeCheck.Memory.OtherProc
|
||||
fields["mem_metrics"] = nodeCheck.Memory.Metrics
|
||||
fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb
|
||||
fields["mem_mnesia"] = nodeCheck.Memory.Mnesia
|
||||
fields["mem_other_ets"] = nodeCheck.Memory.OtherEts
|
||||
fields["mem_binary"] = nodeCheck.Memory.Binary
|
||||
fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex
|
||||
fields["mem_code"] = nodeCheck.Memory.Code
|
||||
fields["mem_atom"] = nodeCheck.Memory.Atom
|
||||
fields["mem_other_system"] = nodeCheck.Memory.OtherSystem
|
||||
fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused
|
||||
fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated
|
||||
fields["mem_total"] = nodeCheck.Memory.Total
|
||||
}
|
||||
acc.AddFields("rabbitmq_node", fields, tags, now)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user