Add support for multiple field names for timers
This commit is contained in:
parent
8362aa9d66
commit
0e392654f2
|
@ -17,7 +17,11 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const UDP_PACKET_SIZE int = 1500
|
const (
|
||||||
|
UDP_PACKET_SIZE int = 1500
|
||||||
|
|
||||||
|
defaultFieldName = "value"
|
||||||
|
)
|
||||||
|
|
||||||
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
|
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
|
||||||
"You may want to increase allowed_pending_messages in the config\n"
|
"You may want to increase allowed_pending_messages in the config\n"
|
||||||
|
@ -114,7 +118,7 @@ type cachedcounter struct {
|
||||||
|
|
||||||
type cachedtimings struct {
|
type cachedtimings struct {
|
||||||
name string
|
name string
|
||||||
stats RunningStats
|
fields map[string]RunningStats
|
||||||
tags map[string]string
|
tags map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
for _, metric := range s.timings {
|
for _, metric := range s.timings {
|
||||||
|
// Defining a template to parse field names for timers allows us to split
|
||||||
|
// out multiple fields per timer. In this case we prefix each stat with the
|
||||||
|
// field name and store these all in a single measurement.
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
fields["mean"] = metric.stats.Mean()
|
for fieldName, stats := range metric.fields {
|
||||||
fields["stddev"] = metric.stats.Stddev()
|
var prefix string
|
||||||
fields["upper"] = metric.stats.Upper()
|
if fieldName != defaultFieldName {
|
||||||
fields["lower"] = metric.stats.Lower()
|
prefix = fieldName + "_"
|
||||||
fields["count"] = metric.stats.Count()
|
|
||||||
for _, percentile := range s.Percentiles {
|
|
||||||
name := fmt.Sprintf("%v_percentile", percentile)
|
|
||||||
fields[name] = metric.stats.Percentile(percentile)
|
|
||||||
}
|
}
|
||||||
|
fields[prefix+"mean"] = stats.Mean()
|
||||||
|
fields[prefix+"stddev"] = stats.Stddev()
|
||||||
|
fields[prefix+"upper"] = stats.Upper()
|
||||||
|
fields[prefix+"lower"] = stats.Lower()
|
||||||
|
fields[prefix+"count"] = stats.Count()
|
||||||
|
for _, percentile := range s.Percentiles {
|
||||||
|
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
|
||||||
|
fields[name] = stats.Percentile(percentile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
acc.AddFields(metric.name, fields, metric.tags, now)
|
acc.AddFields(metric.name, fields, metric.tags, now)
|
||||||
}
|
}
|
||||||
if s.DeleteTimings {
|
if s.DeleteTimings {
|
||||||
|
@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
|
|
||||||
// Parse the name & tags from bucket
|
// Parse the name & tags from bucket
|
||||||
m.name, m.field, m.tags = s.parseName(m.bucket)
|
m.name, m.field, m.tags = s.parseName(m.bucket)
|
||||||
// fields are not supported for timings, so if specified combine into
|
|
||||||
// the name
|
|
||||||
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
|
|
||||||
m.name += "_" + m.field
|
|
||||||
}
|
|
||||||
switch m.mtype {
|
switch m.mtype {
|
||||||
case "c":
|
case "c":
|
||||||
m.tags["metric_type"] = "counter"
|
m.tags["metric_type"] = "counter"
|
||||||
|
@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
|
||||||
name = strings.Replace(name, "-", "__", -1)
|
name = strings.Replace(name, "-", "__", -1)
|
||||||
}
|
}
|
||||||
if field == "" {
|
if field == "" {
|
||||||
field = "value"
|
field = defaultFieldName
|
||||||
}
|
}
|
||||||
|
|
||||||
return name, field, tags
|
return name, field, tags
|
||||||
|
@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) {
|
||||||
func (s *Statsd) aggregate(m metric) {
|
func (s *Statsd) aggregate(m metric) {
|
||||||
switch m.mtype {
|
switch m.mtype {
|
||||||
case "ms", "h":
|
case "ms", "h":
|
||||||
|
// Check if the measurement exists
|
||||||
cached, ok := s.timings[m.hash]
|
cached, ok := s.timings[m.hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
cached = cachedtimings{
|
cached = cachedtimings{
|
||||||
name: m.name,
|
name: m.name,
|
||||||
|
fields: make(map[string]RunningStats),
|
||||||
tags: m.tags,
|
tags: m.tags,
|
||||||
stats: RunningStats{
|
}
|
||||||
|
}
|
||||||
|
// Check if the field exists. If we've not enabled multiple fields per timer
|
||||||
|
// this will be the default field name, eg. "value"
|
||||||
|
field, ok := cached.fields[m.field]
|
||||||
|
if !ok {
|
||||||
|
field = RunningStats{
|
||||||
PercLimit: s.PercentileLimit,
|
PercLimit: s.PercentileLimit,
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.samplerate > 0 {
|
if m.samplerate > 0 {
|
||||||
for i := 0; i < int(1.0/m.samplerate); i++ {
|
for i := 0; i < int(1.0/m.samplerate); i++ {
|
||||||
cached.stats.AddValue(m.floatvalue)
|
field.AddValue(m.floatvalue)
|
||||||
}
|
}
|
||||||
s.timings[m.hash] = cached
|
|
||||||
} else {
|
} else {
|
||||||
cached.stats.AddValue(m.floatvalue)
|
field.AddValue(m.floatvalue)
|
||||||
s.timings[m.hash] = cached
|
|
||||||
}
|
}
|
||||||
|
cached.fields[m.field] = field
|
||||||
|
s.timings[m.hash] = cached
|
||||||
case "c":
|
case "c":
|
||||||
// check if the measurement exists
|
// check if the measurement exists
|
||||||
_, ok := s.counters[m.hash]
|
_, ok := s.counters[m.hash]
|
||||||
|
|
|
@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
|
||||||
// A 0 with invalid samplerate will add a single 0,
|
// A 0 with invalid samplerate will add a single 0,
|
||||||
// plus the last bit of value 1
|
// plus the last bit of value 1
|
||||||
// which adds up to 12 individual datapoints to be cached
|
// which adds up to 12 individual datapoints to be cached
|
||||||
if cachedtiming.stats.n != 12 {
|
if cachedtiming.fields[defaultFieldName].n != 12 {
|
||||||
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
|
t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cachedtiming.stats.upper != 1 {
|
if cachedtiming.fields[defaultFieldName].upper != 1 {
|
||||||
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
|
t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.AssertContainsFields(t, "test_timing", valid)
|
acc.AssertContainsFields(t, "test_timing", valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests low-level functionality of timings when multiple fields is enabled
|
||||||
|
// and a measurement template has been defined which can parse field names
|
||||||
|
func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
|
||||||
|
s := NewStatsd()
|
||||||
|
s.Templates = []string{"measurement.field"}
|
||||||
|
s.Percentiles = []int{90}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
validLines := []string{
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:11|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:22|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range validLines {
|
||||||
|
err := s.parseStatsdLine(line)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Gather(acc)
|
||||||
|
|
||||||
|
valid := map[string]interface{}{
|
||||||
|
"success_90_percentile": float64(11),
|
||||||
|
"success_count": int64(5),
|
||||||
|
"success_lower": float64(1),
|
||||||
|
"success_mean": float64(3),
|
||||||
|
"success_stddev": float64(4),
|
||||||
|
"success_upper": float64(11),
|
||||||
|
|
||||||
|
"error_90_percentile": float64(22),
|
||||||
|
"error_count": int64(5),
|
||||||
|
"error_lower": float64(2),
|
||||||
|
"error_mean": float64(6),
|
||||||
|
"error_stddev": float64(8),
|
||||||
|
"error_upper": float64(22),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsFields(t, "test_timing", valid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests low-level functionality of timings when multiple fields is enabled
|
||||||
|
// but a measurement template hasn't been defined so we can't parse field names
|
||||||
|
// In this case the behaviour should be the same as normal behaviour
|
||||||
|
func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
|
||||||
|
s := NewStatsd()
|
||||||
|
s.Templates = []string{}
|
||||||
|
s.Percentiles = []int{90}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
validLines := []string{
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:11|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.success:1|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:22|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
"test_timing.error:2|ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range validLines {
|
||||||
|
err := s.parseStatsdLine(line)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.Gather(acc)
|
||||||
|
|
||||||
|
expectedSuccess := map[string]interface{}{
|
||||||
|
"90_percentile": float64(11),
|
||||||
|
"count": int64(5),
|
||||||
|
"lower": float64(1),
|
||||||
|
"mean": float64(3),
|
||||||
|
"stddev": float64(4),
|
||||||
|
"upper": float64(11),
|
||||||
|
}
|
||||||
|
expectedError := map[string]interface{}{
|
||||||
|
"90_percentile": float64(22),
|
||||||
|
"count": int64(5),
|
||||||
|
"lower": float64(2),
|
||||||
|
"mean": float64(6),
|
||||||
|
"stddev": float64(8),
|
||||||
|
"upper": float64(22),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsFields(t, "test_timing_success", expectedSuccess)
|
||||||
|
acc.AssertContainsFields(t, "test_timing_error", expectedError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParse_Timings_Delete(t *testing.T) {
|
func TestParse_Timings_Delete(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue