Use explicit schemas in mqtt_consumer input (#3401)

(cherry picked from commit fcfcc803b1)
This commit is contained in:
Daniel Nelson 2017-10-30 15:33:20 -07:00 committed by Daniel Nelson
parent f416f429d7
commit f29a994743
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
2 changed files with 18 additions and 6 deletions

View File

@ -10,7 +10,9 @@ The plugin expects messages in the
```toml
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["localhost:1883"]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds

View File

@ -56,7 +56,10 @@ type MQTTConsumer struct {
}
var sampleConfig = `
servers = ["localhost:1883"]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
@ -239,9 +242,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return nil, err
}
scheme := "tcp"
if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg)
}
@ -257,8 +258,17 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host infomations")
}
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host)
for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server)
if tlsCfg == nil {
server = "tcp://" + server
} else {
server = "ssl://" + server
}
}
opts.AddBroker(server)
}