From eeeab5192b21677766474729397fa1406f5f5caa Mon Sep 17 00:00:00 2001 From: vanillahsu Date: Tue, 31 May 2016 17:58:35 +0800 Subject: [PATCH] Add gelf serializer & graylog output filter. (#1167) * add gelf serializer. * change url. * handle fields in correct format. * add graylog. * handle host field of graylog. * 1: Add go-gelf entry to Godeps to fix ci. 2: switch to github.com/Graylog2/go-gelf. * implement Close(). * Deprecated gelf serializer, and back to graylog-golang. * Update graylog-golang's hash. * move gelf related function to graylog.go. * 1: remove uneeded deps on Godeps_windows. 2: add README.md 3: add unittest. * Fix unittest on 'go test -race' --- plugins/outputs/all/all.go | 1 + plugins/outputs/graylog/README.md | 5 + plugins/outputs/graylog/graylog.go | 247 ++++++++++++++++++++++++ plugins/outputs/graylog/graylog_test.go | 55 ++++++ 4 files changed, 308 insertions(+) create mode 100644 plugins/outputs/graylog/README.md create mode 100644 plugins/outputs/graylog/graylog.go create mode 100644 plugins/outputs/graylog/graylog_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 5b223529c..27f8958fe 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" + _ "github.com/influxdata/telegraf/plugins/outputs/graylog" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" diff --git a/plugins/outputs/graylog/README.md b/plugins/outputs/graylog/README.md new file mode 100644 index 000000000..26b8d8fc6 --- /dev/null +++ b/plugins/outputs/graylog/README.md @@ -0,0 +1,5 @@ +# Graylog Output Plugin + +This plugin writes to a Graylog instance using the "gelf" format. + +It requires a `servers` name. diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go new file mode 100644 index 000000000..7f2480134 --- /dev/null +++ b/plugins/outputs/graylog/graylog.go @@ -0,0 +1,247 @@ +package graylog + +import ( + "bytes" + "compress/zlib" + "crypto/rand" + "encoding/binary" + ejson "encoding/json" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "io" + "math" + "net" + "os" +) + +const ( + defaultGraylogEndpoint = "127.0.0.1:12201" + defaultConnection = "wan" + defaultMaxChunkSizeWan = 1420 + defaultMaxChunkSizeLan = 8154 +) + +type GelfConfig struct { + GraylogEndpoint string + Connection string + MaxChunkSizeWan int + MaxChunkSizeLan int +} + +type Gelf struct { + GelfConfig +} + +func NewGelfWriter(config GelfConfig) *Gelf { + if config.GraylogEndpoint == "" { + config.GraylogEndpoint = defaultGraylogEndpoint + } + + if config.Connection == "" { + config.Connection = defaultConnection + } + + if config.MaxChunkSizeWan == 0 { + config.MaxChunkSizeWan = defaultMaxChunkSizeWan + } + + if config.MaxChunkSizeLan == 0 { + config.MaxChunkSizeLan = defaultMaxChunkSizeLan + } + + g := &Gelf{GelfConfig: config} + + return g +} + +func (g *Gelf) Write(message []byte) (n int, err error) { + compressed := g.compress(message) + + chunksize := g.GelfConfig.MaxChunkSizeWan + length := compressed.Len() + + if length > chunksize { + + chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize))) + + id := make([]byte, 8) + rand.Read(id) + + for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 { + packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed) + _, err = g.send(packet.Bytes()) + if err != nil { + return 0, err + } + } + } else { + _, err = g.send(compressed.Bytes()) + if err != nil { + return 0, err + } + } + + n = len(message) + + return +} + +func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { + var packet bytes.Buffer + + chunksize := g.getChunksize() + + packet.Write(g.intToBytes(30)) + packet.Write(g.intToBytes(15)) + packet.Write(id) + + packet.Write(g.intToBytes(index)) + packet.Write(g.intToBytes(chunkCountInt)) + + packet.Write(compressed.Next(chunksize)) + + return packet +} + +func (g *Gelf) getChunksize() int { + if g.GelfConfig.Connection == "wan" { + return g.GelfConfig.MaxChunkSizeWan + } + + if g.GelfConfig.Connection == "lan" { + return g.GelfConfig.MaxChunkSizeLan + } + + return g.GelfConfig.MaxChunkSizeWan +} + +func (g *Gelf) intToBytes(i int) []byte { + buf := new(bytes.Buffer) + + binary.Write(buf, binary.LittleEndian, int8(i)) + return buf.Bytes() +} + +func (g *Gelf) compress(b []byte) bytes.Buffer { + var buf bytes.Buffer + comp := zlib.NewWriter(&buf) + + comp.Write(b) + comp.Close() + + return buf +} + +func (g *Gelf) send(b []byte) (n int, err error) { + udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint) + if err != nil { + return + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return + } + + n, err = conn.Write(b) + return +} + +type Graylog struct { + Servers []string + writer io.Writer +} + +var sampleConfig = ` + ## Udp endpoint for your graylog instance. + servers = ["127.0.0.1:12201", "192.168.1.1:12201"] +` + +func (g *Graylog) Connect() error { + writers := []io.Writer{} + + if len(g.Servers) == 0 { + g.Servers = append(g.Servers, "localhost:12201") + } + + for _, server := range g.Servers { + w := NewGelfWriter(GelfConfig{GraylogEndpoint: server}) + writers = append(writers, w) + } + + g.writer = io.MultiWriter(writers...) + return nil +} + +func (g *Graylog) Close() error { + return nil +} + +func (g *Graylog) SampleConfig() string { + return sampleConfig +} + +func (g *Graylog) Description() string { + return "Send telegraf metrics to graylog(s)" +} + +func (g *Graylog) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + values, err := serialize(metric) + if err != nil { + return err + } + + for _, value := range values { + _, err := g.writer.Write([]byte(value)) + if err != nil { + return fmt.Errorf("FAILED to write message: %s, %s", value, err) + } + } + } + return nil +} + +func serialize(metric telegraf.Metric) ([]string, error) { + out := []string{} + + m := make(map[string]interface{}) + m["version"] = "1.1" + m["timestamp"] = metric.UnixNano() / 1000000000 + m["short_message"] = " " + m["name"] = metric.Name() + + if host, ok := metric.Tags()["host"]; ok { + m["host"] = host + } else { + host, err := os.Hostname() + if err != nil { + return []string{}, err + } + m["host"] = host + } + + for key, value := range metric.Fields() { + nkey := fmt.Sprintf("_%s", key) + m[nkey] = value + } + + serialized, err := ejson.Marshal(m) + if err != nil { + return []string{}, err + } + out = append(out, string(serialized)) + + return out, nil +} + +func init() { + outputs.Add("graylog", func() telegraf.Output { + return &Graylog{} + }) +} diff --git a/plugins/outputs/graylog/graylog_test.go b/plugins/outputs/graylog/graylog_test.go new file mode 100644 index 000000000..521f83dc1 --- /dev/null +++ b/plugins/outputs/graylog/graylog_test.go @@ -0,0 +1,55 @@ +package graylog + +import ( + "bytes" + "compress/zlib" + "encoding/json" + "io" + "net" + "sync" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func TestWrite(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + go UDPServer(t, &wg) + + i := Graylog{ + Servers: []string{"127.0.0.1:12201"}, + } + i.Connect() + + metrics := testutil.MockMetrics() + metrics = append(metrics, testutil.TestMetric(int64(1234567890))) + + i.Write(metrics) + + wg.Wait() + i.Close() +} + +type GelfObject map[string]interface{} + +func UDPServer(t *testing.T, wg *sync.WaitGroup) { + serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201") + udpServer, _ := net.ListenUDP("udp", serverAddr) + defer wg.Done() + + bufR := make([]byte, 1024) + n, _, _ := udpServer.ReadFromUDP(bufR) + + b := bytes.NewReader(bufR[0:n]) + r, _ := zlib.NewReader(b) + + bufW := bytes.NewBuffer(nil) + io.Copy(bufW, r) + r.Close() + + var obj GelfObject + json.Unmarshal(bufW.Bytes(), &obj) + assert.Equal(t, obj["_value"], float64(1)) +}