diff --git a/outputs/amqp/amqp.go b/outputs/amqp/amqp.go index 2d3f1c399..070793fdb 100644 --- a/outputs/amqp/amqp.go +++ b/outputs/amqp/amqp.go @@ -13,8 +13,8 @@ type AMQP struct { URL string // AMQP exchange Exchange string - // Routing key - RoutingTag string + // Routing Key Tag + RoutingTag string `toml:"routing_tag"` channel *amqp.Channel } @@ -24,9 +24,9 @@ var sampleConfig = ` url = "amqp://localhost:5672/influxdb" # AMQP exchange exchange = "telegraf" - # AMQP tag name used as a routing key - # If there's no tag in a point, empty routing key will be used - routing_tag = "dc" + # Telegraf tag to use as a routing key + # ie, if this tag exists, it's value will be used as the routing key + routing_tag = "host" ` func (q *AMQP) Connect() error { diff --git a/outputs/kafka/kafka.go b/outputs/kafka/kafka.go index 49a729b42..370f1ba95 100644 --- a/outputs/kafka/kafka.go +++ b/outputs/kafka/kafka.go @@ -14,6 +14,8 @@ type Kafka struct { Brokers []string // Kafka topic Topic string + // Routing Key Tag + RoutingTag string `toml:"routing_tag"` producer sarama.SyncProducer } @@ -23,6 +25,9 @@ var sampleConfig = ` brokers = ["localhost:9092"] # Kafka topic for producer messages topic = "telegraf" + # Telegraf tag to use as a routing key + # ie, if this tag exists, it's value will be used as the routing key + routing_tag = "host" ` func (k *Kafka) Connect() error { @@ -71,7 +76,7 @@ func (k *Kafka) Write(bp client.BatchPoints) error { Topic: k.Topic, Value: sarama.StringEncoder(value), } - if h, ok := p.Tags["host"]; ok { + if h, ok := p.Tags[k.RoutingTag]; ok { m.Key = sarama.StringEncoder(h) }