From 917b25cd5f2f1168402fbbbd905405459c931de0 Mon Sep 17 00:00:00 2001 From: Jack Zampolin Date: Fri, 2 Sep 2016 10:16:11 -0700 Subject: [PATCH] foo --- .../rabbit_mq_parser/rabbit_mq_parser.go | 323 ++++++------------ rabbit_mq_parser.conf | 2 +- 2 files changed, 112 insertions(+), 213 deletions(-) diff --git a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go index 9d83ea1c3..b3ee443c2 100644 --- a/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go +++ b/plugins/inputs/rabbit_mq_parser/rabbit_mq_parser.go @@ -1,6 +1,8 @@ package statsd import ( + "bytes" + "encoding/json" "fmt" "log" "strconv" @@ -20,9 +22,10 @@ type RabbitMQParser struct { QueueName string Prefetch int - conn *amqp.Connection - ch *amqp.Channel - q amqp.Queue + conn *amqp.Connection + ch *amqp.Channel + q amqp.Queue + drops int sync.Mutex } @@ -45,6 +48,8 @@ func (rmq *RabbitMQParser) SampleConfig() string { // Gather satisfies the telegraf.ServiceInput interface // All gathering is done in the Start function func (rmq *RabbitMQParser) Gather(_ telegraf.Accumulator) error { + log.Println("Dropped ", rmq.drops, " points in the last 10s") + rmq.drops = 0 return nil } @@ -82,7 +87,7 @@ func (rmq *RabbitMQParser) Start(acc telegraf.Accumulator) error { // Register the RabbitMQ parser as a consumer of the queue // And start the lister passing in the Accumulator msgs := rmq.registerConsumer() - go listen(msgs, acc) + go rmq.listen(msgs, acc) // Log that service has started log.Println("Starting RabbitMQ service...") @@ -100,184 +105,125 @@ func (rmq *RabbitMQParser) registerConsumer() <-chan amqp.Delivery { // Iterate over messages as they are coming in // and launch new goroutine to handle load -func listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { +func (rmq *RabbitMQParser) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { for d := range msgs { - go handleMessage(d, acc) + go rmq.handleMessage(d, acc) } } // handleMessage parses the incoming messages into *client.Point // and then adds them to the Accumulator -func handleMessage(d amqp.Delivery, acc telegraf.Accumulator) { - msg := sanitizeMsg(d) - switch msg.Name() { - case "proc": - if _, ok := msg.Fields()["num"].(float64); !ok { - d.Ack(false) - log.Printf("string field instead of float field %v\n", msg) - return - } +func (rmq *RabbitMQParser) handleMessage(d amqp.Delivery, acc telegraf.Accumulator) { + msg := rmq.SanitizeMsg(d) + // If point is not valid then we will drop message + if msg == nil { + // log.Println("droping message", string(d.Body)) + rmq.drops++ d.Ack(false) - acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) - default: - d.Ack(false) - acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) + return } + d.Ack(false) + acc.AddFields(msg.Name(), msg.Fields(), msg.Tags(), msg.Time()) } -// sanitizeMsg breaks message cleanly into the different parts +// SanitizeMsg breaks message cleanly into the different parts // turns them into an IR and returns a point -func sanitizeMsg(msg amqp.Delivery) *client.Point { - ir := &irMessage{} - m := string(msg.Body) - switch { - case strings.Contains(m, "'severity'"): - text := strings.Split(m, "'severity'") - clockSplit := strings.Split(text[1], "'clock'") - ir.severity = clockSplit[0] - tsSplit := strings.Split(clockSplit[1], "'timestamp'") - ir.clock = tsSplit[0] - valueSplit := strings.Split(tsSplit[1], "'value'") - ir.ts = valueSplit[0] - serverSplit := strings.Split(valueSplit[1], "'server'") - ir.value = serverSplit[0] - sourceSplit := strings.Split(serverSplit[1], "'source'") - ir.server = sourceSplit[0] - hostSplit := strings.Split(sourceSplit[1], "'host'") - ir.source = hostSplit[0] - keySplit := strings.Split(hostSplit[1], "'key'") - ir.host = keySplit[0] - logEventSplit := strings.Split(keySplit[1], "'logeventid'") - ir.key = logEventSplit[0] - ir.logeventid = logEventSplit[1] - case strings.Contains(m, `"host"`): - text := strings.Split(m, "\"host\"") - hostSplit := strings.Split(text[1], "\"clock\"") - ir.host = hostSplit[0] - clockSplit := strings.Split(hostSplit[1], "\"value\"") - ir.clock = clockSplit[0] - valueSplit := strings.Split(clockSplit[1], "\"key\"") - ir.value = valueSplit[0] - keySplit := strings.Split(valueSplit[1], "\"server\"") - ir.key = keySplit[0] - ir.server = keySplit[1] - ir.doubleQuoted = true - case strings.Contains(m, "'host'"): - text := strings.Split(m, "'host'") - hostSplit := strings.Split(text[1], "'clock'") - ir.host = hostSplit[0] - clockSplit := strings.Split(hostSplit[1], "'value'") - ir.clock = clockSplit[0] - valueSplit := strings.Split(clockSplit[1], "'key'") - ir.value = valueSplit[0] - keySplit := strings.Split(valueSplit[1], "'server'") - ir.key = keySplit[0] - ir.server = keySplit[1] - ir.doubleQuoted = false - } - return ir.message().point() -} - -// Takes the intermediate representation and turns it into a message -func (ir *irMessage) message() message { - var msg message - // trim trailing chars from value - ir.value = string(ir.value[2 : len(ir.value)-2]) - - // trim trailing chars from key - ir.key = string(ir.key[3 : len(ir.key)-3]) - - // check what type of value is to be stored - // "'" indicates string messages - if strings.ContainsAny(ir.value, "'") { - msg = ir.toStringMessage() - } else { - msg = ir.toFloatMessage() - } - - return msg -} - -// irMessage is an intermediate representation of the -// point as it moves through the parser -type irMessage struct { - host string - clock string - value string - key string - server string - severity string - ts string - source string - logeventid string - doubleQuoted bool -} - -// cleans host and server names -func cleanHost(str string) string { - c := strings.Split(str, "'") - return c[1] -} - -// takes a dirty timestamp string and turns it into time.Time -func cleanClock(str string) time.Time { - c := string(str[2 : len(str)-2]) - i, err := strconv.ParseInt(c, 10, 64) +func (rmq *RabbitMQParser) SanitizeMsg(msg amqp.Delivery) *client.Point { + ir := &IRMessage{} + data, err := parseBody(msg.Body) if err != nil { - panic(fmt.Errorf("%v: parsing integer", err)) + // log.Println("droping message", string(msg.Body)) + rmq.drops++ + return nil } - return time.Unix(i, 0) -} - -// irMessage -> *strMessage -func (ir *irMessage) toStringMessage() *strMessage { - sm := &strMessage{} - if ir.doubleQuoted { - sm.host = cleanHost(strings.Replace(ir.host, "\"", "'", -1)) - sm.clock = cleanClock(strings.Replace(ir.clock, "\"", "'", -1)) - sm.server = cleanHost(strings.Replace(ir.host, "\"", "'", -1)) - sm.value = ir.value - sm.key = ir.key - } else { - sm.host = cleanHost(ir.host) - sm.clock = cleanClock(ir.clock) - sm.server = cleanHost(ir.server) - sm.value = ir.value - sm.key = ir.key - } - return sm -} - -// irMessage -> *floatMessage -func (ir *irMessage) toFloatMessage() *floatMessage { - fm := &floatMessage{} - if ir.doubleQuoted { - fm.host = cleanHost(strings.Replace(ir.host, "\"", "'", -1)) - fm.clock = cleanClock(strings.Replace(ir.clock, "\"", "'", -1)) - fm.server = cleanHost(strings.Replace(ir.host, "\"", "'", -1)) - i, err := strconv.ParseFloat(ir.value, 64) - if err != nil { - panic(fmt.Errorf("%v: parsing float", err)) - } - fm.value = i - fm.key = ir.key - } else { - fm.host = cleanHost(ir.host) - fm.clock = cleanClock(ir.clock) - fm.server = cleanHost(ir.server) - i, err := strconv.ParseFloat(ir.value, 64) - if err != nil { - j, err := strconv.ParseInt(ir.value, 10, 64) + for key, val := range data { + value := fmt.Sprintf("%v", val) + switch { + case key == "host": + ir.Host = value + case key == "value": + i, err := strconv.ParseFloat(value, 64) if err != nil { - // if we fail to parse a value out of the string we return 0 - log.Printf("Error parsing %v with key %v setting value to 0\n", ir.value, ir.key) + return nil } - i = float64(j) + ir.Value = i + case key == "key": + ir.Key = value + case key == "server": + ir.Server = value + case key == "clock": + ir.Clock = time.Unix(int64(val.(float64)), 0) } - fm.value = i - fm.key = ir.key } - return fm + m := ir.point() + if m == nil { + log.Println("droping message", string(msg.Body)) + rmq.drops++ + return nil + } + return m +} + +// IRMessage is an intermediate representation of the +// point as it moves through the parser +type IRMessage struct { + Host string + Clock time.Time + Value float64 + Key string + Server string +} + +func (ir *IRMessage) point() *client.Point { + meas, tags, fields := structureKey(ir.Key, ir.Value) + tags["host"] = ir.Host + tags["server"] = ir.Server + pt, err := client.NewPoint(meas, tags, fields, ir.Clock) + if err != nil { + panic(fmt.Errorf("%v: creating float point", err)) + } + return pt +} + +func parseBody(msg []byte) (map[string]interface{}, error) { + var data map[string]interface{} + // Try to parse, if not replace single with double quotes + if err := json.Unmarshal(msg, &data); err != nil { + rp := bytes.Replace(msg, []byte("'"), []byte(`"`), -1) + if err := json.Unmarshal(rp, &data); err != nil { + switch err.(type) { + // case *json.UnmarshalFieldError: + // log.Println("UnmarshalFieldError") + // return nil, err + // case *json.UnmarshalTypeError: + // log.Println("UnmarshalTypeError") + // return nil, err + // case *json.UnsupportedTypeError: + // log.Println("UnsupportedTypeError") + // return nil, err + // case *json.UnsupportedValueError: + // log.Println("UnsupportedValueError") + // return nil, err + // case *json.InvalidUnmarshalError: + // log.Println("InvalidUnmarshalError") + // return nil, err + // case *json.InvalidUTF8Error: + // log.Println("InvalidUTF8Error") + // return nil, err + default: + return nil, err + } + } + } + return data, nil +} + +func errPoint() *client.Point { + pt, err := client.NewPoint("err", map[string]string{}, map[string]interface{}{"err": "err"}, time.Now()) + if err != nil { + panic(err) + } + return pt } // This is awful decision tree parsing, but it works... @@ -597,57 +543,10 @@ func trim(s string) string { return s[0 : len(s)-1] } -// common interface for different datatypes -type message interface { - point() *client.Point -} - -// takes an irMessage -> float field -type floatMessage struct { - host string - clock time.Time - value float64 - key string - server string -} - func trimS(s string) string { return s[1 : len(s)-1] } -// satisfies the message interface -func (fm *floatMessage) point() *client.Point { - meas, tags, fields := structureKey(fm.key, fm.value) - tags["host"] = fm.host - tags["server"] = fm.server - pt, err := client.NewPoint(meas, tags, fields, fm.clock) - if err != nil { - panic(fmt.Errorf("%v: creating float point", err)) - } - return pt -} - -// takes an irMessage -> string field -type strMessage struct { - host string - clock time.Time - value string - key string - server string -} - -// satisfies the message interface -func (sm *strMessage) point() *client.Point { - meas, tags, fields := structureKey(sm.key, sm.value) - tags["host"] = sm.host - tags["server"] = sm.server - pt, err := client.NewPoint(meas, tags, fields, sm.clock) - if err != nil { - panic(fmt.Errorf("%v: creating string point", err)) - } - return pt -} - // Stop satisfies the telegraf.ServiceInput interface func (rmq *RabbitMQParser) Stop() { rmq.Lock() diff --git a/rabbit_mq_parser.conf b/rabbit_mq_parser.conf index ec44c7163..db01ca32f 100644 --- a/rabbit_mq_parser.conf +++ b/rabbit_mq_parser.conf @@ -1,5 +1,5 @@ [agent] - interval = "1s" + interval = "10s" round_interval = true metric_batch_size = 10000 metric_buffer_limit = 100000