AMQP output plugin typo fixes and added README and RoutingTag

This commit is contained in:
Eugene Dementiev 2015-09-16 03:25:56 +03:00 committed by Cameron Sparr
parent f00d43aa09
commit 5d280e4d25
4 changed files with 27 additions and 8 deletions

8
outputs/amqp/README.md Normal file
View File

@ -0,0 +1,8 @@
# AMQP Output Plugin
This plugin writes to a AMQP exchange using tag, defined in configuration file
as RoutingTag, as a routing key.
If RoutingTag is empty, then empty routing key will be used.
This plugin doesn't bind exchange to a queue, so it should be done by consumer.

View File

@ -14,7 +14,7 @@ type AMQP struct {
// AMQP exchange // AMQP exchange
Exchange string Exchange string
// Routing key // Routing key
RoutingKey string RoutingTag string
channel *amqp.Channel channel *amqp.Channel
} }
@ -24,6 +24,9 @@ var sampleConfig = `
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
# AMQP exchange # AMQP exchange
exchange = "telegraf" exchange = "telegraf"
# AMQP tag name used as a routing key
# If there's no tag in a point, empty routing key will be used
routing_tag = "dc"
` `
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {
@ -71,7 +74,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error {
for _, p := range bp.Points { for _, p := range bp.Points {
// Combine tags from Point and BatchPoints and grab the resulting // Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to Kafka // line-protocol output string to write to AMQP
var value, key string var value, key string
if p.Raw != "" { if p.Raw != "" {
value = p.Raw value = p.Raw
@ -85,8 +88,10 @@ func (q *AMQP) Write(bp client.BatchPoints) error {
value = p.MarshalString() value = p.MarshalString()
} }
if h, ok := p.Tags["dc"]; ok { if q.RoutingTag != "" {
key = h if h, ok := p.Tags[q.RoutingTag]; ok {
key = h
}
} }
err := q.channel.Publish( err := q.channel.Publish(
@ -99,7 +104,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error {
Body: []byte(value), Body: []byte(value),
}) })
if err != nil { if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s\n", err) return fmt.Errorf("FAILED to send amqp message: %s", err)
} }
} }
return nil return nil

View File

@ -18,11 +18,11 @@ func TestConnectAndWrite(t *testing.T) {
Exchange: "telegraf_test", Exchange: "telegraf_test",
} }
// Verify that we can connect to the Kafka broker // Verify that we can connect to the AMQP broker
err := q.Connect() err := q.Connect()
require.NoError(t, err) require.NoError(t, err)
// Verify that we can successfully write data to the kafka broker // Verify that we can successfully write data to the amqp broker
err = q.Write(testutil.MockBatchPoints()) err = q.Write(testutil.MockBatchPoints())
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -26,8 +26,14 @@ kafka:
ADVERTISED_HOST: ADVERTISED_HOST:
ADVERTISED_PORT: 9092 ADVERTISED_PORT: 9092
rabbitmq:
image: rabbitmq:3-management
hostname: docker_rabbit
ports:
- "15672:15672"
- "5672:5672"
opentsdb: opentsdb:
image: lancope/opentsdb image: lancope/opentsdb
ports: ports:
- "24242:4242" - "24242:4242"