Add gelf serializer & graylog output filter. (#1167)

* add gelf serializer.

* change url.

* handle fields in correct format.

* add graylog.

* handle host field of graylog.

* 1: Add go-gelf entry to Godeps to fix ci.
2: switch to github.com/Graylog2/go-gelf.

* implement Close().

* Deprecated gelf serializer, and back to graylog-golang.

* Update graylog-golang's hash.

* move gelf related function to graylog.go.

* 1: remove uneeded deps on Godeps_windows.
2: add README.md
3: add unittest.

* Fix unittest on 'go test -race'
This commit is contained in:
vanillahsu 2016-05-31 17:58:35 +08:00 committed by Cameron Sparr
parent a7dfbce3d3
commit eeeab5192b
4 changed files with 308 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"

View File

@ -0,0 +1,5 @@
# Graylog Output Plugin
This plugin writes to a Graylog instance using the "gelf" format.
It requires a `servers` name.

View File

@ -0,0 +1,247 @@
package graylog
import (
"bytes"
"compress/zlib"
"crypto/rand"
"encoding/binary"
ejson "encoding/json"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"io"
"math"
"net"
"os"
)
const (
defaultGraylogEndpoint = "127.0.0.1:12201"
defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154
)
type GelfConfig struct {
GraylogEndpoint string
Connection string
MaxChunkSizeWan int
MaxChunkSizeLan int
}
type Gelf struct {
GelfConfig
}
func NewGelfWriter(config GelfConfig) *Gelf {
if config.GraylogEndpoint == "" {
config.GraylogEndpoint = defaultGraylogEndpoint
}
if config.Connection == "" {
config.Connection = defaultConnection
}
if config.MaxChunkSizeWan == 0 {
config.MaxChunkSizeWan = defaultMaxChunkSizeWan
}
if config.MaxChunkSizeLan == 0 {
config.MaxChunkSizeLan = defaultMaxChunkSizeLan
}
g := &Gelf{GelfConfig: config}
return g
}
func (g *Gelf) Write(message []byte) (n int, err error) {
compressed := g.compress(message)
chunksize := g.GelfConfig.MaxChunkSizeWan
length := compressed.Len()
if length > chunksize {
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
id := make([]byte, 8)
rand.Read(id)
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
_, err = g.send(packet.Bytes())
if err != nil {
return 0, err
}
}
} else {
_, err = g.send(compressed.Bytes())
if err != nil {
return 0, err
}
}
n = len(message)
return
}
func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer {
var packet bytes.Buffer
chunksize := g.getChunksize()
packet.Write(g.intToBytes(30))
packet.Write(g.intToBytes(15))
packet.Write(id)
packet.Write(g.intToBytes(index))
packet.Write(g.intToBytes(chunkCountInt))
packet.Write(compressed.Next(chunksize))
return packet
}
func (g *Gelf) getChunksize() int {
if g.GelfConfig.Connection == "wan" {
return g.GelfConfig.MaxChunkSizeWan
}
if g.GelfConfig.Connection == "lan" {
return g.GelfConfig.MaxChunkSizeLan
}
return g.GelfConfig.MaxChunkSizeWan
}
func (g *Gelf) intToBytes(i int) []byte {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, int8(i))
return buf.Bytes()
}
func (g *Gelf) compress(b []byte) bytes.Buffer {
var buf bytes.Buffer
comp := zlib.NewWriter(&buf)
comp.Write(b)
comp.Close()
return buf
}
func (g *Gelf) send(b []byte) (n int, err error) {
udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint)
if err != nil {
return
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return
}
n, err = conn.Write(b)
return
}
type Graylog struct {
Servers []string
writer io.Writer
}
var sampleConfig = `
## Udp endpoint for your graylog instance.
servers = ["127.0.0.1:12201", "192.168.1.1:12201"]
`
func (g *Graylog) Connect() error {
writers := []io.Writer{}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201")
}
for _, server := range g.Servers {
w := NewGelfWriter(GelfConfig{GraylogEndpoint: server})
writers = append(writers, w)
}
g.writer = io.MultiWriter(writers...)
return nil
}
func (g *Graylog) Close() error {
return nil
}
func (g *Graylog) SampleConfig() string {
return sampleConfig
}
func (g *Graylog) Description() string {
return "Send telegraf metrics to graylog(s)"
}
func (g *Graylog) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
for _, metric := range metrics {
values, err := serialize(metric)
if err != nil {
return err
}
for _, value := range values {
_, err := g.writer.Write([]byte(value))
if err != nil {
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
}
}
}
return nil
}
func serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
m := make(map[string]interface{})
m["version"] = "1.1"
m["timestamp"] = metric.UnixNano() / 1000000000
m["short_message"] = " "
m["name"] = metric.Name()
if host, ok := metric.Tags()["host"]; ok {
m["host"] = host
} else {
host, err := os.Hostname()
if err != nil {
return []string{}, err
}
m["host"] = host
}
for key, value := range metric.Fields() {
nkey := fmt.Sprintf("_%s", key)
m[nkey] = value
}
serialized, err := ejson.Marshal(m)
if err != nil {
return []string{}, err
}
out = append(out, string(serialized))
return out, nil
}
func init() {
outputs.Add("graylog", func() telegraf.Output {
return &Graylog{}
})
}

View File

@ -0,0 +1,55 @@
package graylog
import (
"bytes"
"compress/zlib"
"encoding/json"
"io"
"net"
"sync"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestWrite(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go UDPServer(t, &wg)
i := Graylog{
Servers: []string{"127.0.0.1:12201"},
}
i.Connect()
metrics := testutil.MockMetrics()
metrics = append(metrics, testutil.TestMetric(int64(1234567890)))
i.Write(metrics)
wg.Wait()
i.Close()
}
type GelfObject map[string]interface{}
func UDPServer(t *testing.T, wg *sync.WaitGroup) {
serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201")
udpServer, _ := net.ListenUDP("udp", serverAddr)
defer wg.Done()
bufR := make([]byte, 1024)
n, _, _ := udpServer.ReadFromUDP(bufR)
b := bytes.NewReader(bufR[0:n])
r, _ := zlib.NewReader(b)
bufW := bytes.NewBuffer(nil)
io.Copy(bufW, r)
r.Close()
var obj GelfObject
json.Unmarshal(bufW.Bytes(), &obj)
assert.Equal(t, obj["_value"], float64(1))
}