Combine BatchPoints with the same RoutingTag to one message in amqp output

This commit is contained in:
Eugene Dementiev 2015-10-21 20:25:36 +03:00
parent ac685d19f8
commit a42b6fa53c
2 changed files with 7 additions and 1 deletions

View File

@ -4,5 +4,6 @@ This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key. as RoutingTag, as a routing key.
If RoutingTag is empty, then empty routing key will be used. If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. This plugin doesn't bind exchange to a queue, so it should be done by consumer.

View File

@ -1,6 +1,7 @@
package amqp package amqp
import ( import (
"bytes"
"fmt" "fmt"
"log" "log"
"sync" "sync"
@ -88,6 +89,7 @@ func (q *AMQP) Write(points []*client.Point) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
var outbuf = make(map[string][][]byte)
for _, p := range points { for _, p := range points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
@ -100,7 +102,10 @@ func (q *AMQP) Write(points []*client.Point) error {
key = h key = h
} }
} }
outbuf[key] = append(outbuf[key], []byte(value))
}
for key, buf := range outbuf {
err := q.channel.Publish( err := q.channel.Publish(
q.Exchange, // exchange q.Exchange, // exchange
key, // routing key key, // routing key
@ -108,7 +113,7 @@ func (q *AMQP) Write(points []*client.Point) error {
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: []byte(value), Body: bytes.Join(buf, []byte("\n")),
}) })
if err != nil { if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err) return fmt.Errorf("FAILED to send amqp message: %s", err)