Rewriting Riemann output plugin (#1900)

* rename to riemann_legacy

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* initial draft for Riemann output plugin rewrite

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* add unit tests

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* add option to send string metrics as states

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* add integration tests

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* add plugin README.md

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* bump riemann library

* clarify settings description

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* update Readme.md with updated description

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* add Riemann event examples

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

* use full URL for Riemann server address

Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com>

closes #1878
This commit is contained in:
Fabio Berchtold 2017-01-27 23:54:59 +01:00 committed by Cameron Sparr
parent a36fd375de
commit fc76f47e43
10 changed files with 615 additions and 69 deletions

2
Godeps
View File

@ -1,7 +1,7 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99

View File

@ -58,7 +58,7 @@ docker-run:
docker run --name redis -p "6379:6379" -d redis
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
docker run --name nats -p "4222:4222" -d nats
# Run docker containers necessary for CircleCI unit tests
@ -71,7 +71,7 @@ docker-run-circle:
-d spotify/kafka
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
docker run --name nats -p "4222:4222" -d nats
# Kill all docker containers, ignore errors

View File

@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins:
* [opentsdb](./plugins/outputs/opentsdb)
* [prometheus](./plugins/outputs/prometheus_client)
* [riemann](./plugins/outputs/riemann)
* [riemann_legacy](./plugins/outputs/riemann_legacy)
## Contributing

View File

@ -443,8 +443,39 @@
# # expiration_interval = "60s"
# # Configuration for the Riemann server to send metrics to
# # Configuration for Riemann server to send metrics to
# [[outputs.riemann]]
# ## The full TCP or UDP URL of the Riemann server
# url = "tcp://localhost:5555"
#
# ## Riemann event TTL, floating-point time in seconds.
# ## Defines how long that an event is considered valid for in Riemann
# # ttl = 30.0
#
# ## Separator to use between measurement and field name in Riemann service name
# ## This does not have any effect if 'measurement_as_attribute' is set to 'true'
# separator = "/"
#
# ## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
# # measurement_as_attribute = false
#
# ## Send string metrics as Riemann event states.
# ## Unless enabled all string metrics will be ignored
# # string_as_state = false
#
# ## A list of tag keys whose values get sent as Riemann tags.
# ## If empty, all Telegraf tag values will be sent as tags
# # tag_keys = ["telegraf","custom_tag"]
#
# ## Additional Riemann tags to send.
# # tags = ["telegraf-output"]
#
# ## Description for Riemann event
# # description_text = "metrics collected from telegraf"
# # Configuration for the legacy Riemann plugin
# [[outputs.riemann_legacy]]
# ## URL of server
# url = "localhost:5555"
# ## transport protocol to use either tcp or udp

View File

@ -20,4 +20,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
)

View File

@ -0,0 +1,83 @@
# Riemann Output Plugin
This plugin writes to [Riemann](http://riemann.io/) via TCP or UDP.
### Configuration:
```toml
# Configuration for Riemann to send metrics to
[[outputs.riemann]]
## The full TCP or UDP URL of the Riemann server
url = "tcp://localhost:5555"
## Riemann event TTL, floating-point time in seconds.
## Defines how long that an event is considered valid for in Riemann
# ttl = 30.0
## Separator to use between measurement and field name in Riemann service name
## This does not have any effect if 'measurement_as_attribute' is set to 'true'
separator = "/"
## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
# measurement_as_attribute = false
## Send string metrics as Riemann event states.
## Unless enabled all string metrics will be ignored
# string_as_state = false
## A list of tag keys whose values get sent as Riemann tags.
## If empty, all Telegraf tag values will be sent as tags
# tag_keys = ["telegraf","custom_tag"]
## Additional Riemann tags to send.
# tags = ["telegraf-output"]
## Description for Riemann event
# description_text = "metrics collected from telegraf"
```
### Required parameters:
* `url`: The full TCP or UDP URL of the Riemann server to send events to.
### Optional parameters:
* `ttl`: Riemann event TTL, floating-point time in seconds. Defines how long that an event is considered valid for in Riemann.
* `separator`: Separator to use between measurement and field name in Riemann service name.
* `measurement_as_attribute`: Set measurement name as a Riemann attribute, instead of prepending it to the Riemann service name.
* `string_as_state`: Send string metrics as Riemann event states. If this is not enabled then all string metrics will be ignored.
* `tag_keys`: A list of tag keys whose values get sent as Riemann tags. If empty, all Telegraf tag values will be sent as tags.
* `tags`: Additional Riemann tags that will be sent.
* `description_text`: Description text for Riemann event.
### Example Events:
Riemann event emitted by Telegraf with default configuration:
```
#riemann.codec.Event{
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0,
:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021}
```
Telegraf emitting the same Riemann event with `measurement_as_attribute` set to `true`:
```
#riemann.codec.Event{ ...
:measurement "disk", :service "used_percent", :metric 73.16736001949994,
... :time 1475605021}
```
Telegraf emitting the same Riemann event with additional Riemann tags defined:
```
#riemann.codec.Event{
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0,
:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021,
:tags ["telegraf" "postgres_cluster"]}
```
Telegraf emitting a Riemann event with a status text and `string_as_state` set to `true`, and a `description_text` defined:
```
#riemann.codec.Event{
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state "Running", :ttl 30.0,
:description "PostgreSQL master node is up and running",
:service "status", :time 1475605021}
```

View File

@ -3,6 +3,7 @@ package riemann
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"strings"
@ -12,44 +13,70 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
)
const deprecationMsg = "I! WARNING: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
type Riemann struct {
URL string
Transport string
TTL float32
Separator string
MeasurementAsAttribute bool
StringAsState bool
TagKeys []string
Tags []string
DescriptionText string
client *raidman.Client
}
var sampleConfig = `
## URL of server
url = "localhost:5555"
## transport protocol to use either tcp or udp
transport = "tcp"
## separator to use between input name and field name in Riemann service name
separator = " "
## The full TCP or UDP URL of the Riemann server
url = "tcp://localhost:5555"
## Riemann event TTL, floating-point time in seconds.
## Defines how long that an event is considered valid for in Riemann
# ttl = 30.0
## Separator to use between measurement and field name in Riemann service name
## This does not have any effect if 'measurement_as_attribute' is set to 'true'
separator = "/"
## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
# measurement_as_attribute = false
## Send string metrics as Riemann event states.
## Unless enabled all string metrics will be ignored
# string_as_state = false
## A list of tag keys whose values get sent as Riemann tags.
## If empty, all Telegraf tag values will be sent as tags
# tag_keys = ["telegraf","custom_tag"]
## Additional Riemann tags to send.
# tags = ["telegraf-output"]
## Description for Riemann event
# description_text = "metrics collected from telegraf"
`
func (r *Riemann) Connect() error {
log.Printf(deprecationMsg)
c, err := raidman.Dial(r.Transport, r.URL)
parsed_url, err := url.Parse(r.URL)
if err != nil {
return err
}
client, err := raidman.Dial(parsed_url.Scheme, parsed_url.Host)
if err != nil {
r.client = nil
return err
}
r.client = c
r.client = client
return nil
}
func (r *Riemann) Close() error {
if r.client == nil {
return nil
}
if r.client != nil {
r.client.Close()
r.client = nil
}
return nil
}
@ -62,91 +89,125 @@ func (r *Riemann) Description() string {
}
func (r *Riemann) Write(metrics []telegraf.Metric) error {
log.Printf(deprecationMsg)
if len(metrics) == 0 {
return nil
}
if r.client == nil {
err := r.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
if err := r.Connect(); err != nil {
return fmt.Errorf("Failed to (re)connect to Riemann: %s", err.Error())
}
}
// build list of Riemann events to send
var events []*raidman.Event
for _, p := range metrics {
evs := buildEvents(p, r.Separator)
for _, m := range metrics {
evs := r.buildRiemannEvents(m)
for _, ev := range evs {
events = append(events, ev)
}
}
var senderr = r.client.SendMulti(events)
if senderr != nil {
r.Close() // always retuns nil
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
senderr)
if err := r.client.SendMulti(events); err != nil {
r.Close()
return fmt.Errorf("Failed to send riemann message: %s", err)
}
return nil
}
func buildEvents(p telegraf.Metric, s string) []*raidman.Event {
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
events := []*raidman.Event{}
for fieldName, value := range p.Fields() {
host, ok := p.Tags()["host"]
for fieldName, value := range m.Fields() {
// get host for Riemann event
host, ok := m.Tags()["host"]
if !ok {
hostname, err := os.Hostname()
if err != nil {
host = "unknown"
} else {
if hostname, err := os.Hostname(); err == nil {
host = hostname
} else {
host = "unknown"
}
}
event := &raidman.Event{
Host: host,
Service: serviceName(s, p.Name(), p.Tags(), fieldName),
Ttl: r.TTL,
Description: r.DescriptionText,
Time: m.Time().Unix(),
Attributes: r.attributes(m.Name(), m.Tags()),
Service: r.service(m.Name(), fieldName),
Tags: r.tags(m.Tags()),
}
switch value.(type) {
case string:
// only send string metrics if explicitly enabled, skip otherwise
if !r.StringAsState {
log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value)
continue
}
event.State = value.(string)
default:
case int, int64, uint64, float32, float64:
event.Metric = value
default:
log.Printf("D! Riemann does not support metric value [%s]\n", value)
continue
}
events = append(events, event)
}
return events
}
func serviceName(s string, n string, t map[string]string, f string) string {
serviceStrings := []string{}
serviceStrings = append(serviceStrings, n)
func (r *Riemann) attributes(name string, tags map[string]string) map[string]string {
if r.MeasurementAsAttribute {
tags["measurement"] = name
}
// we'll skip the 'host' tag
tagStrings := []string{}
tagNames := []string{}
delete(tags, "host") // exclude 'host' tag
return tags
}
for tagName := range t {
tagNames = append(tagNames, tagName)
}
sort.Strings(tagNames)
func (r *Riemann) service(name string, field string) string {
var serviceStrings []string
for _, tagName := range tagNames {
if tagName != "host" {
tagStrings = append(tagStrings, t[tagName])
// if measurement is not enabled as an attribute then prepend it to service name
if !r.MeasurementAsAttribute {
serviceStrings = append(serviceStrings, name)
}
serviceStrings = append(serviceStrings, field)
return strings.Join(serviceStrings, r.Separator)
}
func (r *Riemann) tags(tags map[string]string) []string {
// always add specified Riemann tags
values := r.Tags
// if tag_keys are specified, add those and return tag list
if len(r.TagKeys) > 0 {
for _, tagName := range r.TagKeys {
value, ok := tags[tagName]
if ok {
values = append(values, value)
}
}
var tagString string = strings.Join(tagStrings, s)
if tagString != "" {
serviceStrings = append(serviceStrings, tagString)
return values
}
serviceStrings = append(serviceStrings, f)
return strings.Join(serviceStrings, s)
// otherwise add all values from telegraf tag key/value pairs
var keys []string
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
if key != "host" { // exclude 'host' tag
values = append(values, tags[key])
}
}
return values
}
func init() {

View File

@ -1,22 +1,180 @@
package riemann
import (
"fmt"
"testing"
"time"
"github.com/amir/raidman"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestAttributes(t *testing.T) {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
r := &Riemann{}
require.Equal(t,
map[string]string{"tag1": "value1", "tag2": "value2"},
r.attributes("test", tags))
// enable measurement as attribute, should now be included
r.MeasurementAsAttribute = true
require.Equal(t,
map[string]string{"tag1": "value1", "tag2": "value2", "measurement": "test"},
r.attributes("test", tags))
}
func TestService(t *testing.T) {
r := &Riemann{
Separator: "/",
}
require.Equal(t, "test/value", r.service("test", "value"))
// enable measurement as attribute, should not be part of service name anymore
r.MeasurementAsAttribute = true
require.Equal(t, "value", r.service("test", "value"))
}
func TestTags(t *testing.T) {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
// all tag values plus additional tag should be present
r := &Riemann{
Tags: []string{"test"},
}
require.Equal(t,
[]string{"test", "value1", "value2"},
r.tags(tags))
// only tag2 value plus additional tag should be present
r.TagKeys = []string{"tag2"}
require.Equal(t,
[]string{"test", "value2"},
r.tags(tags))
// only tag1 value should be present
r.Tags = nil
r.TagKeys = []string{"tag1"}
require.Equal(t,
[]string{"value1"},
r.tags(tags))
}
func TestMetricEvents(t *testing.T) {
r := &Riemann{
TTL: 20.0,
Separator: "/",
MeasurementAsAttribute: false,
DescriptionText: "metrics from telegraf",
Tags: []string{"telegraf"},
}
// build a single event
metric, _ := telegraf.NewMetric(
"test1",
map[string]string{"tag1": "value1", "host": "abc123"},
map[string]interface{}{"value": 5.6},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
events := r.buildRiemannEvents(metric)
require.Len(t, events, 1)
// is event as expected?
expectedEvent := &raidman.Event{
Ttl: 20.0,
Time: 1257894000,
Tags: []string{"telegraf", "value1"},
Host: "abc123",
State: "",
Service: "test1/value",
Metric: 5.6,
Description: "metrics from telegraf",
Attributes: map[string]string{"tag1": "value1"},
}
require.Equal(t, expectedEvent, events[0])
// build 2 events
metric, _ = telegraf.NewMetric(
"test2",
map[string]string{"host": "xyz987"},
map[string]interface{}{"point": 1},
time.Date(2012, time.November, 2, 3, 0, 0, 0, time.UTC),
)
events = append(events, r.buildRiemannEvents(metric)...)
require.Len(t, events, 2)
// first event should still be the same
require.Equal(t, expectedEvent, events[0])
// second event
expectedEvent = &raidman.Event{
Ttl: 20.0,
Time: 1351825200,
Tags: []string{"telegraf"},
Host: "xyz987",
State: "",
Service: "test2/point",
Metric: int64(1),
Description: "metrics from telegraf",
Attributes: map[string]string{},
}
require.Equal(t, expectedEvent, events[1])
}
func TestStateEvents(t *testing.T) {
r := &Riemann{
MeasurementAsAttribute: true,
}
// string metrics will be skipped unless explicitly enabled
metric, _ := telegraf.NewMetric(
"test",
map[string]string{"host": "host"},
map[string]interface{}{"value": "running"},
time.Date(2015, time.November, 9, 22, 0, 0, 0, time.UTC),
)
events := r.buildRiemannEvents(metric)
// no event should be present
require.Len(t, events, 0)
// enable string metrics as event states
r.StringAsState = true
events = r.buildRiemannEvents(metric)
require.Len(t, events, 1)
// is event as expected?
expectedEvent := &raidman.Event{
Ttl: 0,
Time: 1447106400,
Tags: nil,
Host: "host",
State: "running",
Service: "value",
Metric: nil,
Description: "",
Attributes: map[string]string{"measurement": "test"},
}
require.Equal(t, expectedEvent, events[0])
}
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
url := testutil.GetLocalHost() + ":5555"
r := &Riemann{
URL: url,
Transport: "tcp",
URL: fmt.Sprintf("tcp://%s:5555", testutil.GetLocalHost()),
TTL: 15.0,
Separator: "/",
MeasurementAsAttribute: false,
StringAsState: true,
DescriptionText: "metrics from telegraf",
Tags: []string{"docker"},
}
err := r.Connect()
@ -24,4 +182,32 @@ func TestConnectAndWrite(t *testing.T) {
err = r.Write(testutil.MockMetrics())
require.NoError(t, err)
metrics := make([]telegraf.Metric, 0)
metrics = append(metrics, testutil.TestMetric(2))
metrics = append(metrics, testutil.TestMetric(3.456789))
metrics = append(metrics, testutil.TestMetric(uint(0)))
metrics = append(metrics, testutil.TestMetric("ok"))
metrics = append(metrics, testutil.TestMetric("running"))
err = r.Write(metrics)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
// are there any "docker" tagged events in Riemann?
events, err := r.client.Query(`tagged "docker"`)
require.NoError(t, err)
require.NotZero(t, len(events))
// get Riemann events with state = "running", should be 1 event
events, err = r.client.Query(`state = "running"`)
require.NoError(t, err)
require.Len(t, events, 1)
// is event as expected?
require.Equal(t, []string{"docker", "value1"}, events[0].Tags)
require.Equal(t, "running", events[0].State)
require.Equal(t, "test1/value", events[0].Service)
require.Equal(t, "metrics from telegraf", events[0].Description)
require.Equal(t, map[string]string{"tag1": "value1"}, events[0].Attributes)
}

View File

@ -0,0 +1,156 @@
package riemann_legacy
import (
"fmt"
"log"
"os"
"sort"
"strings"
"github.com/amir/raidman"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)
const deprecationMsg = "E! Error: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
type Riemann struct {
URL string
Transport string
Separator string
client *raidman.Client
}
var sampleConfig = `
## URL of server
url = "localhost:5555"
## transport protocol to use either tcp or udp
transport = "tcp"
## separator to use between input name and field name in Riemann service name
separator = " "
`
func (r *Riemann) Connect() error {
log.Printf(deprecationMsg)
c, err := raidman.Dial(r.Transport, r.URL)
if err != nil {
r.client = nil
return err
}
r.client = c
return nil
}
func (r *Riemann) Close() error {
if r.client == nil {
return nil
}
r.client.Close()
r.client = nil
return nil
}
func (r *Riemann) SampleConfig() string {
return sampleConfig
}
func (r *Riemann) Description() string {
return "Configuration for the Riemann server to send metrics to"
}
func (r *Riemann) Write(metrics []telegraf.Metric) error {
log.Printf(deprecationMsg)
if len(metrics) == 0 {
return nil
}
if r.client == nil {
err := r.Connect()
if err != nil {
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
}
}
var events []*raidman.Event
for _, p := range metrics {
evs := buildEvents(p, r.Separator)
for _, ev := range evs {
events = append(events, ev)
}
}
var senderr = r.client.SendMulti(events)
if senderr != nil {
r.Close() // always retuns nil
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
senderr)
}
return nil
}
func buildEvents(p telegraf.Metric, s string) []*raidman.Event {
events := []*raidman.Event{}
for fieldName, value := range p.Fields() {
host, ok := p.Tags()["host"]
if !ok {
hostname, err := os.Hostname()
if err != nil {
host = "unknown"
} else {
host = hostname
}
}
event := &raidman.Event{
Host: host,
Service: serviceName(s, p.Name(), p.Tags(), fieldName),
}
switch value.(type) {
case string:
event.State = value.(string)
default:
event.Metric = value
}
events = append(events, event)
}
return events
}
func serviceName(s string, n string, t map[string]string, f string) string {
serviceStrings := []string{}
serviceStrings = append(serviceStrings, n)
// we'll skip the 'host' tag
tagStrings := []string{}
tagNames := []string{}
for tagName := range t {
tagNames = append(tagNames, tagName)
}
sort.Strings(tagNames)
for _, tagName := range tagNames {
if tagName != "host" {
tagStrings = append(tagStrings, t[tagName])
}
}
var tagString string = strings.Join(tagStrings, s)
if tagString != "" {
serviceStrings = append(serviceStrings, tagString)
}
serviceStrings = append(serviceStrings, f)
return strings.Join(serviceStrings, s)
}
func init() {
outputs.Add("riemann_legacy", func() telegraf.Output {
return &Riemann{}
})
}

View File

@ -0,0 +1,27 @@
package riemann_legacy
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
url := testutil.GetLocalHost() + ":5555"
r := &Riemann{
URL: url,
Transport: "tcp",
}
err := r.Connect()
require.NoError(t, err)
err = r.Write(testutil.MockMetrics())
require.NoError(t, err)
}