From 97a1e4e706d0d15c8dcc05ac284f9a394eeb66e0 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 30 Oct 2017 15:33:20 -0700 Subject: [PATCH] Use explicit schemas in mqtt_consumer input (#3401) --- plugins/inputs/mqtt_consumer/README.md | 4 +++- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 20 ++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index 52990ef76..2889bde59 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -10,7 +10,9 @@ The plugin expects messages in the ```toml # Read metrics from MQTT topic(s) [[inputs.mqtt_consumer]] - servers = ["localhost:1883"] + ## MQTT broker URLs to be used. The format should be scheme://host:port, + ## schema can be tcp, ssl, or ws. + servers = ["tcp://localhost:1883"] ## MQTT QoS, must be 0, 1, or 2 qos = 0 ## Connection timeout for initial connection in seconds diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index ddffbf258..6903f654d 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -56,7 +56,10 @@ type MQTTConsumer struct { } var sampleConfig = ` - servers = ["localhost:1883"] + ## MQTT broker URLs to be used. The format should be scheme://host:port, + ## schema can be tcp, ssl, or ws. + servers = ["tcp://localhost:1883"] + ## MQTT QoS, must be 0, 1, or 2 qos = 0 ## Connection timeout for initial connection in seconds @@ -239,9 +242,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { return nil, err } - scheme := "tcp" if tlsCfg != nil { - scheme = "ssl" opts.SetTLSConfig(tlsCfg) } @@ -257,8 +258,17 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { if len(m.Servers) == 0 { return opts, fmt.Errorf("could not get host infomations") } - for _, host := range m.Servers { - server := fmt.Sprintf("%s://%s", scheme, host) + + for _, server := range m.Servers { + // Preserve support for host:port style servers; deprecated in Telegraf 1.4.4 + if !strings.Contains(server, "://") { + log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server) + if tlsCfg == nil { + server = "tcp://" + server + } else { + server = "ssl://" + server + } + } opts.AddBroker(server) }