From 04f5493ccc6148d8f5817ecaa68b741166c1b4bf Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Mon, 22 Jan 2018 11:15:13 -0800 Subject: [PATCH] Limit wait time for writes in mqtt output (#3699) --- plugins/outputs/mqtt/mqtt.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 3bcf1b7d8..05978aa60 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -25,6 +26,9 @@ var sampleConfig = ` # username = "telegraf" # password = "metricsmetricsmetricsmetrics" + ## Timeout for write operations. default: 5s + # timeout = "5s" + ## client ID, if not set a random ID is generated # client_id = "" @@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { func (m *MQTT) publish(topic string, body []byte) error { token := m.client.Publish(topic, byte(m.QoS), false, body) - token.Wait() + token.WaitTimeout(m.Timeout.Duration) if token.Error() != nil { return token.Error() } @@ -159,6 +163,11 @@ func (m *MQTT) publish(topic string, body []byte) error { func (m *MQTT) createOpts() (*paho.ClientOptions, error) { opts := paho.NewClientOptions() + if m.Timeout.Duration < time.Second { + m.Timeout.Duration = 5 * time.Second + } + opts.WriteTimeout = m.Timeout.Duration + if m.ClientID != "" { opts.SetClientID(m.ClientID) } else {