Add batch mode to mqtt output (#4094)

This commit is contained in:
jvrahav 2018-05-19 07:25:02 +05:30 committed by Daniel Nelson
parent 680173cc4e
commit cbb4dacbc3
3 changed files with 47 additions and 10 deletions

View File

@ -1791,19 +1791,19 @@
# ## List of metrics collected on above servers # ## List of metrics collected on above servers
# ## Each metric consists in a name, a jmx path and either # ## Each metric consists in a name, a jmx path and either
# ## a pass or drop slice attribute. # ## a pass or drop slice attribute.
# ## This collect all heap memory usage metrics. # ## This collect all heap memory usage metrics.
# [[inputs.jolokia.metrics]] # [[inputs.jolokia.metrics]]
# name = "heap_memory_usage" # name = "heap_memory_usage"
# mbean = "java.lang:type=Memory" # mbean = "java.lang:type=Memory"
# attribute = "HeapMemoryUsage" # attribute = "HeapMemoryUsage"
# #
# ## This collect thread counts metrics. # ## This collect thread counts metrics.
# [[inputs.jolokia.metrics]] # [[inputs.jolokia.metrics]]
# name = "thread_count" # name = "thread_count"
# mbean = "java.lang:type=Threading" # mbean = "java.lang:type=Threading"
# attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" # attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"
# #
# ## This collect number of class loaded/unloaded counts metrics. # ## This collect number of class loaded/unloaded counts metrics.
# [[inputs.jolokia.metrics]] # [[inputs.jolokia.metrics]]
# name = "class_count" # name = "class_count"
# mbean = "java.lang:type=ClassLoading" # mbean = "java.lang:type=ClassLoading"
@ -3561,4 +3561,3 @@
# [[inputs.zipkin]] # [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data # # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens # # port = 9411 # Port on which Telegraf listens

View File

@ -30,6 +30,15 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
## Use TLS but skip chain & host verification ## Use TLS but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
## Batch messages in a topic
## batch = false
## Flag to determine if messages sent in a topic in a flush interval,
## need to be batched into one message.
## batch = true, batches the messages in a topic to one messages
## batch = false, default behaviour
# batch = false
## Data format to output. ## Data format to output.
data_format = "influx" data_format = "influx"
``` ```

View File

@ -45,6 +45,15 @@ var sampleConfig = `
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" data_format = "influx"
## Batch messages in a topic
## batch = false
## Flag to determine if messages sent in a topic in a flush interval,
## need to be batched into one message.
## batch = true, batches the messages in a topic to one messages
## batch = false, default behaviour
# batch = false
` `
type MQTT struct { type MQTT struct {
@ -57,6 +66,7 @@ type MQTT struct {
QoS int `toml:"qos"` QoS int `toml:"qos"`
ClientID string `toml:"client_id"` ClientID string `toml:"client_id"`
tls.ClientConfig tls.ClientConfig
BatchMessage bool `toml:"batch"`
client paho.Client client paho.Client
opts *paho.ClientOptions opts *paho.ClientOptions
@ -117,6 +127,8 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
hostname = "" hostname = ""
} }
metricsmap := make(map[string][]telegraf.Metric)
for _, metric := range metrics { for _, metric := range metrics {
var t []string var t []string
if m.TopicPrefix != "" { if m.TopicPrefix != "" {
@ -129,7 +141,11 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
t = append(t, metric.Name()) t = append(t, metric.Name())
topic := strings.Join(t, "/") topic := strings.Join(t, "/")
if m.BatchMessage {
metricsmap[topic] = append(metricsmap[topic], metric)
} else {
buf, err := m.serializer.Serialize(metric) buf, err := m.serializer.Serialize(metric)
if err != nil { if err != nil {
return err return err
} }
@ -139,6 +155,19 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("Could not write to MQTT server, %s", err) return fmt.Errorf("Could not write to MQTT server, %s", err)
} }
} }
}
for key := range metricsmap {
buf, err := m.serializer.SerializeBatch(metricsmap[key])
if err != nil {
return err
}
publisherr := m.publish(key, buf)
if publisherr != nil {
return fmt.Errorf("Could not write to MQTT server, %s", publisherr)
}
}
return nil return nil
} }