Add topic_tag option to mqtt_consumer (#6266)
This commit is contained in:
		
							parent
							
								
									f45ba14f73
								
							
						
					
					
						commit
						92385a4630
					
				|  | @ -18,6 +18,10 @@ and creates metrics using one of the supported [input data formats][]. | ||||||
|     "sensors/#", |     "sensors/#", | ||||||
|   ] |   ] | ||||||
| 
 | 
 | ||||||
|  |   ## The message topic will be stored in a tag specified by this value.  If set | ||||||
|  |   ## to the empty string no topic tag will be created. | ||||||
|  |   # topic_tag = "topic" | ||||||
|  | 
 | ||||||
|   ## QoS policy for messages |   ## QoS policy for messages | ||||||
|   ##   0 = at most once |   ##   0 = at most once | ||||||
|   ##   1 = at least once |   ##   1 = at least once | ||||||
|  |  | ||||||
|  | @ -43,10 +43,11 @@ type Client interface { | ||||||
| type ClientFactory func(o *mqtt.ClientOptions) Client | type ClientFactory func(o *mqtt.ClientOptions) Client | ||||||
| 
 | 
 | ||||||
| type MQTTConsumer struct { | type MQTTConsumer struct { | ||||||
| 	Servers                []string | 	Servers                []string          `toml:"servers"` | ||||||
| 	Topics                 []string | 	Topics                 []string          `toml:"topics"` | ||||||
| 	Username               string | 	TopicTag               *string           `toml:"topic_tag"` | ||||||
| 	Password               string | 	Username               string            `toml:"username"` | ||||||
|  | 	Password               string            `toml:"password"` | ||||||
| 	QoS                    int               `toml:"qos"` | 	QoS                    int               `toml:"qos"` | ||||||
| 	ConnectionTimeout      internal.Duration `toml:"connection_timeout"` | 	ConnectionTimeout      internal.Duration `toml:"connection_timeout"` | ||||||
| 	MaxUndeliveredMessages int               `toml:"max_undelivered_messages"` | 	MaxUndeliveredMessages int               `toml:"max_undelivered_messages"` | ||||||
|  | @ -67,6 +68,7 @@ type MQTTConsumer struct { | ||||||
| 	state         ConnectionState | 	state         ConnectionState | ||||||
| 	sem           semaphore | 	sem           semaphore | ||||||
| 	messages      map[telegraf.TrackingID]bool | 	messages      map[telegraf.TrackingID]bool | ||||||
|  | 	topicTag      string | ||||||
| 
 | 
 | ||||||
| 	ctx    context.Context | 	ctx    context.Context | ||||||
| 	cancel context.CancelFunc | 	cancel context.CancelFunc | ||||||
|  | @ -84,6 +86,10 @@ var sampleConfig = ` | ||||||
|     "sensors/#", |     "sensors/#", | ||||||
|   ] |   ] | ||||||
| 
 | 
 | ||||||
|  |   ## The message topic will be stored in a tag specified by this value.  If set | ||||||
|  |   ## to the empty string no topic tag will be created. | ||||||
|  |   # topic_tag = "topic" | ||||||
|  | 
 | ||||||
|   ## QoS policy for messages |   ## QoS policy for messages | ||||||
|   ##   0 = at most once |   ##   0 = at most once | ||||||
|   ##   1 = at least once |   ##   1 = at least once | ||||||
|  | @ -161,6 +167,11 @@ func (m *MQTTConsumer) Init() error { | ||||||
| 		return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration) | 		return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	m.topicTag = "topic" | ||||||
|  | 	if m.TopicTag != nil { | ||||||
|  | 		m.topicTag = *m.TopicTag | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	opts, err := m.createOpts() | 	opts, err := m.createOpts() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  | @ -267,9 +278,11 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	if m.topicTag != "" { | ||||||
| 		topic := msg.Topic() | 		topic := msg.Topic() | ||||||
| 		for _, metric := range metrics { | 		for _, metric := range metrics { | ||||||
| 		metric.AddTag("topic", topic) | 			metric.AddTag(m.topicTag, topic) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	id := acc.AddTrackingMetricGroup(metrics) | 	id := acc.AddTrackingMetricGroup(metrics) | ||||||
|  |  | ||||||
|  | @ -143,6 +143,142 @@ func TestPersistentClientIDFail(t *testing.T) { | ||||||
| 	require.Error(t, err) | 	require.Error(t, err) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type Message struct { | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Duplicate() bool { | ||||||
|  | 	panic("not implemented") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Qos() byte { | ||||||
|  | 	panic("not implemented") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Retained() bool { | ||||||
|  | 	panic("not implemented") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Topic() string { | ||||||
|  | 	return "telegraf" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) MessageID() uint16 { | ||||||
|  | 	panic("not implemented") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Payload() []byte { | ||||||
|  | 	return []byte("cpu time_idle=42i") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *Message) Ack() { | ||||||
|  | 	panic("not implemented") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestTopicTag(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name     string | ||||||
|  | 		topicTag func() *string | ||||||
|  | 		expected []telegraf.Metric | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name: "default topic when topic tag is unset for backwards compatibility", | ||||||
|  | 			topicTag: func() *string { | ||||||
|  | 				return nil | ||||||
|  | 			}, | ||||||
|  | 			expected: []telegraf.Metric{ | ||||||
|  | 				testutil.MustMetric( | ||||||
|  | 					"cpu", | ||||||
|  | 					map[string]string{ | ||||||
|  | 						"topic": "telegraf", | ||||||
|  | 					}, | ||||||
|  | 					map[string]interface{}{ | ||||||
|  | 						"time_idle": 42, | ||||||
|  | 					}, | ||||||
|  | 					time.Unix(0, 0), | ||||||
|  | 				), | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "use topic tag when set", | ||||||
|  | 			topicTag: func() *string { | ||||||
|  | 				tag := "topic_tag" | ||||||
|  | 				return &tag | ||||||
|  | 			}, | ||||||
|  | 			expected: []telegraf.Metric{ | ||||||
|  | 				testutil.MustMetric( | ||||||
|  | 					"cpu", | ||||||
|  | 					map[string]string{ | ||||||
|  | 						"topic_tag": "telegraf", | ||||||
|  | 					}, | ||||||
|  | 					map[string]interface{}{ | ||||||
|  | 						"time_idle": 42, | ||||||
|  | 					}, | ||||||
|  | 					time.Unix(0, 0), | ||||||
|  | 				), | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "no topic tag is added when topic tag is set to the empty string", | ||||||
|  | 			topicTag: func() *string { | ||||||
|  | 				tag := "" | ||||||
|  | 				return &tag | ||||||
|  | 			}, | ||||||
|  | 			expected: []telegraf.Metric{ | ||||||
|  | 				testutil.MustMetric( | ||||||
|  | 					"cpu", | ||||||
|  | 					map[string]string{}, | ||||||
|  | 					map[string]interface{}{ | ||||||
|  | 						"time_idle": 42, | ||||||
|  | 					}, | ||||||
|  | 					time.Unix(0, 0), | ||||||
|  | 				), | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, tt := range tests { | ||||||
|  | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|  | 			var handler mqtt.MessageHandler | ||||||
|  | 			client := &FakeClient{ | ||||||
|  | 				ConnectF: func() mqtt.Token { | ||||||
|  | 					return &FakeToken{} | ||||||
|  | 				}, | ||||||
|  | 				AddRouteF: func(topic string, callback mqtt.MessageHandler) { | ||||||
|  | 					handler = callback | ||||||
|  | 				}, | ||||||
|  | 				SubscribeMultipleF: func(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { | ||||||
|  | 					return &FakeToken{} | ||||||
|  | 				}, | ||||||
|  | 				DisconnectF: func(quiesce uint) { | ||||||
|  | 				}, | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			plugin := New(func(o *mqtt.ClientOptions) Client { | ||||||
|  | 				return client | ||||||
|  | 			}) | ||||||
|  | 			plugin.Topics = []string{"telegraf"} | ||||||
|  | 			plugin.TopicTag = tt.topicTag() | ||||||
|  | 
 | ||||||
|  | 			parser, err := parsers.NewInfluxParser() | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 			plugin.SetParser(parser) | ||||||
|  | 
 | ||||||
|  | 			err = plugin.Init() | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 			var acc testutil.Accumulator | ||||||
|  | 			err = plugin.Start(&acc) | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 			handler(nil, &Message{}) | ||||||
|  | 
 | ||||||
|  | 			plugin.Stop() | ||||||
|  | 
 | ||||||
|  | 			testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), | ||||||
|  | 				testutil.IgnoreTime()) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestAddRouteCalledForEachTopic(t *testing.T) { | func TestAddRouteCalledForEachTopic(t *testing.T) { | ||||||
| 	client := &FakeClient{ | 	client := &FakeClient{ | ||||||
| 		ConnectF: func() mqtt.Token { | 		ConnectF: func() mqtt.Token { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue