From 92385a4630eca605d95a0e038bea357f337567fa Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 19 Aug 2019 19:05:22 -0700 Subject: [PATCH] Add topic_tag option to mqtt_consumer (#6266) --- plugins/inputs/mqtt_consumer/README.md | 4 + plugins/inputs/mqtt_consumer/mqtt_consumer.go | 27 +++- .../mqtt_consumer/mqtt_consumer_test.go | 136 ++++++++++++++++++ 3 files changed, 160 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index 53476cb3d..9e60679f6 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -18,6 +18,10 @@ and creates metrics using one of the supported [input data formats][]. "sensors/#", ] + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + ## QoS policy for messages ## 0 = at most once ## 1 = at least once diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 8a6d0d4de..7e3b43d44 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -43,10 +43,11 @@ type Client interface { type ClientFactory func(o *mqtt.ClientOptions) Client type MQTTConsumer struct { - Servers []string - Topics []string - Username string - Password string + Servers []string `toml:"servers"` + Topics []string `toml:"topics"` + TopicTag *string `toml:"topic_tag"` + Username string `toml:"username"` + Password string `toml:"password"` QoS int `toml:"qos"` ConnectionTimeout internal.Duration `toml:"connection_timeout"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` @@ -67,6 +68,7 @@ type MQTTConsumer struct { state ConnectionState sem semaphore messages map[telegraf.TrackingID]bool + topicTag string ctx context.Context cancel context.CancelFunc @@ -84,6 +86,10 @@ var sampleConfig = ` "sensors/#", ] + ## The message topic will be stored in a tag specified by this value. If set + ## to the empty string no topic tag will be created. + # topic_tag = "topic" + ## QoS policy for messages ## 0 = at most once ## 1 = at least once @@ -161,6 +167,11 @@ func (m *MQTTConsumer) Init() error { return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration) } + m.topicTag = "topic" + if m.TopicTag != nil { + m.topicTag = *m.TopicTag + } + opts, err := m.createOpts() if err != nil { return err @@ -267,9 +278,11 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess return err } - topic := msg.Topic() - for _, metric := range metrics { - metric.AddTag("topic", topic) + if m.topicTag != "" { + topic := msg.Topic() + for _, metric := range metrics { + metric.AddTag(m.topicTag, topic) + } } id := acc.AddTrackingMetricGroup(metrics) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 07d2015a8..cbc6ee986 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -143,6 +143,142 @@ func TestPersistentClientIDFail(t *testing.T) { require.Error(t, err) } +type Message struct { +} + +func (m *Message) Duplicate() bool { + panic("not implemented") +} + +func (m *Message) Qos() byte { + panic("not implemented") +} + +func (m *Message) Retained() bool { + panic("not implemented") +} + +func (m *Message) Topic() string { + return "telegraf" +} + +func (m *Message) MessageID() uint16 { + panic("not implemented") +} + +func (m *Message) Payload() []byte { + return []byte("cpu time_idle=42i") +} + +func (m *Message) Ack() { + panic("not implemented") +} + +func TestTopicTag(t *testing.T) { + tests := []struct { + name string + topicTag func() *string + expected []telegraf.Metric + }{ + { + name: "default topic when topic tag is unset for backwards compatibility", + topicTag: func() *string { + return nil + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "topic": "telegraf", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "use topic tag when set", + topicTag: func() *string { + tag := "topic_tag" + return &tag + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "topic_tag": "telegraf", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "no topic tag is added when topic tag is set to the empty string", + topicTag: func() *string { + tag := "" + return &tag + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var handler mqtt.MessageHandler + client := &FakeClient{ + ConnectF: func() mqtt.Token { + return &FakeToken{} + }, + AddRouteF: func(topic string, callback mqtt.MessageHandler) { + handler = callback + }, + SubscribeMultipleF: func(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { + return &FakeToken{} + }, + DisconnectF: func(quiesce uint) { + }, + } + + plugin := New(func(o *mqtt.ClientOptions) Client { + return client + }) + plugin.Topics = []string{"telegraf"} + plugin.TopicTag = tt.topicTag() + + parser, err := parsers.NewInfluxParser() + require.NoError(t, err) + plugin.SetParser(parser) + + err = plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + err = plugin.Start(&acc) + require.NoError(t, err) + + handler(nil, &Message{}) + + plugin.Stop() + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), + testutil.IgnoreTime()) + }) + } +} + func TestAddRouteCalledForEachTopic(t *testing.T) { client := &FakeClient{ ConnectF: func() mqtt.Token {