From 553208a96015a080d552c27d3aacaef997a75681 Mon Sep 17 00:00:00 2001 From: Eugene Dementiev Date: Wed, 21 Oct 2015 20:25:36 +0300 Subject: [PATCH] Combine BatchPoints with the same RoutingTag to one message in amqp output closes #287 --- CHANGELOG.md | 1 + outputs/amqp/README.md | 1 + outputs/amqp/amqp.go | 7 ++++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a6d7d190..495187379 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ of metrics collected and from how many plugins. - [#280](https://github.com/influxdb/telegraf/issues/280): Use InfluxDB client v2. - [#281](https://github.com/influxdb/telegraf/issues/281): Eliminate need to deep copy Batch Points. - [#286](https://github.com/influxdb/telegraf/issues/286): bcache plugin, thanks @cornerot! +- [#287](https://github.com/influxdb/telegraf/issues/287): Batch AMQP output, thanks @ekini! ### Bugfixes - [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini! diff --git a/outputs/amqp/README.md b/outputs/amqp/README.md index e708e3496..2fdedfbf1 100644 --- a/outputs/amqp/README.md +++ b/outputs/amqp/README.md @@ -4,5 +4,6 @@ This plugin writes to a AMQP exchange using tag, defined in configuration file as RoutingTag, as a routing key. If RoutingTag is empty, then empty routing key will be used. +Metrics are grouped in batches by RoutingTag. This plugin doesn't bind exchange to a queue, so it should be done by consumer. diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index b8ae0501d..e33aad274 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -1,6 +1,7 @@ package amqp import ( + "bytes" "fmt" "log" "sync" @@ -88,6 +89,7 @@ func (q *AMQP) Write(points []*client.Point) error { if len(points) == 0 { return nil } + var outbuf = make(map[string][][]byte) for _, p := range points { // Combine tags from Point and BatchPoints and grab the resulting @@ -100,7 +102,10 @@ func (q *AMQP) Write(points []*client.Point) error { key = h } } + outbuf[key] = append(outbuf[key], []byte(value)) + } + for key, buf := range outbuf { err := q.channel.Publish( q.Exchange, // exchange key, // routing key @@ -108,7 +113,7 @@ func (q *AMQP) Write(points []*client.Point) error { false, // immediate amqp.Publishing{ ContentType: "text/plain", - Body: []byte(value), + Body: bytes.Join(buf, []byte("\n")), }) if err != nil { return fmt.Errorf("FAILED to send amqp message: %s", err)