Use explicit schemas in mqtt_consumer input (#3401)

This commit is contained in:
Daniel Nelson 2017-10-30 15:33:20 -07:00 committed by GitHub
parent 4d5de8698b
commit fcfcc803b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 6 deletions

View File

@ -10,7 +10,9 @@ The plugin expects messages in the
```toml ```toml
# Read metrics from MQTT topic(s) # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]] [[inputs.mqtt_consumer]]
servers = ["localhost:1883"] ## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2 ## MQTT QoS, must be 0, 1, or 2
qos = 0 qos = 0
## Connection timeout for initial connection in seconds ## Connection timeout for initial connection in seconds

View File

@ -56,7 +56,10 @@ type MQTTConsumer struct {
} }
var sampleConfig = ` var sampleConfig = `
servers = ["localhost:1883"] ## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## MQTT QoS, must be 0, 1, or 2 ## MQTT QoS, must be 0, 1, or 2
qos = 0 qos = 0
## Connection timeout for initial connection in seconds ## Connection timeout for initial connection in seconds
@ -239,9 +242,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return nil, err return nil, err
} }
scheme := "tcp"
if tlsCfg != nil { if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg) opts.SetTLSConfig(tlsCfg)
} }
@ -257,8 +258,17 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
if len(m.Servers) == 0 { if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host infomations") return opts, fmt.Errorf("could not get host infomations")
} }
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host) for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server)
if tlsCfg == nil {
server = "tcp://" + server
} else {
server = "ssl://" + server
}
}
opts.AddBroker(server) opts.AddBroker(server)
} }