From aa4a62ea5d8277cc993fb835cf61ce91c0160ddf Mon Sep 17 00:00:00 2001 From: Adam Argo Date: Thu, 24 Mar 2016 14:31:23 -0700 Subject: [PATCH] adds ability to parse datadog-formatted tags in the statsd input --- plugins/inputs/statsd/README.md | 3 ++ plugins/inputs/statsd/statsd.go | 50 ++++++++++++++++++ plugins/inputs/statsd/statsd_test.go | 78 ++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+) diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 5bb18657c..5156f90df 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -21,6 +21,9 @@ ## convert measurement names, "." to "_" and "-" to "__" convert_names = true + ## parses tags in the datadog statsd format + parse_data_dog_tags = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite # templates = [ diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 5e1e85667..b113faa6d 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -47,6 +47,10 @@ type Statsd struct { DeleteTimings bool ConvertNames bool + // This flag enables parsing of tags in the dogstatsd extention to the + // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) + ParseDataDogTags bool + // UDPPacketSize is the size of the read packets for the server listening // for statsd UDP packets. This will default to 1500 bytes. UDPPacketSize int `toml:"udp_packet_size"` @@ -148,6 +152,9 @@ const sampleConfig = ` ## convert measurement names, "." to "_" and "-" to "__" convert_names = true + ## parses tags in the datadog statsd format + parse_data_dog_tags = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#graphite # templates = [ @@ -318,6 +325,43 @@ func (s *Statsd) parseStatsdLine(line string) error { s.Lock() defer s.Unlock() + lineTags := make(map[string]string) + if s.ParseDataDogTags { + recombinedSegments := make([]string, 0) + // datadog tags look like this: + // users.online:1|c|@0.5|#country:china,environment:production + // users.online:1|c|#sometagwithnovalue + // we will split on the pipe and remove any elements that are datadog + // tags, parse them, and rebuild the line sans the datadog tags + pipesplit := strings.Split(line, "|") + for _, segment := range pipesplit { + if len(segment) > 0 && segment[0] == '#' { + // we have ourselves a tag; they are comma serated + tagstr := segment[1:] + tags := strings.Split(tagstr, ",") + for _, tag := range tags { + ts := strings.Split(tag, ":") + var k, v string + switch len(ts) { + case 1: + // just a tag + k = ts[0] + v = "" + case 2: + k = ts[0] + v = ts[1] + } + if k != "" { + lineTags[k] = v + } + } + } else { + recombinedSegments = append(recombinedSegments, segment) + } + } + line = strings.Join(recombinedSegments, "|") + } + // Validate splitting the line on ":" bits := strings.Split(line, ":") if len(bits) < 2 { @@ -415,6 +459,12 @@ func (s *Statsd) parseStatsdLine(line string) error { m.tags["metric_type"] = "histogram" } + if len(lineTags) > 0 { + for k, v := range lineTags { + m.tags[k] = v + } + } + // Make a unique key for the measurement name/tags var tg []string for k, v := range m.tags { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 3a87f00aa..5dffdc9cd 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -410,6 +410,84 @@ func TestParse_Tags(t *testing.T) { } } +// Test that DataDog tags are parsed +func TestParse_DataDogTags(t *testing.T) { + s := NewStatsd() + s.ParseDataDogTags = true + + lines := []string{ + "my_counter:1|c|#host:localhost,environment:prod", + "my_gauge:10.1|g|#live", + "my_set:1|s|#host:localhost", + "my_timer:3|ms|@0.1|#live,host:localhost", + } + + testTags := map[string]map[string]string{ + "my_counter": map[string]string{ + "host": "localhost", + "environment": "prod", + }, + + "my_gauge": map[string]string{ + "live": "", + }, + + "my_set": map[string]string{ + "host": "localhost", + }, + + "my_timer": map[string]string{ + "live": "", + "host": "localhost", + }, + } + + for _, line := range lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + sourceTags := map[string]map[string]string{ + "my_gauge": tagsForItem(s.gauges), + "my_counter": tagsForItem(s.counters), + "my_set": tagsForItem(s.sets), + "my_timer": tagsForItem(s.timings), + } + + for statName, tags := range testTags { + for k, v := range tags { + otherValue := sourceTags[statName][k] + if sourceTags[statName][k] != v { + t.Errorf("Error with %s, tag %s: %s != %s", statName, k, v, otherValue) + } + } + } +} + +func tagsForItem(m interface{}) map[string]string { + switch m.(type) { + case map[string]cachedcounter: + for _, v := range m.(map[string]cachedcounter) { + return v.tags + } + case map[string]cachedgauge: + for _, v := range m.(map[string]cachedgauge) { + return v.tags + } + case map[string]cachedset: + for _, v := range m.(map[string]cachedset) { + return v.tags + } + case map[string]cachedtimings: + for _, v := range m.(map[string]cachedtimings) { + return v.tags + } + } + return nil +} + // Test that statsd buckets are parsed to measurement names properly func TestParseName(t *testing.T) { s := NewStatsd()