From 7a44c309b7d05634654647ba9e6c56f33528a276 Mon Sep 17 00:00:00 2001 From: Ildar Svetlov Date: Tue, 30 Jan 2018 00:14:49 +0400 Subject: [PATCH] Add ability to select which queues will be gathered to rabbitmq input (#3702) --- plugins/inputs/rabbitmq/README.md | 5 +++ plugins/inputs/rabbitmq/rabbitmq.go | 48 ++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index 796dfc7bf..5dae5e091 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -44,6 +44,11 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not ## specified, metrics for all exchanges are gathered. # exchanges = ["telegraf"] + + ## Queues to include and exclude. Globs accepted. + ## Note that an empty array for both will include all queues + # queue_name_include = [] + # queue_name_exclude = [] ``` ### Measurements & Fields: diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 64aa6408d..e0d12c3db 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -52,7 +53,14 @@ type RabbitMQ struct { Queues []string Exchanges []string + QueueInclude []string `toml:"queue_name_include"` + QueueExclude []string `toml:"queue_name_exclude"` + Client *http.Client + + filterCreated bool + excludeEveryQueue bool + queueFilter filter.Filter } // OverviewResponse ... @@ -195,6 +203,11 @@ var sampleConfig = ` ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not ## specified, metrics for all exchanges are gathered. # exchanges = ["telegraf"] + + ## Queues to include and exclude. Globs accepted. + ## Note that an empty array for both will include all queues + queue_name_include = [] + queue_name_exclude = [] ` // SampleConfig ... @@ -225,6 +238,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { } } + // Create queue filter if not already created + if !r.filterCreated { + err := r.createQueueFilter() + if err != nil { + return err + } + r.filterCreated = true + } + var wg sync.WaitGroup wg.Add(len(gatherFunctions)) for _, f := range gatherFunctions { @@ -361,6 +383,9 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { } func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { + if r.excludeEveryQueue { + return + } // Gather information about queues queues := make([]Queue, 0) err := r.requestJSON("/api/queues", &queues) @@ -370,7 +395,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { } for _, queue := range queues { - if !r.shouldGatherQueue(queue) { + if !r.queueFilter.Match(queue.Name) { continue } tags := map[string]string{ @@ -463,18 +488,25 @@ func (r *RabbitMQ) shouldGatherNode(node Node) bool { return false } -func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { - if len(r.Queues) == 0 { - return true +func (r *RabbitMQ) createQueueFilter() error { + // Backwards compatibility for deprecated `queues` parameter. + if len(r.Queues) > 0 { + r.QueueInclude = append(r.QueueInclude, r.Queues...) } - for _, name := range r.Queues { - if name == queue.Name { - return true + filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude) + if err != nil { + return err + } + r.queueFilter = filter + + for _, q := range r.QueueExclude { + if q == "*" { + r.excludeEveryQueue = true } } - return false + return nil } func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {