Limit wait time for writes in mqtt output (#3699)

This commit is contained in:
Daniel Nelson 2018-01-22 11:15:13 -08:00 committed by GitHub
parent ef776f120b
commit 91fc2765b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -25,6 +26,9 @@ var sampleConfig = `
# username = "telegraf" # username = "telegraf"
# password = "metricsmetricsmetricsmetrics" # password = "metricsmetricsmetricsmetrics"
## Timeout for write operations. default: 5s
# timeout = "5s"
## client ID, if not set a random ID is generated ## client ID, if not set a random ID is generated
# client_id = "" # client_id = ""
@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
func (m *MQTT) publish(topic string, body []byte) error { func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), false, body) token := m.client.Publish(topic, byte(m.QoS), false, body)
token.Wait() token.WaitTimeout(m.Timeout.Duration)
if token.Error() != nil { if token.Error() != nil {
return token.Error() return token.Error()
} }
@ -159,6 +163,11 @@ func (m *MQTT) publish(topic string, body []byte) error {
func (m *MQTT) createOpts() (*paho.ClientOptions, error) { func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions() opts := paho.NewClientOptions()
if m.Timeout.Duration < time.Second {
m.Timeout.Duration = 5 * time.Second
}
opts.WriteTimeout = m.Timeout.Duration
if m.ClientID != "" { if m.ClientID != "" {
opts.SetClientID(m.ClientID) opts.SetClientID(m.ClientID)
} else { } else {