Rewriting Riemann output plugin (#1900)
* rename to riemann_legacy Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * initial draft for Riemann output plugin rewrite Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * add unit tests Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * add option to send string metrics as states Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * add integration tests Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * add plugin README.md Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * bump riemann library * clarify settings description Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * update Readme.md with updated description Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * add Riemann event examples Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> * use full URL for Riemann server address Signed-off-by: Fabio Berchtold <fabio.berchtold@swisscom.com> closes #1878
This commit is contained in:
parent
c9e87a39f8
commit
3fa37a9212
2
Godeps
2
Godeps
|
@ -1,7 +1,7 @@
|
|||
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
|
||||
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
|
||||
github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63
|
||||
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
|
||||
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
|
||||
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
|
||||
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
|
||||
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
|
||||
|
|
4
Makefile
4
Makefile
|
@ -58,7 +58,7 @@ docker-run:
|
|||
docker run --name redis -p "6379:6379" -d redis
|
||||
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
|
||||
docker run --name nats -p "4222:4222" -d nats
|
||||
|
||||
# Run docker containers necessary for CircleCI unit tests
|
||||
|
@ -71,7 +71,7 @@ docker-run-circle:
|
|||
-d spotify/kafka
|
||||
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
|
||||
docker run --name nats -p "4222:4222" -d nats
|
||||
|
||||
# Kill all docker containers, ignore errors
|
||||
|
|
|
@ -219,6 +219,7 @@ Telegraf can also collect metrics via the following service plugins:
|
|||
* [opentsdb](./plugins/outputs/opentsdb)
|
||||
* [prometheus](./plugins/outputs/prometheus_client)
|
||||
* [riemann](./plugins/outputs/riemann)
|
||||
* [riemann_legacy](./plugins/outputs/riemann_legacy)
|
||||
|
||||
## Contributing
|
||||
|
||||
|
|
|
@ -443,8 +443,39 @@
|
|||
# # expiration_interval = "60s"
|
||||
|
||||
|
||||
# # Configuration for the Riemann server to send metrics to
|
||||
# # Configuration for Riemann server to send metrics to
|
||||
# [[outputs.riemann]]
|
||||
# ## The full TCP or UDP URL of the Riemann server
|
||||
# url = "tcp://localhost:5555"
|
||||
#
|
||||
# ## Riemann event TTL, floating-point time in seconds.
|
||||
# ## Defines how long that an event is considered valid for in Riemann
|
||||
# # ttl = 30.0
|
||||
#
|
||||
# ## Separator to use between measurement and field name in Riemann service name
|
||||
# ## This does not have any effect if 'measurement_as_attribute' is set to 'true'
|
||||
# separator = "/"
|
||||
#
|
||||
# ## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
|
||||
# # measurement_as_attribute = false
|
||||
#
|
||||
# ## Send string metrics as Riemann event states.
|
||||
# ## Unless enabled all string metrics will be ignored
|
||||
# # string_as_state = false
|
||||
#
|
||||
# ## A list of tag keys whose values get sent as Riemann tags.
|
||||
# ## If empty, all Telegraf tag values will be sent as tags
|
||||
# # tag_keys = ["telegraf","custom_tag"]
|
||||
#
|
||||
# ## Additional Riemann tags to send.
|
||||
# # tags = ["telegraf-output"]
|
||||
#
|
||||
# ## Description for Riemann event
|
||||
# # description_text = "metrics collected from telegraf"
|
||||
|
||||
|
||||
# # Configuration for the legacy Riemann plugin
|
||||
# [[outputs.riemann_legacy]]
|
||||
# ## URL of server
|
||||
# url = "localhost:5555"
|
||||
# ## transport protocol to use either tcp or udp
|
||||
|
|
|
@ -20,4 +20,5 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/outputs/opentsdb"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
|
||||
)
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
# Riemann Output Plugin
|
||||
|
||||
This plugin writes to [Riemann](http://riemann.io/) via TCP or UDP.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
# Configuration for Riemann to send metrics to
|
||||
[[outputs.riemann]]
|
||||
## The full TCP or UDP URL of the Riemann server
|
||||
url = "tcp://localhost:5555"
|
||||
|
||||
## Riemann event TTL, floating-point time in seconds.
|
||||
## Defines how long that an event is considered valid for in Riemann
|
||||
# ttl = 30.0
|
||||
|
||||
## Separator to use between measurement and field name in Riemann service name
|
||||
## This does not have any effect if 'measurement_as_attribute' is set to 'true'
|
||||
separator = "/"
|
||||
|
||||
## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
|
||||
# measurement_as_attribute = false
|
||||
|
||||
## Send string metrics as Riemann event states.
|
||||
## Unless enabled all string metrics will be ignored
|
||||
# string_as_state = false
|
||||
|
||||
## A list of tag keys whose values get sent as Riemann tags.
|
||||
## If empty, all Telegraf tag values will be sent as tags
|
||||
# tag_keys = ["telegraf","custom_tag"]
|
||||
|
||||
## Additional Riemann tags to send.
|
||||
# tags = ["telegraf-output"]
|
||||
|
||||
## Description for Riemann event
|
||||
# description_text = "metrics collected from telegraf"
|
||||
```
|
||||
|
||||
### Required parameters:
|
||||
|
||||
* `url`: The full TCP or UDP URL of the Riemann server to send events to.
|
||||
|
||||
### Optional parameters:
|
||||
|
||||
* `ttl`: Riemann event TTL, floating-point time in seconds. Defines how long that an event is considered valid for in Riemann.
|
||||
* `separator`: Separator to use between measurement and field name in Riemann service name.
|
||||
* `measurement_as_attribute`: Set measurement name as a Riemann attribute, instead of prepending it to the Riemann service name.
|
||||
* `string_as_state`: Send string metrics as Riemann event states. If this is not enabled then all string metrics will be ignored.
|
||||
* `tag_keys`: A list of tag keys whose values get sent as Riemann tags. If empty, all Telegraf tag values will be sent as tags.
|
||||
* `tags`: Additional Riemann tags that will be sent.
|
||||
* `description_text`: Description text for Riemann event.
|
||||
|
||||
### Example Events:
|
||||
|
||||
Riemann event emitted by Telegraf with default configuration:
|
||||
```
|
||||
#riemann.codec.Event{
|
||||
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0,
|
||||
:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021}
|
||||
```
|
||||
|
||||
Telegraf emitting the same Riemann event with `measurement_as_attribute` set to `true`:
|
||||
```
|
||||
#riemann.codec.Event{ ...
|
||||
:measurement "disk", :service "used_percent", :metric 73.16736001949994,
|
||||
... :time 1475605021}
|
||||
```
|
||||
|
||||
Telegraf emitting the same Riemann event with additional Riemann tags defined:
|
||||
```
|
||||
#riemann.codec.Event{
|
||||
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state nil, :description nil, :ttl 30.0,
|
||||
:service "disk/used_percent", :metric 73.16736001949994, :path "/boot", :fstype "ext4", :time 1475605021,
|
||||
:tags ["telegraf" "postgres_cluster"]}
|
||||
```
|
||||
|
||||
Telegraf emitting a Riemann event with a status text and `string_as_state` set to `true`, and a `description_text` defined:
|
||||
```
|
||||
#riemann.codec.Event{
|
||||
:host "postgresql-1e612b44-e92f-4d27-9f30-5e2f53947870", :state "Running", :ttl 30.0,
|
||||
:description "PostgreSQL master node is up and running",
|
||||
:service "status", :time 1475605021}
|
||||
```
|
|
@ -3,6 +3,7 @@ package riemann
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -12,44 +13,70 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
||||
const deprecationMsg = "I! WARNING: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
|
||||
|
||||
type Riemann struct {
|
||||
URL string
|
||||
Transport string
|
||||
TTL float32
|
||||
Separator string
|
||||
MeasurementAsAttribute bool
|
||||
StringAsState bool
|
||||
TagKeys []string
|
||||
Tags []string
|
||||
DescriptionText string
|
||||
|
||||
client *raidman.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## URL of server
|
||||
url = "localhost:5555"
|
||||
## transport protocol to use either tcp or udp
|
||||
transport = "tcp"
|
||||
## separator to use between input name and field name in Riemann service name
|
||||
separator = " "
|
||||
## The full TCP or UDP URL of the Riemann server
|
||||
url = "tcp://localhost:5555"
|
||||
|
||||
## Riemann event TTL, floating-point time in seconds.
|
||||
## Defines how long that an event is considered valid for in Riemann
|
||||
# ttl = 30.0
|
||||
|
||||
## Separator to use between measurement and field name in Riemann service name
|
||||
## This does not have any effect if 'measurement_as_attribute' is set to 'true'
|
||||
separator = "/"
|
||||
|
||||
## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
|
||||
# measurement_as_attribute = false
|
||||
|
||||
## Send string metrics as Riemann event states.
|
||||
## Unless enabled all string metrics will be ignored
|
||||
# string_as_state = false
|
||||
|
||||
## A list of tag keys whose values get sent as Riemann tags.
|
||||
## If empty, all Telegraf tag values will be sent as tags
|
||||
# tag_keys = ["telegraf","custom_tag"]
|
||||
|
||||
## Additional Riemann tags to send.
|
||||
# tags = ["telegraf-output"]
|
||||
|
||||
## Description for Riemann event
|
||||
# description_text = "metrics collected from telegraf"
|
||||
`
|
||||
|
||||
func (r *Riemann) Connect() error {
|
||||
log.Printf(deprecationMsg)
|
||||
c, err := raidman.Dial(r.Transport, r.URL)
|
||||
parsed_url, err := url.Parse(r.URL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := raidman.Dial(parsed_url.Scheme, parsed_url.Host)
|
||||
if err != nil {
|
||||
r.client = nil
|
||||
return err
|
||||
}
|
||||
|
||||
r.client = c
|
||||
r.client = client
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Riemann) Close() error {
|
||||
if r.client == nil {
|
||||
return nil
|
||||
}
|
||||
if r.client != nil {
|
||||
r.client.Close()
|
||||
r.client = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -62,91 +89,125 @@ func (r *Riemann) Description() string {
|
|||
}
|
||||
|
||||
func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||
log.Printf(deprecationMsg)
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if r.client == nil {
|
||||
err := r.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
|
||||
if err := r.Connect(); err != nil {
|
||||
return fmt.Errorf("Failed to (re)connect to Riemann: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// build list of Riemann events to send
|
||||
var events []*raidman.Event
|
||||
for _, p := range metrics {
|
||||
evs := buildEvents(p, r.Separator)
|
||||
for _, m := range metrics {
|
||||
evs := r.buildRiemannEvents(m)
|
||||
for _, ev := range evs {
|
||||
events = append(events, ev)
|
||||
}
|
||||
}
|
||||
|
||||
var senderr = r.client.SendMulti(events)
|
||||
if senderr != nil {
|
||||
r.Close() // always retuns nil
|
||||
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
|
||||
senderr)
|
||||
if err := r.client.SendMulti(events); err != nil {
|
||||
r.Close()
|
||||
return fmt.Errorf("Failed to send riemann message: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildEvents(p telegraf.Metric, s string) []*raidman.Event {
|
||||
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
|
||||
events := []*raidman.Event{}
|
||||
for fieldName, value := range p.Fields() {
|
||||
host, ok := p.Tags()["host"]
|
||||
for fieldName, value := range m.Fields() {
|
||||
// get host for Riemann event
|
||||
host, ok := m.Tags()["host"]
|
||||
if !ok {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
host = "unknown"
|
||||
} else {
|
||||
if hostname, err := os.Hostname(); err == nil {
|
||||
host = hostname
|
||||
} else {
|
||||
host = "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
event := &raidman.Event{
|
||||
Host: host,
|
||||
Service: serviceName(s, p.Name(), p.Tags(), fieldName),
|
||||
Ttl: r.TTL,
|
||||
Description: r.DescriptionText,
|
||||
Time: m.Time().Unix(),
|
||||
|
||||
Attributes: r.attributes(m.Name(), m.Tags()),
|
||||
Service: r.service(m.Name(), fieldName),
|
||||
Tags: r.tags(m.Tags()),
|
||||
}
|
||||
|
||||
switch value.(type) {
|
||||
case string:
|
||||
// only send string metrics if explicitly enabled, skip otherwise
|
||||
if !r.StringAsState {
|
||||
log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value)
|
||||
continue
|
||||
}
|
||||
event.State = value.(string)
|
||||
default:
|
||||
case int, int64, uint64, float32, float64:
|
||||
event.Metric = value
|
||||
default:
|
||||
log.Printf("D! Riemann does not support metric value [%s]\n", value)
|
||||
continue
|
||||
}
|
||||
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func serviceName(s string, n string, t map[string]string, f string) string {
|
||||
serviceStrings := []string{}
|
||||
serviceStrings = append(serviceStrings, n)
|
||||
func (r *Riemann) attributes(name string, tags map[string]string) map[string]string {
|
||||
if r.MeasurementAsAttribute {
|
||||
tags["measurement"] = name
|
||||
}
|
||||
|
||||
// we'll skip the 'host' tag
|
||||
tagStrings := []string{}
|
||||
tagNames := []string{}
|
||||
delete(tags, "host") // exclude 'host' tag
|
||||
return tags
|
||||
}
|
||||
|
||||
for tagName := range t {
|
||||
tagNames = append(tagNames, tagName)
|
||||
}
|
||||
sort.Strings(tagNames)
|
||||
func (r *Riemann) service(name string, field string) string {
|
||||
var serviceStrings []string
|
||||
|
||||
for _, tagName := range tagNames {
|
||||
if tagName != "host" {
|
||||
tagStrings = append(tagStrings, t[tagName])
|
||||
// if measurement is not enabled as an attribute then prepend it to service name
|
||||
if !r.MeasurementAsAttribute {
|
||||
serviceStrings = append(serviceStrings, name)
|
||||
}
|
||||
serviceStrings = append(serviceStrings, field)
|
||||
|
||||
return strings.Join(serviceStrings, r.Separator)
|
||||
}
|
||||
|
||||
func (r *Riemann) tags(tags map[string]string) []string {
|
||||
// always add specified Riemann tags
|
||||
values := r.Tags
|
||||
|
||||
// if tag_keys are specified, add those and return tag list
|
||||
if len(r.TagKeys) > 0 {
|
||||
for _, tagName := range r.TagKeys {
|
||||
value, ok := tags[tagName]
|
||||
if ok {
|
||||
values = append(values, value)
|
||||
}
|
||||
}
|
||||
var tagString string = strings.Join(tagStrings, s)
|
||||
if tagString != "" {
|
||||
serviceStrings = append(serviceStrings, tagString)
|
||||
return values
|
||||
}
|
||||
serviceStrings = append(serviceStrings, f)
|
||||
return strings.Join(serviceStrings, s)
|
||||
|
||||
// otherwise add all values from telegraf tag key/value pairs
|
||||
var keys []string
|
||||
for key := range tags {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
for _, key := range keys {
|
||||
if key != "host" { // exclude 'host' tag
|
||||
values = append(values, tags[key])
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -1,22 +1,180 @@
|
|||
package riemann
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/amir/raidman"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAttributes(t *testing.T) {
|
||||
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
|
||||
|
||||
r := &Riemann{}
|
||||
require.Equal(t,
|
||||
map[string]string{"tag1": "value1", "tag2": "value2"},
|
||||
r.attributes("test", tags))
|
||||
|
||||
// enable measurement as attribute, should now be included
|
||||
r.MeasurementAsAttribute = true
|
||||
require.Equal(t,
|
||||
map[string]string{"tag1": "value1", "tag2": "value2", "measurement": "test"},
|
||||
r.attributes("test", tags))
|
||||
}
|
||||
|
||||
func TestService(t *testing.T) {
|
||||
r := &Riemann{
|
||||
Separator: "/",
|
||||
}
|
||||
require.Equal(t, "test/value", r.service("test", "value"))
|
||||
|
||||
// enable measurement as attribute, should not be part of service name anymore
|
||||
r.MeasurementAsAttribute = true
|
||||
require.Equal(t, "value", r.service("test", "value"))
|
||||
}
|
||||
|
||||
func TestTags(t *testing.T) {
|
||||
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
|
||||
|
||||
// all tag values plus additional tag should be present
|
||||
r := &Riemann{
|
||||
Tags: []string{"test"},
|
||||
}
|
||||
require.Equal(t,
|
||||
[]string{"test", "value1", "value2"},
|
||||
r.tags(tags))
|
||||
|
||||
// only tag2 value plus additional tag should be present
|
||||
r.TagKeys = []string{"tag2"}
|
||||
require.Equal(t,
|
||||
[]string{"test", "value2"},
|
||||
r.tags(tags))
|
||||
|
||||
// only tag1 value should be present
|
||||
r.Tags = nil
|
||||
r.TagKeys = []string{"tag1"}
|
||||
require.Equal(t,
|
||||
[]string{"value1"},
|
||||
r.tags(tags))
|
||||
}
|
||||
|
||||
func TestMetricEvents(t *testing.T) {
|
||||
r := &Riemann{
|
||||
TTL: 20.0,
|
||||
Separator: "/",
|
||||
MeasurementAsAttribute: false,
|
||||
DescriptionText: "metrics from telegraf",
|
||||
Tags: []string{"telegraf"},
|
||||
}
|
||||
|
||||
// build a single event
|
||||
metric, _ := telegraf.NewMetric(
|
||||
"test1",
|
||||
map[string]string{"tag1": "value1", "host": "abc123"},
|
||||
map[string]interface{}{"value": 5.6},
|
||||
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
||||
events := r.buildRiemannEvents(metric)
|
||||
require.Len(t, events, 1)
|
||||
|
||||
// is event as expected?
|
||||
expectedEvent := &raidman.Event{
|
||||
Ttl: 20.0,
|
||||
Time: 1257894000,
|
||||
Tags: []string{"telegraf", "value1"},
|
||||
Host: "abc123",
|
||||
State: "",
|
||||
Service: "test1/value",
|
||||
Metric: 5.6,
|
||||
Description: "metrics from telegraf",
|
||||
Attributes: map[string]string{"tag1": "value1"},
|
||||
}
|
||||
require.Equal(t, expectedEvent, events[0])
|
||||
|
||||
// build 2 events
|
||||
metric, _ = telegraf.NewMetric(
|
||||
"test2",
|
||||
map[string]string{"host": "xyz987"},
|
||||
map[string]interface{}{"point": 1},
|
||||
time.Date(2012, time.November, 2, 3, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
||||
events = append(events, r.buildRiemannEvents(metric)...)
|
||||
require.Len(t, events, 2)
|
||||
|
||||
// first event should still be the same
|
||||
require.Equal(t, expectedEvent, events[0])
|
||||
|
||||
// second event
|
||||
expectedEvent = &raidman.Event{
|
||||
Ttl: 20.0,
|
||||
Time: 1351825200,
|
||||
Tags: []string{"telegraf"},
|
||||
Host: "xyz987",
|
||||
State: "",
|
||||
Service: "test2/point",
|
||||
Metric: int64(1),
|
||||
Description: "metrics from telegraf",
|
||||
Attributes: map[string]string{},
|
||||
}
|
||||
require.Equal(t, expectedEvent, events[1])
|
||||
}
|
||||
|
||||
func TestStateEvents(t *testing.T) {
|
||||
r := &Riemann{
|
||||
MeasurementAsAttribute: true,
|
||||
}
|
||||
|
||||
// string metrics will be skipped unless explicitly enabled
|
||||
metric, _ := telegraf.NewMetric(
|
||||
"test",
|
||||
map[string]string{"host": "host"},
|
||||
map[string]interface{}{"value": "running"},
|
||||
time.Date(2015, time.November, 9, 22, 0, 0, 0, time.UTC),
|
||||
)
|
||||
|
||||
events := r.buildRiemannEvents(metric)
|
||||
// no event should be present
|
||||
require.Len(t, events, 0)
|
||||
|
||||
// enable string metrics as event states
|
||||
r.StringAsState = true
|
||||
events = r.buildRiemannEvents(metric)
|
||||
require.Len(t, events, 1)
|
||||
|
||||
// is event as expected?
|
||||
expectedEvent := &raidman.Event{
|
||||
Ttl: 0,
|
||||
Time: 1447106400,
|
||||
Tags: nil,
|
||||
Host: "host",
|
||||
State: "running",
|
||||
Service: "value",
|
||||
Metric: nil,
|
||||
Description: "",
|
||||
Attributes: map[string]string{"measurement": "test"},
|
||||
}
|
||||
require.Equal(t, expectedEvent, events[0])
|
||||
}
|
||||
|
||||
func TestConnectAndWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
url := testutil.GetLocalHost() + ":5555"
|
||||
|
||||
r := &Riemann{
|
||||
URL: url,
|
||||
Transport: "tcp",
|
||||
URL: fmt.Sprintf("tcp://%s:5555", testutil.GetLocalHost()),
|
||||
TTL: 15.0,
|
||||
Separator: "/",
|
||||
MeasurementAsAttribute: false,
|
||||
StringAsState: true,
|
||||
DescriptionText: "metrics from telegraf",
|
||||
Tags: []string{"docker"},
|
||||
}
|
||||
|
||||
err := r.Connect()
|
||||
|
@ -24,4 +182,32 @@ func TestConnectAndWrite(t *testing.T) {
|
|||
|
||||
err = r.Write(testutil.MockMetrics())
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
metrics = append(metrics, testutil.TestMetric(2))
|
||||
metrics = append(metrics, testutil.TestMetric(3.456789))
|
||||
metrics = append(metrics, testutil.TestMetric(uint(0)))
|
||||
metrics = append(metrics, testutil.TestMetric("ok"))
|
||||
metrics = append(metrics, testutil.TestMetric("running"))
|
||||
err = r.Write(metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// are there any "docker" tagged events in Riemann?
|
||||
events, err := r.client.Query(`tagged "docker"`)
|
||||
require.NoError(t, err)
|
||||
require.NotZero(t, len(events))
|
||||
|
||||
// get Riemann events with state = "running", should be 1 event
|
||||
events, err = r.client.Query(`state = "running"`)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, events, 1)
|
||||
|
||||
// is event as expected?
|
||||
require.Equal(t, []string{"docker", "value1"}, events[0].Tags)
|
||||
require.Equal(t, "running", events[0].State)
|
||||
require.Equal(t, "test1/value", events[0].Service)
|
||||
require.Equal(t, "metrics from telegraf", events[0].Description)
|
||||
require.Equal(t, map[string]string{"tag1": "value1"}, events[0].Attributes)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
package riemann_legacy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/amir/raidman"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
||||
const deprecationMsg = "E! Error: this Riemann output plugin will be deprecated in a future release, see https://github.com/influxdata/telegraf/issues/1878 for more details & discussion."
|
||||
|
||||
type Riemann struct {
|
||||
URL string
|
||||
Transport string
|
||||
Separator string
|
||||
|
||||
client *raidman.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## URL of server
|
||||
url = "localhost:5555"
|
||||
## transport protocol to use either tcp or udp
|
||||
transport = "tcp"
|
||||
## separator to use between input name and field name in Riemann service name
|
||||
separator = " "
|
||||
`
|
||||
|
||||
func (r *Riemann) Connect() error {
|
||||
log.Printf(deprecationMsg)
|
||||
c, err := raidman.Dial(r.Transport, r.URL)
|
||||
|
||||
if err != nil {
|
||||
r.client = nil
|
||||
return err
|
||||
}
|
||||
|
||||
r.client = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Riemann) Close() error {
|
||||
if r.client == nil {
|
||||
return nil
|
||||
}
|
||||
r.client.Close()
|
||||
r.client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Riemann) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (r *Riemann) Description() string {
|
||||
return "Configuration for the Riemann server to send metrics to"
|
||||
}
|
||||
|
||||
func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||
log.Printf(deprecationMsg)
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if r.client == nil {
|
||||
err := r.Connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to (re)connect to Riemann. Error: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
var events []*raidman.Event
|
||||
for _, p := range metrics {
|
||||
evs := buildEvents(p, r.Separator)
|
||||
for _, ev := range evs {
|
||||
events = append(events, ev)
|
||||
}
|
||||
}
|
||||
|
||||
var senderr = r.client.SendMulti(events)
|
||||
if senderr != nil {
|
||||
r.Close() // always retuns nil
|
||||
return fmt.Errorf("FAILED to send riemann message (will try to reconnect). Error: %s\n",
|
||||
senderr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildEvents(p telegraf.Metric, s string) []*raidman.Event {
|
||||
events := []*raidman.Event{}
|
||||
for fieldName, value := range p.Fields() {
|
||||
host, ok := p.Tags()["host"]
|
||||
if !ok {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
host = "unknown"
|
||||
} else {
|
||||
host = hostname
|
||||
}
|
||||
}
|
||||
|
||||
event := &raidman.Event{
|
||||
Host: host,
|
||||
Service: serviceName(s, p.Name(), p.Tags(), fieldName),
|
||||
}
|
||||
|
||||
switch value.(type) {
|
||||
case string:
|
||||
event.State = value.(string)
|
||||
default:
|
||||
event.Metric = value
|
||||
}
|
||||
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
func serviceName(s string, n string, t map[string]string, f string) string {
|
||||
serviceStrings := []string{}
|
||||
serviceStrings = append(serviceStrings, n)
|
||||
|
||||
// we'll skip the 'host' tag
|
||||
tagStrings := []string{}
|
||||
tagNames := []string{}
|
||||
|
||||
for tagName := range t {
|
||||
tagNames = append(tagNames, tagName)
|
||||
}
|
||||
sort.Strings(tagNames)
|
||||
|
||||
for _, tagName := range tagNames {
|
||||
if tagName != "host" {
|
||||
tagStrings = append(tagStrings, t[tagName])
|
||||
}
|
||||
}
|
||||
var tagString string = strings.Join(tagStrings, s)
|
||||
if tagString != "" {
|
||||
serviceStrings = append(serviceStrings, tagString)
|
||||
}
|
||||
serviceStrings = append(serviceStrings, f)
|
||||
return strings.Join(serviceStrings, s)
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("riemann_legacy", func() telegraf.Output {
|
||||
return &Riemann{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package riemann_legacy
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConnectAndWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
url := testutil.GetLocalHost() + ":5555"
|
||||
|
||||
r := &Riemann{
|
||||
URL: url,
|
||||
Transport: "tcp",
|
||||
}
|
||||
|
||||
err := r.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = r.Write(testutil.MockMetrics())
|
||||
require.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue