Add support for multiple field names for timers

closes #737
This commit is contained in:
Matt Heath 2016-02-22 15:58:06 +00:00 committed by Michele Fadda
parent 677e922dbe
commit d8c0668fd8
3 changed files with 146 additions and 32 deletions

View File

@ -5,6 +5,7 @@
- [#694](https://github.com/influxdata/telegraf/pull/694): DNS Query input, thanks @mjasion! - [#694](https://github.com/influxdata/telegraf/pull/694): DNS Query input, thanks @mjasion!
- [#724](https://github.com/influxdata/telegraf/pull/724): username matching for procstat input, thanks @zorel! - [#724](https://github.com/influxdata/telegraf/pull/724): username matching for procstat input, thanks @zorel!
- [#736](https://github.com/influxdata/telegraf/pull/736): Ignore dummy filesystems from disk plugin. Thanks @PierreF! - [#736](https://github.com/influxdata/telegraf/pull/736): Ignore dummy filesystems from disk plugin. Thanks @PierreF!
- [#737](https://github.com/influxdata/telegraf/pull/737): Support multiple fields for statsd input. Thanks @mattheath!
### Bugfixes ### Bugfixes
- [#701](https://github.com/influxdata/telegraf/pull/701): output write count shouldnt print in quiet mode. - [#701](https://github.com/influxdata/telegraf/pull/701): output write count shouldnt print in quiet mode.

View File

@ -17,7 +17,11 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
const UDP_PACKET_SIZE int = 1500 const (
UDP_PACKET_SIZE int = 1500
defaultFieldName = "value"
)
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n" "You may want to increase allowed_pending_messages in the config\n"
@ -113,9 +117,9 @@ type cachedcounter struct {
} }
type cachedtimings struct { type cachedtimings struct {
name string name string
stats RunningStats fields map[string]RunningStats
tags map[string]string tags map[string]string
} }
func (_ *Statsd) Description() string { func (_ *Statsd) Description() string {
@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
now := time.Now() now := time.Now()
for _, metric := range s.timings { for _, metric := range s.timings {
// Defining a template to parse field names for timers allows us to split
// out multiple fields per timer. In this case we prefix each stat with the
// field name and store these all in a single measurement.
fields := make(map[string]interface{}) fields := make(map[string]interface{})
fields["mean"] = metric.stats.Mean() for fieldName, stats := range metric.fields {
fields["stddev"] = metric.stats.Stddev() var prefix string
fields["upper"] = metric.stats.Upper() if fieldName != defaultFieldName {
fields["lower"] = metric.stats.Lower() prefix = fieldName + "_"
fields["count"] = metric.stats.Count() }
for _, percentile := range s.Percentiles { fields[prefix+"mean"] = stats.Mean()
name := fmt.Sprintf("%v_percentile", percentile) fields[prefix+"stddev"] = stats.Stddev()
fields[name] = metric.stats.Percentile(percentile) fields[prefix+"upper"] = stats.Upper()
fields[prefix+"lower"] = stats.Lower()
fields[prefix+"count"] = stats.Count()
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
fields[name] = stats.Percentile(percentile)
}
} }
acc.AddFields(metric.name, fields, metric.tags, now) acc.AddFields(metric.name, fields, metric.tags, now)
} }
if s.DeleteTimings { if s.DeleteTimings {
@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Parse the name & tags from bucket // Parse the name & tags from bucket
m.name, m.field, m.tags = s.parseName(m.bucket) m.name, m.field, m.tags = s.parseName(m.bucket)
// fields are not supported for timings, so if specified combine into
// the name
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
m.name += "_" + m.field
}
switch m.mtype { switch m.mtype {
case "c": case "c":
m.tags["metric_type"] = "counter" m.tags["metric_type"] = "counter"
@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
name = strings.Replace(name, "-", "__", -1) name = strings.Replace(name, "-", "__", -1)
} }
if field == "" { if field == "" {
field = "value" field = defaultFieldName
} }
return name, field, tags return name, field, tags
@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) {
func (s *Statsd) aggregate(m metric) { func (s *Statsd) aggregate(m metric) {
switch m.mtype { switch m.mtype {
case "ms", "h": case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash] cached, ok := s.timings[m.hash]
if !ok { if !ok {
cached = cachedtimings{ cached = cachedtimings{
name: m.name, name: m.name,
tags: m.tags, fields: make(map[string]RunningStats),
stats: RunningStats{ tags: m.tags,
PercLimit: s.PercentileLimit, }
}, }
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
} }
} }
if m.samplerate > 0 { if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ { for i := 0; i < int(1.0/m.samplerate); i++ {
cached.stats.AddValue(m.floatvalue) field.AddValue(m.floatvalue)
} }
s.timings[m.hash] = cached
} else { } else {
cached.stats.AddValue(m.floatvalue) field.AddValue(m.floatvalue)
s.timings[m.hash] = cached
} }
cached.fields[m.field] = field
s.timings[m.hash] = cached
case "c": case "c":
// check if the measurement exists // check if the measurement exists
_, ok := s.counters[m.hash] _, ok := s.counters[m.hash]

View File

@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
// A 0 with invalid samplerate will add a single 0, // A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1 // plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached // which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 { if cachedtiming.fields[defaultFieldName].n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n)
} }
if cachedtiming.stats.upper != 1 { if cachedtiming.fields[defaultFieldName].upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper)
} }
} }
@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) {
} }
acc.AssertContainsFields(t, "test_timing", valid) acc.AssertContainsFields(t, "test_timing", valid)
}
// Tests low-level functionality of timings when multiple fields is enabled
// and a measurement template has been defined which can parse field names
func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{"measurement.field"}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}
validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}
for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)
valid := map[string]interface{}{
"success_90_percentile": float64(11),
"success_count": int64(5),
"success_lower": float64(1),
"success_mean": float64(3),
"success_stddev": float64(4),
"success_upper": float64(11),
"error_90_percentile": float64(22),
"error_count": int64(5),
"error_lower": float64(2),
"error_mean": float64(6),
"error_stddev": float64(8),
"error_upper": float64(22),
}
acc.AssertContainsFields(t, "test_timing", valid)
}
// Tests low-level functionality of timings when multiple fields is enabled
// but a measurement template hasn't been defined so we can't parse field names
// In this case the behaviour should be the same as normal behaviour
func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}
validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}
for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)
expectedSuccess := map[string]interface{}{
"90_percentile": float64(11),
"count": int64(5),
"lower": float64(1),
"mean": float64(3),
"stddev": float64(4),
"upper": float64(11),
}
expectedError := map[string]interface{}{
"90_percentile": float64(22),
"count": int64(5),
"lower": float64(2),
"mean": float64(6),
"stddev": float64(8),
"upper": float64(22),
}
acc.AssertContainsFields(t, "test_timing_success", expectedSuccess)
acc.AssertContainsFields(t, "test_timing_error", expectedError)
} }
func TestParse_Timings_Delete(t *testing.T) { func TestParse_Timings_Delete(t *testing.T) {