From 0823eed54651fdcf38ee96b7822fafa9c9e5bce8 Mon Sep 17 00:00:00 2001 From: Jeffrey Allen Date: Thu, 22 Oct 2015 15:50:19 -0600 Subject: [PATCH] Add riemann output Closes #34 --- README.md | 1 + outputs/all/all.go | 1 + outputs/riemann/riemann.go | 96 +++++++++++++++++++++++++++++++++ outputs/riemann/riemann_test.go | 27 ++++++++++ 4 files changed, 125 insertions(+) create mode 100644 outputs/riemann/riemann.go create mode 100644 outputs/riemann/riemann_test.go diff --git a/README.md b/README.md index c14179a74..799a6d375 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,7 @@ found by running `telegraf -sample-config`. * librato * prometheus * amon +* riemann ## Contributing diff --git a/outputs/all/all.go b/outputs/all/all.go index be9c4cf42..2c00f43f9 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -11,4 +11,5 @@ import ( _ "github.com/influxdb/telegraf/outputs/nsq" _ "github.com/influxdb/telegraf/outputs/opentsdb" _ "github.com/influxdb/telegraf/outputs/prometheus_client" + _ "github.com/influxdb/telegraf/outputs/riemann" ) diff --git a/outputs/riemann/riemann.go b/outputs/riemann/riemann.go new file mode 100644 index 000000000..789e09691 --- /dev/null +++ b/outputs/riemann/riemann.go @@ -0,0 +1,96 @@ +package riemann + +import ( + "errors" + "fmt" + "os" + + "github.com/amir/raidman" + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/outputs" +) + +type Riemann struct { + URL string + Transport string + + client raidman.Client +} + +var sampleConfig = ` + # URL of server + url = "localhost:5555" + # transport protocol to use either tcp or udp + transport = "tcp" +` + +func (r *Riemann) Connect() error { + c, err := raidman.Dial(r.Transport, r.URL) + + if err != nil { + return err + } + + r.client = *c + return nil +} + +func (r *Riemann) Close() error { + r.client.Close() + return nil +} + +func (r *Riemann) SampleConfig() string { + return sampleConfig +} + +func (r *Riemann) Description() string { + return "Configuration for the Riemann server to send metrics to" +} + +func (r *Riemann) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + + var events []*raidman.Event + for _, p := range points { + ev := buildEvent(p) + events = append(events, &ev) + } + + var senderr = r.client.SendMulti(events) + if senderr != nil { + return errors.New(fmt.Sprintf("FAILED to send riemann message: %s\n", + senderr)) + } + + return nil +} + +func buildEvent(p *client.Point) raidman.Event { + host := p.Tags()["host"] + + if len(host) == 0 { + hostname, err := os.Hostname() + if err != nil { + host = "unknown" + } else { + host = hostname + } + } + + var event = &raidman.Event{ + Host: host, + Service: p.Name(), + Metric: p.Fields()["value"], + } + + return *event +} + +func init() { + outputs.Add("riemann", func() outputs.Output { + return &Riemann{} + }) +} diff --git a/outputs/riemann/riemann_test.go b/outputs/riemann/riemann_test.go new file mode 100644 index 000000000..31e9478b1 --- /dev/null +++ b/outputs/riemann/riemann_test.go @@ -0,0 +1,27 @@ +package riemann + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + url := testutil.GetLocalHost() + ":5555" + + r := &Riemann{ + URL: url, + Transport: "tcp", + } + + err := r.Connect() + require.NoError(t, err) + + err = r.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +}