statsd: allow template parsing fields. Default to value=

closes #602
This commit is contained in:
Cameron Sparr 2016-01-28 16:09:41 -07:00 committed by Ryan Merrick
parent 856745f873
commit 7a5a652e57
3 changed files with 218 additions and 58 deletions

View File

@ -12,6 +12,7 @@
- [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements. - [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements.
- [#599](https://github.com/influxdata/telegraf/issues/599): datadog plugin tags not working. - [#599](https://github.com/influxdata/telegraf/issues/599): datadog plugin tags not working.
- [#600](https://github.com/influxdata/telegraf/issues/600): datadog measurement/field name parsing is wrong. - [#600](https://github.com/influxdata/telegraf/issues/600): datadog measurement/field name parsing is wrong.
- [#602](https://github.com/influxdata/telegraf/issues/602): Fix statsd field name templating.
## v0.10.1 [2016-01-27] ## v0.10.1 [2016-01-27]

View File

@ -9,6 +9,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/influxdata/influxdb/services/graphite" "github.com/influxdata/influxdb/services/graphite"
@ -51,6 +52,8 @@ type Statsd struct {
done chan struct{} done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive // Cache gauges, counters & sets so they can be aggregated as they arrive
// gauges and counters map measurement/tags hash -> field name -> metrics
// sets and timings map measurement/tags hash -> metrics
gauges map[string]cachedgauge gauges map[string]cachedgauge
counters map[string]cachedcounter counters map[string]cachedcounter
sets map[string]cachedset sets map[string]cachedset
@ -80,6 +83,7 @@ func NewStatsd() *Statsd {
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate> // One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
type metric struct { type metric struct {
name string name string
field string
bucket string bucket string
hash string hash string
intvalue int64 intvalue int64
@ -91,21 +95,21 @@ type metric struct {
} }
type cachedset struct { type cachedset struct {
name string name string
set map[int64]bool fields map[string]map[int64]bool
tags map[string]string tags map[string]string
} }
type cachedgauge struct { type cachedgauge struct {
name string name string
value float64 fields map[string]interface{}
tags map[string]string tags map[string]string
} }
type cachedcounter struct { type cachedcounter struct {
name string name string
value int64 fields map[string]interface{}
tags map[string]string tags map[string]string
} }
type cachedtimings struct { type cachedtimings struct {
@ -160,6 +164,7 @@ func (_ *Statsd) SampleConfig() string {
func (s *Statsd) Gather(acc telegraf.Accumulator) error { func (s *Statsd) Gather(acc telegraf.Accumulator) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
now := time.Now()
for _, metric := range s.timings { for _, metric := range s.timings {
fields := make(map[string]interface{}) fields := make(map[string]interface{})
@ -172,28 +177,32 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
name := fmt.Sprintf("%v_percentile", percentile) name := fmt.Sprintf("%v_percentile", percentile)
fields[name] = metric.stats.Percentile(percentile) fields[name] = metric.stats.Percentile(percentile)
} }
acc.AddFields(metric.name, fields, metric.tags) acc.AddFields(metric.name, fields, metric.tags, now)
} }
if s.DeleteTimings { if s.DeleteTimings {
s.timings = make(map[string]cachedtimings) s.timings = make(map[string]cachedtimings)
} }
for _, metric := range s.gauges { for _, metric := range s.gauges {
acc.Add(metric.name, metric.value, metric.tags) acc.AddFields(metric.name, metric.fields, metric.tags, now)
} }
if s.DeleteGauges { if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge) s.gauges = make(map[string]cachedgauge)
} }
for _, metric := range s.counters { for _, metric := range s.counters {
acc.Add(metric.name, metric.value, metric.tags) acc.AddFields(metric.name, metric.fields, metric.tags, now)
} }
if s.DeleteCounters { if s.DeleteCounters {
s.counters = make(map[string]cachedcounter) s.counters = make(map[string]cachedcounter)
} }
for _, metric := range s.sets { for _, metric := range s.sets {
acc.Add(metric.name, int64(len(metric.set)), metric.tags) fields := make(map[string]interface{})
for field, set := range metric.fields {
fields[field] = int64(len(set))
}
acc.AddFields(metric.name, fields, metric.tags, now)
} }
if s.DeleteSets { if s.DeleteSets {
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
@ -358,7 +367,12 @@ func (s *Statsd) parseStatsdLine(line string) error {
} }
// Parse the name & tags from bucket // Parse the name & tags from bucket
m.name, m.tags = s.parseName(m.bucket) m.name, m.field, m.tags = s.parseName(m.bucket)
// fields are not supported for timings, so if specified combine into
// the name
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
m.name += "_" + m.field
}
switch m.mtype { switch m.mtype {
case "c": case "c":
m.tags["metric_type"] = "counter" m.tags["metric_type"] = "counter"
@ -389,8 +403,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
// parseName parses the given bucket name with the list of bucket maps in the // parseName parses the given bucket name with the list of bucket maps in the
// config file. If there is a match, it will parse the name of the metric and // config file. If there is a match, it will parse the name of the metric and
// map of tags. // map of tags.
// Return values are (<name>, <tags>) // Return values are (<name>, <field>, <tags>)
func (s *Statsd) parseName(bucket string) (string, map[string]string) { func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
tags := make(map[string]string) tags := make(map[string]string)
bucketparts := strings.Split(bucket, ",") bucketparts := strings.Split(bucket, ",")
@ -410,17 +424,21 @@ func (s *Statsd) parseName(bucket string) (string, map[string]string) {
DefaultTags: tags, DefaultTags: tags,
} }
var field string
name := bucketparts[0] name := bucketparts[0]
p, err := graphite.NewParserWithOptions(o) p, err := graphite.NewParserWithOptions(o)
if err == nil { if err == nil {
name, tags, _, _ = p.ApplyTemplate(name) name, tags, field, _ = p.ApplyTemplate(name)
} }
if s.ConvertNames { if s.ConvertNames {
name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1) name = strings.Replace(name, "-", "__", -1)
} }
if field == "" {
field = "value"
}
return name, tags return name, field, tags
} }
// Parse the key,value out of a string that looks like "key=value" // Parse the key,value out of a string that looks like "key=value"
@ -466,46 +484,59 @@ func (s *Statsd) aggregate(m metric) {
s.timings[m.hash] = cached s.timings[m.hash] = cached
} }
case "c": case "c":
cached, ok := s.counters[m.hash] // check if the measurement exists
_, ok := s.counters[m.hash]
if !ok { if !ok {
s.counters[m.hash] = cachedcounter{ s.counters[m.hash] = cachedcounter{
name: m.name, name: m.name,
value: m.intvalue, fields: make(map[string]interface{}),
tags: m.tags, tags: m.tags,
} }
} else {
cached.value += m.intvalue
s.counters[m.hash] = cached
} }
// check if the field exists
_, ok = s.counters[m.hash].fields[m.field]
if !ok {
s.counters[m.hash].fields[m.field] = int64(0)
}
s.counters[m.hash].fields[m.field] =
s.counters[m.hash].fields[m.field].(int64) + m.intvalue
case "g": case "g":
cached, ok := s.gauges[m.hash] // check if the measurement exists
_, ok := s.gauges[m.hash]
if !ok { if !ok {
s.gauges[m.hash] = cachedgauge{ s.gauges[m.hash] = cachedgauge{
name: m.name, name: m.name,
value: m.floatvalue, fields: make(map[string]interface{}),
tags: m.tags, tags: m.tags,
} }
}
// check if the field exists
_, ok = s.gauges[m.hash].fields[m.field]
if !ok {
s.gauges[m.hash].fields[m.field] = float64(0)
}
if m.additive {
s.gauges[m.hash].fields[m.field] =
s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue
} else { } else {
if m.additive { s.gauges[m.hash].fields[m.field] = m.floatvalue
cached.value = cached.value + m.floatvalue
} else {
cached.value = m.floatvalue
}
s.gauges[m.hash] = cached
} }
case "s": case "s":
cached, ok := s.sets[m.hash] // check if the measurement exists
_, ok := s.sets[m.hash]
if !ok { if !ok {
// Completely new metric (initialize with count of 1)
s.sets[m.hash] = cachedset{ s.sets[m.hash] = cachedset{
name: m.name, name: m.name,
tags: m.tags, fields: make(map[string]map[int64]bool),
set: map[int64]bool{m.intvalue: true}, tags: m.tags,
} }
} else {
cached.set[m.intvalue] = true
s.sets[m.hash] = cached
} }
// check if the field exists
_, ok = s.sets[m.hash].fields[m.field]
if !ok {
s.sets[m.hash].fields[m.field] = make(map[int64]bool)
}
s.sets[m.hash].fields[m.field][m.intvalue] = true
} }
} }

View File

@ -243,6 +243,113 @@ func TestParse_TemplateSpecificity(t *testing.T) {
} }
} }
// Test that most specific template is chosen
func TestParse_TemplateFields(t *testing.T) {
s := NewStatsd()
s.Templates = []string{
"* measurement.measurement.field",
}
lines := []string{
"my.counter.f1:1|c",
"my.counter.f1:1|c",
"my.counter.f2:1|c",
"my.counter.f3:10|c",
"my.counter.f3:100|c",
"my.gauge.f1:10.1|g",
"my.gauge.f2:10.1|g",
"my.gauge.f1:0.9|g",
"my.set.f1:1|s",
"my.set.f1:2|s",
"my.set.f1:1|s",
"my.set.f2:100|s",
}
for _, line := range lines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
counter_tests := []struct {
name string
value int64
field string
}{
{
"my_counter",
2,
"f1",
},
{
"my_counter",
1,
"f2",
},
{
"my_counter",
110,
"f3",
},
}
// Validate counters
for _, test := range counter_tests {
err := test_validate_counter(test.name, test.value, s.counters, test.field)
if err != nil {
t.Error(err.Error())
}
}
gauge_tests := []struct {
name string
value float64
field string
}{
{
"my_gauge",
0.9,
"f1",
},
{
"my_gauge",
10.1,
"f2",
},
}
// Validate gauges
for _, test := range gauge_tests {
err := test_validate_gauge(test.name, test.value, s.gauges, test.field)
if err != nil {
t.Error(err.Error())
}
}
set_tests := []struct {
name string
value int64
field string
}{
{
"my_set",
2,
"f1",
},
{
"my_set",
1,
"f2",
},
}
// Validate sets
for _, test := range set_tests {
err := test_validate_set(test.name, test.value, s.sets, test.field)
if err != nil {
t.Error(err.Error())
}
}
}
// Test that fields are parsed correctly // Test that fields are parsed correctly
func TestParse_Fields(t *testing.T) { func TestParse_Fields(t *testing.T) {
if false { if false {
@ -286,7 +393,7 @@ func TestParse_Tags(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
name, tags := s.parseName(test.bucket) name, _, tags := s.parseName(test.bucket)
if name != test.name { if name != test.name {
t.Errorf("Expected: %s, got %s", test.name, name) t.Errorf("Expected: %s, got %s", test.name, name)
} }
@ -326,7 +433,7 @@ func TestParseName(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
name, _ := s.parseName(test.in_name) name, _, _ := s.parseName(test.in_name)
if name != test.out_name { if name != test.out_name {
t.Errorf("Expected: %s, got %s", test.out_name, name) t.Errorf("Expected: %s, got %s", test.out_name, name)
} }
@ -354,7 +461,7 @@ func TestParseName(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
name, _ := s.parseName(test.in_name) name, _, _ := s.parseName(test.in_name)
if name != test.out_name { if name != test.out_name {
t.Errorf("Expected: %s, got %s", test.out_name, name) t.Errorf("Expected: %s, got %s", test.out_name, name)
} }
@ -863,7 +970,14 @@ func test_validate_set(
name string, name string,
value int64, value int64,
cache map[string]cachedset, cache map[string]cachedset,
field ...string,
) error { ) error {
var f string
if len(field) > 0 {
f = field[0]
} else {
f = "value"
}
var metric cachedset var metric cachedset
var found bool var found bool
for _, v := range cache { for _, v := range cache {
@ -877,23 +991,30 @@ func test_validate_set(
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
} }
if value != int64(len(metric.set)) { if value != int64(len(metric.fields[f])) {
return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n",
name, value, len(metric.set))) name, value, len(metric.fields[f])))
} }
return nil return nil
} }
func test_validate_counter( func test_validate_counter(
name string, name string,
value int64, valueExpected int64,
cache map[string]cachedcounter, cache map[string]cachedcounter,
field ...string,
) error { ) error {
var metric cachedcounter var f string
if len(field) > 0 {
f = field[0]
} else {
f = "value"
}
var valueActual int64
var found bool var found bool
for _, v := range cache { for _, v := range cache {
if v.name == name { if v.name == name {
metric = v valueActual = v.fields[f].(int64)
found = true found = true
break break
} }
@ -902,23 +1023,30 @@ func test_validate_counter(
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
} }
if value != metric.value { if valueExpected != valueActual {
return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n",
name, value, metric.value)) name, valueExpected, valueActual))
} }
return nil return nil
} }
func test_validate_gauge( func test_validate_gauge(
name string, name string,
value float64, valueExpected float64,
cache map[string]cachedgauge, cache map[string]cachedgauge,
field ...string,
) error { ) error {
var metric cachedgauge var f string
if len(field) > 0 {
f = field[0]
} else {
f = "value"
}
var valueActual float64
var found bool var found bool
for _, v := range cache { for _, v := range cache {
if v.name == name { if v.name == name {
metric = v valueActual = v.fields[f].(float64)
found = true found = true
break break
} }
@ -927,9 +1055,9 @@ func test_validate_gauge(
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
} }
if value != metric.value { if valueExpected != valueActual {
return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n", return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n",
name, value, metric.value)) name, valueExpected, valueActual))
} }
return nil return nil
} }