telegraf/plugins/outputs/riemann/riemann.go

226 lines
5.2 KiB
Go

package riemann
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"strings"
"time"
"github.com/amir/raidman"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
type Riemann struct {
URL string
TTL float32
Separator string
MeasurementAsAttribute bool
StringAsState bool
TagKeys []string
Tags []string
DescriptionText string
Timeout internal.Duration
client *raidman.Client
}
var sampleConfig = `
## The full TCP or UDP URL of the Riemann server
url = "tcp://localhost:5555"
## Riemann event TTL, floating-point time in seconds.
## Defines how long that an event is considered valid for in Riemann
# ttl = 30.0
## Separator to use between measurement and field name in Riemann service name
## This does not have any effect if 'measurement_as_attribute' is set to 'true'
separator = "/"
## Set measurement name as Riemann attribute 'measurement', instead of prepending it to the Riemann service name
# measurement_as_attribute = false
## Send string metrics as Riemann event states.
## Unless enabled all string metrics will be ignored
# string_as_state = false
## A list of tag keys whose values get sent as Riemann tags.
## If empty, all Telegraf tag values will be sent as tags
# tag_keys = ["telegraf","custom_tag"]
## Additional Riemann tags to send.
# tags = ["telegraf-output"]
## Description for Riemann event
# description_text = "metrics collected from telegraf"
## Riemann client write timeout, defaults to "5s" if not set.
# timeout = "5s"
`
func (r *Riemann) Connect() error {
parsed_url, err := url.Parse(r.URL)
if err != nil {
return err
}
client, err := raidman.DialWithTimeout(parsed_url.Scheme, parsed_url.Host, r.Timeout.Duration)
if err != nil {
r.client = nil
return err
}
r.client = client
return nil
}
func (r *Riemann) Close() error {
if r.client != nil {
r.client.Close()
r.client = nil
}
return nil
}
func (r *Riemann) SampleConfig() string {
return sampleConfig
}
func (r *Riemann) Description() string {
return "Configuration for the Riemann server to send metrics to"
}
func (r *Riemann) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
if r.client == nil {
if err := r.Connect(); err != nil {
return fmt.Errorf("Failed to (re)connect to Riemann: %s", err.Error())
}
}
// build list of Riemann events to send
var events []*raidman.Event
for _, m := range metrics {
evs := r.buildRiemannEvents(m)
for _, ev := range evs {
events = append(events, ev)
}
}
if err := r.client.SendMulti(events); err != nil {
r.Close()
return fmt.Errorf("Failed to send riemann message: %s", err)
}
return nil
}
func (r *Riemann) buildRiemannEvents(m telegraf.Metric) []*raidman.Event {
events := []*raidman.Event{}
for fieldName, value := range m.Fields() {
// get host for Riemann event
host, ok := m.Tags()["host"]
if !ok {
if hostname, err := os.Hostname(); err == nil {
host = hostname
} else {
host = "unknown"
}
}
event := &raidman.Event{
Host: host,
Ttl: r.TTL,
Description: r.DescriptionText,
Time: m.Time().Unix(),
Attributes: r.attributes(m.Name(), m.Tags()),
Service: r.service(m.Name(), fieldName),
Tags: r.tags(m.Tags()),
}
switch value.(type) {
case string:
// only send string metrics if explicitly enabled, skip otherwise
if !r.StringAsState {
log.Printf("D! Riemann event states disabled, skipping metric value [%s]\n", value)
continue
}
event.State = value.(string)
case int, int64, uint64, float32, float64:
event.Metric = value
default:
log.Printf("D! Riemann does not support metric value [%s]\n", value)
continue
}
events = append(events, event)
}
return events
}
func (r *Riemann) attributes(name string, tags map[string]string) map[string]string {
if r.MeasurementAsAttribute {
tags["measurement"] = name
}
delete(tags, "host") // exclude 'host' tag
return tags
}
func (r *Riemann) service(name string, field string) string {
var serviceStrings []string
// if measurement is not enabled as an attribute then prepend it to service name
if !r.MeasurementAsAttribute {
serviceStrings = append(serviceStrings, name)
}
serviceStrings = append(serviceStrings, field)
return strings.Join(serviceStrings, r.Separator)
}
func (r *Riemann) tags(tags map[string]string) []string {
// always add specified Riemann tags
values := r.Tags
// if tag_keys are specified, add those and return tag list
if len(r.TagKeys) > 0 {
for _, tagName := range r.TagKeys {
value, ok := tags[tagName]
if ok {
values = append(values, value)
}
}
return values
}
// otherwise add all values from telegraf tag key/value pairs
var keys []string
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
if key != "host" { // exclude 'host' tag
values = append(values, tags[key])
}
}
return values
}
func init() {
outputs.Add("riemann", func() telegraf.Output {
return &Riemann{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}