From cbb4dacbc3c7fed2c2f3d5fb660a960724e572f6 Mon Sep 17 00:00:00 2001 From: jvrahav Date: Sat, 19 May 2018 07:25:02 +0530 Subject: [PATCH] Add batch mode to mqtt output (#4094) --- etc/telegraf.conf | 9 ++++---- plugins/outputs/mqtt/README.md | 9 ++++++++ plugins/outputs/mqtt/mqtt.go | 39 +++++++++++++++++++++++++++++----- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 3a7c6a790..ef0f95caf 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -1791,19 +1791,19 @@ # ## List of metrics collected on above servers # ## Each metric consists in a name, a jmx path and either # ## a pass or drop slice attribute. -# ## This collect all heap memory usage metrics. +# ## This collect all heap memory usage metrics. # [[inputs.jolokia.metrics]] # name = "heap_memory_usage" # mbean = "java.lang:type=Memory" # attribute = "HeapMemoryUsage" # -# ## This collect thread counts metrics. +# ## This collect thread counts metrics. # [[inputs.jolokia.metrics]] # name = "thread_count" # mbean = "java.lang:type=Threading" # attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount" # -# ## This collect number of class loaded/unloaded counts metrics. +# ## This collect number of class loaded/unloaded counts metrics. # [[inputs.jolokia.metrics]] # name = "class_count" # mbean = "java.lang:type=ClassLoading" @@ -2259,7 +2259,7 @@ # reverse_metric_names = true -# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver +# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver # [[inputs.opensmtpd]] # ## If running as a restricted user you can prepend sudo for additional access: # #use_sudo = false @@ -3561,4 +3561,3 @@ # [[inputs.zipkin]] # # path = "/api/v1/spans" # URL path for span data # # port = 9411 # Port on which Telegraf listens - diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index 53483d967..716806783 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -30,6 +30,15 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt ## Use TLS but skip chain & host verification # insecure_skip_verify = false + ## Batch messages in a topic + ## batch = false + ## Flag to determine if messages sent in a topic in a flush interval, + ## need to be batched into one message. + ## batch = true, batches the messages in a topic to one messages + ## batch = false, default behaviour + # batch = false + + ## Data format to output. data_format = "influx" ``` diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 1c700332e..5228a083a 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -45,6 +45,15 @@ var sampleConfig = ` ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Batch messages in a topic + ## batch = false + ## Flag to determine if messages sent in a topic in a flush interval, + ## need to be batched into one message. + ## batch = true, batches the messages in a topic to one messages + ## batch = false, default behaviour + # batch = false + ` type MQTT struct { @@ -57,6 +66,7 @@ type MQTT struct { QoS int `toml:"qos"` ClientID string `toml:"client_id"` tls.ClientConfig + BatchMessage bool `toml:"batch"` client paho.Client opts *paho.ClientOptions @@ -117,6 +127,8 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { hostname = "" } + metricsmap := make(map[string][]telegraf.Metric) + for _, metric := range metrics { var t []string if m.TopicPrefix != "" { @@ -129,14 +141,31 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { t = append(t, metric.Name()) topic := strings.Join(t, "/") - buf, err := m.serializer.Serialize(metric) + if m.BatchMessage { + metricsmap[topic] = append(metricsmap[topic], metric) + } else { + buf, err := m.serializer.Serialize(metric) + + if err != nil { + return err + } + + err = m.publish(topic, buf) + if err != nil { + return fmt.Errorf("Could not write to MQTT server, %s", err) + } + } + } + + for key := range metricsmap { + buf, err := m.serializer.SerializeBatch(metricsmap[key]) + if err != nil { return err } - - err = m.publish(topic, buf) - if err != nil { - return fmt.Errorf("Could not write to MQTT server, %s", err) + publisherr := m.publish(key, buf) + if publisherr != nil { + return fmt.Errorf("Could not write to MQTT server, %s", publisherr) } }