adds ability to parse datadog-formatted tags in the statsd input

This commit is contained in:
Adam Argo 2016-03-24 14:31:23 -07:00
parent a95710ed0c
commit aa4a62ea5d
3 changed files with 131 additions and 0 deletions

View File

@ -21,6 +21,9 @@
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true
## parses tags in the datadog statsd format
parse_data_dog_tags = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [

View File

@ -47,6 +47,10 @@ type Statsd struct {
DeleteTimings bool
ConvertNames bool
// This flag enables parsing of tags in the dogstatsd extention to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool
// UDPPacketSize is the size of the read packets for the server listening
// for statsd UDP packets. This will default to 1500 bytes.
UDPPacketSize int `toml:"udp_packet_size"`
@ -148,6 +152,9 @@ const sampleConfig = `
## convert measurement names, "." to "_" and "-" to "__"
convert_names = true
## parses tags in the datadog statsd format
parse_data_dog_tags = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
# templates = [
@ -318,6 +325,43 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()
lineTags := make(map[string]string)
if s.ParseDataDogTags {
recombinedSegments := make([]string, 0)
// datadog tags look like this:
// users.online:1|c|@0.5|#country:china,environment:production
// users.online:1|c|#sometagwithnovalue
// we will split on the pipe and remove any elements that are datadog
// tags, parse them, and rebuild the line sans the datadog tags
pipesplit := strings.Split(line, "|")
for _, segment := range pipesplit {
if len(segment) > 0 && segment[0] == '#' {
// we have ourselves a tag; they are comma serated
tagstr := segment[1:]
tags := strings.Split(tagstr, ",")
for _, tag := range tags {
ts := strings.Split(tag, ":")
var k, v string
switch len(ts) {
case 1:
// just a tag
k = ts[0]
v = ""
case 2:
k = ts[0]
v = ts[1]
}
if k != "" {
lineTags[k] = v
}
}
} else {
recombinedSegments = append(recombinedSegments, segment)
}
}
line = strings.Join(recombinedSegments, "|")
}
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
@ -415,6 +459,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.tags["metric_type"] = "histogram"
}
if len(lineTags) > 0 {
for k, v := range lineTags {
m.tags[k] = v
}
}
// Make a unique key for the measurement name/tags
var tg []string
for k, v := range m.tags {

View File

@ -410,6 +410,84 @@ func TestParse_Tags(t *testing.T) {
}
}
// Test that DataDog tags are parsed
func TestParse_DataDogTags(t *testing.T) {
s := NewStatsd()
s.ParseDataDogTags = true
lines := []string{
"my_counter:1|c|#host:localhost,environment:prod",
"my_gauge:10.1|g|#live",
"my_set:1|s|#host:localhost",
"my_timer:3|ms|@0.1|#live,host:localhost",
}
testTags := map[string]map[string]string{
"my_counter": map[string]string{
"host": "localhost",
"environment": "prod",
},
"my_gauge": map[string]string{
"live": "",
},
"my_set": map[string]string{
"host": "localhost",
},
"my_timer": map[string]string{
"live": "",
"host": "localhost",
},
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
sourceTags := map[string]map[string]string{
"my_gauge": tagsForItem(s.gauges),
"my_counter": tagsForItem(s.counters),
"my_set": tagsForItem(s.sets),
"my_timer": tagsForItem(s.timings),
}
for statName, tags := range testTags {
for k, v := range tags {
otherValue := sourceTags[statName][k]
if sourceTags[statName][k] != v {
t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue)
}
}
}
}
func tagsForItem(m interface{}) map[string]string {
switch m.(type) {
case map[string]cachedcounter:
for _, v := range m.(map[string]cachedcounter) {
return v.tags
}
case map[string]cachedgauge:
for _, v := range m.(map[string]cachedgauge) {
return v.tags
}
case map[string]cachedset:
for _, v := range m.(map[string]cachedset) {
return v.tags
}
case map[string]cachedtimings:
for _, v := range m.(map[string]cachedtimings) {
return v.tags
}
}
return nil
}
// Test that statsd buckets are parsed to measurement names properly
func TestParseName(t *testing.T) {
s := NewStatsd()