mqtt_consumer: option to set persistent session and client ID
closes #797
This commit is contained in:
parent
7914c34d1f
commit
bf8a7b4a17
|
@ -11,6 +11,7 @@
|
||||||
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
|
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
|
||||||
- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
|
- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
|
||||||
- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
|
- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.
|
||||||
|
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
|
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
|
||||||
|
|
|
@ -26,6 +26,9 @@ type MQTTConsumer struct {
|
||||||
// Legacy metric buffer support
|
// Legacy metric buffer support
|
||||||
MetricBuffer int
|
MetricBuffer int
|
||||||
|
|
||||||
|
PersistentSession bool
|
||||||
|
ClientID string `toml:"client_id"`
|
||||||
|
|
||||||
// Path to CA file
|
// Path to CA file
|
||||||
SSLCA string `toml:"ssl_ca"`
|
SSLCA string `toml:"ssl_ca"`
|
||||||
// Path to host cert file
|
// Path to host cert file
|
||||||
|
@ -57,6 +60,13 @@ var sampleConfig = `
|
||||||
"sensors/#",
|
"sensors/#",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# if true, messages that can't be delivered while the subscriber is offline
|
||||||
|
# will be delivered when it comes back (such as on service restart).
|
||||||
|
# NOTE: if true, client_id MUST be set
|
||||||
|
persistent_session = false
|
||||||
|
# If empty, a random client ID will be generated.
|
||||||
|
client_id = ""
|
||||||
|
|
||||||
## username and password to connect MQTT server.
|
## username and password to connect MQTT server.
|
||||||
# username = "telegraf"
|
# username = "telegraf"
|
||||||
# password = "metricsmetricsmetricsmetrics"
|
# password = "metricsmetricsmetricsmetrics"
|
||||||
|
@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
|
||||||
|
if m.PersistentSession && m.ClientID == "" {
|
||||||
|
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
|
||||||
|
" = true, you MUST also set client_id")
|
||||||
|
}
|
||||||
|
|
||||||
m.acc = acc
|
m.acc = acc
|
||||||
if m.QoS > 2 || m.QoS < 0 {
|
if m.QoS > 2 || m.QoS < 0 {
|
||||||
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
|
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
|
||||||
|
@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
|
||||||
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
||||||
opts := mqtt.NewClientOptions()
|
opts := mqtt.NewClientOptions()
|
||||||
|
|
||||||
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
|
if m.ClientID == "" {
|
||||||
|
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
|
||||||
|
} else {
|
||||||
|
opts.SetClientID(m.ClientID)
|
||||||
|
}
|
||||||
|
|
||||||
tlsCfg, err := internal.GetTLSConfig(
|
tlsCfg, err := internal.GetTLSConfig(
|
||||||
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
|
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
|
||||||
|
@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
||||||
}
|
}
|
||||||
opts.SetAutoReconnect(true)
|
opts.SetAutoReconnect(true)
|
||||||
opts.SetKeepAlive(time.Second * 60)
|
opts.SetKeepAlive(time.Second * 60)
|
||||||
|
opts.SetCleanSession(!m.PersistentSession)
|
||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
|
"git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
|
||||||
return n, in
|
return n, in
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that default client has random ID
|
||||||
|
func TestRandomClientID(t *testing.T) {
|
||||||
|
m1 := &MQTTConsumer{
|
||||||
|
Servers: []string{"localhost:1883"}}
|
||||||
|
opts, err := m1.createOpts()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
m2 := &MQTTConsumer{
|
||||||
|
Servers: []string{"localhost:1883"}}
|
||||||
|
opts2, err2 := m2.createOpts()
|
||||||
|
assert.NoError(t, err2)
|
||||||
|
|
||||||
|
assert.NotEqual(t, opts.ClientID, opts2.ClientID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that default client has random ID
|
||||||
|
func TestClientID(t *testing.T) {
|
||||||
|
m1 := &MQTTConsumer{
|
||||||
|
Servers: []string{"localhost:1883"},
|
||||||
|
ClientID: "telegraf-test",
|
||||||
|
}
|
||||||
|
opts, err := m1.createOpts()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
m2 := &MQTTConsumer{
|
||||||
|
Servers: []string{"localhost:1883"},
|
||||||
|
ClientID: "telegraf-test",
|
||||||
|
}
|
||||||
|
opts2, err2 := m2.createOpts()
|
||||||
|
assert.NoError(t, err2)
|
||||||
|
|
||||||
|
assert.Equal(t, "telegraf-test", opts2.ClientID)
|
||||||
|
assert.Equal(t, "telegraf-test", opts.ClientID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that Start() fails if client ID is not set but persistent is
|
||||||
|
func TestPersistentClientIDFail(t *testing.T) {
|
||||||
|
m1 := &MQTTConsumer{
|
||||||
|
Servers: []string{"localhost:1883"},
|
||||||
|
PersistentSession: true,
|
||||||
|
}
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
err := m1.Start(&acc)
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that the parser parses NATS messages into metrics
|
// Test that the parser parses NATS messages into metrics
|
||||||
func TestRunParser(t *testing.T) {
|
func TestRunParser(t *testing.T) {
|
||||||
n, in := newTestMQTTConsumer()
|
n, in := newTestMQTTConsumer()
|
||||||
|
|
Loading…
Reference in New Issue