AMQP routing tag doc & add routing tag for Kafka

closes #200
This commit is contained in:
Cameron Sparr 2015-09-16 12:10:26 -07:00
parent c843b53c30
commit d979ee5573
2 changed files with 11 additions and 6 deletions

View File

@ -13,8 +13,8 @@ type AMQP struct {
URL string URL string
// AMQP exchange // AMQP exchange
Exchange string Exchange string
// Routing key // Routing Key Tag
RoutingTag string RoutingTag string `toml:"routing_tag"`
channel *amqp.Channel channel *amqp.Channel
} }
@ -24,9 +24,9 @@ var sampleConfig = `
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
# AMQP exchange # AMQP exchange
exchange = "telegraf" exchange = "telegraf"
# AMQP tag name used as a routing key # Telegraf tag to use as a routing key
# If there's no tag in a point, empty routing key will be used # ie, if this tag exists, it's value will be used as the routing key
routing_tag = "dc" routing_tag = "host"
` `
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {

View File

@ -14,6 +14,8 @@ type Kafka struct {
Brokers []string Brokers []string
// Kafka topic // Kafka topic
Topic string Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@ -23,6 +25,9 @@ var sampleConfig = `
brokers = ["localhost:9092"] brokers = ["localhost:9092"]
# Kafka topic for producer messages # Kafka topic for producer messages
topic = "telegraf" topic = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
` `
func (k *Kafka) Connect() error { func (k *Kafka) Connect() error {
@ -71,7 +76,7 @@ func (k *Kafka) Write(bp client.BatchPoints) error {
Topic: k.Topic, Topic: k.Topic,
Value: sarama.StringEncoder(value), Value: sarama.StringEncoder(value),
} }
if h, ok := p.Tags["host"]; ok { if h, ok := p.Tags[k.RoutingTag]; ok {
m.Key = sarama.StringEncoder(h) m.Key = sarama.StringEncoder(h)
} }