From 5c051eb801b4ff790c1108729dd9bb5eb8974346 Mon Sep 17 00:00:00 2001 From: Daniel Malon Date: Sun, 8 Nov 2015 10:19:00 +0000 Subject: [PATCH] Parse statsd lines with multiple metric bits closes #354 --- CHANGELOG.md | 1 + plugins/statsd/README.md | 16 ++++ plugins/statsd/statsd.go | 174 ++++++++++++++++++---------------- plugins/statsd/statsd_test.go | 130 +++++++++++++++++++++++++ 4 files changed, 238 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd54b575b..305337a5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ same type can be specified, like this: - [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin. - [#392](https://github.com/influxdb/telegraf/pull/392): Procstat plugin can now accept pgrep -f pattern, thanks @ecarreras! - [#383](https://github.com/influxdb/telegraf/pull/383): Specify plugins as a list. +- [#354](https://github.com/influxdb/telegraf/pull/354): Add ability to specify multiple metrics in one statsd line. Thanks @MerlinDMC! ### Bugfixes - [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning. diff --git a/plugins/statsd/README.md b/plugins/statsd/README.md index 6045a5d9f..76255f3b0 100644 --- a/plugins/statsd/README.md +++ b/plugins/statsd/README.md @@ -26,6 +26,22 @@ implementation. In short, the telegraf statsd listener will accept: - `load.time.nanoseconds:1|h` - `load.time:200|ms|@0.1` <- sampled 1/10 of the time +It is possible to omit repetitive names and merge individual stats into a +single line by separating them with additional colons: + + - `users.current.den001.myapp:32|g:+10|g:-10|g` + - `deploys.test.myservice:1|c:101|c:1|c|@0.1` + - `users.unique:101|s:101|s:102|s` + - `load.time:320|ms:200|ms|@0.1` + +This also allows for mixed types in a single line: + + - `foo:1|c:200|ms` + +The string `foo:1|c:200|ms` is internally split into two individual metrics +`foo:1|c` and `foo:200|ms` which are added to the aggregator separately. + + #### Influx Statsd In order to take advantage of InfluxDB's tagging system, we have made a couple diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go index 2b8b2c218..bd86a56d7 100644 --- a/plugins/statsd/statsd.go +++ b/plugins/statsd/statsd.go @@ -254,101 +254,109 @@ func (s *Statsd) parseStatsdLine(line string) error { s.Lock() defer s.Unlock() - m := metric{} - - // Validate splitting the line on "|" - pipesplit := strings.Split(line, "|") - if len(pipesplit) < 2 { - log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) - return errors.New("Error Parsing statsd line") - } else if len(pipesplit) > 2 { - sr := pipesplit[2] - errmsg := "Error: parsing sample rate, %s, it must be in format like: " + - "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" - if strings.Contains(sr, "@") && len(sr) > 1 { - samplerate, err := strconv.ParseFloat(sr[1:], 64) - if err != nil { - log.Printf(errmsg, err.Error(), line) - } else { - // sample rate successfully parsed - m.samplerate = samplerate - } - } else { - log.Printf(errmsg, "", line) - } - } - - // Validate metric type - switch pipesplit[1] { - case "g", "c", "s", "ms", "h": - m.mtype = pipesplit[1] - default: - log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) - return errors.New("Error Parsing statsd line") - } - - // Validate splitting the rest of the line on ":" - colonsplit := strings.Split(pipesplit[0], ":") - if len(colonsplit) != 2 { + // Validate splitting the line on ":" + bits := strings.Split(line, ":") + if len(bits) < 2 { log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line) return errors.New("Error Parsing statsd line") } - m.bucket = colonsplit[0] - // Parse the value - if strings.ContainsAny(colonsplit[1], "-+") { - if m.mtype != "g" { - log.Printf("Error: +- values are only supported for gauges: %s\n", line) + // Extract bucket name from individual metric bits + bucketName, bits := bits[0], bits[1:] + + // Add a metric for each bit available + for _, bit := range bits { + m := metric{} + + m.bucket = bucketName + + // Validate splitting the bit on "|" + pipesplit := strings.Split(bit, "|") + if len(pipesplit) < 2 { + log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line) + return errors.New("Error Parsing statsd line") + } else if len(pipesplit) > 2 { + sr := pipesplit[2] + errmsg := "Error: parsing sample rate, %s, it must be in format like: " + + "@0.1, @0.5, etc. Ignoring sample rate for line: %s\n" + if strings.Contains(sr, "@") && len(sr) > 1 { + samplerate, err := strconv.ParseFloat(sr[1:], 64) + if err != nil { + log.Printf(errmsg, err.Error(), line) + } else { + // sample rate successfully parsed + m.samplerate = samplerate + } + } else { + log.Printf(errmsg, "", line) + } + } + + // Validate metric type + switch pipesplit[1] { + case "g", "c", "s", "ms", "h": + m.mtype = pipesplit[1] + default: + log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1]) return errors.New("Error Parsing statsd line") } - m.additive = true - } - switch m.mtype { - case "g", "ms", "h": - v, err := strconv.ParseFloat(colonsplit[1], 64) - if err != nil { - log.Printf("Error: parsing value to float64: %s\n", line) - return errors.New("Error Parsing statsd line") + // Parse the value + if strings.ContainsAny(pipesplit[0], "-+") { + if m.mtype != "g" { + log.Printf("Error: +- values are only supported for gauges: %s\n", line) + return errors.New("Error Parsing statsd line") + } + m.additive = true } - m.floatvalue = v - case "c", "s": - v, err := strconv.ParseInt(colonsplit[1], 10, 64) - if err != nil { - log.Printf("Error: parsing value to int64: %s\n", line) - return errors.New("Error Parsing statsd line") + + switch m.mtype { + case "g", "ms", "h": + v, err := strconv.ParseFloat(pipesplit[0], 64) + if err != nil { + log.Printf("Error: parsing value to float64: %s\n", line) + return errors.New("Error Parsing statsd line") + } + m.floatvalue = v + case "c", "s": + v, err := strconv.ParseInt(pipesplit[0], 10, 64) + if err != nil { + log.Printf("Error: parsing value to int64: %s\n", line) + return errors.New("Error Parsing statsd line") + } + // If a sample rate is given with a counter, divide value by the rate + if m.samplerate != 0 && m.mtype == "c" { + v = int64(float64(v) / m.samplerate) + } + m.intvalue = v } - // If a sample rate is given with a counter, divide value by the rate - if m.samplerate != 0 && m.mtype == "c" { - v = int64(float64(v) / m.samplerate) + + // Parse the name & tags from bucket + m.name, m.tags = s.parseName(m.bucket) + switch m.mtype { + case "c": + m.tags["metric_type"] = "counter" + case "g": + m.tags["metric_type"] = "gauge" + case "s": + m.tags["metric_type"] = "set" + case "ms": + m.tags["metric_type"] = "timing" + case "h": + m.tags["metric_type"] = "histogram" } - m.intvalue = v + + // Make a unique key for the measurement name/tags + var tg []string + for k, v := range m.tags { + tg = append(tg, fmt.Sprintf("%s=%s", k, v)) + } + sort.Strings(tg) + m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) + + s.aggregate(m) } - // Parse the name & tags from bucket - m.name, m.tags = s.parseName(m.bucket) - switch m.mtype { - case "c": - m.tags["metric_type"] = "counter" - case "g": - m.tags["metric_type"] = "gauge" - case "s": - m.tags["metric_type"] = "set" - case "ms": - m.tags["metric_type"] = "timing" - case "h": - m.tags["metric_type"] = "histogram" - } - - // Make a unique key for the measurement name/tags - var tg []string - for k, v := range m.tags { - tg = append(tg, fmt.Sprintf("%s=%s", k, v)) - } - sort.Strings(tg) - m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name) - - s.aggregate(m) return nil } diff --git a/plugins/statsd/statsd_test.go b/plugins/statsd/statsd_test.go index b7e1f2d93..6c85b0c4b 100644 --- a/plugins/statsd/statsd_test.go +++ b/plugins/statsd/statsd_test.go @@ -326,6 +326,136 @@ func TestParse_MeasurementsWithSameName(t *testing.T) { } } +// Test that measurements with multiple bits, are treated as different outputs +// but are equal to their single-measurement representation +func TestParse_MeasurementsWithMultipleValues(t *testing.T) { + single_lines := []string{ + "valid.multiple:0|ms|@0.1", + "valid.multiple:0|ms|", + "valid.multiple:1|ms", + "valid.multiple.duplicate:1|c", + "valid.multiple.duplicate:1|c", + "valid.multiple.duplicate:2|c", + "valid.multiple.duplicate:1|c", + "valid.multiple.duplicate:1|h", + "valid.multiple.duplicate:1|h", + "valid.multiple.duplicate:2|h", + "valid.multiple.duplicate:1|h", + "valid.multiple.duplicate:1|s", + "valid.multiple.duplicate:1|s", + "valid.multiple.duplicate:2|s", + "valid.multiple.duplicate:1|s", + "valid.multiple.duplicate:1|g", + "valid.multiple.duplicate:1|g", + "valid.multiple.duplicate:2|g", + "valid.multiple.duplicate:1|g", + "valid.multiple.mixed:1|c", + "valid.multiple.mixed:1|ms", + "valid.multiple.mixed:2|s", + "valid.multiple.mixed:1|g", + } + + multiple_lines := []string{ + "valid.multiple:0|ms|@0.1:0|ms|:1|ms", + "valid.multiple.duplicate:1|c:1|c:2|c:1|c", + "valid.multiple.duplicate:1|h:1|h:2|h:1|h", + "valid.multiple.duplicate:1|s:1|s:2|s:1|s", + "valid.multiple.duplicate:1|g:1|g:2|g:1|g", + "valid.multiple.mixed:1|c:1|ms:2|s:1|g", + } + + s_single := NewStatsd() + s_multiple := NewStatsd() + + for _, line := range single_lines { + err := s_single.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + for _, line := range multiple_lines { + err := s_multiple.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + if len(s_single.timings) != 3 { + t.Errorf("Expected 3 measurement, found %d", len(s_single.timings)) + } + + if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok { + t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found") + } else { + if cachedtiming.name != "valid_multiple" { + t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name) + } + + // A 0 at samplerate 0.1 will add 10 values of 0, + // A 0 with invalid samplerate will add a single 0, + // plus the last bit of value 1 + // which adds up to 12 individual datapoints to be cached + if cachedtiming.stats.n != 12 { + t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) + } + + if cachedtiming.stats.upper != 1 { + t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) + } + } + + // test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate + if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil { + t.Error(err.Error()) + } + + // test if s_single and s_multiple did compute the same stats for valid.multiple.mixed + if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil { + t.Error(err.Error()) + } + + if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil { + t.Error(err.Error()) + } +} + // Valid lines should be parsed and their values should be cached func TestParse_ValidLines(t *testing.T) { s := NewStatsd()