From a9c135488e6781dd2cd81260e6e3b255a28cbef6 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 10 Feb 2016 15:50:07 -0700 Subject: [PATCH] Add Serializer plugins, and 'file' output plugin --- Godeps | 3 +- internal/config/config.go | 43 +++++++ plugins/outputs/all/all.go | 1 + plugins/outputs/amqp/amqp.go | 32 ++++- plugins/outputs/amqp/amqp_test.go | 7 +- plugins/outputs/file/README.md | 1 + plugins/outputs/file/file.go | 109 ++++++++++++++++ plugins/outputs/file/file_test.go | 1 + plugins/outputs/graphite/graphite.go | 84 +++--------- plugins/outputs/graphite/graphite_test.go | 31 +---- plugins/outputs/kafka/kafka.go | 48 ++++--- plugins/outputs/kafka/kafka_test.go | 7 +- plugins/outputs/mqtt/mqtt.go | 72 +++++++---- plugins/outputs/mqtt/mqtt_test.go | 6 +- plugins/outputs/nsq/nsq.go | 36 +++++- plugins/outputs/nsq/nsq_test.go | 7 +- plugins/parsers/influx/parser.go | 2 +- plugins/serializers/graphite/graphite.go | 79 ++++++++++++ plugins/serializers/graphite/graphite_test.go | 121 ++++++++++++++++++ plugins/serializers/influx/influx.go | 12 ++ plugins/serializers/influx/influx_test.go | 68 ++++++++++ plugins/serializers/registry.go | 55 ++++++++ 22 files changed, 665 insertions(+), 160 deletions(-) create mode 100644 plugins/outputs/file/README.md create mode 100644 plugins/outputs/file/file.go create mode 100644 plugins/outputs/file/file_test.go create mode 100644 plugins/serializers/graphite/graphite.go create mode 100644 plugins/serializers/graphite/graphite_test.go create mode 100644 plugins/serializers/influx/influx.go create mode 100644 plugins/serializers/influx/influx_test.go create mode 100644 plugins/serializers/registry.go diff --git a/Godeps b/Godeps index 005aee939..d0d2194c6 100644 --- a/Godeps +++ b/Godeps @@ -19,8 +19,7 @@ github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 -github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67 -github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67 +github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f diff --git a/internal/config/config.go b/internal/config/config.go index 766ba1189..ffd4f632a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/config" "github.com/naoina/toml/ast" @@ -398,6 +399,17 @@ func (c *Config) addOutput(name string, table *ast.Table) error { } output := creator() + // If the output has a SetSerializer function, then this means it can write + // arbitrary types of output, so build the serializer and set it. + switch t := output.(type) { + case serializers.SerializerOutput: + serializer, err := buildSerializer(name, table) + if err != nil { + return err + } + t.SetSerializer(serializer) + } + outputConfig, err := buildOutput(name, table) if err != nil { return err @@ -660,6 +672,37 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { return parsers.NewParser(c) } +// buildSerializer grabs the necessary entries from the ast.Table for creating +// a serializers.Serializer object, and creates it, which can then be added onto +// an Output object. +func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) { + c := &serializers.Config{} + + if node, ok := tbl.Fields["data_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.DataFormat = str.Value + } + } + } + + if c.DataFormat == "" { + c.DataFormat = "influx" + } + + if node, ok := tbl.Fields["prefix"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.Prefix = str.Value + } + } + } + + delete(tbl.Fields, "data_format") + delete(tbl.Fields, "prefix") + return serializers.NewSerializer(c) +} + // buildOutput parses output specific items from the ast.Table, builds the filter and returns an // internal_models.OutputConfig to be inserted into internal_models.RunningInput // Note: error exists in the return for future calls that might require error diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index ac8357c90..18fb1c925 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" + _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 19d95f512..d826e6d52 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -10,6 +10,8 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/streadway/amqp" ) @@ -39,6 +41,8 @@ type AMQP struct { channel *amqp.Channel sync.Mutex headers amqp.Table + + serializer serializers.Serializer } const ( @@ -69,8 +73,18 @@ var sampleConfig = ` # ssl_key = "/etc/telegraf/key.pem" ### Use SSL but skip chain & host verification # insecure_skip_verify = false + + ### Data format to output. This can be "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md + data_format = "influx" ` +func (a *AMQP) SetSerializer(serializer serializers.Serializer) { + a.serializer = serializer +} + func (q *AMQP) Connect() error { q.Lock() defer q.Unlock() @@ -147,18 +161,24 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { } var outbuf = make(map[string][][]byte) - for _, p := range metrics { - var value, key string - value = p.String() - + for _, metric := range metrics { + var key string if q.RoutingTag != "" { - if h, ok := p.Tags()[q.RoutingTag]; ok { + if h, ok := metric.Tags()[q.RoutingTag]; ok { key = h } } - outbuf[key] = append(outbuf[key], []byte(value)) + values, err := q.serializer.Serialize(metric) + if err != nil { + return err + } + + for _, value := range values { + outbuf[key] = append(outbuf[key], []byte(value)) + } } + for key, buf := range outbuf { err := q.channel.Publish( q.Exchange, // exchange diff --git a/plugins/outputs/amqp/amqp_test.go b/plugins/outputs/amqp/amqp_test.go index 4cecff02e..66a082627 100644 --- a/plugins/outputs/amqp/amqp_test.go +++ b/plugins/outputs/amqp/amqp_test.go @@ -3,6 +3,7 @@ package amqp import ( "testing" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) { } var url = "amqp://" + testutil.GetLocalHost() + ":5672/" + s, _ := serializers.NewInfluxSerializer() q := &AMQP{ - URL: url, - Exchange: "telegraf_test", + URL: url, + Exchange: "telegraf_test", + serializer: s, } // Verify that we can connect to the AMQP broker diff --git a/plugins/outputs/file/README.md b/plugins/outputs/file/README.md new file mode 100644 index 000000000..6f3b7f513 --- /dev/null +++ b/plugins/outputs/file/README.md @@ -0,0 +1 @@ +# file Output Plugin diff --git a/plugins/outputs/file/file.go b/plugins/outputs/file/file.go new file mode 100644 index 000000000..deae8aaf8 --- /dev/null +++ b/plugins/outputs/file/file.go @@ -0,0 +1,109 @@ +package file + +import ( + "fmt" + "io" + "os" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +type File struct { + Files []string + + writer io.Writer + closers []io.Closer + + serializer serializers.Serializer +} + +var sampleConfig = ` + ### Files to write to, "stdout" is a specially handled file. + files = ["stdout", "/tmp/metrics.out"] + + ### Data format to output. This can be "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +func (f *File) SetSerializer(serializer serializers.Serializer) { + f.serializer = serializer +} + +func (f *File) Connect() error { + writers := []io.Writer{} + for _, file := range f.Files { + if file == "stdout" { + writers = append(writers, os.Stdout) + f.closers = append(f.closers, os.Stdout) + } else { + var of *os.File + var err error + if _, err := os.Stat(file); os.IsNotExist(err) { + of, err = os.Create(file) + } else { + of, err = os.OpenFile(file, os.O_APPEND|os.O_WRONLY, os.ModeAppend) + } + + if err != nil { + return err + } + writers = append(writers, of) + f.closers = append(f.closers, of) + } + } + f.writer = io.MultiWriter(writers...) + return nil +} + +func (f *File) Close() error { + var errS string + for _, c := range f.closers { + if err := c.Close(); err != nil { + errS += err.Error() + "\n" + } + } + if errS != "" { + return fmt.Errorf(errS) + } + return nil +} + +func (f *File) SampleConfig() string { + return sampleConfig +} + +func (f *File) Description() string { + return "Send telegraf metrics to file(s)" +} + +func (f *File) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + values, err := f.serializer.Serialize(metric) + if err != nil { + return err + } + + for _, value := range values { + _, err = f.writer.Write([]byte(value + "\n")) + if err != nil { + return fmt.Errorf("FAILED to write message: %s, %s", value, err) + } + } + } + return nil +} + +func init() { + outputs.Add("file", func() telegraf.Output { + return &File{} + }) +} diff --git a/plugins/outputs/file/file_test.go b/plugins/outputs/file/file_test.go new file mode 100644 index 000000000..b691ba57a --- /dev/null +++ b/plugins/outputs/file/file_test.go @@ -0,0 +1 @@ +package file diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 7e4414ffc..29ac774f4 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -3,14 +3,15 @@ package graphite import ( "errors" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/outputs" "log" "math/rand" "net" - "sort" "strings" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" ) type Graphite struct { @@ -71,42 +72,22 @@ func (g *Graphite) Description() string { func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data var bp []string - for _, metric := range metrics { - // Get name - name := metric.Name() - // Convert UnixNano to Unix timestamps - timestamp := metric.UnixNano() / 1000000000 - tag_str := buildTags(metric) - - for field_name, value := range metric.Fields() { - // Convert value - value_str := fmt.Sprintf("%#v", value) - // Write graphite metric - var graphitePoint string - if name == field_name { - graphitePoint = fmt.Sprintf("%s.%s %s %d\n", - tag_str, - strings.Replace(name, ".", "_", -1), - value_str, - timestamp) - } else { - graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n", - tag_str, - strings.Replace(name, ".", "_", -1), - strings.Replace(field_name, ".", "_", -1), - value_str, - timestamp) - } - if g.Prefix != "" { - graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint) - } - bp = append(bp, graphitePoint) - } + s, err := serializers.NewGraphiteSerializer(g.Prefix) + if err != nil { + return err } - graphitePoints := strings.Join(bp, "") + + for _, metric := range metrics { + gMetrics, err := s.Serialize(metric) + if err != nil { + log.Printf("Error serializing some metrics to graphite: %s", err.Error()) + } + bp = append(bp, gMetrics...) + } + graphitePoints := strings.Join(bp, "\n") + "\n" // This will get set to nil if a successful write occurs - err := errors.New("Could not write to any Graphite server in cluster\n") + err = errors.New("Could not write to any Graphite server in cluster\n") // Send data to a random server p := rand.Perm(len(g.conns)) @@ -128,37 +109,6 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { return err } -func buildTags(metric telegraf.Metric) string { - var keys []string - tags := metric.Tags() - for k := range tags { - if k == "host" { - continue - } - keys = append(keys, k) - } - sort.Strings(keys) - - var tag_str string - if host, ok := tags["host"]; ok { - if len(keys) > 0 { - tag_str = strings.Replace(host, ".", "_", -1) + "." - } else { - tag_str = strings.Replace(host, ".", "_", -1) - } - } - - for i, k := range keys { - tag_value := strings.Replace(tags[k], ".", "_", -1) - if i == 0 { - tag_str += tag_value - } else { - tag_str += "." + tag_value - } - } - return tag_str -} - func init() { outputs.Add("graphite", func() telegraf.Output { return &Graphite{} diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 2b62750e3..9d9476241 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -43,6 +43,8 @@ func TestGraphiteOK(t *testing.T) { // Start TCP server wg.Add(1) go TCPServer(t, &wg) + // Give the fake graphite TCP server some time to start: + time.Sleep(time.Millisecond * 100) // Init plugin g := Graphite{ @@ -95,32 +97,3 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) conn.Close() } - -func TestGraphiteTags(t *testing.T) { - m1, _ := telegraf.NewMetric( - "mymeasurement", - map[string]string{"host": "192.168.0.1"}, - map[string]interface{}{"value": float64(3.14)}, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), - ) - m2, _ := telegraf.NewMetric( - "mymeasurement", - map[string]string{"host": "192.168.0.1", "afoo": "first", "bfoo": "second"}, - map[string]interface{}{"value": float64(3.14)}, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), - ) - m3, _ := telegraf.NewMetric( - "mymeasurement", - map[string]string{"afoo": "first", "bfoo": "second"}, - map[string]interface{}{"value": float64(3.14)}, - time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), - ) - - tags1 := buildTags(m1) - tags2 := buildTags(m2) - tags3 := buildTags(m3) - - assert.Equal(t, "192_168_0_1", tags1) - assert.Equal(t, "192_168_0_1.first.second", tags2) - assert.Equal(t, "first.second", tags3) -} diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a1240dc28..71c2642dd 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -2,12 +2,12 @@ package kafka import ( "crypto/tls" - "errors" "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/Shopify/sarama" ) @@ -40,6 +40,8 @@ type Kafka struct { tlsConfig tls.Config producer sarama.SyncProducer + + serializer serializers.Serializer } var sampleConfig = ` @@ -57,8 +59,18 @@ var sampleConfig = ` # ssl_key = "/etc/telegraf/key.pem" ### Use SSL but skip chain & host verification # insecure_skip_verify = false + + ### Data format to output. This can be "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md + data_format = "influx" ` +func (k *Kafka) SetSerializer(serializer serializers.Serializer) { + k.serializer = serializer +} + func (k *Kafka) Connect() error { config := sarama.NewConfig() // Wait for all in-sync replicas to ack the message @@ -109,21 +121,27 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { return nil } - for _, p := range metrics { - value := p.String() - - m := &sarama.ProducerMessage{ - Topic: k.Topic, - Value: sarama.StringEncoder(value), - } - if h, ok := p.Tags()[k.RoutingTag]; ok { - m.Key = sarama.StringEncoder(h) - } - - _, _, err := k.producer.SendMessage(m) + for _, metric := range metrics { + values, err := k.serializer.Serialize(metric) if err != nil { - return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n", - err)) + return err + } + + var pubErr error + for _, value := range values { + m := &sarama.ProducerMessage{ + Topic: k.Topic, + Value: sarama.StringEncoder(value), + } + if h, ok := metric.Tags()[k.RoutingTag]; ok { + m.Key = sarama.StringEncoder(h) + } + + _, _, pubErr = k.producer.SendMessage(m) + } + + if pubErr != nil { + return fmt.Errorf("FAILED to send kafka message: %s\n", pubErr) } } return nil diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 103f268cb..f99e0ecea 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -3,6 +3,7 @@ package kafka import ( "testing" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) { } brokers := []string{testutil.GetLocalHost() + ":9092"} + s, _ := serializers.NewInfluxSerializer() k := &Kafka{ - Brokers: brokers, - Topic: "Test", + Brokers: brokers, + Topic: "Test", + serializer: s, } // Verify that we can connect to the Kafka broker diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 61f0ef557..efa20944b 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -9,8 +9,35 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" ) +var sampleConfig = ` + servers = ["localhost:1883"] # required. + + ### MQTT outputs send metrics to this topic format + ### "///" + ### ex: prefix/host/web01.example.com/mem + topic_prefix = "telegraf" + + ### username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ### Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ### Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ### Data format to output. This can be "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + type MQTT struct { Servers []string `toml:"servers"` Username string @@ -32,31 +59,11 @@ type MQTT struct { client *paho.Client opts *paho.ClientOptions + serializer serializers.Serializer + sync.Mutex } -var sampleConfig = ` - servers = ["localhost:1883"] # required. - ### MQTT QoS, must be 0, 1, or 2 - qos = 0 - - ### MQTT outputs send metrics to this topic format - ### "///" - ### ex: prefix/host/web01.example.com/mem - topic_prefix = "telegraf" - - ### username and password to connect MQTT server. - # username = "telegraf" - # password = "metricsmetricsmetricsmetrics" - - ### Optional SSL Config - # ssl_ca = "/etc/telegraf/ca.pem" - # ssl_cert = "/etc/telegraf/cert.pem" - # ssl_key = "/etc/telegraf/key.pem" - ### Use SSL but skip chain & host verification - # insecure_skip_verify = false -` - func (m *MQTT) Connect() error { var err error m.Lock() @@ -78,6 +85,10 @@ func (m *MQTT) Connect() error { return nil } +func (m *MQTT) SetSerializer(serializer serializers.Serializer) { + m.serializer = serializer +} + func (m *MQTT) Close() error { if m.client.IsConnected() { m.client.Disconnect(20) @@ -104,7 +115,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { hostname = "" } - for _, p := range metrics { + for _, metric := range metrics { var t []string if m.TopicPrefix != "" { t = append(t, m.TopicPrefix) @@ -113,13 +124,20 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { t = append(t, hostname) } - t = append(t, p.Name()) + t = append(t, metric.Name()) topic := strings.Join(t, "/") - value := p.String() - err := m.publish(topic, value) + values, err := m.serializer.Serialize(metric) if err != nil { - return fmt.Errorf("Could not write to MQTT server, %s", err) + return fmt.Errorf("MQTT Could not serialize metric: %s", + metric.String()) + } + + for _, value := range values { + err = m.publish(topic, value) + if err != nil { + return fmt.Errorf("Could not write to MQTT server, %s", err) + } } } diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 25d0ab9e3..260eb0c64 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -3,7 +3,9 @@ package mqtt import ( "testing" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) @@ -13,8 +15,10 @@ func TestConnectAndWrite(t *testing.T) { } var url = testutil.GetLocalHost() + ":1883" + s, _ := serializers.NewInfluxSerializer() m := &MQTT{ - Servers: []string{url}, + Servers: []string{url}, + serializer: s, } // Verify that we can connect to the MQTT broker diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index ce84c77d5..7fe9b2068 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -2,15 +2,20 @@ package nsq import ( "fmt" + + "github.com/nsqio/go-nsq" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/nsqio/go-nsq" + "github.com/influxdata/telegraf/plugins/serializers" ) type NSQ struct { Server string Topic string producer *nsq.Producer + + serializer serializers.Serializer } var sampleConfig = ` @@ -18,8 +23,18 @@ var sampleConfig = ` server = "localhost:4150" ### NSQ topic for producer messages topic = "telegraf" + + ### Data format to output. This can be "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md + data_format = "influx" ` +func (n *NSQ) SetSerializer(serializer serializers.Serializer) { + n.serializer = serializer +} + func (n *NSQ) Connect() error { config := nsq.NewConfig() producer, err := nsq.NewProducer(n.Server, config) @@ -50,12 +65,21 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error { return nil } - for _, p := range metrics { - value := p.String() - - err := n.producer.Publish(n.Topic, []byte(value)) - + for _, metric := range metrics { + values, err := n.serializer.Serialize(metric) if err != nil { + return err + } + + var pubErr error + for _, value := range values { + err = n.producer.Publish(n.Topic, []byte(value)) + if err != nil { + pubErr = err + } + } + + if pubErr != nil { return fmt.Errorf("FAILED to send NSQD message: %s", err) } } diff --git a/plugins/outputs/nsq/nsq_test.go b/plugins/outputs/nsq/nsq_test.go index 0880d0252..e2b0fc31d 100644 --- a/plugins/outputs/nsq/nsq_test.go +++ b/plugins/outputs/nsq/nsq_test.go @@ -3,6 +3,7 @@ package nsq import ( "testing" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) { } server := []string{testutil.GetLocalHost() + ":4150"} + s, _ := serializers.NewInfluxSerializer() n := &NSQ{ - Server: server[0], - Topic: "telegraf", + Server: server[0], + Topic: "telegraf", + serializer: s, } // Verify that we can connect to the NSQ daemon diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index 345e60b2e..8ab783b0d 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -15,7 +15,7 @@ type InfluxParser struct { DefaultTags map[string]string } -// ParseMetrics returns a slice of Metrics from a text representation of a +// Parse returns a slice of Metrics from a text representation of a // metric (in line-protocol format) // with each metric separated by newlines. If any metrics fail to parse, // a non-nil error will be returned in addition to the metrics that parsed diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go new file mode 100644 index 000000000..d04f756c1 --- /dev/null +++ b/plugins/serializers/graphite/graphite.go @@ -0,0 +1,79 @@ +package graphite + +import ( + "fmt" + "sort" + "strings" + + "github.com/influxdata/telegraf" +) + +type GraphiteSerializer struct { + Prefix string +} + +func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { + out := []string{} + // Get name + name := metric.Name() + // Convert UnixNano to Unix timestamps + timestamp := metric.UnixNano() / 1000000000 + tag_str := buildTags(metric) + + for field_name, value := range metric.Fields() { + // Convert value + value_str := fmt.Sprintf("%#v", value) + // Write graphite metric + var graphitePoint string + if name == field_name { + graphitePoint = fmt.Sprintf("%s.%s %s %d", + tag_str, + strings.Replace(name, ".", "_", -1), + value_str, + timestamp) + } else { + graphitePoint = fmt.Sprintf("%s.%s.%s %s %d", + tag_str, + strings.Replace(name, ".", "_", -1), + strings.Replace(field_name, ".", "_", -1), + value_str, + timestamp) + } + if s.Prefix != "" { + graphitePoint = fmt.Sprintf("%s.%s", s.Prefix, graphitePoint) + } + out = append(out, graphitePoint) + } + return out, nil +} + +func buildTags(metric telegraf.Metric) string { + var keys []string + tags := metric.Tags() + for k := range tags { + if k == "host" { + continue + } + keys = append(keys, k) + } + sort.Strings(keys) + + var tag_str string + if host, ok := tags["host"]; ok { + if len(keys) > 0 { + tag_str = strings.Replace(host, ".", "_", -1) + "." + } else { + tag_str = strings.Replace(host, ".", "_", -1) + } + } + + for i, k := range keys { + tag_value := strings.Replace(tags[k], ".", "_", -1) + if i == 0 { + tag_str += tag_value + } else { + tag_str += "." + tag_value + } + } + return tag_str +} diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go new file mode 100644 index 000000000..72b203b7a --- /dev/null +++ b/plugins/serializers/graphite/graphite_test.go @@ -0,0 +1,121 @@ +package graphite + +import ( + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/influxdata/telegraf" +) + +func TestGraphiteTags(t *testing.T) { + m1, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"host": "192.168.0.1", "afoo": "first", "bfoo": "second"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := telegraf.NewMetric( + "mymeasurement", + map[string]string{"afoo": "first", "bfoo": "second"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + tags1 := buildTags(m1) + tags2 := buildTags(m2) + tags3 := buildTags(m3) + + assert.Equal(t, "192_168_0_1", tags1) + assert.Equal(t, "192_168_0_1.first.second", tags2) + assert.Equal(t, "first.second", tags3) +} + +func TestSerializeMetricNoHost(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricHost(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricPrefix(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{Prefix: "prefix"} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()), + fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go new file mode 100644 index 000000000..03c53fed2 --- /dev/null +++ b/plugins/serializers/influx/influx.go @@ -0,0 +1,12 @@ +package influx + +import ( + "github.com/influxdata/telegraf" +) + +type InfluxSerializer struct { +} + +func (s *InfluxSerializer) Serialize(metric telegraf.Metric) ([]string, error) { + return []string{metric.String()}, nil +} diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go new file mode 100644 index 000000000..4937800aa --- /dev/null +++ b/plugins/serializers/influx/influx_test.go @@ -0,0 +1,68 @@ +package influx + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/influxdata/telegraf" +) + +func TestSerializeMetricFloat(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := InfluxSerializer{} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=91.5 %d", now.UnixNano())} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricInt(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := InfluxSerializer{} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=90i %d", now.UnixNano())} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricString(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": "foobar", + } + m, err := telegraf.NewMetric("cpu", tags, fields, now) + assert.NoError(t, err) + + s := InfluxSerializer{} + mS, err := s.Serialize(m) + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=\"foobar\" %d", now.UnixNano())} + assert.Equal(t, expS, mS) +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go new file mode 100644 index 000000000..2fedfbeaf --- /dev/null +++ b/plugins/serializers/registry.go @@ -0,0 +1,55 @@ +package serializers + +import ( + "github.com/influxdata/telegraf" + + "github.com/influxdata/telegraf/plugins/serializers/graphite" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +// SerializerOutput is an interface for output plugins that are able to +// serialize telegraf metrics into arbitrary data formats. +type SerializerOutput interface { + // SetSerializer sets the serializer function for the interface. + SetSerializer(serializer Serializer) +} + +// Serializer is an interface defining functions that a serializer plugin must +// satisfy. +type Serializer interface { + // Serialize takes a single telegraf metric and turns it into a string. + Serialize(metric telegraf.Metric) ([]string, error) +} + +// Config is a struct that covers the data types needed for all serializer types, +// and can be used to instantiate _any_ of the serializers. +type Config struct { + // Dataformat can be one of: influx, graphite + DataFormat string + + // Prefix to add to all measurements, only supports Graphite + Prefix string +} + +// NewSerializer a Serializer interface based on the given config. +func NewSerializer(config *Config) (Serializer, error) { + var err error + var serializer Serializer + switch config.DataFormat { + case "influx": + serializer, err = NewInfluxSerializer() + case "graphite": + serializer, err = NewGraphiteSerializer(config.Prefix) + } + return serializer, err +} + +func NewInfluxSerializer() (Serializer, error) { + return &influx.InfluxSerializer{}, nil +} + +func NewGraphiteSerializer(prefix string) (Serializer, error) { + return &graphite.GraphiteSerializer{ + Prefix: prefix, + }, nil +}