diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 5cd6a9771..952f50d99 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -6,9 +6,11 @@ import ( "log" "strings" "sync" + "time" "github.com/Shopify/sarama" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/inputs" @@ -83,6 +85,7 @@ const ( defaultMaxUndeliveredMessages = 1000 defaultMaxMessageLen = 1000000 defaultConsumerGroup = "telegraf_metrics_consumers" + reconnectDelay = 5 * time.Second ) type empty struct{} @@ -259,6 +262,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { err := k.consumer.Consume(ctx, k.Topics, handler) if err != nil { acc.AddError(err) + internal.SleepContext(ctx, reconnectDelay) } } err = k.consumer.Close() diff --git a/plugins/outputs/kafka/README.md b/plugins/outputs/kafka/README.md index 7ced4c5c6..d1cc9f0cb 100644 --- a/plugins/outputs/kafka/README.md +++ b/plugins/outputs/kafka/README.md @@ -10,6 +10,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm ## Kafka topic for producer messages topic = "telegraf" + ## The value of this tag will be used as the topic. If not set the 'topic' + ## option is used. + # topic_tag = "" + + ## If true, the 'topic_tag' will be removed from to the metric. + # exclude_topic_tag = false + ## Optional Client id # client_id = "Telegraf" diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a877b334b..406febc28 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -26,16 +26,18 @@ var zeroTime = time.Unix(0, 0) type ( Kafka struct { - Brokers []string - Topic string + Brokers []string `toml:"brokers"` + Topic string `toml:"topic"` + TopicTag string `toml:"topic_tag"` + ExcludeTopicTag bool `toml:"exclude_topic_tag"` ClientID string `toml:"client_id"` TopicSuffix TopicSuffix `toml:"topic_suffix"` RoutingTag string `toml:"routing_tag"` RoutingKey string `toml:"routing_key"` - CompressionCodec int - RequiredAcks int - MaxRetry int - MaxMessageBytes int `toml:"max_message_bytes"` + CompressionCodec int `toml:"compression_codec"` + RequiredAcks int `toml:"required_acks"` + MaxRetry int `toml:"max_retry"` + MaxMessageBytes int `toml:"max_message_bytes"` Version string `toml:"version"` @@ -57,7 +59,9 @@ type ( Log telegraf.Logger `toml:"-"` tlsConfig tls.Config - producer sarama.SyncProducer + + producerFunc func(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) + producer sarama.SyncProducer serializer serializers.Serializer } @@ -94,6 +98,13 @@ var sampleConfig = ` ## Kafka topic for producer messages topic = "telegraf" + ## The value of this tag will be used as the topic. If not set the 'topic' + ## option is used. + # topic_tag = "" + + ## If true, the 'topic_tag' will be removed from to the metric. + # exclude_topic_tag = false + ## Optional Client id # client_id = "Telegraf" @@ -212,14 +223,29 @@ func ValidateTopicSuffixMethod(method string) error { return fmt.Errorf("Unknown topic suffix method provided: %s", method) } -func (k *Kafka) GetTopicName(metric telegraf.Metric) string { +func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) { + topic := k.Topic + if k.TopicTag != "" { + if t, ok := metric.GetTag(k.TopicTag); ok { + topic = t + + // If excluding the topic tag, a copy is required to avoid modifying + // the metric buffer. + if k.ExcludeTopicTag { + metric = metric.Copy() + metric.Accept() + metric.RemoveTag(k.TopicTag) + } + } + } + var topicName string switch k.TopicSuffix.Method { case "measurement": - topicName = k.Topic + k.TopicSuffix.Separator + metric.Name() + topicName = topic + k.TopicSuffix.Separator + metric.Name() case "tags": var topicNameComponents []string - topicNameComponents = append(topicNameComponents, k.Topic) + topicNameComponents = append(topicNameComponents, topic) for _, tag := range k.TopicSuffix.Keys { tagValue := metric.Tags()[tag] if tagValue != "" { @@ -228,9 +254,9 @@ func (k *Kafka) GetTopicName(metric telegraf.Metric) string { } topicName = strings.Join(topicNameComponents, k.TopicSuffix.Separator) default: - topicName = k.Topic + topicName = topic } - return topicName + return metric, topicName } func (k *Kafka) SetSerializer(serializer serializers.Serializer) { @@ -306,7 +332,7 @@ func (k *Kafka) Connect() error { config.Net.SASL.Version = version } - producer, err := sarama.NewSyncProducer(k.Brokers, config) + producer, err := k.producerFunc(k.Brokers, config) if err != nil { return err } @@ -348,6 +374,8 @@ func (k *Kafka) routingKey(metric telegraf.Metric) (string, error) { func (k *Kafka) Write(metrics []telegraf.Metric) error { msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) for _, metric := range metrics { + metric, topic := k.GetTopicName(metric) + buf, err := k.serializer.Serialize(metric) if err != nil { k.Log.Debugf("Could not serialize metric: %v", err) @@ -355,7 +383,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } m := &sarama.ProducerMessage{ - Topic: k.GetTopicName(metric), + Topic: topic, Value: sarama.ByteEncoder(buf), } @@ -403,6 +431,7 @@ func init() { return &Kafka{ MaxRetry: 3, RequiredAcks: -1, + producerFunc: sarama.NewSyncProducer, } }) } diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index bac51c28d..070eea3f9 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/Shopify/sarama" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers" @@ -81,7 +82,7 @@ func TestTopicSuffixes(t *testing.T) { TopicSuffix: topicSuffix, } - topic := k.GetTopicName(metric) + _, topic := k.GetTopicName(metric) require.Equal(t, expectedTopic, topic) } } @@ -156,3 +157,146 @@ func TestRoutingKey(t *testing.T) { }) } } + +type MockProducer struct { + sent []*sarama.ProducerMessage +} + +func (p *MockProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + p.sent = append(p.sent, msg) + return 0, 0, nil +} + +func (p *MockProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + p.sent = append(p.sent, msgs...) + return nil +} + +func (p *MockProducer) Close() error { + return nil +} + +func NewMockProducer(addrs []string, config *sarama.Config) (sarama.SyncProducer, error) { + return &MockProducer{}, nil +} + +func TestTopicTag(t *testing.T) { + tests := []struct { + name string + plugin *Kafka + input []telegraf.Metric + topic string + value string + }{ + { + name: "static topic", + plugin: &Kafka{ + Brokers: []string{"127.0.0.1"}, + Topic: "telegraf", + producerFunc: NewMockProducer, + }, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + topic: "telegraf", + value: "cpu time_idle=42 0\n", + }, + { + name: "topic tag overrides static topic", + plugin: &Kafka{ + Brokers: []string{"127.0.0.1"}, + Topic: "telegraf", + TopicTag: "topic", + producerFunc: NewMockProducer, + }, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "topic": "xyzzy", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + topic: "xyzzy", + value: "cpu,topic=xyzzy time_idle=42 0\n", + }, + { + name: "missing topic tag falls back to static topic", + plugin: &Kafka{ + Brokers: []string{"127.0.0.1"}, + Topic: "telegraf", + TopicTag: "topic", + producerFunc: NewMockProducer, + }, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + topic: "telegraf", + value: "cpu time_idle=42 0\n", + }, + { + name: "exclude topic tag removes tag", + plugin: &Kafka{ + Brokers: []string{"127.0.0.1"}, + Topic: "telegraf", + TopicTag: "topic", + ExcludeTopicTag: true, + producerFunc: NewMockProducer, + }, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "topic": "xyzzy", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + topic: "xyzzy", + value: "cpu time_idle=42 0\n", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + tt.plugin.SetSerializer(s) + + err = tt.plugin.Connect() + require.NoError(t, err) + + producer := &MockProducer{} + tt.plugin.producer = producer + + err = tt.plugin.Write(tt.input) + require.NoError(t, err) + + require.Equal(t, tt.topic, producer.sent[0].Topic) + + encoded, err := producer.sent[0].Value.Encode() + require.NoError(t, err) + require.Equal(t, tt.value, string(encoded)) + }) + } +}