adds ability to parse datadog-formatted tags in the statsd input
This commit is contained in:
parent
1cc65cfa12
commit
e863201300
|
@ -21,6 +21,9 @@
|
||||||
## convert measurement names, "." to "_" and "-" to "__"
|
## convert measurement names, "." to "_" and "-" to "__"
|
||||||
convert_names = true
|
convert_names = true
|
||||||
|
|
||||||
|
## parses tags in the datadog statsd format
|
||||||
|
parse_data_dog_tags = false
|
||||||
|
|
||||||
## Statsd data translation templates, more info can be read here:
|
## Statsd data translation templates, more info can be read here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
||||||
# templates = [
|
# templates = [
|
||||||
|
|
|
@ -47,6 +47,10 @@ type Statsd struct {
|
||||||
DeleteTimings bool
|
DeleteTimings bool
|
||||||
ConvertNames bool
|
ConvertNames bool
|
||||||
|
|
||||||
|
// This flag enables parsing of tags in the dogstatsd extention to the
|
||||||
|
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
|
||||||
|
ParseDataDogTags bool
|
||||||
|
|
||||||
// UDPPacketSize is the size of the read packets for the server listening
|
// UDPPacketSize is the size of the read packets for the server listening
|
||||||
// for statsd UDP packets. This will default to 1500 bytes.
|
// for statsd UDP packets. This will default to 1500 bytes.
|
||||||
UDPPacketSize int `toml:"udp_packet_size"`
|
UDPPacketSize int `toml:"udp_packet_size"`
|
||||||
|
@ -148,6 +152,9 @@ const sampleConfig = `
|
||||||
## convert measurement names, "." to "_" and "-" to "__"
|
## convert measurement names, "." to "_" and "-" to "__"
|
||||||
convert_names = true
|
convert_names = true
|
||||||
|
|
||||||
|
## parses tags in the datadog statsd format
|
||||||
|
parse_data_dog_tags = false
|
||||||
|
|
||||||
## Statsd data translation templates, more info can be read here:
|
## Statsd data translation templates, more info can be read here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite
|
||||||
# templates = [
|
# templates = [
|
||||||
|
@ -318,6 +325,43 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
|
lineTags := make(map[string]string)
|
||||||
|
if s.ParseDataDogTags {
|
||||||
|
recombinedSegments := make([]string, 0)
|
||||||
|
// datadog tags look like this:
|
||||||
|
// users.online:1|c|@0.5|#country:china,environment:production
|
||||||
|
// users.online:1|c|#sometagwithnovalue
|
||||||
|
// we will split on the pipe and remove any elements that are datadog
|
||||||
|
// tags, parse them, and rebuild the line sans the datadog tags
|
||||||
|
pipesplit := strings.Split(line, "|")
|
||||||
|
for _, segment := range pipesplit {
|
||||||
|
if len(segment) > 0 && segment[0] == '#' {
|
||||||
|
// we have ourselves a tag; they are comma serated
|
||||||
|
tagstr := segment[1:]
|
||||||
|
tags := strings.Split(tagstr, ",")
|
||||||
|
for _, tag := range tags {
|
||||||
|
ts := strings.Split(tag, ":")
|
||||||
|
var k, v string
|
||||||
|
switch len(ts) {
|
||||||
|
case 1:
|
||||||
|
// just a tag
|
||||||
|
k = ts[0]
|
||||||
|
v = ""
|
||||||
|
case 2:
|
||||||
|
k = ts[0]
|
||||||
|
v = ts[1]
|
||||||
|
}
|
||||||
|
if k != "" {
|
||||||
|
lineTags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
recombinedSegments = append(recombinedSegments, segment)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line = strings.Join(recombinedSegments, "|")
|
||||||
|
}
|
||||||
|
|
||||||
// Validate splitting the line on ":"
|
// Validate splitting the line on ":"
|
||||||
bits := strings.Split(line, ":")
|
bits := strings.Split(line, ":")
|
||||||
if len(bits) < 2 {
|
if len(bits) < 2 {
|
||||||
|
@ -415,6 +459,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
m.tags["metric_type"] = "histogram"
|
m.tags["metric_type"] = "histogram"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(lineTags) > 0 {
|
||||||
|
for k, v := range lineTags {
|
||||||
|
m.tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Make a unique key for the measurement name/tags
|
// Make a unique key for the measurement name/tags
|
||||||
var tg []string
|
var tg []string
|
||||||
for k, v := range m.tags {
|
for k, v := range m.tags {
|
||||||
|
|
|
@ -410,6 +410,84 @@ func TestParse_Tags(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that DataDog tags are parsed
|
||||||
|
func TestParse_DataDogTags(t *testing.T) {
|
||||||
|
s := NewStatsd()
|
||||||
|
s.ParseDataDogTags = true
|
||||||
|
|
||||||
|
lines := []string{
|
||||||
|
"my_counter:1|c|#host:localhost,environment:prod",
|
||||||
|
"my_gauge:10.1|g|#live",
|
||||||
|
"my_set:1|s|#host:localhost",
|
||||||
|
"my_timer:3|ms|@0.1|#live,host:localhost",
|
||||||
|
}
|
||||||
|
|
||||||
|
testTags := map[string]map[string]string{
|
||||||
|
"my_counter": map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"environment": "prod",
|
||||||
|
},
|
||||||
|
|
||||||
|
"my_gauge": map[string]string{
|
||||||
|
"live": "",
|
||||||
|
},
|
||||||
|
|
||||||
|
"my_set": map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
},
|
||||||
|
|
||||||
|
"my_timer": map[string]string{
|
||||||
|
"live": "",
|
||||||
|
"host": "localhost",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range lines {
|
||||||
|
err := s.parseStatsdLine(line)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceTags := map[string]map[string]string{
|
||||||
|
"my_gauge": tagsForItem(s.gauges),
|
||||||
|
"my_counter": tagsForItem(s.counters),
|
||||||
|
"my_set": tagsForItem(s.sets),
|
||||||
|
"my_timer": tagsForItem(s.timings),
|
||||||
|
}
|
||||||
|
|
||||||
|
for statName, tags := range testTags {
|
||||||
|
for k, v := range tags {
|
||||||
|
otherValue := sourceTags[statName][k]
|
||||||
|
if sourceTags[statName][k] != v {
|
||||||
|
t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tagsForItem(m interface{}) map[string]string {
|
||||||
|
switch m.(type) {
|
||||||
|
case map[string]cachedcounter:
|
||||||
|
for _, v := range m.(map[string]cachedcounter) {
|
||||||
|
return v.tags
|
||||||
|
}
|
||||||
|
case map[string]cachedgauge:
|
||||||
|
for _, v := range m.(map[string]cachedgauge) {
|
||||||
|
return v.tags
|
||||||
|
}
|
||||||
|
case map[string]cachedset:
|
||||||
|
for _, v := range m.(map[string]cachedset) {
|
||||||
|
return v.tags
|
||||||
|
}
|
||||||
|
case map[string]cachedtimings:
|
||||||
|
for _, v := range m.(map[string]cachedtimings) {
|
||||||
|
return v.tags
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Test that statsd buckets are parsed to measurement names properly
|
// Test that statsd buckets are parsed to measurement names properly
|
||||||
func TestParseName(t *testing.T) {
|
func TestParseName(t *testing.T) {
|
||||||
s := NewStatsd()
|
s := NewStatsd()
|
||||||
|
|
Loading…
Reference in New Issue