Rabbitmq plugin: connection-related metrics. (#1908)

* Rabbitmq plugin: connection-related metrics.

* Run go fmt.
This commit is contained in:
Kishore Nallan 2016-12-13 19:47:20 +05:30 committed by Cameron Sparr
parent a61148904d
commit af850b8854
2 changed files with 130 additions and 3 deletions

View File

@ -52,6 +52,7 @@ type RabbitMQ struct {
// InsecureSkipVerify bool // InsecureSkipVerify bool
Nodes []string Nodes []string
Queues []string Queues []string
Connections []string
Client *http.Client Client *http.Client
} }
@ -135,10 +136,22 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"` SocketsUsed int64 `json:"sockets_used"`
} }
// Connection ...
type Connection struct {
Name string
State string
Vhost string
Host string
Node string
ReceiveCount int64 `json:"recv_cnt"`
SendCount int64 `json:"send_cnt"`
SendPend int64 `json:"send_pend"`
}
// gatherFunc ... // gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherConnections}
var sampleConfig = ` var sampleConfig = `
# url = "http://localhost:15672" # url = "http://localhost:15672"
@ -380,6 +393,42 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
errChan <- nil errChan <- nil
} }
func gatherConnections(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
// Gather information about connections
connections := make([]Connection, 0)
err := r.requestJSON("/api/connections", &connections)
if err != nil {
errChan <- err
return
}
for _, connection := range connections {
if !r.shouldGatherConnection(connection) {
continue
}
tags := map[string]string{
"url": r.URL,
"connection": connection.Name,
"vhost": connection.Vhost,
"host": connection.Host,
"node": connection.Node,
}
acc.AddFields(
"rabbitmq_connection",
map[string]interface{}{
"recv_cnt": connection.ReceiveCount,
"send_cnt": connection.SendCount,
"send_pend": connection.SendPend,
"state": connection.State,
},
tags,
)
}
errChan <- nil
}
func (r *RabbitMQ) shouldGatherNode(node Node) bool { func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 { if len(r.Nodes) == 0 {
return true return true
@ -408,6 +457,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
return false return false
} }
func (r *RabbitMQ) shouldGatherConnection(connection Connection) bool {
if len(r.Connections) == 0 {
return true
}
for _, name := range r.Connections {
if name == connection.Name {
return true
}
}
return false
}
func init() { func init() {
inputs.Add("rabbitmq", func() telegraf.Input { inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{ return &RabbitMQ{

View File

@ -374,6 +374,57 @@ const sampleQueuesResponse = `
] ]
` `
const sampleConnectionsResponse = `
[
{
"recv_oct": 166055,
"recv_oct_details": {
"rate": 0
},
"send_oct": 589,
"send_oct_details": {
"rate": 0
},
"recv_cnt": 124,
"send_cnt": 7,
"send_pend": 0,
"state": "running",
"channels": 1,
"type": "network",
"node": "rabbit@ip-10-0-12-133",
"name": "10.0.10.8:32774 -> 10.0.12.131:5672",
"port": 5672,
"peer_port": 32774,
"host": "10.0.12.131",
"peer_host": "10.0.10.8",
"ssl": false,
"peer_cert_subject": null,
"peer_cert_issuer": null,
"peer_cert_validity": null,
"auth_mechanism": "AMQPLAIN",
"ssl_protocol": null,
"ssl_key_exchange": null,
"ssl_cipher": null,
"ssl_hash": null,
"protocol": "AMQP 0-9-1",
"user": "workers",
"vhost": "main",
"timeout": 0,
"frame_max": 131072,
"channel_max": 65535,
"client_properties": {
"product": "py-amqp",
"product_version": "1.4.7",
"capabilities": {
"connection.blocked": true,
"consumer_cancel_notify": true
}
},
"connected_at": 1476647837266
}
]
`
func TestRabbitMQGeneratesMetrics(t *testing.T) { func TestRabbitMQGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string var rsp string
@ -385,6 +436,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
rsp = sampleNodesResponse rsp = sampleNodesResponse
case "/api/queues": case "/api/queues":
rsp = sampleQueuesResponse rsp = sampleQueuesResponse
case "/api/connections":
rsp = sampleConnectionsResponse
default: default:
panic("Cannot handle request") panic("Cannot handle request")
} }
@ -441,4 +494,15 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
} }
assert.True(t, acc.HasMeasurement("rabbitmq_queue")) assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
assert.True(t, acc.HasMeasurement("rabbitmq_connection"))
connection_fields := map[string]interface{}{
"recv_cnt": int64(124),
"send_cnt": int64(7),
"send_pend": int64(0),
"state": "running",
}
acc.AssertContainsFields(t, "rabbitmq_connection", connection_fields)
} }