Revert "Add gelf serializer & graylog output filter." (#1299)
This commit is contained in:
parent
069764f05e
commit
958ef2f872
|
@ -7,7 +7,6 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/file"
|
_ "github.com/influxdata/telegraf/plugins/outputs/file"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
|
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
|
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
|
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
|
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
# Graylog Output Plugin
|
|
||||||
|
|
||||||
This plugin writes to a Graylog instance using the "gelf" format.
|
|
||||||
|
|
||||||
It requires a `servers` name.
|
|
|
@ -1,247 +0,0 @@
|
||||||
package graylog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"compress/zlib"
|
|
||||||
"crypto/rand"
|
|
||||||
"encoding/binary"
|
|
||||||
ejson "encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultGraylogEndpoint = "127.0.0.1:12201"
|
|
||||||
defaultConnection = "wan"
|
|
||||||
defaultMaxChunkSizeWan = 1420
|
|
||||||
defaultMaxChunkSizeLan = 8154
|
|
||||||
)
|
|
||||||
|
|
||||||
type GelfConfig struct {
|
|
||||||
GraylogEndpoint string
|
|
||||||
Connection string
|
|
||||||
MaxChunkSizeWan int
|
|
||||||
MaxChunkSizeLan int
|
|
||||||
}
|
|
||||||
|
|
||||||
type Gelf struct {
|
|
||||||
GelfConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewGelfWriter(config GelfConfig) *Gelf {
|
|
||||||
if config.GraylogEndpoint == "" {
|
|
||||||
config.GraylogEndpoint = defaultGraylogEndpoint
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Connection == "" {
|
|
||||||
config.Connection = defaultConnection
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.MaxChunkSizeWan == 0 {
|
|
||||||
config.MaxChunkSizeWan = defaultMaxChunkSizeWan
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.MaxChunkSizeLan == 0 {
|
|
||||||
config.MaxChunkSizeLan = defaultMaxChunkSizeLan
|
|
||||||
}
|
|
||||||
|
|
||||||
g := &Gelf{GelfConfig: config}
|
|
||||||
|
|
||||||
return g
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) Write(message []byte) (n int, err error) {
|
|
||||||
compressed := g.compress(message)
|
|
||||||
|
|
||||||
chunksize := g.GelfConfig.MaxChunkSizeWan
|
|
||||||
length := compressed.Len()
|
|
||||||
|
|
||||||
if length > chunksize {
|
|
||||||
|
|
||||||
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
|
|
||||||
|
|
||||||
id := make([]byte, 8)
|
|
||||||
rand.Read(id)
|
|
||||||
|
|
||||||
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
|
|
||||||
packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
|
|
||||||
_, err = g.send(packet.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_, err = g.send(compressed.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n = len(message)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer {
|
|
||||||
var packet bytes.Buffer
|
|
||||||
|
|
||||||
chunksize := g.getChunksize()
|
|
||||||
|
|
||||||
packet.Write(g.intToBytes(30))
|
|
||||||
packet.Write(g.intToBytes(15))
|
|
||||||
packet.Write(id)
|
|
||||||
|
|
||||||
packet.Write(g.intToBytes(index))
|
|
||||||
packet.Write(g.intToBytes(chunkCountInt))
|
|
||||||
|
|
||||||
packet.Write(compressed.Next(chunksize))
|
|
||||||
|
|
||||||
return packet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) getChunksize() int {
|
|
||||||
if g.GelfConfig.Connection == "wan" {
|
|
||||||
return g.GelfConfig.MaxChunkSizeWan
|
|
||||||
}
|
|
||||||
|
|
||||||
if g.GelfConfig.Connection == "lan" {
|
|
||||||
return g.GelfConfig.MaxChunkSizeLan
|
|
||||||
}
|
|
||||||
|
|
||||||
return g.GelfConfig.MaxChunkSizeWan
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) intToBytes(i int) []byte {
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
|
|
||||||
binary.Write(buf, binary.LittleEndian, int8(i))
|
|
||||||
return buf.Bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) compress(b []byte) bytes.Buffer {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
comp := zlib.NewWriter(&buf)
|
|
||||||
|
|
||||||
comp.Write(b)
|
|
||||||
comp.Close()
|
|
||||||
|
|
||||||
return buf
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Gelf) send(b []byte) (n int, err error) {
|
|
||||||
udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := net.DialUDP("udp", nil, udpAddr)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err = conn.Write(b)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type Graylog struct {
|
|
||||||
Servers []string
|
|
||||||
writer io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
var sampleConfig = `
|
|
||||||
## Udp endpoint for your graylog instance.
|
|
||||||
servers = ["127.0.0.1:12201", "192.168.1.1:12201"]
|
|
||||||
`
|
|
||||||
|
|
||||||
func (g *Graylog) Connect() error {
|
|
||||||
writers := []io.Writer{}
|
|
||||||
|
|
||||||
if len(g.Servers) == 0 {
|
|
||||||
g.Servers = append(g.Servers, "localhost:12201")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, server := range g.Servers {
|
|
||||||
w := NewGelfWriter(GelfConfig{GraylogEndpoint: server})
|
|
||||||
writers = append(writers, w)
|
|
||||||
}
|
|
||||||
|
|
||||||
g.writer = io.MultiWriter(writers...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Graylog) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Graylog) SampleConfig() string {
|
|
||||||
return sampleConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Graylog) Description() string {
|
|
||||||
return "Send telegraf metrics to graylog(s)"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Graylog) Write(metrics []telegraf.Metric) error {
|
|
||||||
if len(metrics) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, metric := range metrics {
|
|
||||||
values, err := serialize(metric)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, value := range values {
|
|
||||||
_, err := g.writer.Write([]byte(value))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func serialize(metric telegraf.Metric) ([]string, error) {
|
|
||||||
out := []string{}
|
|
||||||
|
|
||||||
m := make(map[string]interface{})
|
|
||||||
m["version"] = "1.1"
|
|
||||||
m["timestamp"] = metric.UnixNano() / 1000000000
|
|
||||||
m["short_message"] = " "
|
|
||||||
m["name"] = metric.Name()
|
|
||||||
|
|
||||||
if host, ok := metric.Tags()["host"]; ok {
|
|
||||||
m["host"] = host
|
|
||||||
} else {
|
|
||||||
host, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
return []string{}, err
|
|
||||||
}
|
|
||||||
m["host"] = host
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, value := range metric.Fields() {
|
|
||||||
nkey := fmt.Sprintf("_%s", key)
|
|
||||||
m[nkey] = value
|
|
||||||
}
|
|
||||||
|
|
||||||
serialized, err := ejson.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
return []string{}, err
|
|
||||||
}
|
|
||||||
out = append(out, string(serialized))
|
|
||||||
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
outputs.Add("graylog", func() telegraf.Output {
|
|
||||||
return &Graylog{}
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,55 +0,0 @@
|
||||||
package graylog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"compress/zlib"
|
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestWrite(t *testing.T) {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go UDPServer(t, &wg)
|
|
||||||
|
|
||||||
i := Graylog{
|
|
||||||
Servers: []string{"127.0.0.1:12201"},
|
|
||||||
}
|
|
||||||
i.Connect()
|
|
||||||
|
|
||||||
metrics := testutil.MockMetrics()
|
|
||||||
metrics = append(metrics, testutil.TestMetric(int64(1234567890)))
|
|
||||||
|
|
||||||
i.Write(metrics)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
i.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
type GelfObject map[string]interface{}
|
|
||||||
|
|
||||||
func UDPServer(t *testing.T, wg *sync.WaitGroup) {
|
|
||||||
serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201")
|
|
||||||
udpServer, _ := net.ListenUDP("udp", serverAddr)
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
bufR := make([]byte, 1024)
|
|
||||||
n, _, _ := udpServer.ReadFromUDP(bufR)
|
|
||||||
|
|
||||||
b := bytes.NewReader(bufR[0:n])
|
|
||||||
r, _ := zlib.NewReader(b)
|
|
||||||
|
|
||||||
bufW := bytes.NewBuffer(nil)
|
|
||||||
io.Copy(bufW, r)
|
|
||||||
r.Close()
|
|
||||||
|
|
||||||
var obj GelfObject
|
|
||||||
json.Unmarshal(bufW.Bytes(), &obj)
|
|
||||||
assert.Equal(t, obj["_value"], float64(1))
|
|
||||||
}
|
|
Loading…
Reference in New Issue