From 958ef2f87238afc5eb8a2601af738a2f3e7bd5c8 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 31 May 2016 11:21:20 +0100 Subject: [PATCH] Revert "Add gelf serializer & graylog output filter." (#1299) --- plugins/outputs/all/all.go | 1 - plugins/outputs/graylog/README.md | 5 - plugins/outputs/graylog/graylog.go | 247 ------------------------ plugins/outputs/graylog/graylog_test.go | 55 ------ 4 files changed, 308 deletions(-) delete mode 100644 plugins/outputs/graylog/README.md delete mode 100644 plugins/outputs/graylog/graylog.go delete mode 100644 plugins/outputs/graylog/graylog_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 27f8958fe..5b223529c 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -7,7 +7,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" - _ "github.com/influxdata/telegraf/plugins/outputs/graylog" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" diff --git a/plugins/outputs/graylog/README.md b/plugins/outputs/graylog/README.md deleted file mode 100644 index 26b8d8fc6..000000000 --- a/plugins/outputs/graylog/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# Graylog Output Plugin - -This plugin writes to a Graylog instance using the "gelf" format. - -It requires a `servers` name. diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go deleted file mode 100644 index 7f2480134..000000000 --- a/plugins/outputs/graylog/graylog.go +++ /dev/null @@ -1,247 +0,0 @@ -package graylog - -import ( - "bytes" - "compress/zlib" - "crypto/rand" - "encoding/binary" - ejson "encoding/json" - "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/outputs" - "io" - "math" - "net" - "os" -) - -const ( - defaultGraylogEndpoint = "127.0.0.1:12201" - defaultConnection = "wan" - defaultMaxChunkSizeWan = 1420 - defaultMaxChunkSizeLan = 8154 -) - -type GelfConfig struct { - GraylogEndpoint string - Connection string - MaxChunkSizeWan int - MaxChunkSizeLan int -} - -type Gelf struct { - GelfConfig -} - -func NewGelfWriter(config GelfConfig) *Gelf { - if config.GraylogEndpoint == "" { - config.GraylogEndpoint = defaultGraylogEndpoint - } - - if config.Connection == "" { - config.Connection = defaultConnection - } - - if config.MaxChunkSizeWan == 0 { - config.MaxChunkSizeWan = defaultMaxChunkSizeWan - } - - if config.MaxChunkSizeLan == 0 { - config.MaxChunkSizeLan = defaultMaxChunkSizeLan - } - - g := &Gelf{GelfConfig: config} - - return g -} - -func (g *Gelf) Write(message []byte) (n int, err error) { - compressed := g.compress(message) - - chunksize := g.GelfConfig.MaxChunkSizeWan - length := compressed.Len() - - if length > chunksize { - - chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize))) - - id := make([]byte, 8) - rand.Read(id) - - for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 { - packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed) - _, err = g.send(packet.Bytes()) - if err != nil { - return 0, err - } - } - } else { - _, err = g.send(compressed.Bytes()) - if err != nil { - return 0, err - } - } - - n = len(message) - - return -} - -func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { - var packet bytes.Buffer - - chunksize := g.getChunksize() - - packet.Write(g.intToBytes(30)) - packet.Write(g.intToBytes(15)) - packet.Write(id) - - packet.Write(g.intToBytes(index)) - packet.Write(g.intToBytes(chunkCountInt)) - - packet.Write(compressed.Next(chunksize)) - - return packet -} - -func (g *Gelf) getChunksize() int { - if g.GelfConfig.Connection == "wan" { - return g.GelfConfig.MaxChunkSizeWan - } - - if g.GelfConfig.Connection == "lan" { - return g.GelfConfig.MaxChunkSizeLan - } - - return g.GelfConfig.MaxChunkSizeWan -} - -func (g *Gelf) intToBytes(i int) []byte { - buf := new(bytes.Buffer) - - binary.Write(buf, binary.LittleEndian, int8(i)) - return buf.Bytes() -} - -func (g *Gelf) compress(b []byte) bytes.Buffer { - var buf bytes.Buffer - comp := zlib.NewWriter(&buf) - - comp.Write(b) - comp.Close() - - return buf -} - -func (g *Gelf) send(b []byte) (n int, err error) { - udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint) - if err != nil { - return - } - - conn, err := net.DialUDP("udp", nil, udpAddr) - if err != nil { - return - } - - n, err = conn.Write(b) - return -} - -type Graylog struct { - Servers []string - writer io.Writer -} - -var sampleConfig = ` - ## Udp endpoint for your graylog instance. - servers = ["127.0.0.1:12201", "192.168.1.1:12201"] -` - -func (g *Graylog) Connect() error { - writers := []io.Writer{} - - if len(g.Servers) == 0 { - g.Servers = append(g.Servers, "localhost:12201") - } - - for _, server := range g.Servers { - w := NewGelfWriter(GelfConfig{GraylogEndpoint: server}) - writers = append(writers, w) - } - - g.writer = io.MultiWriter(writers...) - return nil -} - -func (g *Graylog) Close() error { - return nil -} - -func (g *Graylog) SampleConfig() string { - return sampleConfig -} - -func (g *Graylog) Description() string { - return "Send telegraf metrics to graylog(s)" -} - -func (g *Graylog) Write(metrics []telegraf.Metric) error { - if len(metrics) == 0 { - return nil - } - - for _, metric := range metrics { - values, err := serialize(metric) - if err != nil { - return err - } - - for _, value := range values { - _, err := g.writer.Write([]byte(value)) - if err != nil { - return fmt.Errorf("FAILED to write message: %s, %s", value, err) - } - } - } - return nil -} - -func serialize(metric telegraf.Metric) ([]string, error) { - out := []string{} - - m := make(map[string]interface{}) - m["version"] = "1.1" - m["timestamp"] = metric.UnixNano() / 1000000000 - m["short_message"] = " " - m["name"] = metric.Name() - - if host, ok := metric.Tags()["host"]; ok { - m["host"] = host - } else { - host, err := os.Hostname() - if err != nil { - return []string{}, err - } - m["host"] = host - } - - for key, value := range metric.Fields() { - nkey := fmt.Sprintf("_%s", key) - m[nkey] = value - } - - serialized, err := ejson.Marshal(m) - if err != nil { - return []string{}, err - } - out = append(out, string(serialized)) - - return out, nil -} - -func init() { - outputs.Add("graylog", func() telegraf.Output { - return &Graylog{} - }) -} diff --git a/plugins/outputs/graylog/graylog_test.go b/plugins/outputs/graylog/graylog_test.go deleted file mode 100644 index 521f83dc1..000000000 --- a/plugins/outputs/graylog/graylog_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package graylog - -import ( - "bytes" - "compress/zlib" - "encoding/json" - "io" - "net" - "sync" - "testing" - - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" -) - -func TestWrite(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - go UDPServer(t, &wg) - - i := Graylog{ - Servers: []string{"127.0.0.1:12201"}, - } - i.Connect() - - metrics := testutil.MockMetrics() - metrics = append(metrics, testutil.TestMetric(int64(1234567890))) - - i.Write(metrics) - - wg.Wait() - i.Close() -} - -type GelfObject map[string]interface{} - -func UDPServer(t *testing.T, wg *sync.WaitGroup) { - serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201") - udpServer, _ := net.ListenUDP("udp", serverAddr) - defer wg.Done() - - bufR := make([]byte, 1024) - n, _, _ := udpServer.ReadFromUDP(bufR) - - b := bytes.NewReader(bufR[0:n]) - r, _ := zlib.NewReader(b) - - bufW := bytes.NewBuffer(nil) - io.Copy(bufW, r) - r.Close() - - var obj GelfObject - json.Unmarshal(bufW.Bytes(), &obj) - assert.Equal(t, obj["_value"], float64(1)) -}