diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index 5d500afd1..d52a760f2 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -107,6 +107,26 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management - io_write_avg_time_rate (float, milliseconds per second) - io_write_bytes (int, bytes) - io_write_bytes_rate (float, bytes per second) + - mem_connection_readers (int, bytes) + - mem_connection_writers (int, bytes) + - mem_connection_channels (int, bytes) + - mem_connection_other (int, bytes) + - mem_queue_procs (int, bytes) + - mem_queue_slave_procs (int, bytes) + - mem_plugins (int, bytes) + - mem_other_proc (int, bytes) + - mem_metrics (int, bytes) + - mem_mgmt_db (int, bytes) + - mem_mnesia (int, bytes) + - mem_other_ets (int, bytes) + - mem_binary (int, bytes) + - mem_msg_index (int, bytes) + - mem_code (int, bytes) + - mem_atom (int, bytes) + - mem_other_system (int, bytes) + - mem_allocated_unused (int, bytes) + - mem_reserved_unallocated (int, bytes) + - mem_total (int, bytes) - rabbitmq_queue - consumer_utilisation (float, percent) diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 4e7e918da..168a340b0 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -182,6 +182,35 @@ type HealthCheck struct { Status string `json:"status"` } +// MemoryResponse ... +type MemoryResponse struct { + Memory *Memory `json:"memory"` +} + +// Memory details +type Memory struct { + ConnectionReaders int64 `json:"connection_readers"` + ConnectionWriters int64 `json:"connection_writers"` + ConnectionChannels int64 `json:"connection_channels"` + ConnectionOther int64 `json:"connection_other"` + QueueProcs int64 `json:"queue_procs"` + QueueSlaveProcs int64 `json:"queue_slave_procs"` + Plugins int64 `json:"plugins"` + OtherProc int64 `json:"other_proc"` + Metrics int64 `json:"metrics"` + MgmtDb int64 `json:"mgmt_db"` + Mnesia int64 `json:"mnesia"` + OtherEts int64 `json:"other_ets"` + Binary int64 `json:"binary"` + MsgIndex int64 `json:"msg_index"` + Code int64 `json:"code"` + Atom int64 `json:"atom"` + OtherSystem int64 `json:"other_system"` + AllocatedUnused int64 `json:"allocated_unused"` + ReservedUnallocated int64 `json:"reserved_unallocated"` + Total int64 `json:"total"` +} + // gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator) @@ -391,43 +420,52 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { return } - type NodeHealthCheck struct { + type NodeCheck struct { NodeName string HealthCheck HealthCheck - Error error + Memory *Memory } - healthChecksChannel := make(chan NodeHealthCheck, numberNodes) + nodeChecksChannel := make(chan NodeCheck, numberNodes) for _, node := range nodes { - go func(nodeName string, healthChecksChannel chan NodeHealthCheck) { + go func(nodeName string, healthChecksChannel chan NodeCheck) { var healthCheck HealthCheck + var memoryresponse MemoryResponse err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck) - nodeHealthCheck := NodeHealthCheck{ + nodeCheck := NodeCheck{ NodeName: nodeName, - Error: err, HealthCheck: healthCheck, } + if err != nil { + acc.AddError(err) + return + } - healthChecksChannel <- nodeHealthCheck - }(node.Name, healthChecksChannel) + err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse) + nodeCheck.Memory = memoryresponse.Memory + if err != nil { + acc.AddError(err) + return + } + + nodeChecksChannel <- nodeCheck + }(node.Name, nodeChecksChannel) } now := time.Now() for i := 0; i < len(nodes); i++ { - nodeHealthCheck := <-healthChecksChannel + nodeCheck := <-nodeChecksChannel var healthCheckStatus int64 = 0 - if nodeHealthCheck.Error != nil { - acc.AddError(nodeHealthCheck.Error) - } else if nodeHealthCheck.HealthCheck.Status == "ok" { + if nodeCheck.HealthCheck.Status == "ok" { healthCheckStatus = 1 } - node := nodes[nodeHealthCheck.NodeName] + node := nodes[nodeCheck.NodeName] tags := map[string]string{"url": r.URL} tags["node"] = node.Name @@ -466,6 +504,28 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { "running": boolToInt(node.Running), "health_check_status": healthCheckStatus, } + if nodeCheck.Memory != nil { + fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders + fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters + fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels + fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther + fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs + fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs + fields["mem_plugins"] = nodeCheck.Memory.Plugins + fields["mem_other_proc"] = nodeCheck.Memory.OtherProc + fields["mem_metrics"] = nodeCheck.Memory.Metrics + fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb + fields["mem_mnesia"] = nodeCheck.Memory.Mnesia + fields["mem_other_ets"] = nodeCheck.Memory.OtherEts + fields["mem_binary"] = nodeCheck.Memory.Binary + fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex + fields["mem_code"] = nodeCheck.Memory.Code + fields["mem_atom"] = nodeCheck.Memory.Atom + fields["mem_other_system"] = nodeCheck.Memory.OtherSystem + fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused + fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated + fields["mem_total"] = nodeCheck.Memory.Total + } acc.AddFields("rabbitmq_node", fields, tags, now) } } diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 0f98f95ce..9d35718d9 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -6,10 +6,11 @@ import ( "net/http/httptest" "testing" + "io/ioutil" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "io/ioutil" ) func TestRabbitMQGeneratesMetrics(t *testing.T) { @@ -27,6 +28,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { jsonFilePath = "testdata/exchanges.json" case "/api/healthchecks/node/rabbit@vagrant-ubuntu-trusty-64": jsonFilePath = "testdata/healthchecks.json" + case "/api/nodes/rabbit@vagrant-ubuntu-trusty-64/memory": + jsonFilePath = "testdata/memory.json" default: panic("Cannot handle request") } @@ -129,6 +132,26 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { "io_write_avg_time_rate": 4.32, "io_write_bytes": 823, "io_write_bytes_rate": 32.8, + "mem_connection_readers": 1234, + "mem_connection_writers": 5678, + "mem_connection_channels": 1133, + "mem_connection_other": 2840, + "mem_queue_procs": 2840, + "mem_queue_slave_procs": 0, + "mem_plugins": 1755976, + "mem_other_proc": 23056584, + "mem_metrics": 196536, + "mem_mgmt_db": 491272, + "mem_mnesia": 115600, + "mem_other_ets": 2121872, + "mem_binary": 418848, + "mem_msg_index": 42848, + "mem_code": 25179322, + "mem_atom": 1041593, + "mem_other_system": 14741981, + "mem_allocated_unused": 38208528, + "mem_reserved_unallocated": 0, + "mem_total": 83025920, } compareMetrics(t, nodeMetrics, acc, "rabbitmq_node") diff --git a/plugins/inputs/rabbitmq/testdata/memory.json b/plugins/inputs/rabbitmq/testdata/memory.json new file mode 100644 index 000000000..da252eb61 --- /dev/null +++ b/plugins/inputs/rabbitmq/testdata/memory.json @@ -0,0 +1,24 @@ +{ + "memory": { + "connection_readers": 1234, + "connection_writers": 5678, + "connection_channels": 1133, + "connection_other": 2840, + "queue_procs": 2840, + "queue_slave_procs": 0, + "plugins": 1755976, + "other_proc": 23056584, + "metrics": 196536, + "mgmt_db": 491272, + "mnesia": 115600, + "other_ets": 2121872, + "binary": 418848, + "msg_index": 42848, + "code": 25179322, + "atom": 1041593, + "other_system": 14741981, + "allocated_unused": 38208528, + "reserved_unallocated": 0, + "total": 83025920 + } +} \ No newline at end of file