Combine BatchPoints with the same RoutingTag to one message in amqp output

closes #287
This commit is contained in:
Eugene Dementiev 2015-10-21 20:25:36 +03:00 committed by Cameron Sparr
parent dfc59866e8
commit 553208a960
3 changed files with 8 additions and 1 deletions

View File

@ -28,6 +28,7 @@ of metrics collected and from how many plugins.
- [#280](https://github.com/influxdb/telegraf/issues/280): Use InfluxDB client v2. - [#280](https://github.com/influxdb/telegraf/issues/280): Use InfluxDB client v2.
- [#281](https://github.com/influxdb/telegraf/issues/281): Eliminate need to deep copy Batch Points. - [#281](https://github.com/influxdb/telegraf/issues/281): Eliminate need to deep copy Batch Points.
- [#286](https://github.com/influxdb/telegraf/issues/286): bcache plugin, thanks @cornerot! - [#286](https://github.com/influxdb/telegraf/issues/286): bcache plugin, thanks @cornerot!
- [#287](https://github.com/influxdb/telegraf/issues/287): Batch AMQP output, thanks @ekini!
### Bugfixes ### Bugfixes
- [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini! - [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini!

View File

@ -4,5 +4,6 @@ This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key. as RoutingTag, as a routing key.
If RoutingTag is empty, then empty routing key will be used. If RoutingTag is empty, then empty routing key will be used.
Metrics are grouped in batches by RoutingTag.
This plugin doesn't bind exchange to a queue, so it should be done by consumer. This plugin doesn't bind exchange to a queue, so it should be done by consumer.

View File

@ -1,6 +1,7 @@
package amqp package amqp
import ( import (
"bytes"
"fmt" "fmt"
"log" "log"
"sync" "sync"
@ -88,6 +89,7 @@ func (q *AMQP) Write(points []*client.Point) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
var outbuf = make(map[string][][]byte)
for _, p := range points { for _, p := range points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
@ -100,7 +102,10 @@ func (q *AMQP) Write(points []*client.Point) error {
key = h key = h
} }
} }
outbuf[key] = append(outbuf[key], []byte(value))
}
for key, buf := range outbuf {
err := q.channel.Publish( err := q.channel.Publish(
q.Exchange, // exchange q.Exchange, // exchange
key, // routing key key, // routing key
@ -108,7 +113,7 @@ func (q *AMQP) Write(points []*client.Point) error {
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: []byte(value), Body: bytes.Join(buf, []byte("\n")),
}) })
if err != nil { if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err) return fmt.Errorf("FAILED to send amqp message: %s", err)