parent
							
								
									85c4f753ad
								
							
						
					
					
						commit
						e983d35c25
					
				|  | @ -5,6 +5,7 @@ | ||||||
| - [#694](https://github.com/influxdata/telegraf/pull/694): DNS Query input, thanks @mjasion! | - [#694](https://github.com/influxdata/telegraf/pull/694): DNS Query input, thanks @mjasion! | ||||||
| - [#724](https://github.com/influxdata/telegraf/pull/724): username matching for procstat input, thanks @zorel! | - [#724](https://github.com/influxdata/telegraf/pull/724): username matching for procstat input, thanks @zorel! | ||||||
| - [#736](https://github.com/influxdata/telegraf/pull/736): Ignore dummy filesystems from disk plugin. Thanks @PierreF! | - [#736](https://github.com/influxdata/telegraf/pull/736): Ignore dummy filesystems from disk plugin. Thanks @PierreF! | ||||||
|  | - [#737](https://github.com/influxdata/telegraf/pull/737): Support multiple fields for statsd input. Thanks @mattheath! | ||||||
| 
 | 
 | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
| - [#701](https://github.com/influxdata/telegraf/pull/701): output write count shouldnt print in quiet mode. | - [#701](https://github.com/influxdata/telegraf/pull/701): output write count shouldnt print in quiet mode. | ||||||
|  |  | ||||||
|  | @ -17,7 +17,11 @@ import ( | ||||||
| 	"github.com/influxdata/telegraf/plugins/inputs" | 	"github.com/influxdata/telegraf/plugins/inputs" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const UDP_PACKET_SIZE int = 1500 | const ( | ||||||
|  | 	UDP_PACKET_SIZE int = 1500 | ||||||
|  | 
 | ||||||
|  | 	defaultFieldName = "value" | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + | var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + | ||||||
| 	"You may want to increase allowed_pending_messages in the config\n" | 	"You may want to increase allowed_pending_messages in the config\n" | ||||||
|  | @ -114,7 +118,7 @@ type cachedcounter struct { | ||||||
| 
 | 
 | ||||||
| type cachedtimings struct { | type cachedtimings struct { | ||||||
| 	name   string | 	name   string | ||||||
| 	stats RunningStats | 	fields map[string]RunningStats | ||||||
| 	tags   map[string]string | 	tags   map[string]string | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { | ||||||
| 	now := time.Now() | 	now := time.Now() | ||||||
| 
 | 
 | ||||||
| 	for _, metric := range s.timings { | 	for _, metric := range s.timings { | ||||||
|  | 		// Defining a template to parse field names for timers allows us to split
 | ||||||
|  | 		// out multiple fields per timer. In this case we prefix each stat with the
 | ||||||
|  | 		// field name and store these all in a single measurement.
 | ||||||
| 		fields := make(map[string]interface{}) | 		fields := make(map[string]interface{}) | ||||||
| 		fields["mean"] = metric.stats.Mean() | 		for fieldName, stats := range metric.fields { | ||||||
| 		fields["stddev"] = metric.stats.Stddev() | 			var prefix string | ||||||
| 		fields["upper"] = metric.stats.Upper() | 			if fieldName != defaultFieldName { | ||||||
| 		fields["lower"] = metric.stats.Lower() | 				prefix = fieldName + "_" | ||||||
| 		fields["count"] = metric.stats.Count() |  | ||||||
| 		for _, percentile := range s.Percentiles { |  | ||||||
| 			name := fmt.Sprintf("%v_percentile", percentile) |  | ||||||
| 			fields[name] = metric.stats.Percentile(percentile) |  | ||||||
| 			} | 			} | ||||||
|  | 			fields[prefix+"mean"] = stats.Mean() | ||||||
|  | 			fields[prefix+"stddev"] = stats.Stddev() | ||||||
|  | 			fields[prefix+"upper"] = stats.Upper() | ||||||
|  | 			fields[prefix+"lower"] = stats.Lower() | ||||||
|  | 			fields[prefix+"count"] = stats.Count() | ||||||
|  | 			for _, percentile := range s.Percentiles { | ||||||
|  | 				name := fmt.Sprintf("%s%v_percentile", prefix, percentile) | ||||||
|  | 				fields[name] = stats.Percentile(percentile) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		acc.AddFields(metric.name, fields, metric.tags, now) | 		acc.AddFields(metric.name, fields, metric.tags, now) | ||||||
| 	} | 	} | ||||||
| 	if s.DeleteTimings { | 	if s.DeleteTimings { | ||||||
|  | @ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error { | ||||||
| 
 | 
 | ||||||
| 		// Parse the name & tags from bucket
 | 		// Parse the name & tags from bucket
 | ||||||
| 		m.name, m.field, m.tags = s.parseName(m.bucket) | 		m.name, m.field, m.tags = s.parseName(m.bucket) | ||||||
| 		// fields are not supported for timings, so if specified combine into
 |  | ||||||
| 		// the name
 |  | ||||||
| 		if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" { |  | ||||||
| 			m.name += "_" + m.field |  | ||||||
| 		} |  | ||||||
| 		switch m.mtype { | 		switch m.mtype { | ||||||
| 		case "c": | 		case "c": | ||||||
| 			m.tags["metric_type"] = "counter" | 			m.tags["metric_type"] = "counter" | ||||||
|  | @ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { | ||||||
| 		name = strings.Replace(name, "-", "__", -1) | 		name = strings.Replace(name, "-", "__", -1) | ||||||
| 	} | 	} | ||||||
| 	if field == "" { | 	if field == "" { | ||||||
| 		field = "value" | 		field = defaultFieldName | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return name, field, tags | 	return name, field, tags | ||||||
|  | @ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) { | ||||||
| func (s *Statsd) aggregate(m metric) { | func (s *Statsd) aggregate(m metric) { | ||||||
| 	switch m.mtype { | 	switch m.mtype { | ||||||
| 	case "ms", "h": | 	case "ms", "h": | ||||||
|  | 		// Check if the measurement exists
 | ||||||
| 		cached, ok := s.timings[m.hash] | 		cached, ok := s.timings[m.hash] | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			cached = cachedtimings{ | 			cached = cachedtimings{ | ||||||
| 				name:   m.name, | 				name:   m.name, | ||||||
|  | 				fields: make(map[string]RunningStats), | ||||||
| 				tags:   m.tags, | 				tags:   m.tags, | ||||||
| 				stats: RunningStats{ | 			} | ||||||
|  | 		} | ||||||
|  | 		// Check if the field exists. If we've not enabled multiple fields per timer
 | ||||||
|  | 		// this will be the default field name, eg. "value"
 | ||||||
|  | 		field, ok := cached.fields[m.field] | ||||||
|  | 		if !ok { | ||||||
|  | 			field = RunningStats{ | ||||||
| 				PercLimit: s.PercentileLimit, | 				PercLimit: s.PercentileLimit, | ||||||
| 				}, |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 
 |  | ||||||
| 		if m.samplerate > 0 { | 		if m.samplerate > 0 { | ||||||
| 			for i := 0; i < int(1.0/m.samplerate); i++ { | 			for i := 0; i < int(1.0/m.samplerate); i++ { | ||||||
| 				cached.stats.AddValue(m.floatvalue) | 				field.AddValue(m.floatvalue) | ||||||
| 			} | 			} | ||||||
| 			s.timings[m.hash] = cached |  | ||||||
| 		} else { | 		} else { | ||||||
| 			cached.stats.AddValue(m.floatvalue) | 			field.AddValue(m.floatvalue) | ||||||
| 			s.timings[m.hash] = cached |  | ||||||
| 		} | 		} | ||||||
|  | 		cached.fields[m.field] = field | ||||||
|  | 		s.timings[m.hash] = cached | ||||||
| 	case "c": | 	case "c": | ||||||
| 		// check if the measurement exists
 | 		// check if the measurement exists
 | ||||||
| 		_, ok := s.counters[m.hash] | 		_, ok := s.counters[m.hash] | ||||||
|  |  | ||||||
|  | @ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) { | ||||||
| 		// A 0 with invalid samplerate will add a single 0,
 | 		// A 0 with invalid samplerate will add a single 0,
 | ||||||
| 		// plus the last bit of value 1
 | 		// plus the last bit of value 1
 | ||||||
| 		// which adds up to 12 individual datapoints to be cached
 | 		// which adds up to 12 individual datapoints to be cached
 | ||||||
| 		if cachedtiming.stats.n != 12 { | 		if cachedtiming.fields[defaultFieldName].n != 12 { | ||||||
| 			t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n) | 			t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n) | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		if cachedtiming.stats.upper != 1 { | 		if cachedtiming.fields[defaultFieldName].upper != 1 { | ||||||
| 			t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper) | 			t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	acc.AssertContainsFields(t, "test_timing", valid) | 	acc.AssertContainsFields(t, "test_timing", valid) | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|  | // Tests low-level functionality of timings when multiple fields is enabled
 | ||||||
|  | // and a measurement template has been defined which can parse field names
 | ||||||
|  | func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) { | ||||||
|  | 	s := NewStatsd() | ||||||
|  | 	s.Templates = []string{"measurement.field"} | ||||||
|  | 	s.Percentiles = []int{90} | ||||||
|  | 	acc := &testutil.Accumulator{} | ||||||
|  | 
 | ||||||
|  | 	validLines := []string{ | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:11|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:22|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, line := range validLines { | ||||||
|  | 		err := s.parseStatsdLine(line) | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Errorf("Parsing line %s should not have resulted in an error\n", line) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	s.Gather(acc) | ||||||
|  | 
 | ||||||
|  | 	valid := map[string]interface{}{ | ||||||
|  | 		"success_90_percentile": float64(11), | ||||||
|  | 		"success_count":         int64(5), | ||||||
|  | 		"success_lower":         float64(1), | ||||||
|  | 		"success_mean":          float64(3), | ||||||
|  | 		"success_stddev":        float64(4), | ||||||
|  | 		"success_upper":         float64(11), | ||||||
|  | 
 | ||||||
|  | 		"error_90_percentile": float64(22), | ||||||
|  | 		"error_count":         int64(5), | ||||||
|  | 		"error_lower":         float64(2), | ||||||
|  | 		"error_mean":          float64(6), | ||||||
|  | 		"error_stddev":        float64(8), | ||||||
|  | 		"error_upper":         float64(22), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	acc.AssertContainsFields(t, "test_timing", valid) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Tests low-level functionality of timings when multiple fields is enabled
 | ||||||
|  | // but a measurement template hasn't been defined so we can't parse field names
 | ||||||
|  | // In this case the behaviour should be the same as normal behaviour
 | ||||||
|  | func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) { | ||||||
|  | 	s := NewStatsd() | ||||||
|  | 	s.Templates = []string{} | ||||||
|  | 	s.Percentiles = []int{90} | ||||||
|  | 	acc := &testutil.Accumulator{} | ||||||
|  | 
 | ||||||
|  | 	validLines := []string{ | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:11|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.success:1|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:22|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 		"test_timing.error:2|ms", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, line := range validLines { | ||||||
|  | 		err := s.parseStatsdLine(line) | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Errorf("Parsing line %s should not have resulted in an error\n", line) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	s.Gather(acc) | ||||||
|  | 
 | ||||||
|  | 	expectedSuccess := map[string]interface{}{ | ||||||
|  | 		"90_percentile": float64(11), | ||||||
|  | 		"count":         int64(5), | ||||||
|  | 		"lower":         float64(1), | ||||||
|  | 		"mean":          float64(3), | ||||||
|  | 		"stddev":        float64(4), | ||||||
|  | 		"upper":         float64(11), | ||||||
|  | 	} | ||||||
|  | 	expectedError := map[string]interface{}{ | ||||||
|  | 		"90_percentile": float64(22), | ||||||
|  | 		"count":         int64(5), | ||||||
|  | 		"lower":         float64(2), | ||||||
|  | 		"mean":          float64(6), | ||||||
|  | 		"stddev":        float64(8), | ||||||
|  | 		"upper":         float64(22), | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	acc.AssertContainsFields(t, "test_timing_success", expectedSuccess) | ||||||
|  | 	acc.AssertContainsFields(t, "test_timing_error", expectedError) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func TestParse_Timings_Delete(t *testing.T) { | func TestParse_Timings_Delete(t *testing.T) { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue