Add support for exchanges to RabbitMQ input (#3619)
This commit is contained in:
parent
07cb749e04
commit
87f1d45ee0
|
@ -40,6 +40,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
||||||
## A list of queues to gather as the rabbitmq_queue measurement. If not
|
## A list of queues to gather as the rabbitmq_queue measurement. If not
|
||||||
## specified, metrics for all queues are gathered.
|
## specified, metrics for all queues are gathered.
|
||||||
# queues = ["telegraf"]
|
# queues = ["telegraf"]
|
||||||
|
|
||||||
|
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
||||||
|
## specified, metrics for all exchanges are gathered.
|
||||||
|
# exchanges = ["telegraf"]
|
||||||
```
|
```
|
||||||
|
|
||||||
### Measurements & Fields:
|
### Measurements & Fields:
|
||||||
|
@ -95,6 +99,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
||||||
- messages_redeliver_rate (float, messages per second)
|
- messages_redeliver_rate (float, messages per second)
|
||||||
- messages_unack (integer, count)
|
- messages_unack (integer, count)
|
||||||
|
|
||||||
|
- rabbitmq_exchange
|
||||||
|
- messages_publish_in (int, count)
|
||||||
|
- messages_publish_out (int, count)
|
||||||
|
|
||||||
### Tags:
|
### Tags:
|
||||||
|
|
||||||
- All measurements have the following tags:
|
- All measurements have the following tags:
|
||||||
|
@ -114,6 +122,15 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
||||||
- durable
|
- durable
|
||||||
- auto_delete
|
- auto_delete
|
||||||
|
|
||||||
|
- rabbitmq_exchange
|
||||||
|
- url
|
||||||
|
- exchange
|
||||||
|
- type
|
||||||
|
- vhost
|
||||||
|
- internal
|
||||||
|
- durable
|
||||||
|
- auto_delete
|
||||||
|
|
||||||
### Sample Queries:
|
### Sample Queries:
|
||||||
|
|
||||||
Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:
|
Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:
|
||||||
|
@ -129,4 +146,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
|
||||||
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
|
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
|
||||||
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
|
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
|
||||||
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
|
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
|
||||||
|
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
|
||||||
```
|
```
|
||||||
|
|
|
@ -48,8 +48,9 @@ type RabbitMQ struct {
|
||||||
ResponseHeaderTimeout internal.Duration `toml:"header_timeout"`
|
ResponseHeaderTimeout internal.Duration `toml:"header_timeout"`
|
||||||
ClientTimeout internal.Duration `toml:"client_timeout"`
|
ClientTimeout internal.Duration `toml:"client_timeout"`
|
||||||
|
|
||||||
Nodes []string
|
Nodes []string
|
||||||
Queues []string
|
Queues []string
|
||||||
|
Exchanges []string
|
||||||
|
|
||||||
Client *http.Client
|
Client *http.Client
|
||||||
}
|
}
|
||||||
|
@ -78,6 +79,8 @@ type MessageStats struct {
|
||||||
PublishDetails Details `json:"publish_details"`
|
PublishDetails Details `json:"publish_details"`
|
||||||
Redeliver int64
|
Redeliver int64
|
||||||
RedeliverDetails Details `json:"redeliver_details"`
|
RedeliverDetails Details `json:"redeliver_details"`
|
||||||
|
PublishIn int64 `json:"publish_in"`
|
||||||
|
PublishOut int64 `json:"publish_out"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectTotals ...
|
// ObjectTotals ...
|
||||||
|
@ -133,10 +136,20 @@ type Node struct {
|
||||||
SocketsUsed int64 `json:"sockets_used"`
|
SocketsUsed int64 `json:"sockets_used"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Exchange struct {
|
||||||
|
Name string
|
||||||
|
MessageStats `json:"message_stats"`
|
||||||
|
Type string
|
||||||
|
Internal bool
|
||||||
|
Vhost string
|
||||||
|
Durable bool
|
||||||
|
AutoDelete bool `json:"auto_delete"`
|
||||||
|
}
|
||||||
|
|
||||||
// gatherFunc ...
|
// gatherFunc ...
|
||||||
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
|
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)
|
||||||
|
|
||||||
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
|
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
## Management Plugin url. (default: http://localhost:15672)
|
## Management Plugin url. (default: http://localhost:15672)
|
||||||
|
@ -171,6 +184,10 @@ var sampleConfig = `
|
||||||
## A list of queues to gather as the rabbitmq_queue measurement. If not
|
## A list of queues to gather as the rabbitmq_queue measurement. If not
|
||||||
## specified, metrics for all queues are gathered.
|
## specified, metrics for all queues are gathered.
|
||||||
# queues = ["telegraf"]
|
# queues = ["telegraf"]
|
||||||
|
|
||||||
|
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
||||||
|
## specified, metrics for all exchanges are gathered.
|
||||||
|
# exchanges = ["telegraf"]
|
||||||
`
|
`
|
||||||
|
|
||||||
// SampleConfig ...
|
// SampleConfig ...
|
||||||
|
@ -374,6 +391,40 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
|
// Gather information about exchanges
|
||||||
|
exchanges := make([]Exchange, 0)
|
||||||
|
err := r.requestJSON("/api/exchanges", &exchanges)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, exchange := range exchanges {
|
||||||
|
if !r.shouldGatherExchange(exchange) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags := map[string]string{
|
||||||
|
"url": r.URL,
|
||||||
|
"exchange": exchange.Name,
|
||||||
|
"type": exchange.Type,
|
||||||
|
"vhost": exchange.Vhost,
|
||||||
|
"internal": strconv.FormatBool(exchange.Internal),
|
||||||
|
"durable": strconv.FormatBool(exchange.Durable),
|
||||||
|
"auto_delete": strconv.FormatBool(exchange.AutoDelete),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AddFields(
|
||||||
|
"rabbitmq_exchange",
|
||||||
|
map[string]interface{}{
|
||||||
|
"messages_publish_in": exchange.MessageStats.PublishIn,
|
||||||
|
"messages_publish_out": exchange.MessageStats.PublishOut,
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
||||||
if len(r.Nodes) == 0 {
|
if len(r.Nodes) == 0 {
|
||||||
return true
|
return true
|
||||||
|
@ -402,6 +453,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
|
||||||
|
if len(r.Exchanges) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, name := range r.Exchanges {
|
||||||
|
if name == exchange.Name {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("rabbitmq", func() telegraf.Input {
|
inputs.Add("rabbitmq", func() telegraf.Input {
|
||||||
return &RabbitMQ{
|
return &RabbitMQ{
|
||||||
|
|
|
@ -374,6 +374,102 @@ const sampleQueuesResponse = `
|
||||||
]
|
]
|
||||||
`
|
`
|
||||||
|
|
||||||
|
const sampleExchangesResponse = `
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "direct",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": ""
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"message_stats": {
|
||||||
|
"publish_in_details": {
|
||||||
|
"rate": 0
|
||||||
|
},
|
||||||
|
"publish_in": 2,
|
||||||
|
"publish_out_details": {
|
||||||
|
"rate": 0
|
||||||
|
},
|
||||||
|
"publish_out": 1
|
||||||
|
},
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "fanout",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "telegraf"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "direct",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.direct"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "fanout",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.fanout"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "headers",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.headers"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "headers",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.match"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": true,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "topic",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.rabbitmq.log"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": true,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "topic",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.rabbitmq.trace"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"arguments": { },
|
||||||
|
"internal": false,
|
||||||
|
"auto_delete": false,
|
||||||
|
"durable": true,
|
||||||
|
"type": "topic",
|
||||||
|
"vhost": "\/",
|
||||||
|
"name": "amq.topic"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
`
|
||||||
|
|
||||||
func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
var rsp string
|
var rsp string
|
||||||
|
@ -385,6 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
rsp = sampleNodesResponse
|
rsp = sampleNodesResponse
|
||||||
case "/api/queues":
|
case "/api/queues":
|
||||||
rsp = sampleQueuesResponse
|
rsp = sampleQueuesResponse
|
||||||
|
case "/api/exchanges":
|
||||||
|
rsp = sampleExchangesResponse
|
||||||
default:
|
default:
|
||||||
panic("Cannot handle request")
|
panic("Cannot handle request")
|
||||||
}
|
}
|
||||||
|
@ -441,4 +539,13 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
|
assert.True(t, acc.HasMeasurement("rabbitmq_queue"))
|
||||||
|
|
||||||
|
exchangeIntMetrics := []string{
|
||||||
|
"messages_publish_in",
|
||||||
|
"messages_publish_out",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, metric := range exchangeIntMetrics {
|
||||||
|
assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue