Add Serializer plugins, and 'file' output plugin

This commit is contained in:
Cameron Sparr 2016-02-10 15:50:07 -07:00 committed by Michele Fadda
parent f53675ab7c
commit a9b91c7daa
22 changed files with 665 additions and 160 deletions

3
Godeps
View File

@ -19,8 +19,7 @@ github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d
github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690 github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24 github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24
github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67 github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5
github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67
github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264 github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264
github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38 github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38
github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f

View File

@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/config" "github.com/influxdata/config"
"github.com/naoina/toml/ast" "github.com/naoina/toml/ast"
@ -398,6 +399,17 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
} }
output := creator() output := creator()
// If the output has a SetSerializer function, then this means it can write
// arbitrary types of output, so build the serializer and set it.
switch t := output.(type) {
case serializers.SerializerOutput:
serializer, err := buildSerializer(name, table)
if err != nil {
return err
}
t.SetSerializer(serializer)
}
outputConfig, err := buildOutput(name, table) outputConfig, err := buildOutput(name, table)
if err != nil { if err != nil {
return err return err
@ -660,6 +672,37 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
return parsers.NewParser(c) return parsers.NewParser(c)
} }
// buildSerializer grabs the necessary entries from the ast.Table for creating
// a serializers.Serializer object, and creates it, which can then be added onto
// an Output object.
func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
c := &serializers.Config{}
if node, ok := tbl.Fields["data_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.DataFormat = str.Value
}
}
}
if c.DataFormat == "" {
c.DataFormat = "influx"
}
if node, ok := tbl.Fields["prefix"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Prefix = str.Value
}
}
}
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
return serializers.NewSerializer(c)
}
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an // buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// internal_models.OutputConfig to be inserted into internal_models.RunningInput // internal_models.OutputConfig to be inserted into internal_models.RunningInput
// Note: error exists in the return for future calls that might require error // Note: error exists in the return for future calls that might require error

View File

@ -5,6 +5,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kafka"

View File

@ -10,6 +10,8 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
@ -39,6 +41,8 @@ type AMQP struct {
channel *amqp.Channel channel *amqp.Channel
sync.Mutex sync.Mutex
headers amqp.Table headers amqp.Table
serializer serializers.Serializer
} }
const ( const (
@ -69,8 +73,18 @@ var sampleConfig = `
# ssl_key = "/etc/telegraf/key.pem" # ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification ### Use SSL but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
data_format = "influx"
` `
func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
a.serializer = serializer
}
func (q *AMQP) Connect() error { func (q *AMQP) Connect() error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@ -147,18 +161,24 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
} }
var outbuf = make(map[string][][]byte) var outbuf = make(map[string][][]byte)
for _, p := range metrics { for _, metric := range metrics {
var value, key string var key string
value = p.String()
if q.RoutingTag != "" { if q.RoutingTag != "" {
if h, ok := p.Tags()[q.RoutingTag]; ok { if h, ok := metric.Tags()[q.RoutingTag]; ok {
key = h key = h
} }
} }
outbuf[key] = append(outbuf[key], []byte(value))
values, err := q.serializer.Serialize(metric)
if err != nil {
return err
} }
for _, value := range values {
outbuf[key] = append(outbuf[key], []byte(value))
}
}
for key, buf := range outbuf { for key, buf := range outbuf {
err := q.channel.Publish( err := q.channel.Publish(
q.Exchange, // exchange q.Exchange, // exchange

View File

@ -3,6 +3,7 @@ package amqp
import ( import (
"testing" "testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) {
} }
var url = "amqp://" + testutil.GetLocalHost() + ":5672/" var url = "amqp://" + testutil.GetLocalHost() + ":5672/"
s, _ := serializers.NewInfluxSerializer()
q := &AMQP{ q := &AMQP{
URL: url, URL: url,
Exchange: "telegraf_test", Exchange: "telegraf_test",
serializer: s,
} }
// Verify that we can connect to the AMQP broker // Verify that we can connect to the AMQP broker

View File

@ -0,0 +1 @@
# file Output Plugin

View File

@ -0,0 +1,109 @@
package file
import (
"fmt"
"io"
"os"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
type File struct {
Files []string
writer io.Writer
closers []io.Closer
serializer serializers.Serializer
}
var sampleConfig = `
### Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`
func (f *File) SetSerializer(serializer serializers.Serializer) {
f.serializer = serializer
}
func (f *File) Connect() error {
writers := []io.Writer{}
for _, file := range f.Files {
if file == "stdout" {
writers = append(writers, os.Stdout)
f.closers = append(f.closers, os.Stdout)
} else {
var of *os.File
var err error
if _, err := os.Stat(file); os.IsNotExist(err) {
of, err = os.Create(file)
} else {
of, err = os.OpenFile(file, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
}
if err != nil {
return err
}
writers = append(writers, of)
f.closers = append(f.closers, of)
}
}
f.writer = io.MultiWriter(writers...)
return nil
}
func (f *File) Close() error {
var errS string
for _, c := range f.closers {
if err := c.Close(); err != nil {
errS += err.Error() + "\n"
}
}
if errS != "" {
return fmt.Errorf(errS)
}
return nil
}
func (f *File) SampleConfig() string {
return sampleConfig
}
func (f *File) Description() string {
return "Send telegraf metrics to file(s)"
}
func (f *File) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
values, err := f.serializer.Serialize(metric)
if err != nil {
return err
}
for _, value := range values {
_, err = f.writer.Write([]byte(value + "\n"))
if err != nil {
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
}
}
}
return nil
}
func init() {
outputs.Add("file", func() telegraf.Output {
return &File{}
})
}

View File

@ -0,0 +1 @@
package file

View File

@ -3,14 +3,15 @@ package graphite
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"log" "log"
"math/rand" "math/rand"
"net" "net"
"sort"
"strings" "strings"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
) )
type Graphite struct { type Graphite struct {
@ -71,42 +72,22 @@ func (g *Graphite) Description() string {
func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data // Prepare data
var bp []string var bp []string
for _, metric := range metrics { s, err := serializers.NewGraphiteSerializer(g.Prefix)
// Get name if err != nil {
name := metric.Name() return err
// Convert UnixNano to Unix timestamps }
timestamp := metric.UnixNano() / 1000000000
tag_str := buildTags(metric)
for field_name, value := range metric.Fields() { for _, metric := range metrics {
// Convert value gMetrics, err := s.Serialize(metric)
value_str := fmt.Sprintf("%#v", value) if err != nil {
// Write graphite metric log.Printf("Error serializing some metrics to graphite: %s", err.Error())
var graphitePoint string
if name == field_name {
graphitePoint = fmt.Sprintf("%s.%s %s %d\n",
tag_str,
strings.Replace(name, ".", "_", -1),
value_str,
timestamp)
} else {
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n",
tag_str,
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1),
value_str,
timestamp)
} }
if g.Prefix != "" { bp = append(bp, gMetrics...)
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint)
} }
bp = append(bp, graphitePoint) graphitePoints := strings.Join(bp, "\n") + "\n"
}
}
graphitePoints := strings.Join(bp, "")
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := errors.New("Could not write to any Graphite server in cluster\n") err = errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server // Send data to a random server
p := rand.Perm(len(g.conns)) p := rand.Perm(len(g.conns))
@ -128,37 +109,6 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
return err return err
} }
func buildTags(metric telegraf.Metric) string {
var keys []string
tags := metric.Tags()
for k := range tags {
if k == "host" {
continue
}
keys = append(keys, k)
}
sort.Strings(keys)
var tag_str string
if host, ok := tags["host"]; ok {
if len(keys) > 0 {
tag_str = strings.Replace(host, ".", "_", -1) + "."
} else {
tag_str = strings.Replace(host, ".", "_", -1)
}
}
for i, k := range keys {
tag_value := strings.Replace(tags[k], ".", "_", -1)
if i == 0 {
tag_str += tag_value
} else {
tag_str += "." + tag_value
}
}
return tag_str
}
func init() { func init() {
outputs.Add("graphite", func() telegraf.Output { outputs.Add("graphite", func() telegraf.Output {
return &Graphite{} return &Graphite{}

View File

@ -43,6 +43,8 @@ func TestGraphiteOK(t *testing.T) {
// Start TCP server // Start TCP server
wg.Add(1) wg.Add(1)
go TCPServer(t, &wg) go TCPServer(t, &wg)
// Give the fake graphite TCP server some time to start:
time.Sleep(time.Millisecond * 100)
// Init plugin // Init plugin
g := Graphite{ g := Graphite{
@ -95,32 +97,3 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3)
conn.Close() conn.Close()
} }
func TestGraphiteTags(t *testing.T) {
m1, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "afoo": "first", "bfoo": "second"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"afoo": "first", "bfoo": "second"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
tags1 := buildTags(m1)
tags2 := buildTags(m2)
tags3 := buildTags(m3)
assert.Equal(t, "192_168_0_1", tags1)
assert.Equal(t, "192_168_0_1.first.second", tags2)
assert.Equal(t, "first.second", tags3)
}

View File

@ -2,12 +2,12 @@ package kafka
import ( import (
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
) )
@ -40,6 +40,8 @@ type Kafka struct {
tlsConfig tls.Config tlsConfig tls.Config
producer sarama.SyncProducer producer sarama.SyncProducer
serializer serializers.Serializer
} }
var sampleConfig = ` var sampleConfig = `
@ -57,8 +59,18 @@ var sampleConfig = `
# ssl_key = "/etc/telegraf/key.pem" # ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification ### Use SSL but skip chain & host verification
# insecure_skip_verify = false # insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
data_format = "influx"
` `
func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}
func (k *Kafka) Connect() error { func (k *Kafka) Connect() error {
config := sarama.NewConfig() config := sarama.NewConfig()
// Wait for all in-sync replicas to ack the message // Wait for all in-sync replicas to ack the message
@ -109,21 +121,27 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
for _, p := range metrics { for _, metric := range metrics {
value := p.String() values, err := k.serializer.Serialize(metric)
if err != nil {
return err
}
var pubErr error
for _, value := range values {
m := &sarama.ProducerMessage{ m := &sarama.ProducerMessage{
Topic: k.Topic, Topic: k.Topic,
Value: sarama.StringEncoder(value), Value: sarama.StringEncoder(value),
} }
if h, ok := p.Tags()[k.RoutingTag]; ok { if h, ok := metric.Tags()[k.RoutingTag]; ok {
m.Key = sarama.StringEncoder(h) m.Key = sarama.StringEncoder(h)
} }
_, _, err := k.producer.SendMessage(m) _, _, pubErr = k.producer.SendMessage(m)
if err != nil { }
return errors.New(fmt.Sprintf("FAILED to send kafka message: %s\n",
err)) if pubErr != nil {
return fmt.Errorf("FAILED to send kafka message: %s\n", pubErr)
} }
} }
return nil return nil

View File

@ -3,6 +3,7 @@ package kafka
import ( import (
"testing" "testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) {
} }
brokers := []string{testutil.GetLocalHost() + ":9092"} brokers := []string{testutil.GetLocalHost() + ":9092"}
s, _ := serializers.NewInfluxSerializer()
k := &Kafka{ k := &Kafka{
Brokers: brokers, Brokers: brokers,
Topic: "Test", Topic: "Test",
serializer: s,
} }
// Verify that we can connect to the Kafka broker // Verify that we can connect to the Kafka broker

View File

@ -9,8 +9,35 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
) )
var sampleConfig = `
servers = ["localhost:1883"] # required.
### MQTT outputs send metrics to this topic format
### "<topic_prefix>/<hostname>/<pluginname>/"
### ex: prefix/host/web01.example.com/mem
topic_prefix = "telegraf"
### username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`
type MQTT struct { type MQTT struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Username string Username string
@ -32,31 +59,11 @@ type MQTT struct {
client *paho.Client client *paho.Client
opts *paho.ClientOptions opts *paho.ClientOptions
serializer serializers.Serializer
sync.Mutex sync.Mutex
} }
var sampleConfig = `
servers = ["localhost:1883"] # required.
### MQTT QoS, must be 0, 1, or 2
qos = 0
### MQTT outputs send metrics to this topic format
### "<topic_prefix>/<hostname>/<pluginname>/"
### ex: prefix/host/web01.example.com/mem
topic_prefix = "telegraf"
### username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
### Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
### Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
func (m *MQTT) Connect() error { func (m *MQTT) Connect() error {
var err error var err error
m.Lock() m.Lock()
@ -78,6 +85,10 @@ func (m *MQTT) Connect() error {
return nil return nil
} }
func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
m.serializer = serializer
}
func (m *MQTT) Close() error { func (m *MQTT) Close() error {
if m.client.IsConnected() { if m.client.IsConnected() {
m.client.Disconnect(20) m.client.Disconnect(20)
@ -104,7 +115,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
hostname = "" hostname = ""
} }
for _, p := range metrics { for _, metric := range metrics {
var t []string var t []string
if m.TopicPrefix != "" { if m.TopicPrefix != "" {
t = append(t, m.TopicPrefix) t = append(t, m.TopicPrefix)
@ -113,15 +124,22 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
t = append(t, hostname) t = append(t, hostname)
} }
t = append(t, p.Name()) t = append(t, metric.Name())
topic := strings.Join(t, "/") topic := strings.Join(t, "/")
value := p.String() values, err := m.serializer.Serialize(metric)
err := m.publish(topic, value) if err != nil {
return fmt.Errorf("MQTT Could not serialize metric: %s",
metric.String())
}
for _, value := range values {
err = m.publish(topic, value)
if err != nil { if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err) return fmt.Errorf("Could not write to MQTT server, %s", err)
} }
} }
}
return nil return nil
} }

View File

@ -3,7 +3,9 @@ package mqtt
import ( import (
"testing" "testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -13,8 +15,10 @@ func TestConnectAndWrite(t *testing.T) {
} }
var url = testutil.GetLocalHost() + ":1883" var url = testutil.GetLocalHost() + ":1883"
s, _ := serializers.NewInfluxSerializer()
m := &MQTT{ m := &MQTT{
Servers: []string{url}, Servers: []string{url},
serializer: s,
} }
// Verify that we can connect to the MQTT broker // Verify that we can connect to the MQTT broker

View File

@ -2,15 +2,20 @@ package nsq
import ( import (
"fmt" "fmt"
"github.com/nsqio/go-nsq"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/nsqio/go-nsq" "github.com/influxdata/telegraf/plugins/serializers"
) )
type NSQ struct { type NSQ struct {
Server string Server string
Topic string Topic string
producer *nsq.Producer producer *nsq.Producer
serializer serializers.Serializer
} }
var sampleConfig = ` var sampleConfig = `
@ -18,8 +23,18 @@ var sampleConfig = `
server = "localhost:4150" server = "localhost:4150"
### NSQ topic for producer messages ### NSQ topic for producer messages
topic = "telegraf" topic = "telegraf"
### Data format to output. This can be "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
data_format = "influx"
` `
func (n *NSQ) SetSerializer(serializer serializers.Serializer) {
n.serializer = serializer
}
func (n *NSQ) Connect() error { func (n *NSQ) Connect() error {
config := nsq.NewConfig() config := nsq.NewConfig()
producer, err := nsq.NewProducer(n.Server, config) producer, err := nsq.NewProducer(n.Server, config)
@ -50,12 +65,21 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error {
return nil return nil
} }
for _, p := range metrics { for _, metric := range metrics {
value := p.String() values, err := n.serializer.Serialize(metric)
err := n.producer.Publish(n.Topic, []byte(value))
if err != nil { if err != nil {
return err
}
var pubErr error
for _, value := range values {
err = n.producer.Publish(n.Topic, []byte(value))
if err != nil {
pubErr = err
}
}
if pubErr != nil {
return fmt.Errorf("FAILED to send NSQD message: %s", err) return fmt.Errorf("FAILED to send NSQD message: %s", err)
} }
} }

View File

@ -3,6 +3,7 @@ package nsq
import ( import (
"testing" "testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) {
} }
server := []string{testutil.GetLocalHost() + ":4150"} server := []string{testutil.GetLocalHost() + ":4150"}
s, _ := serializers.NewInfluxSerializer()
n := &NSQ{ n := &NSQ{
Server: server[0], Server: server[0],
Topic: "telegraf", Topic: "telegraf",
serializer: s,
} }
// Verify that we can connect to the NSQ daemon // Verify that we can connect to the NSQ daemon

View File

@ -15,7 +15,7 @@ type InfluxParser struct {
DefaultTags map[string]string DefaultTags map[string]string
} }
// ParseMetrics returns a slice of Metrics from a text representation of a // Parse returns a slice of Metrics from a text representation of a
// metric (in line-protocol format) // metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse, // with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed // a non-nil error will be returned in addition to the metrics that parsed

View File

@ -0,0 +1,79 @@
package graphite
import (
"fmt"
"sort"
"strings"
"github.com/influxdata/telegraf"
)
type GraphiteSerializer struct {
Prefix string
}
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
// Get name
name := metric.Name()
// Convert UnixNano to Unix timestamps
timestamp := metric.UnixNano() / 1000000000
tag_str := buildTags(metric)
for field_name, value := range metric.Fields() {
// Convert value
value_str := fmt.Sprintf("%#v", value)
// Write graphite metric
var graphitePoint string
if name == field_name {
graphitePoint = fmt.Sprintf("%s.%s %s %d",
tag_str,
strings.Replace(name, ".", "_", -1),
value_str,
timestamp)
} else {
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d",
tag_str,
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1),
value_str,
timestamp)
}
if s.Prefix != "" {
graphitePoint = fmt.Sprintf("%s.%s", s.Prefix, graphitePoint)
}
out = append(out, graphitePoint)
}
return out, nil
}
func buildTags(metric telegraf.Metric) string {
var keys []string
tags := metric.Tags()
for k := range tags {
if k == "host" {
continue
}
keys = append(keys, k)
}
sort.Strings(keys)
var tag_str string
if host, ok := tags["host"]; ok {
if len(keys) > 0 {
tag_str = strings.Replace(host, ".", "_", -1) + "."
} else {
tag_str = strings.Replace(host, ".", "_", -1)
}
}
for i, k := range keys {
tag_value := strings.Replace(tags[k], ".", "_", -1)
if i == 0 {
tag_str += tag_value
} else {
tag_str += "." + tag_value
}
}
return tag_str
}

View File

@ -0,0 +1,121 @@
package graphite
import (
"fmt"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
)
func TestGraphiteTags(t *testing.T) {
m1, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"host": "192.168.0.1", "afoo": "first", "bfoo": "second"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := telegraf.NewMetric(
"mymeasurement",
map[string]string{"afoo": "first", "bfoo": "second"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
tags1 := buildTags(m1)
tags2 := buildTags(m2)
tags3 := buildTags(m3)
assert.Equal(t, "192_168_0_1", tags1)
assert.Equal(t, "192_168_0_1.first.second", tags2)
assert.Equal(t, "first.second", tags3)
}
func TestSerializeMetricNoHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
func TestSerializeMetricHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
func TestSerializeMetricPrefix(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Prefix: "prefix"}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle 91.5 %d", now.Unix()),
fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_busy 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}

View File

@ -0,0 +1,12 @@
package influx
import (
"github.com/influxdata/telegraf"
)
type InfluxSerializer struct {
}
func (s *InfluxSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
return []string{metric.String()}, nil
}

View File

@ -0,0 +1,68 @@
package influx
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf"
)
func TestSerializeMetricFloat(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := InfluxSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=91.5 %d", now.UnixNano())}
assert.Equal(t, expS, mS)
}
func TestSerializeMetricInt(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := InfluxSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=90i %d", now.UnixNano())}
assert.Equal(t, expS, mS)
}
func TestSerializeMetricString(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": "foobar",
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := InfluxSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{fmt.Sprintf("cpu,cpu=cpu0 usage_idle=\"foobar\" %d", now.UnixNano())}
assert.Equal(t, expS, mS)
}

View File

@ -0,0 +1,55 @@
package serializers
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
// SerializerOutput is an interface for output plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerOutput interface {
// SetSerializer sets the serializer function for the interface.
SetSerializer(serializer Serializer)
}
// Serializer is an interface defining functions that a serializer plugin must
// satisfy.
type Serializer interface {
// Serialize takes a single telegraf metric and turns it into a string.
Serialize(metric telegraf.Metric) ([]string, error)
}
// Config is a struct that covers the data types needed for all serializer types,
// and can be used to instantiate _any_ of the serializers.
type Config struct {
// Dataformat can be one of: influx, graphite
DataFormat string
// Prefix to add to all measurements, only supports Graphite
Prefix string
}
// NewSerializer a Serializer interface based on the given config.
func NewSerializer(config *Config) (Serializer, error) {
var err error
var serializer Serializer
switch config.DataFormat {
case "influx":
serializer, err = NewInfluxSerializer()
case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix)
}
return serializer, err
}
func NewInfluxSerializer() (Serializer, error) {
return &influx.InfluxSerializer{}, nil
}
func NewGraphiteSerializer(prefix string) (Serializer, error) {
return &graphite.GraphiteSerializer{
Prefix: prefix,
}, nil
}