Refactoring gauges to support floats, unit tests

This commit is contained in:
Cameron Sparr 2015-10-06 13:33:35 -06:00
parent d84a258b0a
commit d40351286a
3 changed files with 161 additions and 132 deletions

View File

@ -13,7 +13,7 @@ import (
// BatchPoints is used to send a batch of data in a single write from telegraf // BatchPoints is used to send a batch of data in a single write from telegraf
// to influx // to influx
type BatchPoints struct { type BatchPoints struct {
mu sync.Mutex sync.Mutex
client.BatchPoints client.BatchPoints
@ -71,8 +71,8 @@ func (bp *BatchPoints) Add(
val interface{}, val interface{},
tags map[string]string, tags map[string]string,
) { ) {
bp.mu.Lock() bp.Lock()
defer bp.mu.Unlock() defer bp.Unlock()
measurement = bp.Prefix + measurement measurement = bp.Prefix + measurement
@ -113,8 +113,8 @@ func (bp *BatchPoints) AddFieldsWithTime(
// TODO this function should add the fields with the timestamp, but that will // TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed // need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags) bp.AddFields(measurement, fields, tags)
// bp.mu.Lock() // bp.Lock()
// defer bp.mu.Unlock() // defer bp.Unlock()
// measurement = bp.Prefix + measurement // measurement = bp.Prefix + measurement
@ -158,8 +158,8 @@ func (bp *BatchPoints) AddFields(
fields map[string]interface{}, fields map[string]interface{},
tags map[string]string, tags map[string]string,
) { ) {
bp.mu.Lock() bp.Lock()
defer bp.mu.Unlock() defer bp.Unlock()
measurement = bp.Prefix + measurement measurement = bp.Prefix + measurement

View File

@ -34,9 +34,9 @@ type Statsd struct {
done chan struct{} done chan struct{}
// Cache gauges, counters & sets so they can be aggregated as they arrive // Cache gauges, counters & sets so they can be aggregated as they arrive
gauges map[string]cachedmetric gauges map[string]cachedgauge
counters map[string]cachedmetric counters map[string]cachedcounter
sets map[string]cachedmetric sets map[string]cachedset
Mappings []struct { Mappings []struct {
Match string Match string
@ -52,9 +52,9 @@ func NewStatsd() *Statsd {
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages) s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages) s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedmetric) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedmetric) s.sets = make(map[string]cachedset)
return &s return &s
} }
@ -63,19 +63,32 @@ func NewStatsd() *Statsd {
type metric struct { type metric struct {
name string name string
bucket string bucket string
value int64 intvalue int64
floatvalue float64
mtype string mtype string
additive bool additive bool
samplerate float64 samplerate float64
tags map[string]string tags map[string]string
} }
// cachedmetric is a subset of metric used specifically for storing cached type cachedset struct {
// gauges and counters, ready for sending to InfluxDB. set map[int64]bool
type cachedmetric struct { tags map[string]string
}
type cachedgauge struct {
value float64
tags map[string]string
}
type cachedcounter struct {
value int64 value int64
tags map[string]string tags map[string]string
set map[int64]bool }
type cachedtiming struct {
timings []float64
tags map[string]string
} }
func (_ *Statsd) Description() string { func (_ *Statsd) Description() string {
@ -105,7 +118,6 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
values := make(map[string]int64)
items := len(s.inmetrics) items := len(s.inmetrics)
for i := 0; i < items; i++ { for i := 0; i < items; i++ {
@ -123,26 +135,23 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
acc.Add(name, cmetric.value, cmetric.tags) acc.Add(name, cmetric.value, cmetric.tags)
} }
if s.DeleteGauges { if s.DeleteGauges {
s.gauges = make(map[string]cachedmetric) s.gauges = make(map[string]cachedgauge)
} }
for name, cmetric := range s.counters { for name, cmetric := range s.counters {
acc.Add(name, cmetric.value, cmetric.tags) acc.Add(name, cmetric.value, cmetric.tags)
} }
if s.DeleteCounters { if s.DeleteCounters {
s.counters = make(map[string]cachedmetric) s.counters = make(map[string]cachedcounter)
} }
for name, cmetric := range s.sets { for name, cmetric := range s.sets {
acc.Add(name, cmetric.value, cmetric.tags) acc.Add(name, int64(len(cmetric.set)), cmetric.tags)
} }
if s.DeleteSets { if s.DeleteSets {
s.sets = make(map[string]cachedmetric) s.sets = make(map[string]cachedset)
} }
for name, value := range values {
acc.Add(name, value, nil)
}
return nil return nil
} }
@ -153,9 +162,9 @@ func (s *Statsd) Start() error {
s.done = make(chan struct{}) s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages) s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages) s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric) s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedmetric) s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedmetric) s.sets = make(map[string]cachedset)
// Start the UDP listener // Start the UDP listener
go s.udpListen() go s.udpListen()
@ -267,6 +276,16 @@ func (s *Statsd) parseStatsdLine(line string) error {
} }
m.additive = true m.additive = true
} }
switch m.mtype {
case "g", "ms", "h":
v, err := strconv.ParseFloat(parts2[1], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
case "c", "s":
v, err := strconv.ParseInt(parts2[1], 10, 64) v, err := strconv.ParseInt(parts2[1], 10, 64)
if err != nil { if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line) log.Printf("Error: parsing value to int64: %s\n", line)
@ -276,7 +295,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
if m.samplerate != 0 && m.mtype == "c" { if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate) v = int64(float64(v) / m.samplerate)
} }
m.value = v m.intvalue = v
}
// Parse the name // Parse the name
m.name, m.tags = s.parseName(m) m.name, m.tags = s.parseName(m)
@ -301,7 +321,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// map of tags. // map of tags.
// Return values are (<name>, <tags>) // Return values are (<name>, <tags>)
func (s *Statsd) parseName(m metric) (string, map[string]string) { func (s *Statsd) parseName(m metric) (string, map[string]string) {
var tags map[string]string tags := make(map[string]string)
name := strings.Replace(m.bucket, ".", "_", -1) name := strings.Replace(m.bucket, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1) name = strings.Replace(name, "-", "__", -1)
@ -325,13 +345,13 @@ func (s *Statsd) parseName(m metric) (string, map[string]string) {
switch m.mtype { switch m.mtype {
case "c": case "c":
name = name + "_counter" tags["metric_type"] = "counter"
case "g": case "g":
name = name + "_gauge" tags["metric_type"] = "gauge"
case "s": case "s":
name = name + "_set" tags["metric_type"] = "set"
case "ms", "h": case "ms", "h":
name = name + "_timer" tags["metric_type"] = "timer"
} }
return name, tags return name, tags
@ -363,27 +383,27 @@ func (s *Statsd) aggregate(m metric) {
case "c": case "c":
cached, ok := s.counters[m.name] cached, ok := s.counters[m.name]
if !ok { if !ok {
s.counters[m.name] = cachedmetric{ s.counters[m.name] = cachedcounter{
value: m.value, value: m.intvalue,
tags: m.tags, tags: m.tags,
} }
} else { } else {
cached.value += m.value cached.value += m.intvalue
cached.tags = m.tags cached.tags = m.tags
s.counters[m.name] = cached s.counters[m.name] = cached
} }
case "g": case "g":
cached, ok := s.gauges[m.name] cached, ok := s.gauges[m.name]
if !ok { if !ok {
s.gauges[m.name] = cachedmetric{ s.gauges[m.name] = cachedgauge{
value: m.value, value: m.floatvalue,
tags: m.tags, tags: m.tags,
} }
} else { } else {
if m.additive { if m.additive {
cached.value = cached.value + m.value cached.value = cached.value + m.floatvalue
} else { } else {
cached.value = m.value cached.value = m.floatvalue
} }
cached.tags = m.tags cached.tags = m.tags
s.gauges[m.name] = cached s.gauges[m.name] = cached
@ -392,22 +412,16 @@ func (s *Statsd) aggregate(m metric) {
cached, ok := s.sets[m.name] cached, ok := s.sets[m.name]
if !ok { if !ok {
// Completely new metric (initialize with count of 1) // Completely new metric (initialize with count of 1)
s.sets[m.name] = cachedmetric{ s.sets[m.name] = cachedset{
value: 1,
tags: m.tags, tags: m.tags,
set: map[int64]bool{m.value: true}, set: map[int64]bool{m.intvalue: true},
} }
} else { } else {
_, ok := s.sets[m.name].set[m.value] cached.set[m.intvalue] = true
if !ok {
// Metric exists, but value has not been counted
cached.value += 1
cached.set[m.value] = true
s.sets[m.name] = cached s.sets[m.name] = cached
} }
} }
} }
}
func (s *Statsd) Stop() { func (s *Statsd) Stop() {
s.Lock() s.Lock()

View File

@ -23,7 +23,6 @@ func TestParse_InvalidLines(t *testing.T) {
"invalid.value:foobar|c", "invalid.value:foobar|c",
"invalid.value:d11|c", "invalid.value:d11|c",
"invalid.value:1d1|c", "invalid.value:1d1|c",
"invalid.value:1.1|c",
} }
for _, line := range invalid_lines { for _, line := range invalid_lines {
err := s.parseStatsdLine(line) err := s.parseStatsdLine(line)
@ -50,39 +49,39 @@ func TestParse_InvalidSampleRate(t *testing.T) {
} }
} }
validations := []struct { counter_validations := []struct {
name string name string
value int64 value int64
cache map[string]cachedmetric cache map[string]cachedcounter
}{ }{
{ {
"invalid_sample_rate_counter", "invalid_sample_rate",
45, 45,
s.counters, s.counters,
}, },
{ {
"invalid_sample_rate_2_counter", "invalid_sample_rate_2",
45, 45,
s.counters, s.counters,
}, },
{
"invalid_sample_rate_gauge",
45,
s.gauges,
},
{
"invalid_sample_rate_set",
1,
s.sets,
},
} }
for _, test := range validations { for _, test := range counter_validations {
err := test_validate_value(test.name, test.value, test.cache) err := test_validate_counter(test.name, test.value, test.cache)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
} }
err := test_validate_gauge("invalid_sample_rate", 45, s.gauges)
if err != nil {
t.Error(err.Error())
}
err = test_validate_set("invalid_sample_rate", 1, s.sets)
if err != nil {
t.Error(err.Error())
}
} }
// Names should be parsed like . -> _ and - -> __ // Names should be parsed like . -> _ and - -> __
@ -105,17 +104,17 @@ func TestParse_DefaultNameParsing(t *testing.T) {
value int64 value int64
}{ }{
{ {
"valid_counter", "valid",
1, 1,
}, },
{ {
"valid_foo__bar_counter", "valid_foo__bar",
11, 11,
}, },
} }
for _, test := range validations { for _, test := range validations {
err := test_validate_value(test.name, test.value, s.counters) err := test_validate_counter(test.name, test.value, s.counters)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -154,34 +153,12 @@ func TestParse_ValidLines(t *testing.T) {
t.Errorf("Parsing line %s should not have resulted in an error\n", line) t.Errorf("Parsing line %s should not have resulted in an error\n", line)
} }
} }
validations := []struct {
name string
value int64
cache map[string]cachedmetric
}{
{
"valid_counter",
45,
s.counters,
},
{
"valid_set",
1,
s.sets,
},
{
"valid_gauge",
45,
s.gauges,
},
} }
for _, test := range validations { // Test that floats are handled as expected for all metric types
err := test_validate_value(test.name, test.value, test.cache) func TestParse_Floats(t *testing.T) {
if err != nil { if false {
t.Error(err.Error()) t.Errorf("TODO")
}
} }
} }
@ -215,36 +192,36 @@ func TestParse_Gauges(t *testing.T) {
validations := []struct { validations := []struct {
name string name string
value int64 value float64
}{ }{
{ {
"plus_minus_gauge", "plus_minus",
120, 120,
}, },
{ {
"plus_plus_gauge", "plus_plus",
300, 300,
}, },
{ {
"minus_minus_gauge", "minus_minus",
-100, -100,
}, },
{ {
"lone_plus_gauge", "lone_plus",
100, 100,
}, },
{ {
"lone_minus_gauge", "lone_minus",
-100, -100,
}, },
{ {
"overwrite_gauge", "overwrite",
300, 300,
}, },
} }
for _, test := range validations { for _, test := range validations {
err := test_validate_value(test.name, test.value, s.gauges) err := test_validate_gauge(test.name, test.value, s.gauges)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -282,17 +259,17 @@ func TestParse_Sets(t *testing.T) {
value int64 value int64
}{ }{
{ {
"unique_user_ids_set", "unique_user_ids",
4, 4,
}, },
{ {
"oneuser_id_set", "oneuser_id",
1, 1,
}, },
} }
for _, test := range validations { for _, test := range validations {
err := test_validate_value(test.name, test.value, s.sets) err := test_validate_set(test.name, test.value, s.sets)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -328,25 +305,25 @@ func TestParse_Counters(t *testing.T) {
value int64 value int64
}{ }{
{ {
"small_inc_counter", "small_inc",
2, 2,
}, },
{ {
"big_inc_counter", "big_inc",
1100101, 1100101,
}, },
{ {
"zero_init_counter", "zero_init",
0, 0,
}, },
{ {
"sample_rate_counter", "sample_rate",
11, 11,
}, },
} }
for _, test := range validations { for _, test := range validations {
err := test_validate_value(test.name, test.value, s.counters) err := test_validate_counter(test.name, test.value, s.counters)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
@ -373,14 +350,14 @@ func TestParse_Gauges_Delete(t *testing.T) {
t.Errorf("Parsing line %s should not have resulted in an error\n", line) t.Errorf("Parsing line %s should not have resulted in an error\n", line)
} }
err = test_validate_value("current_users_gauge", 100, s.gauges) err = test_validate_gauge("current_users", 100, s.gauges)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
s.Gather(fakeacc) s.Gather(fakeacc)
err = test_validate_value("current_users_gauge", 100, s.gauges) err = test_validate_gauge("current_users", 100, s.gauges)
if err == nil { if err == nil {
t.Error("current_users_gauge metric should have been deleted") t.Error("current_users_gauge metric should have been deleted")
} }
@ -399,14 +376,14 @@ func TestParse_Sets_Delete(t *testing.T) {
t.Errorf("Parsing line %s should not have resulted in an error\n", line) t.Errorf("Parsing line %s should not have resulted in an error\n", line)
} }
err = test_validate_value("unique_user_ids_set", 1, s.sets) err = test_validate_set("unique_user_ids", 1, s.sets)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
s.Gather(fakeacc) s.Gather(fakeacc)
err = test_validate_value("unique_user_ids_set", 1, s.sets) err = test_validate_set("unique_user_ids", 1, s.sets)
if err == nil { if err == nil {
t.Error("unique_user_ids_set metric should have been deleted") t.Error("unique_user_ids_set metric should have been deleted")
} }
@ -425,14 +402,14 @@ func TestParse_Counters_Delete(t *testing.T) {
t.Errorf("Parsing line %s should not have resulted in an error\n", line) t.Errorf("Parsing line %s should not have resulted in an error\n", line)
} }
err = test_validate_value("total_users_counter", 100, s.counters) err = test_validate_counter("total_users", 100, s.counters)
if err != nil { if err != nil {
t.Error(err.Error()) t.Error(err.Error())
} }
s.Gather(fakeacc) s.Gather(fakeacc)
err = test_validate_value("total_users_counter", 100, s.counters) err = test_validate_counter("total_users", 100, s.counters)
if err == nil { if err == nil {
t.Error("total_users_counter metric should have been deleted") t.Error("total_users_counter metric should have been deleted")
} }
@ -447,14 +424,52 @@ func TestListen(t *testing.T) {
// Test utility functions // Test utility functions
func test_validate_value(name string, value int64, cache map[string]cachedmetric) error { func test_validate_set(
name string,
value int64,
cache map[string]cachedset,
) error {
metric, ok := cache[name]
if !ok {
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
}
if value != int64(len(metric.set)) {
return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n",
name, value, len(metric.set)))
}
return nil
}
func test_validate_counter(
name string,
value int64,
cache map[string]cachedcounter,
) error {
metric, ok := cache[name] metric, ok := cache[name]
if !ok { if !ok {
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
} }
if value != metric.value { if value != metric.value {
return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d", return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n",
name, value, metric.value))
}
return nil
}
func test_validate_gauge(
name string,
value float64,
cache map[string]cachedgauge,
) error {
metric, ok := cache[name]
if !ok {
return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name))
}
if value != metric.value {
return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n",
name, value, metric.value)) name, value, metric.value))
} }
return nil return nil