mqtt_consumer: option to set persistent session and client ID

closes #797
This commit is contained in:
Cameron Sparr
2016-03-07 13:56:10 +01:00
parent 6139a69fa8
commit 41534c73f0
3 changed files with 70 additions and 1 deletions

View File

@@ -26,6 +26,9 @@ type MQTTConsumer struct {
// Legacy metric buffer support
MetricBuffer int
PersistentSession bool
ClientID string `toml:"client_id"`
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
@@ -57,6 +60,13 @@ var sampleConfig = `
"sensors/#",
]
# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
@@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
if m.PersistentSession && m.ClientID == "" {
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
" = true, you MUST also set client_id")
}
m.acc = acc
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
@@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
opts.SetClientID(m.ClientID)
}
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
@@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
}
opts.SetAutoReconnect(true)
opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession)
return opts, nil
}