Add performance fixes

This commit is contained in:
Jack Zampolin 2016-09-02 12:19:31 -07:00
parent 917b25cd5f
commit 522c45eb8a
2 changed files with 141 additions and 46 deletions

View File

@ -1,10 +1,12 @@
package statsd package rabbit_mq_parser
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"os"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -16,16 +18,31 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
// init registers the input with telegraf
func init() {
inputs.Add("rabbit_mq_parser", func() telegraf.Input {
return &RabbitMQParser{}
})
}
// ##################
// # RabbitMQParser #
// ##################
// RabbitMQParser is the top level struct for this plugin // RabbitMQParser is the top level struct for this plugin
type RabbitMQParser struct { type RabbitMQParser struct {
RabbitmqAddress string RabbitmqAddress string
QueueName string QueueName string
Prefetch int Prefetch int
DroppedLog string
conn *amqp.Connection conn *amqp.Connection
ch *amqp.Channel ch *amqp.Channel
q amqp.Queue q amqp.Queue
drops int drops int
acks int
log *os.File
cl *ConcurrencyLimiter
sync.Mutex sync.Mutex
} }
@ -41,21 +58,34 @@ func (rmq *RabbitMQParser) SampleConfig() string {
## Address and port for the rabbitmq server to pull from ## Address and port for the rabbitmq server to pull from
rabbitmq_address = "amqp://guest:guest@localhost:5672/" rabbitmq_address = "amqp://guest:guest@localhost:5672/"
queue_name = "task_queue" queue_name = "task_queue"
prefetch = 1000 prefetch = 1000
dropped_log = "/Users/johnzampolin/.rabbitmq/drops.log"
` `
} }
// Gather satisfies the telegraf.ServiceInput interface // Gather satisfies the telegraf.ServiceInput interface
// All gathering is done in the Start function // All gathering is done in the Start function
func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error { func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error {
log.Println("Dropped ", rmq.drops, " points in the last 10s") numMessages := rmq.drops + rmq.acks
percentDrops := (float64(rmq.drops) / float64(numMessages)) * 100.0
log.Printf("Dropped %.2f%% of %d metrics in the last interval", percentDrops, numMessages)
rmq.drops = 0 rmq.drops = 0
rmq.acks = 0
return nil return nil
} }
// Start satisfies the telegraf.ServiceInput interface // Start satisfies the telegraf.ServiceInput interface
// Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html" // Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html"
func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error { func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error {
// Create drops file
f, err := os.Create(rmq.DroppedLog)
if err != nil {
panic(err)
}
rmq.log = f
// Limit number of workers to the number of CPU on system
rmq.cl = NewConcurrencyLimiter(runtime.NumCPU() * 2)
// Create queue connection and assign it to RabbitMQParser // Create queue connection and assign it to RabbitMQParser
conn, err := amqp.Dial(rmq.RabbitmqAddress) conn, err := amqp.Dial(rmq.RabbitmqAddress)
@ -94,6 +124,15 @@ func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop satisfies the telegraf.ServiceInput interface
func (rmq *RabbitMQParser) Stop() {
rmq.Lock()
defer rmq.Unlock()
rmq.conn.Close()
rmq.log.Close()
rmq.ch.Close()
}
// Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html" // Yanked from "https://www.rabbitmq.com/tutorials/tutorial-two-go.html"
func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery { func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery {
messages, err := rmq.ch.Consume(rmq.QueueName, "", false, false, false, false, nil) messages, err := rmq.ch.Consume(rmq.QueueName, "", false, false, false, false, nil)
@ -107,7 +146,8 @@ func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery {
// and launch new goroutine to handle load // and launch new goroutine to handle load
func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
for d := range msgs { for d := range msgs {
go rmq.handleMessage(d, acc) rmq.cl.Increment()
rmq.handleMessage(d, acc)
} }
} }
@ -117,13 +157,19 @@ func (rmq *RabbitMQParser) handleMessage(d amqp.Delivery, acc telegraf.Accumulat
msg := rmq.SanitizeMsg(d) msg := rmq.SanitizeMsg(d)
// If point is not valid then we will drop message // If point is not valid then we will drop message
if msg == nil { if msg == nil {
// log.Println("droping message", string(d.Body)) err := rmq.logDropped(string(d.Body))
if err != nil {
panic(err)
}
rmq.drops++ rmq.drops++
d.Ack(false) d.Ack(false)
rmq.cl.Decrement()
return return
} }
d.Ack(false) d.Ack(false)
acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time())
rmq.acks++
rmq.cl.Decrement()
} }
// SanitizeMsg breaks message cleanly into the different parts // SanitizeMsg breaks message cleanly into the different parts
@ -132,7 +178,10 @@ func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point {
ir := &IRMessage{} ir := &IRMessage{}
data, err := parseBody(msg.Body) data, err := parseBody(msg.Body)
if err != nil { if err != nil {
// log.Println("droping message", string(msg.Body)) err := rmq.logDropped(string(msg.Body))
if err != nil {
panic(err)
}
rmq.drops++ rmq.drops++
return nil return nil
} }
@ -157,13 +206,37 @@ func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point {
} }
m := ir.point() m := ir.point()
if m == nil { if m == nil {
log.Println("droping message", string(msg.Body)) err := rmq.logDropped(string(msg.Body))
if err != nil {
panic(err)
}
rmq.drops++ rmq.drops++
return nil return nil
} }
return m return m
} }
// logDropped writes dropped points to a file
func (rmq *RabbitMQParser) logDropped(drop string) error {
rmq.Lock()
defer rmq.Unlock()
// write some text to file
_, err := rmq.log.WriteString(fmt.Sprintf("%v\n", drop))
if err != nil {
return err
}
// save changes
err = rmq.log.Sync()
if err != nil {
return err
}
return nil
}
// #############
// # IRMessage #
// #############
// IRMessage is an intermediate representation of the // IRMessage is an intermediate representation of the
// point as it moves through the parser // point as it moves through the parser
type IRMessage struct { type IRMessage struct {
@ -188,43 +261,19 @@ func (ir *IRMessage) point() *client.Point {
func parseBody(msg []byte) (map[string]interface{}, error) { func parseBody(msg []byte) (map[string]interface{}, error) {
var data map[string]interface{} var data map[string]interface{}
// Try to parse, if not replace single with double quotes // Try to parse, if not replace single with double quotes
// then return err
if err := json.Unmarshal(msg, &data); err != nil { if err := json.Unmarshal(msg, &data); err != nil {
rp := bytes.Replace(msg, []byte("'"), []byte(`"`), -1) rp := bytes.Replace(msg, []byte("'"), []byte(`"`), -1)
if err := json.Unmarshal(rp, &data); err != nil { if err := json.Unmarshal(rp, &data); err != nil {
switch err.(type) { return nil, err
// case *json.UnmarshalFieldError:
// log.Println("UnmarshalFieldError")
// return nil, err
// case *json.UnmarshalTypeError:
// log.Println("UnmarshalTypeError")
// return nil, err
// case *json.UnsupportedTypeError:
// log.Println("UnsupportedTypeError")
// return nil, err
// case *json.UnsupportedValueError:
// log.Println("UnsupportedValueError")
// return nil, err
// case *json.InvalidUnmarshalError:
// log.Println("InvalidUnmarshalError")
// return nil, err
// case *json.InvalidUTF8Error:
// log.Println("InvalidUTF8Error")
// return nil, err
default:
return nil, err
}
} }
} }
return data, nil return data, nil
} }
func errPoint() *client.Point { // ################
pt, err := client.NewPoint("err", map[string]string{}, map[string]interface{}{"err": "err"}, time.Now()) // # Parsing Tree #
if err != nil { // ################
panic(err)
}
return pt
}
// This is awful decision tree parsing, but it works... // This is awful decision tree parsing, but it works...
// Layout: // Layout:
@ -523,6 +572,10 @@ func structureKey(key string, value interface{}) (string, map[string]string, map
return meas, tags, fields return meas, tags, fields
} }
// #####################
// # Parsing Utilities #
// #####################
// join with period // join with period
func jwp(s1, s2 string) string { func jwp(s1, s2 string) string {
return fmt.Sprintf("%v.%v", s1, s2) return fmt.Sprintf("%v.%v", s1, s2)
@ -543,20 +596,61 @@ func trim(s string) string {
return s[0 : len(s)-1] return s[0 : len(s)-1]
} }
// trimS the first and last char from string
func trimS(s string) string { func trimS(s string) string {
return s[1 : len(s)-1] return s[1 : len(s)-1]
} }
// Stop satisfies the telegraf.ServiceInput interface // ######################
func (rmq *RabbitMQParser) Stop() { // # ConcurrencyLimiter #
rmq.Lock() // ######################
defer rmq.Unlock()
rmq.conn.Close() // ConcurrencyLimiter is a go routine safe struct that can be used to
rmq.ch.Close() // ensure that no more than a specifid max number of goroutines are
// executing.
type ConcurrencyLimiter struct {
inc chan chan struct{}
dec chan struct{}
max int
count int
} }
func init() { // NewConcurrencyLimiter returns a configured limiter that will
inputs.Add("rabbit_mq_parser", func() telegraf.Input { // ensure that calls to Increment will block if the max is hit.
return &RabbitMQParser{} func NewConcurrencyLimiter(max int) *ConcurrencyLimiter {
}) c := &ConcurrencyLimiter{
inc: make(chan chan struct{}),
dec: make(chan struct{}, max),
max: max,
}
go c.handleLimits()
return c
}
// Increment will increase the count of running goroutines by 1.
// if the number is currently at the max, the call to Increment
// will block until another goroutine decrements.
func (c *ConcurrencyLimiter) Increment() {
r := make(chan struct{})
c.inc <- r
<-r
}
// Decrement will reduce the count of running goroutines by 1
func (c *ConcurrencyLimiter) Decrement() {
c.dec <- struct{}{}
}
// handleLimits runs in a goroutine to manage the count of
// running goroutines.
func (c *ConcurrencyLimiter) handleLimits() {
for {
r := <-c.inc
if c.count >= c.max {
<-c.dec
c.count--
}
c.count++
r <- struct{}{}
}
} }

View File

@ -23,4 +23,5 @@
rabbitmq_address = "amqp://guest:guest@localhost:5672/" rabbitmq_address = "amqp://guest:guest@localhost:5672/"
queue_name = "task_queue" queue_name = "task_queue"
prefetch = 1000 prefetch = 1000
dropped_log = "/Users/johnzampolin/.rabbitmq/drops.log"