diff --git a/outputs/amqp/README.md b/outputs/amqp/README.md new file mode 100644 index 000000000..e708e3496 --- /dev/null +++ b/outputs/amqp/README.md @@ -0,0 +1,8 @@ +# AMQP Output Plugin + +This plugin writes to a AMQP exchange using tag, defined in configuration file +as RoutingTag, as a routing key. + +If RoutingTag is empty, then empty routing key will be used. + +This plugin doesn't bind exchange to a queue, so it should be done by consumer. diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index 9a657311c..2d3f1c399 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -14,7 +14,7 @@ type AMQP struct { // AMQP exchange Exchange string // Routing key - RoutingKey string + RoutingTag string channel *amqp.Channel } @@ -24,6 +24,9 @@ var sampleConfig = ` url = "amqp://localhost:5672/influxdb" # AMQP exchange exchange = "telegraf" + # AMQP tag name used as a routing key + # If there's no tag in a point, empty routing key will be used + routing_tag = "dc" ` func (q *AMQP) Connect() error { @@ -71,7 +74,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error { for _, p := range bp.Points { // Combine tags from Point and BatchPoints and grab the resulting - // line-protocol output string to write to Kafka + // line-protocol output string to write to AMQP var value, key string if p.Raw != "" { value = p.Raw @@ -85,8 +88,10 @@ func (q *AMQP) Write(bp client.BatchPoints) error { value = p.MarshalString() } - if h, ok := p.Tags["dc"]; ok { - key = h + if q.RoutingTag != "" { + if h, ok := p.Tags[q.RoutingTag]; ok { + key = h + } } err := q.channel.Publish( @@ -99,7 +104,7 @@ func (q *AMQP) Write(bp client.BatchPoints) error { Body: []byte(value), }) if err != nil { - return fmt.Errorf("FAILED to send amqp message: %s\n", err) + return fmt.Errorf("FAILED to send amqp message: %s", err) } } return nil diff --git a/outputs/amqp/amqp_test.go b/outputs/amqp/amqp_test.go index 15781fbc9..247801f9e 100644 --- a/outputs/amqp/amqp_test.go +++ b/outputs/amqp/amqp_test.go @@ -18,11 +18,11 @@ func TestConnectAndWrite(t *testing.T) { Exchange: "telegraf_test", } - // Verify that we can connect to the Kafka broker + // Verify that we can connect to the AMQP broker err := q.Connect() require.NoError(t, err) - // Verify that we can successfully write data to the kafka broker + // Verify that we can successfully write data to the amqp broker err = q.Write(testutil.MockBatchPoints()) require.NoError(t, err) } diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index a41cb67f4..bf1a40ef3 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -26,8 +26,14 @@ kafka: ADVERTISED_HOST: ADVERTISED_PORT: 9092 +rabbitmq: + image: rabbitmq:3-management + hostname: docker_rabbit + ports: + - "15672:15672" + - "5672:5672" + opentsdb: image: lancope/opentsdb ports: - "24242:4242" -