adds ability to parse datadog-formatted tags in the statsd input
This commit is contained in:
parent
474d6db42f
commit
59085f072a
|
@ -21,6 +21,9 @@
|
|||
## convert measurement names, "." to "_" and "-" to "__"
|
||||
convert_names = true
|
||||
|
||||
## parses tags in the datadog statsd format
|
||||
parse_data_dog_tags = false
|
||||
|
||||
## Statsd data translation templates, more info can be read here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
||||
# templates = [
|
||||
|
|
|
@ -47,6 +47,10 @@ type Statsd struct {
|
|||
DeleteTimings bool
|
||||
ConvertNames bool
|
||||
|
||||
// This flag enables parsing of tags in the dogstatsd extention to the
|
||||
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
|
||||
ParseDataDogTags bool
|
||||
|
||||
// UDPPacketSize is the size of the read packets for the server listening
|
||||
// for statsd UDP packets. This will default to 1500 bytes.
|
||||
UDPPacketSize int `toml:"udp_packet_size"`
|
||||
|
@ -148,6 +152,9 @@ const sampleConfig = `
|
|||
## convert measurement names, "." to "_" and "-" to "__"
|
||||
convert_names = true
|
||||
|
||||
## parses tags in the datadog statsd format
|
||||
parse_data_dog_tags = false
|
||||
|
||||
## Statsd data translation templates, more info can be read here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
||||
# templates = [
|
||||
|
@ -318,6 +325,43 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
|||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
lineTags := make(map[string]string)
|
||||
if s.ParseDataDogTags {
|
||||
recombinedSegments := make([]string, 0)
|
||||
// datadog tags look like this:
|
||||
// users.online:1|c|@0.5|#country:china,environment:production
|
||||
// users.online:1|c|#sometagwithnovalue
|
||||
// we will split on the pipe and remove any elements that are datadog
|
||||
// tags, parse them, and rebuild the line sans the datadog tags
|
||||
pipesplit := strings.Split(line, "|")
|
||||
for _, segment := range pipesplit {
|
||||
if len(segment) > 0 && segment[0] == '#' {
|
||||
// we have ourselves a tag; they are comma serated
|
||||
tagstr := segment[1:]
|
||||
tags := strings.Split(tagstr, ",")
|
||||
for _, tag := range tags {
|
||||
ts := strings.Split(tag, ":")
|
||||
var k, v string
|
||||
switch len(ts) {
|
||||
case 1:
|
||||
// just a tag
|
||||
k = ts[0]
|
||||
v = ""
|
||||
case 2:
|
||||
k = ts[0]
|
||||
v = ts[1]
|
||||
}
|
||||
if k != "" {
|
||||
lineTags[k] = v
|
||||
}
|
||||
}
|
||||
} else {
|
||||
recombinedSegments = append(recombinedSegments, segment)
|
||||
}
|
||||
}
|
||||
line = strings.Join(recombinedSegments, "|")
|
||||
}
|
||||
|
||||
// Validate splitting the line on ":"
|
||||
bits := strings.Split(line, ":")
|
||||
if len(bits) < 2 {
|
||||
|
@ -415,6 +459,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
|||
m.tags["metric_type"] = "histogram"
|
||||
}
|
||||
|
||||
if len(lineTags) > 0 {
|
||||
for k, v := range lineTags {
|
||||
m.tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Make a unique key for the measurement name/tags
|
||||
var tg []string
|
||||
for k, v := range m.tags {
|
||||
|
|
|
@ -410,6 +410,84 @@ func TestParse_Tags(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test that DataDog tags are parsed
|
||||
func TestParse_DataDogTags(t *testing.T) {
|
||||
s := NewStatsd()
|
||||
s.ParseDataDogTags = true
|
||||
|
||||
lines := []string{
|
||||
"my_counter:1|c|#host:localhost,environment:prod",
|
||||
"my_gauge:10.1|g|#live",
|
||||
"my_set:1|s|#host:localhost",
|
||||
"my_timer:3|ms|@0.1|#live,host:localhost",
|
||||
}
|
||||
|
||||
testTags := map[string]map[string]string{
|
||||
"my_counter": map[string]string{
|
||||
"host": "localhost",
|
||||
"environment": "prod",
|
||||
},
|
||||
|
||||
"my_gauge": map[string]string{
|
||||
"live": "",
|
||||
},
|
||||
|
||||
"my_set": map[string]string{
|
||||
"host": "localhost",
|
||||
},
|
||||
|
||||
"my_timer": map[string]string{
|
||||
"live": "",
|
||||
"host": "localhost",
|
||||
},
|
||||
}
|
||||
|
||||
for _, line := range lines {
|
||||
err := s.parseStatsdLine(line)
|
||||
if err != nil {
|
||||
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||
}
|
||||
}
|
||||
|
||||
sourceTags := map[string]map[string]string{
|
||||
"my_gauge": tagsForItem(s.gauges),
|
||||
"my_counter": tagsForItem(s.counters),
|
||||
"my_set": tagsForItem(s.sets),
|
||||
"my_timer": tagsForItem(s.timings),
|
||||
}
|
||||
|
||||
for statName, tags := range testTags {
|
||||
for k, v := range tags {
|
||||
otherValue := sourceTags[statName][k]
|
||||
if sourceTags[statName][k] != v {
|
||||
t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tagsForItem(m interface{}) map[string]string {
|
||||
switch m.(type) {
|
||||
case map[string]cachedcounter:
|
||||
for _, v := range m.(map[string]cachedcounter) {
|
||||
return v.tags
|
||||
}
|
||||
case map[string]cachedgauge:
|
||||
for _, v := range m.(map[string]cachedgauge) {
|
||||
return v.tags
|
||||
}
|
||||
case map[string]cachedset:
|
||||
for _, v := range m.(map[string]cachedset) {
|
||||
return v.tags
|
||||
}
|
||||
case map[string]cachedtimings:
|
||||
for _, v := range m.(map[string]cachedtimings) {
|
||||
return v.tags
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test that statsd buckets are parsed to measurement names properly
|
||||
func TestParseName(t *testing.T) {
|
||||
s := NewStatsd()
|
||||
|
|
Loading…
Reference in New Issue