Parse statsd lines with multiple metric bits

closes #354
This commit is contained in:
Daniel Malon 2015-11-08 10:19:00 +00:00 committed by Cameron Sparr
parent 3761f00062
commit 5c051eb801
4 changed files with 238 additions and 83 deletions

View File

@ -29,6 +29,7 @@ same type can be specified, like this:
- [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin. - [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin.
- [#392](https://github.com/influxdb/telegraf/pull/392): Procstat plugin can now accept pgrep -f pattern, thanks @ecarreras! - [#392](https://github.com/influxdb/telegraf/pull/392): Procstat plugin can now accept pgrep -f pattern, thanks @ecarreras!
- [#383](https://github.com/influxdb/telegraf/pull/383): Specify plugins as a list. - [#383](https://github.com/influxdb/telegraf/pull/383): Specify plugins as a list.
- [#354](https://github.com/influxdb/telegraf/pull/354): Add ability to specify multiple metrics in one statsd line. Thanks @MerlinDMC!
### Bugfixes ### Bugfixes
- [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning. - [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning.

View File

@ -26,6 +26,22 @@ implementation. In short, the telegraf statsd listener will accept:
- `load.time.nanoseconds:1|h` - `load.time.nanoseconds:1|h`
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time - `load.time:200|ms|@0.1` <- sampled 1/10 of the time
It is possible to omit repetitive names and merge individual stats into a
single line by separating them with additional colons:
- `users.current.den001.myapp:32|g:+10|g:-10|g`
- `deploys.test.myservice:1|c:101|c:1|c|@0.1`
- `users.unique:101|s:101|s:102|s`
- `load.time:320|ms:200|ms|@0.1`
This also allows for mixed types in a single line:
- `foo:1|c:200|ms`
The string `foo:1|c:200|ms` is internally split into two individual metrics
`foo:1|c` and `foo:200|ms` which are added to the aggregator separately.
#### Influx Statsd #### Influx Statsd
In order to take advantage of InfluxDB's tagging system, we have made a couple In order to take advantage of InfluxDB's tagging system, we have made a couple

View File

@ -254,10 +254,24 @@ func (s *Statsd) parseStatsdLine(line string) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]
// Add a metric for each bit available
for _, bit := range bits {
m := metric{} m := metric{}
// Validate splitting the line on "|" m.bucket = bucketName
pipesplit := strings.Split(line, "|")
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 { if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
@ -287,16 +301,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} }
// Validate splitting the rest of the line on ":"
colonsplit := strings.Split(pipesplit[0], ":")
if len(colonsplit) != 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.bucket = colonsplit[0]
// Parse the value // Parse the value
if strings.ContainsAny(colonsplit[1], "-+") { if strings.ContainsAny(pipesplit[0], "-+") {
if m.mtype != "g" { if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line) log.Printf("Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
@ -306,14 +312,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
switch m.mtype { switch m.mtype {
case "g", "ms", "h": case "g", "ms", "h":
v, err := strconv.ParseFloat(colonsplit[1], 64) v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil { if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line) log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
} }
m.floatvalue = v m.floatvalue = v
case "c", "s": case "c", "s":
v, err := strconv.ParseInt(colonsplit[1], 10, 64) v, err := strconv.ParseInt(pipesplit[0], 10, 64)
if err != nil { if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line) log.Printf("Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line") return errors.New("Error Parsing statsd line")
@ -349,6 +355,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)
s.aggregate(m) s.aggregate(m)
}
return nil return nil
} }

View File

@ -326,6 +326,136 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
} }
} }
// Test that measurements with multiple bits, are treated as different outputs
// but are equal to their single-measurement representation
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
single_lines := []string{
"valid.multiple:0|ms|@0.1",
"valid.multiple:0|ms|",
"valid.multiple:1|ms",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:2|c",
"valid.multiple.duplicate:1|c",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:2|h",
"valid.multiple.duplicate:1|h",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:2|s",
"valid.multiple.duplicate:1|s",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.duplicate:2|g",
"valid.multiple.duplicate:1|g",
"valid.multiple.mixed:1|c",
"valid.multiple.mixed:1|ms",
"valid.multiple.mixed:2|s",
"valid.multiple.mixed:1|g",
}
multiple_lines := []string{
"valid.multiple:0|ms|@0.1:0|ms|:1|ms",
"valid.multiple.duplicate:1|c:1|c:2|c:1|c",
"valid.multiple.duplicate:1|h:1|h:2|h:1|h",
"valid.multiple.duplicate:1|s:1|s:2|s:1|s",
"valid.multiple.duplicate:1|g:1|g:2|g:1|g",
"valid.multiple.mixed:1|c:1|ms:2|s:1|g",
}
s_single := NewStatsd()
s_multiple := NewStatsd()
for _, line := range single_lines {
err := s_single.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
for _, line := range multiple_lines {
err := s_multiple.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
if len(s_single.timings) != 3 {
t.Errorf("Expected 3 measurement, found %d", len(s_single.timings))
}
if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok {
t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found")
} else {
if cachedtiming.name != "valid_multiple" {
t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name)
}
// A 0 at samplerate 0.1 will add 10 values of 0,
// A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
}
if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
}
}
// test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate
if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil {
t.Error(err.Error())
}
if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil {
t.Error(err.Error())
}
if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil {
t.Error(err.Error())
}
if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil {
t.Error(err.Error())
}
if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}
if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}
// test if s_single and s_multiple did compute the same stats for valid.multiple.mixed
if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil {
t.Error(err.Error())
}
if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil {
t.Error(err.Error())
}
if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil {
t.Error(err.Error())
}
if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil {
t.Error(err.Error())
}
if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil {
t.Error(err.Error())
}
if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil {
t.Error(err.Error())
}
}
// Valid lines should be parsed and their values should be cached // Valid lines should be parsed and their values should be cached
func TestParse_ValidLines(t *testing.T) { func TestParse_ValidLines(t *testing.T) {
s := NewStatsd() s := NewStatsd()