Add ability to select which queues will be gathered to rabbitmq input (#3702)
This commit is contained in:
parent
95c9b81397
commit
7a44c309b7
|
@ -44,6 +44,11 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
|
|||
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
||||
## specified, metrics for all exchanges are gathered.
|
||||
# exchanges = ["telegraf"]
|
||||
|
||||
## Queues to include and exclude. Globs accepted.
|
||||
## Note that an empty array for both will include all queues
|
||||
# queue_name_include = []
|
||||
# queue_name_exclude = []
|
||||
```
|
||||
|
||||
### Measurements & Fields:
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
@ -52,7 +53,14 @@ type RabbitMQ struct {
|
|||
Queues []string
|
||||
Exchanges []string
|
||||
|
||||
QueueInclude []string `toml:"queue_name_include"`
|
||||
QueueExclude []string `toml:"queue_name_exclude"`
|
||||
|
||||
Client *http.Client
|
||||
|
||||
filterCreated bool
|
||||
excludeEveryQueue bool
|
||||
queueFilter filter.Filter
|
||||
}
|
||||
|
||||
// OverviewResponse ...
|
||||
|
@ -195,6 +203,11 @@ var sampleConfig = `
|
|||
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
|
||||
## specified, metrics for all exchanges are gathered.
|
||||
# exchanges = ["telegraf"]
|
||||
|
||||
## Queues to include and exclude. Globs accepted.
|
||||
## Note that an empty array for both will include all queues
|
||||
queue_name_include = []
|
||||
queue_name_exclude = []
|
||||
`
|
||||
|
||||
// SampleConfig ...
|
||||
|
@ -225,6 +238,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Create queue filter if not already created
|
||||
if !r.filterCreated {
|
||||
err := r.createQueueFilter()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.filterCreated = true
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(gatherFunctions))
|
||||
for _, f := range gatherFunctions {
|
||||
|
@ -361,6 +383,9 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
if r.excludeEveryQueue {
|
||||
return
|
||||
}
|
||||
// Gather information about queues
|
||||
queues := make([]Queue, 0)
|
||||
err := r.requestJSON("/api/queues", &queues)
|
||||
|
@ -370,7 +395,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
for _, queue := range queues {
|
||||
if !r.shouldGatherQueue(queue) {
|
||||
if !r.queueFilter.Match(queue.Name) {
|
||||
continue
|
||||
}
|
||||
tags := map[string]string{
|
||||
|
@ -463,18 +488,25 @@ func (r *RabbitMQ) shouldGatherNode(node Node) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
|
||||
if len(r.Queues) == 0 {
|
||||
return true
|
||||
func (r *RabbitMQ) createQueueFilter() error {
|
||||
// Backwards compatibility for deprecated `queues` parameter.
|
||||
if len(r.Queues) > 0 {
|
||||
r.QueueInclude = append(r.QueueInclude, r.Queues...)
|
||||
}
|
||||
|
||||
for _, name := range r.Queues {
|
||||
if name == queue.Name {
|
||||
return true
|
||||
filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.queueFilter = filter
|
||||
|
||||
for _, q := range r.QueueExclude {
|
||||
if q == "*" {
|
||||
r.excludeEveryQueue = true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
|
||||
|
|
Loading…
Reference in New Issue