Rabbitmq plugin: connection-related metrics. (#1908)
* Rabbitmq plugin: connection-related metrics. * Run go fmt.
This commit is contained in:
parent
7558081873
commit
dede3e70ad
|
@ -50,8 +50,9 @@ type RabbitMQ struct {
|
|||
ClientTimeout internal.Duration `toml:"client_timeout"`
|
||||
|
||||
// InsecureSkipVerify bool
|
||||
Nodes []string
|
||||
Queues []string
|
||||
Nodes []string
|
||||
Queues []string
|
||||
Connections []string
|
||||
|
||||
Client *http.Client
|
||||
}
|
||||
|
@ -135,10 +136,22 @@ type Node struct {
|
|||
SocketsUsed int64 `json:"sockets_used"`
|
||||
}
|
||||
|
||||
// Connection ...
|
||||
type Connection struct {
|
||||
Name string
|
||||
State string
|
||||
Vhost string
|
||||
Host string
|
||||
Node string
|
||||
ReceiveCount int64 `json:"recv_cnt"`
|
||||
SendCount int64 `json:"send_cnt"`
|
||||
SendPend int64 `json:"send_pend"`
|
||||
}
|
||||
|
||||
// gatherFunc ...
|
||||
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)
|
||||
|
||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherConnections}
|
||||
|
||||
var sampleConfig = `
|
||||
# url = "http://localhost:15672"
|
||||
|
@ -380,6 +393,42 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
|||
errChan <- nil
|
||||
}
|
||||
|
||||
func gatherConnections(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
|
||||
// Gather information about connections
|
||||
connections := make([]Connection, 0)
|
||||
err := r.requestJSON("/api/connections", &connections)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
for _, connection := range connections {
|
||||
if !r.shouldGatherConnection(connection) {
|
||||
continue
|
||||
}
|
||||
tags := map[string]string{
|
||||
"url": r.URL,
|
||||
"connection": connection.Name,
|
||||
"vhost": connection.Vhost,
|
||||
"host": connection.Host,
|
||||
"node": connection.Node,
|
||||
}
|
||||
|
||||
acc.AddFields(
|
||||
"rabbitmq_connection",
|
||||
map[string]interface{}{
|
||||
"recv_cnt": connection.ReceiveCount,
|
||||
"send_cnt": connection.SendCount,
|
||||
"send_pend": connection.SendPend,
|
||||
"state": connection.State,
|
||||
},
|
||||
tags,
|
||||
)
|
||||
}
|
||||
|
||||
errChan <- nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
||||
if len(r.Nodes) == 0 {
|
||||
return true
|
||||
|
@ -408,6 +457,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherConnection(connection Connection) bool {
|
||||
if len(r.Connections) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, name := range r.Connections {
|
||||
if name == connection.Name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("rabbitmq", func() telegraf.Input {
|
||||
return &RabbitMQ{
|
||||
|
|
|
@ -374,6 +374,57 @@ const sampleQueuesResponse = `
|
|||
]
|
||||
`
|
||||
|
||||
const sampleConnectionsResponse = `
|
||||
[
|
||||
{
|
||||
"recv_oct": 166055,
|
||||
"recv_oct_details": {
|
||||
"rate": 0
|
||||
},
|
||||
"send_oct": 589,
|
||||
"send_oct_details": {
|
||||
"rate": 0
|
||||
},
|
||||
"recv_cnt": 124,
|
||||
"send_cnt": 7,
|
||||
"send_pend": 0,
|
||||
"state": "running",
|
||||
"channels": 1,
|
||||
"type": "network",
|
||||
"node": "rabbit@ip-10-0-12-133",
|
||||
"name": "10.0.10.8:32774 -> 10.0.12.131:5672",
|
||||
"port": 5672,
|
||||
"peer_port": 32774,
|
||||
"host": "10.0.12.131",
|
||||
"peer_host": "10.0.10.8",
|
||||
"ssl": false,
|
||||
"peer_cert_subject": null,
|
||||
"peer_cert_issuer": null,
|
||||
"peer_cert_validity": null,
|
||||
"auth_mechanism": "AMQPLAIN",
|
||||
"ssl_protocol": null,
|
||||
"ssl_key_exchange": null,
|
||||
"ssl_cipher": null,
|
||||
"ssl_hash": null,
|
||||
"protocol": "AMQP 0-9-1",
|
||||
"user": "workers",
|
||||
"vhost": "main",
|
||||
"timeout": 0,
|
||||
"frame_max": 131072,
|
||||
"channel_max": 65535,
|
||||
"client_properties": {
|
||||
"product": "py-amqp",
|
||||
"product_version": "1.4.7",
|
||||
"capabilities": {
|
||||
"connection.blocked": true,
|
||||
"consumer_cancel_notify": true
|
||||
}
|
||||
},
|
||||
"connected_at": 1476647837266
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var rsp string
|
||||
|
@ -385,6 +436,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
|||
rsp = sampleNodesResponse
|
||||
case "/api/queues":
|
||||
rsp = sampleQueuesResponse
|
||||
case "/api/connections":
|
||||
rsp = sampleConnectionsResponse
|
||||
default:
|
||||
panic("Cannot handle request")
|
||||
}
|
||||
|
@ -441,4 +494,15 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
|||
}
|
||||
|
||||
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
|
||||
|
||||
assert.True(t, acc.HasMeasurement("rabbitmq_connection"))
|
||||
|
||||
connection_fields := map[string]interface{}{
|
||||
"recv_cnt": int64(124),
|
||||
"send_cnt": int64(7),
|
||||
"send_pend": int64(0),
|
||||
"state": "running",
|
||||
}
|
||||
|
||||
acc.AssertContainsFields(t, "rabbitmq_connection", connection_fields)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue