diff --git a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go index 36a745e3f..9395deba0 100644 --- a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go +++ b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go @@ -36,13 +36,13 @@ type RabbitMQParser struct { Prefetch int DroppedLog string - conn *amqp.Connection - ch *amqp.Channel - q amqp.Queue - drops int - acks int - log *os.File - cl *ConcurrencyLimiter + conn *amqp.Connection + ch *amqp.Channel + q amqp.Queue + drops int + parsed int + log *os.File + cl *ConcurrencyLimiter sync.Mutex } @@ -66,11 +66,11 @@ func (rmq *RabbitMQParser) SampleConfig() string { // Gather satisfies the telegraf.ServiceInput interface // All gathering is done in the Start function func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error { - numMessages := rmq.drops + rmq.acks + numMessages := rmq.drops + rmq.parsed percentDrops := (float64(rmq.drops) / float64(numMessages)) * 100.0 log.Printf("Dropped %.2f%% of %d metrics in the last interval", percentDrops, numMessages) rmq.drops = 0 - rmq.acks = 0 + rmq.parsed = 0 return nil } @@ -85,7 +85,7 @@ func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error { rmq.log = f // Limit number of workers to the number of CPU on system - rmq.cl = NewConcurrencyLimiter(runtime.NumCPU() * 2) + rmq.cl = NewConcurrencyLimiter(runtime.NumCPU()) // Create queue connection and assign it to RabbitMQParser conn, err := amqp.Dial(rmq.RabbitmqAddress) @@ -147,7 +147,7 @@ func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery { func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { for d := range msgs { rmq.cl.Increment() - rmq.handleMessage(d, acc) + go rmq.handleMessage(d, acc) } } @@ -168,7 +168,7 @@ func (rmq *RabbitMQParser) handleMessage(d amqp.Delivery, acc telegraf.Accumulat } d.Ack(false) acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) - rmq.acks++ + rmq.parsed++ rmq.cl.Decrement() }