Add batch mode to mqtt output (#4094)

This commit is contained in:
jvrahav 2018-05-19 07:25:02 +05:30 committed by Daniel Nelson
parent a688eefd1c
commit 81f5a41bc9
3 changed files with 47 additions and 10 deletions

View File

@ -1791,19 +1791,19 @@
# ## List of metrics collected on above servers
# ## Each metric consists in a name, a jmx path and either
# ## a pass or drop slice attribute.
# ## This collect all heap memory usage metrics.
# ## This collect all heap memory usage metrics.
# [[inputs.jolokia.metrics]]
# name = "heap_memory_usage"
# mbean = "java.lang:type=Memory"
# attribute = "HeapMemoryUsage"
#
# ## This collect thread counts metrics.
# ## This collect thread counts metrics.
# [[inputs.jolokia.metrics]]
# name = "thread_count"
# mbean = "java.lang:type=Threading"
# attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"
#
# ## This collect number of class loaded/unloaded counts metrics.
# ## This collect number of class loaded/unloaded counts metrics.
# [[inputs.jolokia.metrics]]
# name = "class_count"
# mbean = "java.lang:type=ClassLoading"
@ -2259,7 +2259,7 @@
# reverse_metric_names = true
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
@ -3561,4 +3561,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

View File

@ -30,6 +30,15 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Batch messages in a topic
## batch = false
## Flag to determine if messages sent in a topic in a flush interval,
## need to be batched into one message.
## batch = true, batches the messages in a topic to one messages
## batch = false, default behaviour
# batch = false
## Data format to output.
data_format = "influx"
```

View File

@ -45,6 +45,15 @@ var sampleConfig = `
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
## Batch messages in a topic
## batch = false
## Flag to determine if messages sent in a topic in a flush interval,
## need to be batched into one message.
## batch = true, batches the messages in a topic to one messages
## batch = false, default behaviour
# batch = false
`
type MQTT struct {
@ -57,6 +66,7 @@ type MQTT struct {
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
tls.ClientConfig
BatchMessage bool `toml:"batch"`
client paho.Client
opts *paho.ClientOptions
@ -117,6 +127,8 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
hostname = ""
}
metricsmap := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
var t []string
if m.TopicPrefix != "" {
@ -129,14 +141,31 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
t = append(t, metric.Name())
topic := strings.Join(t, "/")
buf, err := m.serializer.Serialize(metric)
if m.BatchMessage {
metricsmap[topic] = append(metricsmap[topic], metric)
} else {
buf, err := m.serializer.Serialize(metric)
if err != nil {
return err
}
err = m.publish(topic, buf)
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
}
}
}
for key := range metricsmap {
buf, err := m.serializer.SerializeBatch(metricsmap[key])
if err != nil {
return err
}
err = m.publish(topic, buf)
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
publisherr := m.publish(key, buf)
if publisherr != nil {
return fmt.Errorf("Could not write to MQTT server, %s", publisherr)
}
}