Use explicit schemas in mqtt_consumer input (#3401)
This commit is contained in:
parent
ed52baf5dd
commit
97a1e4e706
|
@ -10,7 +10,9 @@ The plugin expects messages in the
|
||||||
```toml
|
```toml
|
||||||
# Read metrics from MQTT topic(s)
|
# Read metrics from MQTT topic(s)
|
||||||
[[inputs.mqtt_consumer]]
|
[[inputs.mqtt_consumer]]
|
||||||
servers = ["localhost:1883"]
|
## MQTT broker URLs to be used. The format should be scheme://host:port,
|
||||||
|
## schema can be tcp, ssl, or ws.
|
||||||
|
servers = ["tcp://localhost:1883"]
|
||||||
## MQTT QoS, must be 0, 1, or 2
|
## MQTT QoS, must be 0, 1, or 2
|
||||||
qos = 0
|
qos = 0
|
||||||
## Connection timeout for initial connection in seconds
|
## Connection timeout for initial connection in seconds
|
||||||
|
|
|
@ -56,7 +56,10 @@ type MQTTConsumer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
servers = ["localhost:1883"]
|
## MQTT broker URLs to be used. The format should be scheme://host:port,
|
||||||
|
## schema can be tcp, ssl, or ws.
|
||||||
|
servers = ["tcp://localhost:1883"]
|
||||||
|
|
||||||
## MQTT QoS, must be 0, 1, or 2
|
## MQTT QoS, must be 0, 1, or 2
|
||||||
qos = 0
|
qos = 0
|
||||||
## Connection timeout for initial connection in seconds
|
## Connection timeout for initial connection in seconds
|
||||||
|
@ -239,9 +242,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
scheme := "tcp"
|
|
||||||
if tlsCfg != nil {
|
if tlsCfg != nil {
|
||||||
scheme = "ssl"
|
|
||||||
opts.SetTLSConfig(tlsCfg)
|
opts.SetTLSConfig(tlsCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,8 +258,17 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
||||||
if len(m.Servers) == 0 {
|
if len(m.Servers) == 0 {
|
||||||
return opts, fmt.Errorf("could not get host infomations")
|
return opts, fmt.Errorf("could not get host infomations")
|
||||||
}
|
}
|
||||||
for _, host := range m.Servers {
|
|
||||||
server := fmt.Sprintf("%s://%s", scheme, host)
|
for _, server := range m.Servers {
|
||||||
|
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
|
||||||
|
if !strings.Contains(server, "://") {
|
||||||
|
log.Printf("W! mqtt_consumer server %q should be updated to use `scheme://host:port` format", server)
|
||||||
|
if tlsCfg == nil {
|
||||||
|
server = "tcp://" + server
|
||||||
|
} else {
|
||||||
|
server = "ssl://" + server
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
opts.AddBroker(server)
|
opts.AddBroker(server)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue