This commit is contained in:
Jack Zampolin 2016-09-02 10:16:11 -07:00
parent 37450f125e
commit 917b25cd5f
2 changed files with 112 additions and 213 deletions

View File

@ -1,6 +1,8 @@
package statsd package statsd
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"log" "log"
"strconv" "strconv"
@ -20,9 +22,10 @@ type RabbitMQParser struct {
QueueName string QueueName string
Prefetch int Prefetch int
conn *amqp.Connection conn *amqp.Connection
ch *amqp.Channel ch *amqp.Channel
q amqp.Queue q amqp.Queue
drops int
sync.Mutex sync.Mutex
} }
@ -45,6 +48,8 @@ func (rmq *RabbitMQParser) SampleConfig() string {
// Gather satisfies the telegraf.ServiceInput interface // Gather satisfies the telegraf.ServiceInput interface
// All gathering is done in the Start function // All gathering is done in the Start function
func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error { func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error {
log.Println("Dropped ", rmq.drops, " points in the last 10s")
rmq.drops = 0
return nil return nil
} }
@ -82,7 +87,7 @@ func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error {
// Register the RabbitMQ parser as a consumer of the queue // Register the RabbitMQ parser as a consumer of the queue
// And start the lister passing in the Accumulator // And start the lister passing in the Accumulator
msgs := rmq.registerConsumer() msgs := rmq.registerConsumer()
go listen(msgs, acc) go rmq.listen(msgs, acc)
// Log that service has started // Log that service has started
log.Println("Starting RabbitMQ service...") log.Println("Starting RabbitMQ service...")
@ -100,184 +105,125 @@ func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery {
// Iterate over messages as they are coming in // Iterate over messages as they are coming in
// and launch new goroutine to handle load // and launch new goroutine to handle load
func listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) {
for d := range msgs { for d := range msgs {
go handleMessage(d, acc) go rmq.handleMessage(d, acc)
} }
} }
// handleMessage parses the incoming messages into *client.Point // handleMessage parses the incoming messages into *client.Point
// and then adds them to the Accumulator // and then adds them to the Accumulator
func handleMessage(d amqp.Delivery, acc telegraf.Accumulator) { func (rmq *RabbitMQParser) handleMessage(d amqp.Delivery, acc telegraf.Accumulator) {
msg := sanitizeMsg(d) msg := rmq.SanitizeMsg(d)
switch msg.Name() { // If point is not valid then we will drop message
case "proc": if msg == nil {
if _, ok := msg.Fields()["num"].(float64); !ok { // log.Println("droping message", string(d.Body))
d.Ack(false) rmq.drops++
log.Printf("string field instead of float field %v\n", msg)
return
}
d.Ack(false) d.Ack(false)
acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) return
default:
d.Ack(false)
acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time())
} }
d.Ack(false)
acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time())
} }
// sanitizeMsg breaks message cleanly into the different parts // SanitizeMsg breaks message cleanly into the different parts
// turns them into an IR and returns a point // turns them into an IR and returns a point
func sanitizeMsg(msg amqp.Delivery) *client.Point { func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point {
ir := &irMessage{} ir := &IRMessage{}
m := string(msg.Body) data, err := parseBody(msg.Body)
switch {
case strings.Contains(m, "'severity'"):
text := strings.Split(m, "'severity'")
clockSplit := strings.Split(text[1], "'clock'")
ir.severity = clockSplit[0]
tsSplit := strings.Split(clockSplit[1], "'timestamp'")
ir.clock = tsSplit[0]
valueSplit := strings.Split(tsSplit[1], "'value'")
ir.ts = valueSplit[0]
serverSplit := strings.Split(valueSplit[1], "'server'")
ir.value = serverSplit[0]
sourceSplit := strings.Split(serverSplit[1], "'source'")
ir.server = sourceSplit[0]
hostSplit := strings.Split(sourceSplit[1], "'host'")
ir.source = hostSplit[0]
keySplit := strings.Split(hostSplit[1], "'key'")
ir.host = keySplit[0]
logEventSplit := strings.Split(keySplit[1], "'logeventid'")
ir.key = logEventSplit[0]
ir.logeventid = logEventSplit[1]
case strings.Contains(m, `"host"`):
text := strings.Split(m, "\"host\"")
hostSplit := strings.Split(text[1], "\"clock\"")
ir.host = hostSplit[0]
clockSplit := strings.Split(hostSplit[1], "\"value\"")
ir.clock = clockSplit[0]
valueSplit := strings.Split(clockSplit[1], "\"key\"")
ir.value = valueSplit[0]
keySplit := strings.Split(valueSplit[1], "\"server\"")
ir.key = keySplit[0]
ir.server = keySplit[1]
ir.doubleQuoted = true
case strings.Contains(m, "'host'"):
text := strings.Split(m, "'host'")
hostSplit := strings.Split(text[1], "'clock'")
ir.host = hostSplit[0]
clockSplit := strings.Split(hostSplit[1], "'value'")
ir.clock = clockSplit[0]
valueSplit := strings.Split(clockSplit[1], "'key'")
ir.value = valueSplit[0]
keySplit := strings.Split(valueSplit[1], "'server'")
ir.key = keySplit[0]
ir.server = keySplit[1]
ir.doubleQuoted = false
}
return ir.message().point()
}
// Takes the intermediate representation and turns it into a message
func (ir *irMessage) message() message {
var msg message
// trim trailing chars from value
ir.value = string(ir.value[2 : len(ir.value)-2])
// trim trailing chars from key
ir.key = string(ir.key[3 : len(ir.key)-3])
// check what type of value is to be stored
// "'" indicates string messages
if strings.ContainsAny(ir.value, "'") {
msg = ir.toStringMessage()
} else {
msg = ir.toFloatMessage()
}
return msg
}
// irMessage is an intermediate representation of the
// point as it moves through the parser
type irMessage struct {
host string
clock string
value string
key string
server string
severity string
ts string
source string
logeventid string
doubleQuoted bool
}
// cleans host and server names
func cleanHost(str string) string {
c := strings.Split(str, "'")
return c[1]
}
// takes a dirty timestamp string and turns it into time.Time
func cleanClock(str string) time.Time {
c := string(str[2 : len(str)-2])
i, err := strconv.ParseInt(c, 10, 64)
if err != nil { if err != nil {
panic(fmt.Errorf("%v: parsing integer", err)) // log.Println("droping message", string(msg.Body))
rmq.drops++
return nil
} }
return time.Unix(i, 0) for key, val := range data {
} value := fmt.Sprintf("%v", val)
switch {
// irMessage -> *strMessage case key == "host":
func (ir *irMessage) toStringMessage() *strMessage { ir.Host = value
sm := &strMessage{} case key == "value":
if ir.doubleQuoted { i, err := strconv.ParseFloat(value, 64)
sm.host = cleanHost(strings.Replace(ir.host, "\"", "'", -1))
sm.clock = cleanClock(strings.Replace(ir.clock, "\"", "'", -1))
sm.server = cleanHost(strings.Replace(ir.host, "\"", "'", -1))
sm.value = ir.value
sm.key = ir.key
} else {
sm.host = cleanHost(ir.host)
sm.clock = cleanClock(ir.clock)
sm.server = cleanHost(ir.server)
sm.value = ir.value
sm.key = ir.key
}
return sm
}
// irMessage -> *floatMessage
func (ir *irMessage) toFloatMessage() *floatMessage {
fm := &floatMessage{}
if ir.doubleQuoted {
fm.host = cleanHost(strings.Replace(ir.host, "\"", "'", -1))
fm.clock = cleanClock(strings.Replace(ir.clock, "\"", "'", -1))
fm.server = cleanHost(strings.Replace(ir.host, "\"", "'", -1))
i, err := strconv.ParseFloat(ir.value, 64)
if err != nil {
panic(fmt.Errorf("%v: parsing float", err))
}
fm.value = i
fm.key = ir.key
} else {
fm.host = cleanHost(ir.host)
fm.clock = cleanClock(ir.clock)
fm.server = cleanHost(ir.server)
i, err := strconv.ParseFloat(ir.value, 64)
if err != nil {
j, err := strconv.ParseInt(ir.value, 10, 64)
if err != nil { if err != nil {
// if we fail to parse a value out of the string we return 0 return nil
log.Printf("Error parsing %v with key %v setting value to 0\n", ir.value, ir.key)
} }
i = float64(j) ir.Value = i
case key == "key":
ir.Key = value
case key == "server":
ir.Server = value
case key == "clock":
ir.Clock = time.Unix(int64(val.(float64)), 0)
} }
fm.value = i
fm.key = ir.key
} }
return fm m := ir.point()
if m == nil {
log.Println("droping message", string(msg.Body))
rmq.drops++
return nil
}
return m
}
// IRMessage is an intermediate representation of the
// point as it moves through the parser
type IRMessage struct {
Host string
Clock time.Time
Value float64
Key string
Server string
}
func (ir *IRMessage) point() *client.Point {
meas, tags, fields := structureKey(ir.Key, ir.Value)
tags["host"] = ir.Host
tags["server"] = ir.Server
pt, err := client.NewPoint(meas, tags, fields, ir.Clock)
if err != nil {
panic(fmt.Errorf("%v: creating float point", err))
}
return pt
}
func parseBody(msg []byte) (map[string]interface{}, error) {
var data map[string]interface{}
// Try to parse, if not replace single with double quotes
if err := json.Unmarshal(msg, &data); err != nil {
rp := bytes.Replace(msg, []byte("'"), []byte(`"`), -1)
if err := json.Unmarshal(rp, &data); err != nil {
switch err.(type) {
// case *json.UnmarshalFieldError:
// log.Println("UnmarshalFieldError")
// return nil, err
// case *json.UnmarshalTypeError:
// log.Println("UnmarshalTypeError")
// return nil, err
// case *json.UnsupportedTypeError:
// log.Println("UnsupportedTypeError")
// return nil, err
// case *json.UnsupportedValueError:
// log.Println("UnsupportedValueError")
// return nil, err
// case *json.InvalidUnmarshalError:
// log.Println("InvalidUnmarshalError")
// return nil, err
// case *json.InvalidUTF8Error:
// log.Println("InvalidUTF8Error")
// return nil, err
default:
return nil, err
}
}
}
return data, nil
}
func errPoint() *client.Point {
pt, err := client.NewPoint("err", map[string]string{}, map[string]interface{}{"err": "err"}, time.Now())
if err != nil {
panic(err)
}
return pt
} }
// This is awful decision tree parsing, but it works... // This is awful decision tree parsing, but it works...
@ -597,57 +543,10 @@ func trim(s string) string {
return s[0 : len(s)-1] return s[0 : len(s)-1]
} }
// common interface for different datatypes
type message interface {
point() *client.Point
}
// takes an irMessage -> float field
type floatMessage struct {
host string
clock time.Time
value float64
key string
server string
}
func trimS(s string) string { func trimS(s string) string {
return s[1 : len(s)-1] return s[1 : len(s)-1]
} }
// satisfies the message interface
func (fm *floatMessage) point() *client.Point {
meas, tags, fields := structureKey(fm.key, fm.value)
tags["host"] = fm.host
tags["server"] = fm.server
pt, err := client.NewPoint(meas, tags, fields, fm.clock)
if err != nil {
panic(fmt.Errorf("%v: creating float point", err))
}
return pt
}
// takes an irMessage -> string field
type strMessage struct {
host string
clock time.Time
value string
key string
server string
}
// satisfies the message interface
func (sm *strMessage) point() *client.Point {
meas, tags, fields := structureKey(sm.key, sm.value)
tags["host"] = sm.host
tags["server"] = sm.server
pt, err := client.NewPoint(meas, tags, fields, sm.clock)
if err != nil {
panic(fmt.Errorf("%v: creating string point", err))
}
return pt
}
// Stop satisfies the telegraf.ServiceInput interface // Stop satisfies the telegraf.ServiceInput interface
func (rmq *RabbitMQParser) Stop() { func (rmq *RabbitMQParser) Stop() {
rmq.Lock() rmq.Lock()

View File

@ -1,5 +1,5 @@
[agent] [agent]
interval = "1s" interval = "10s"
round_interval = true round_interval = true
metric_batch_size = 10000 metric_batch_size = 10000
metric_buffer_limit = 100000 metric_buffer_limit = 100000