Limit wait time for writes in mqtt output (#3699)
(cherry picked from commit 91fc2765b1
)
This commit is contained in:
parent
1790b26651
commit
f5894a6a2f
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
@ -25,6 +26,9 @@ var sampleConfig = `
|
||||||
# username = "telegraf"
|
# username = "telegraf"
|
||||||
# password = "metricsmetricsmetricsmetrics"
|
# password = "metricsmetricsmetricsmetrics"
|
||||||
|
|
||||||
|
## Timeout for write operations. default: 5s
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
## client ID, if not set a random ID is generated
|
## client ID, if not set a random ID is generated
|
||||||
# client_id = ""
|
# client_id = ""
|
||||||
|
|
||||||
|
@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
func (m *MQTT) publish(topic string, body []byte) error {
|
func (m *MQTT) publish(topic string, body []byte) error {
|
||||||
token := m.client.Publish(topic, byte(m.QoS), false, body)
|
token := m.client.Publish(topic, byte(m.QoS), false, body)
|
||||||
token.Wait()
|
token.WaitTimeout(m.Timeout.Duration)
|
||||||
if token.Error() != nil {
|
if token.Error() != nil {
|
||||||
return token.Error()
|
return token.Error()
|
||||||
}
|
}
|
||||||
|
@ -159,6 +163,11 @@ func (m *MQTT) publish(topic string, body []byte) error {
|
||||||
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
||||||
opts := paho.NewClientOptions()
|
opts := paho.NewClientOptions()
|
||||||
|
|
||||||
|
if m.Timeout.Duration < time.Second {
|
||||||
|
m.Timeout.Duration = 5 * time.Second
|
||||||
|
}
|
||||||
|
opts.WriteTimeout = m.Timeout.Duration
|
||||||
|
|
||||||
if m.ClientID != "" {
|
if m.ClientID != "" {
|
||||||
opts.SetClientID(m.ClientID)
|
opts.SetClientID(m.ClientID)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue