From 0e392654f24fa51110372cea0bdbc22987d7ffe5 Mon Sep 17 00:00:00 2001 From: Matt Heath Date: Mon, 22 Feb 2016 15:58:06 +0000 Subject: [PATCH] Add support for multiple field names for timers --- plugins/inputs/statsd/statsd.go | 71 +++++++++++------- plugins/inputs/statsd/statsd_test.go | 106 ++++++++++++++++++++++++++- 2 files changed, 145 insertions(+), 32 deletions(-) diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 830e9d25c..a16e78b5c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -17,7 +17,11 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) -const UDP_PACKET_SIZE int = 1500 +const ( + UDP_PACKET_SIZE int = 1500 + + defaultFieldName = "value" +) var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + "You may want to increase allowed_pending_messages in the config\n" @@ -113,9 +117,9 @@ type cachedcounter struct { } type cachedtimings struct { - name string - stats RunningStats - tags map[string]string + name string + fields map[string]RunningStats + tags map[string]string } func (_ *Statsd) Description() string { @@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { now := time.Now() for _, metric := range s.timings { + // Defining a template to parse field names for timers allows us to split + // out multiple fields per timer. In this case we prefix each stat with the + // field name and store these all in a single measurement. fields := make(map[string]interface{}) - fields["mean"] = metric.stats.Mean() - fields["stddev"] = metric.stats.Stddev() - fields["upper"] = metric.stats.Upper() - fields["lower"] = metric.stats.Lower() - fields["count"] = metric.stats.Count() - for _, percentile := range s.Percentiles { - name := fmt.Sprintf("%v_percentile", percentile) - fields[name] = metric.stats.Percentile(percentile) + for fieldName, stats := range metric.fields { + var prefix string + if fieldName != defaultFieldName { + prefix = fieldName + "_" + } + fields[prefix+"mean"] = stats.Mean() + fields[prefix+"stddev"] = stats.Stddev() + fields[prefix+"upper"] = stats.Upper() + fields[prefix+"lower"] = stats.Lower() + fields[prefix+"count"] = stats.Count() + for _, percentile := range s.Percentiles { + name := fmt.Sprintf("%s%v_percentile", prefix, percentile) + fields[name] = stats.Percentile(percentile) + } } + acc.AddFields(metric.name, fields, metric.tags, now) } if s.DeleteTimings { @@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error { // Parse the name & tags from bucket m.name, m.field, m.tags = s.parseName(m.bucket) - // fields are not supported for timings, so if specified combine into - // the name - if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" { - m.name += "_" + m.field - } switch m.mtype { case "c": m.tags["metric_type"] = "counter" @@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { name = strings.Replace(name, "-", "__", -1) } if field == "" { - field = "value" + field = defaultFieldName } return name, field, tags @@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) { func (s *Statsd) aggregate(m metric) { switch m.mtype { case "ms", "h": + // Check if the measurement exists cached, ok := s.timings[m.hash] if !ok { cached = cachedtimings{ - name: m.name, - tags: m.tags, - stats: RunningStats{ - PercLimit: s.PercentileLimit, - }, + name: m.name, + fields: make(map[string]RunningStats), + tags: m.tags, + } + } + // Check if the field exists. If we've not enabled multiple fields per timer + // this will be the default field name, eg. "value" + field, ok := cached.fields[m.field] + if !ok { + field = RunningStats{ + PercLimit: s.PercentileLimit, } } - if m.samplerate > 0 { for i := 0; i < int(1.0/m.samplerate); i++ { - cached.stats.AddValue(m.floatvalue) + field.AddValue(m.floatvalue) } - s.timings[m.hash] = cached } else { - cached.stats.AddValue(m.floatvalue) - s.timings[m.hash] = cached + field.AddValue(m.floatvalue) } + cached.fields[m.field] = field + s.timings[m.hash] = cached case "c": // check if the measurement exists _, ok := s.counters[m.hash] diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index a285467b0..3a87f00aa 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { // A 0 with invalid samplerate will add a single 0, // plus the last bit of value 1 // which adds up to 12 individual datapoints to be cached - if cachedtiming.stats.n != 12 { - t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) + if cachedtiming.fields[defaultFieldName].n != 12 { + t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n) } - if cachedtiming.stats.upper != 1 { - t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) + if cachedtiming.fields[defaultFieldName].upper != 1 { + t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper) } } @@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) { } acc.AssertContainsFields(t, "test_timing", valid) +} +// Tests low-level functionality of timings when multiple fields is enabled +// and a measurement template has been defined which can parse field names +func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { + s := NewStatsd() + s.Templates = []string{"measurement.field"} + s.Percentiles = []int{90} + acc := &testutil.Accumulator{} + + validLines := []string{ + "test_timing.success:1|ms", + "test_timing.success:11|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.error:2|ms", + "test_timing.error:22|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + } + + for _, line := range validLines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + s.Gather(acc) + + valid := map[string]interface{}{ + "success_90_percentile": float64(11), + "success_count": int64(5), + "success_lower": float64(1), + "success_mean": float64(3), + "success_stddev": float64(4), + "success_upper": float64(11), + + "error_90_percentile": float64(22), + "error_count": int64(5), + "error_lower": float64(2), + "error_mean": float64(6), + "error_stddev": float64(8), + "error_upper": float64(22), + } + + acc.AssertContainsFields(t, "test_timing", valid) +} + +// Tests low-level functionality of timings when multiple fields is enabled +// but a measurement template hasn't been defined so we can't parse field names +// In this case the behaviour should be the same as normal behaviour +func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { + s := NewStatsd() + s.Templates = []string{} + s.Percentiles = []int{90} + acc := &testutil.Accumulator{} + + validLines := []string{ + "test_timing.success:1|ms", + "test_timing.success:11|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.success:1|ms", + "test_timing.error:2|ms", + "test_timing.error:22|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + "test_timing.error:2|ms", + } + + for _, line := range validLines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + s.Gather(acc) + + expectedSuccess := map[string]interface{}{ + "90_percentile": float64(11), + "count": int64(5), + "lower": float64(1), + "mean": float64(3), + "stddev": float64(4), + "upper": float64(11), + } + expectedError := map[string]interface{}{ + "90_percentile": float64(22), + "count": int64(5), + "lower": float64(2), + "mean": float64(6), + "stddev": float64(8), + "upper": float64(22), + } + + acc.AssertContainsFields(t, "test_timing_success", expectedSuccess) + acc.AssertContainsFields(t, "test_timing_error", expectedError) } func TestParse_Timings_Delete(t *testing.T) {