package mqtt_consumer import ( "fmt" "log" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) type MQTTConsumer struct { Servers []string Topics []string Username string Password string MetricBuffer int QoS int `toml:"qos"` // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file SSLCert string `toml:"ssl_cert"` // Path to cert key file SSLKey string `toml:"ssl_key"` // Use SSL but skip chain & host verification InsecureSkipVerify bool sync.Mutex client *mqtt.Client // channel for all incoming parsed mqtt metrics metricC chan telegraf.Metric done chan struct{} in chan mqtt.Message } var sampleConfig = ` servers = ["localhost:1883"] ### MQTT QoS, must be 0, 1, or 2 qos = 0 ### Topics to subscribe to topics = [ "telegraf/host01/cpu", "telegraf/+/mem", "sensors/#", ] ### Maximum number of metrics to buffer between collection intervals metric_buffer = 100000 ### username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" ### Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" # ssl_key = "/etc/telegraf/key.pem" ### Use SSL but skip chain & host verification # insecure_skip_verify = false ` func (m *MQTTConsumer) SampleConfig() string { return sampleConfig } func (m *MQTTConsumer) Description() string { return "Read line-protocol metrics from MQTT topic(s)" } func (m *MQTTConsumer) Start() error { m.Lock() defer m.Unlock() if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) } opts, err := m.createOpts() if err != nil { return err } m.client = mqtt.NewClient(opts) if token := m.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } m.in = make(chan mqtt.Message, m.MetricBuffer) m.done = make(chan struct{}) if m.MetricBuffer == 0 { m.MetricBuffer = 100000 } m.metricC = make(chan telegraf.Metric, m.MetricBuffer) topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) } subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { return subscribeToken.Error() } go m.parser() return nil } func (m *MQTTConsumer) parser() { for { select { case <-m.done: return case msg := <-m.in: parser := telegraf.NewMetricParser() parser.DefaultTags = make(map[string]string) parser.DefaultTags["topic"] = msg.Topic() metrics, err := parser.Parse(msg.Payload()) if err != nil { log.Printf("Could not parse MQTT message: %s, error: %s", string(msg.Payload()), err.Error()) } for _, metric := range metrics { select { case m.metricC <- metric: continue default: log.Printf("MQTT Consumer buffer is full, dropping a metric." + " You may want to increase the metric_buffer setting") } } } } } func (m *MQTTConsumer) recvMessage(_ *mqtt.Client, msg mqtt.Message) { m.in <- msg } func (m *MQTTConsumer) Stop() { m.Lock() defer m.Unlock() close(m.done) m.client.Disconnect(200) } func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() nmetrics := len(m.metricC) for i := 0; i < nmetrics; i++ { metric := <-m.metricC acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) tlsCfg, err := internal.GetTLSConfig( m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) if err != nil { return nil, err } scheme := "tcp" if tlsCfg != nil { scheme = "ssl" opts.SetTLSConfig(tlsCfg) } user := m.Username if user == "" { opts.SetUsername(user) } password := m.Password if password != "" { opts.SetPassword(password) } if len(m.Servers) == 0 { return opts, fmt.Errorf("could not get host infomations") } for _, host := range m.Servers { server := fmt.Sprintf("%s://%s", scheme, host) opts.AddBroker(server) } opts.SetAutoReconnect(true) // Setting KeepAlive to 0 disables it. // TODO set KeepAlive to a real value (60s?) when this change is merged: // https://git.eclipse.org/r/#/c/65850/ opts.SetKeepAlive(time.Duration(0)) return opts, nil } func init() { inputs.Add("mqtt_consumer", func() telegraf.Input { return &MQTTConsumer{} }) }