From bf8a7b4a17c0213550708c96bd98d9a5acdc46af Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 7 Mar 2016 13:56:10 +0100 Subject: [PATCH] mqtt_consumer: option to set persistent session and client ID closes #797 --- CHANGELOG.md | 1 + plugins/inputs/mqtt_consumer/mqtt_consumer.go | 22 ++++++++- .../mqtt_consumer/mqtt_consumer_test.go | 48 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe87e41dc..2640d3f21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert! - [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug! - [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener. +- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 42cadfd60..e36889703 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -26,6 +26,9 @@ type MQTTConsumer struct { // Legacy metric buffer support MetricBuffer int + PersistentSession bool + ClientID string `toml:"client_id"` + // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to host cert file @@ -57,6 +60,13 @@ var sampleConfig = ` "sensors/#", ] + # if true, messages that can't be delivered while the subscriber is offline + # will be delivered when it comes back (such as on service restart). + # NOTE: if true, client_id MUST be set + persistent_session = false + # If empty, a random client ID will be generated. + client_id = "" + ## username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" @@ -91,6 +101,11 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.Lock() defer m.Unlock() + if m.PersistentSession && m.ClientID == "" { + return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" + + " = true, you MUST also set client_id") + } + m.acc = acc if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) @@ -166,7 +181,11 @@ func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() - opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + if m.ClientID == "" { + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + } else { + opts.SetClientID(m.ClientID) + } tlsCfg, err := internal.GetTLSConfig( m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) @@ -199,6 +218,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { } opts.SetAutoReconnect(true) opts.SetKeepAlive(time.Second * 60) + opts.SetCleanSession(!m.PersistentSession) return opts, nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index b1dd59bcf..e926ebbb2 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -7,6 +7,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) @@ -28,6 +30,52 @@ func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { return n, in } +// Test that default client has random ID +func TestRandomClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}} + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.NotEqual(t, opts.ClientID, opts2.ClientID) +} + +// Test that default client has random ID +func TestClientID(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts, err := m1.createOpts() + assert.NoError(t, err) + + m2 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + ClientID: "telegraf-test", + } + opts2, err2 := m2.createOpts() + assert.NoError(t, err2) + + assert.Equal(t, "telegraf-test", opts2.ClientID) + assert.Equal(t, "telegraf-test", opts.ClientID) +} + +// Test that Start() fails if client ID is not set but persistent is +func TestPersistentClientIDFail(t *testing.T) { + m1 := &MQTTConsumer{ + Servers: []string{"localhost:1883"}, + PersistentSession: true, + } + acc := testutil.Accumulator{} + err := m1.Start(&acc) + assert.Error(t, err) +} + // Test that the parser parses NATS messages into metrics func TestRunParser(t *testing.T) { n, in := newTestMQTTConsumer()