diff --git a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go index b3ee443c2..36a745e3f 100644 --- a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go +++ b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go @@ -1,10 +1,12 @@ -package statsd +package rabbit_mq_parser import ( "bytes" "encoding/json" "fmt" "log" + "os" + "runtime" "strconv" "strings" "sync" @@ -16,16 +18,31 @@ import ( "github.com/streadway/amqp" ) +// init registers the input with telegraf +func init() { + inputs.Add("rabbit_mq_parser", func() telegraf.Input { + return &RabbitMQParser{} + }) +} + +// ################## +// # RabbitMQParser # +// ################## + // RabbitMQParser is the top level struct for this plugin type RabbitMQParser struct { RabbitmqAddress string QueueName string Prefetch int + DroppedLog string conn *amqp.Connection ch *amqp.Channel q amqp.Queue drops int + acks int + log *os.File + cl *ConcurrencyLimiter sync.Mutex } @@ -41,21 +58,34 @@ func (rmq *RabbitMQParser) SampleConfig() string { ## Address and port for the rabbitmq server to pull from rabbitmq_address = "amqp://guest:guest@localhost:5672/" queue_name = "task_queue" - prefetch = 1000 + prefetch = 1000 + dropped_log = "/Users/johnzampolin/.rabbitmq/drops.log" ` } // Gather satisfies the telegraf.ServiceInput interface // All gathering is done in the Start function func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error { - log.Println("Dropped ", rmq.drops, " points in the last 10s") + numMessages := rmq.drops + rmq.acks + percentDrops := (float64(rmq.drops) / float64(numMessages)) * 100.0 + log.Printf("Dropped %.2f%% of %d metrics in the last interval", percentDrops, numMessages) rmq.drops = 0 + rmq.acks = 0 return nil } // Start satisfies the telegraf.ServiceInput interface // Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html" func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error { + // Create drops file + f, err := os.Create(rmq.DroppedLog) + if err != nil { + panic(err) + } + rmq.log = f + + // Limit number of workers to the number of CPU on system + rmq.cl = NewConcurrencyLimiter(runtime.NumCPU() * 2) // Create queue connection and assign it to RabbitMQParser conn, err := amqp.Dial(rmq.RabbitmqAddress) @@ -94,6 +124,15 @@ func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error { return nil } +// Stop satisfies the telegraf.ServiceInput interface +func (rmq *RabbitMQParser) Stop() { + rmq.Lock() + defer rmq.Unlock() + rmq.conn.Close() + rmq.log.Close() + rmq.ch.Close() +} + // Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html" func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery { messages, err := rmq.ch.Consume(rmq.QueueName, "", false, false, false, false, nil) @@ -107,7 +146,8 @@ func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery { // and launch new goroutine to handle load func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { for d := range msgs { - go rmq.handleMessage(d, acc) + rmq.cl.Increment() + rmq.handleMessage(d, acc) } } @@ -117,13 +157,19 @@ func (rmq *RabbitMQParser) handleMessage(d amqp.Delivery, acc telegraf.Accumulat msg := rmq.SanitizeMsg(d) // If point is not valid then we will drop message if msg == nil { - // log.Println("droping message", string(d.Body)) + err := rmq.logDropped(string(d.Body)) + if err != nil { + panic(err) + } rmq.drops++ d.Ack(false) + rmq.cl.Decrement() return } d.Ack(false) acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) + rmq.acks++ + rmq.cl.Decrement() } // SanitizeMsg breaks message cleanly into the different parts @@ -132,7 +178,10 @@ func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point { ir := &IRMessage{} data, err := parseBody(msg.Body) if err != nil { - // log.Println("droping message", string(msg.Body)) + err := rmq.logDropped(string(msg.Body)) + if err != nil { + panic(err) + } rmq.drops++ return nil } @@ -157,13 +206,37 @@ func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point { } m := ir.point() if m == nil { - log.Println("droping message", string(msg.Body)) + err := rmq.logDropped(string(msg.Body)) + if err != nil { + panic(err) + } rmq.drops++ return nil } return m } +// logDropped writes dropped points to a file +func (rmq *RabbitMQParser) logDropped(drop string) error { + rmq.Lock() + defer rmq.Unlock() + // write some text to file + _, err := rmq.log.WriteString(fmt.Sprintf("%v\n", drop)) + if err != nil { + return err + } + // save changes + err = rmq.log.Sync() + if err != nil { + return err + } + return nil +} + +// ############# +// # IRMessage # +// ############# + // IRMessage is an intermediate representation of the // point as it moves through the parser type IRMessage struct { @@ -188,43 +261,19 @@ func (ir *IRMessage) point() *client.Point { func parseBody(msg []byte) (map[string]interface{}, error) { var data map[string]interface{} // Try to parse, if not replace single with double quotes + // then return err if err := json.Unmarshal(msg, &data); err != nil { rp := bytes.Replace(msg, []byte("'"), []byte(`"`), -1) if err := json.Unmarshal(rp, &data); err != nil { - switch err.(type) { - // case *json.UnmarshalFieldError: - // log.Println("UnmarshalFieldError") - // return nil, err - // case *json.UnmarshalTypeError: - // log.Println("UnmarshalTypeError") - // return nil, err - // case *json.UnsupportedTypeError: - // log.Println("UnsupportedTypeError") - // return nil, err - // case *json.UnsupportedValueError: - // log.Println("UnsupportedValueError") - // return nil, err - // case *json.InvalidUnmarshalError: - // log.Println("InvalidUnmarshalError") - // return nil, err - // case *json.InvalidUTF8Error: - // log.Println("InvalidUTF8Error") - // return nil, err - default: - return nil, err - } + return nil, err } } return data, nil } -func errPoint() *client.Point { - pt, err := client.NewPoint("err", map[string]string{}, map[string]interface{}{"err": "err"}, time.Now()) - if err != nil { - panic(err) - } - return pt -} +// ################ +// # Parsing Tree # +// ################ // This is awful decision tree parsing, but it works... // Layout: @@ -523,6 +572,10 @@ func structureKey(key string, value interface{}) (string, map[string]string, map return meas, tags, fields } +// ##################### +// # Parsing Utilities # +// ##################### + // join with period func jwp(s1, s2 string) string { return fmt.Sprintf("%v.%v", s1, s2) @@ -543,20 +596,61 @@ func trim(s string) string { return s[0 : len(s)-1] } +// trimS the first and last char from string func trimS(s string) string { return s[1 : len(s)-1] } -// Stop satisfies the telegraf.ServiceInput interface -func (rmq *RabbitMQParser) Stop() { - rmq.Lock() - defer rmq.Unlock() - rmq.conn.Close() - rmq.ch.Close() +// ###################### +// # ConcurrencyLimiter # +// ###################### + +// ConcurrencyLimiter is a go routine safe struct that can be used to +// ensure that no more than a specifid max number of goroutines are +// executing. +type ConcurrencyLimiter struct { + inc chan chan struct{} + dec chan struct{} + max int + count int } -func init() { - inputs.Add("rabbit_mq_parser", func() telegraf.Input { - return &RabbitMQParser{} - }) +// NewConcurrencyLimiter returns a configured limiter that will +// ensure that calls to Increment will block if the max is hit. +func NewConcurrencyLimiter(max int) *ConcurrencyLimiter { + c := &ConcurrencyLimiter{ + inc: make(chan chan struct{}), + dec: make(chan struct{}, max), + max: max, + } + go c.handleLimits() + return c +} + +// Increment will increase the count of running goroutines by 1. +// if the number is currently at the max, the call to Increment +// will block until another goroutine decrements. +func (c *ConcurrencyLimiter) Increment() { + r := make(chan struct{}) + c.inc <- r + <-r +} + +// Decrement will reduce the count of running goroutines by 1 +func (c *ConcurrencyLimiter) Decrement() { + c.dec <- struct{}{} +} + +// handleLimits runs in a goroutine to manage the count of +// running goroutines. +func (c *ConcurrencyLimiter) handleLimits() { + for { + r := <-c.inc + if c.count >= c.max { + <-c.dec + c.count-- + } + c.count++ + r <- struct{}{} + } } diff --git a/rabbit_mq_parser.conf b/rabbit_mq_parser.conf index db01ca32f..9a9a2e54c 100644 --- a/rabbit_mq_parser.conf +++ b/rabbit_mq_parser.conf @@ -23,4 +23,5 @@ rabbitmq_address = "amqp://guest:guest@localhost:5672/" queue_name = "task_queue" prefetch = 1000 + dropped_log = "/Users/johnzampolin/.rabbitmq/drops.log"