Kafka output fixup
This commit is contained in:
parent
d71a42cd1b
commit
a093ec1eaa
|
@ -154,26 +154,23 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
|||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
values, err := k.serializer.Serialize(metric)
|
||||
buf, err := k.serializer.Serialize(metric)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var pubErr error
|
||||
for _, value := range values {
|
||||
m := &sarama.ProducerMessage{
|
||||
Topic: k.Topic,
|
||||
Value: sarama.StringEncoder(value),
|
||||
Value: sarama.ByteEncoder(buf),
|
||||
}
|
||||
if h, ok := metric.Tags()[k.RoutingTag]; ok {
|
||||
m.Key = sarama.StringEncoder(h)
|
||||
}
|
||||
|
||||
_, _, pubErr = k.producer.SendMessage(m)
|
||||
}
|
||||
_, _, err = k.producer.SendMessage(m)
|
||||
|
||||
if pubErr != nil {
|
||||
return fmt.Errorf("FAILED to send kafka message: %s\n", pubErr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to send kafka message: %s\n", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue