Parse statsd lines with multiple metric bits
This commit is contained in:
parent
b705608b04
commit
e9fa7b48a6
|
@ -26,6 +26,26 @@ implementation. In short, the telegraf statsd listener will accept:
|
||||||
- `load.time.nanoseconds:1|h`
|
- `load.time.nanoseconds:1|h`
|
||||||
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time
|
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time
|
||||||
|
|
||||||
|
It is possible to omit repetitive names and merge individual stats into a
|
||||||
|
single line by separating them with additional colons:
|
||||||
|
|
||||||
|
- `users.current.den001.myapp:32|g:+10|g:-10|g`
|
||||||
|
- `deploys.test.myservice:1|c:101|c:1|c|@0.1`
|
||||||
|
- `users.unique:101|s:101|s:102|s`
|
||||||
|
- `load.time:320|ms:200|ms|@0.1`
|
||||||
|
|
||||||
|
This also allows for mixed types in a single line:
|
||||||
|
|
||||||
|
- `foo:1|c:200|ms`
|
||||||
|
|
||||||
|
The internals for this do parse out the metric name and repeativly use it to
|
||||||
|
create new packets for each bit in the line that can be separated by colon.
|
||||||
|
The parser will copy down the metric name to each bit until a newline is found.
|
||||||
|
|
||||||
|
The string `foo:1|c:200|ms` is internally split into two individual metrics
|
||||||
|
`foo:1|c` and `foo:200|ms` which are added to the aggregator separately.
|
||||||
|
|
||||||
|
|
||||||
#### Influx Statsd
|
#### Influx Statsd
|
||||||
|
|
||||||
In order to take advantage of InfluxDB's tagging system, we have made a couple
|
In order to take advantage of InfluxDB's tagging system, we have made a couple
|
||||||
|
|
|
@ -254,101 +254,109 @@ func (s *Statsd) parseStatsdLine(line string) error {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
m := metric{}
|
// Validate splitting the line on ":"
|
||||||
|
bits := strings.Split(line, ":")
|
||||||
// Validate splitting the line on "|"
|
if len(bits) < 2 {
|
||||||
pipesplit := strings.Split(line, "|")
|
|
||||||
if len(pipesplit) < 2 {
|
|
||||||
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
|
|
||||||
return errors.New("Error Parsing statsd line")
|
|
||||||
} else if len(pipesplit) > 2 {
|
|
||||||
sr := pipesplit[2]
|
|
||||||
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
|
|
||||||
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
|
|
||||||
if strings.Contains(sr, "@") && len(sr) > 1 {
|
|
||||||
samplerate, err := strconv.ParseFloat(sr[1:], 64)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf(errmsg, err.Error(), line)
|
|
||||||
} else {
|
|
||||||
// sample rate successfully parsed
|
|
||||||
m.samplerate = samplerate
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf(errmsg, "", line)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate metric type
|
|
||||||
switch pipesplit[1] {
|
|
||||||
case "g", "c", "s", "ms", "h":
|
|
||||||
m.mtype = pipesplit[1]
|
|
||||||
default:
|
|
||||||
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
|
|
||||||
return errors.New("Error Parsing statsd line")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate splitting the rest of the line on ":"
|
|
||||||
colonsplit := strings.Split(pipesplit[0], ":")
|
|
||||||
if len(colonsplit) != 2 {
|
|
||||||
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
|
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
|
||||||
return errors.New("Error Parsing statsd line")
|
return errors.New("Error Parsing statsd line")
|
||||||
}
|
}
|
||||||
m.bucket = colonsplit[0]
|
|
||||||
|
|
||||||
// Parse the value
|
// Extract bucket name from individual metric bits
|
||||||
if strings.ContainsAny(colonsplit[1], "-+") {
|
bucketName, bits := bits[0], bits[1:]
|
||||||
if m.mtype != "g" {
|
|
||||||
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
|
// Add a metric for each bit available
|
||||||
|
for _, bit := range bits {
|
||||||
|
m := metric{}
|
||||||
|
|
||||||
|
m.bucket = bucketName
|
||||||
|
|
||||||
|
// Validate splitting the bit on "|"
|
||||||
|
pipesplit := strings.Split(bit, "|")
|
||||||
|
if len(pipesplit) < 2 {
|
||||||
|
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
|
||||||
|
return errors.New("Error Parsing statsd line")
|
||||||
|
} else if len(pipesplit) > 2 {
|
||||||
|
sr := pipesplit[2]
|
||||||
|
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
|
||||||
|
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
|
||||||
|
if strings.Contains(sr, "@") && len(sr) > 1 {
|
||||||
|
samplerate, err := strconv.ParseFloat(sr[1:], 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf(errmsg, err.Error(), line)
|
||||||
|
} else {
|
||||||
|
// sample rate successfully parsed
|
||||||
|
m.samplerate = samplerate
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf(errmsg, "", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate metric type
|
||||||
|
switch pipesplit[1] {
|
||||||
|
case "g", "c", "s", "ms", "h":
|
||||||
|
m.mtype = pipesplit[1]
|
||||||
|
default:
|
||||||
|
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
|
||||||
return errors.New("Error Parsing statsd line")
|
return errors.New("Error Parsing statsd line")
|
||||||
}
|
}
|
||||||
m.additive = true
|
|
||||||
}
|
|
||||||
|
|
||||||
switch m.mtype {
|
// Parse the value
|
||||||
case "g", "ms", "h":
|
if strings.ContainsAny(pipesplit[0], "-+") {
|
||||||
v, err := strconv.ParseFloat(colonsplit[1], 64)
|
if m.mtype != "g" {
|
||||||
if err != nil {
|
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
|
||||||
log.Printf("Error: parsing value to float64: %s\n", line)
|
return errors.New("Error Parsing statsd line")
|
||||||
return errors.New("Error Parsing statsd line")
|
}
|
||||||
|
m.additive = true
|
||||||
}
|
}
|
||||||
m.floatvalue = v
|
|
||||||
case "c", "s":
|
switch m.mtype {
|
||||||
v, err := strconv.ParseInt(colonsplit[1], 10, 64)
|
case "g", "ms", "h":
|
||||||
if err != nil {
|
v, err := strconv.ParseFloat(pipesplit[0], 64)
|
||||||
log.Printf("Error: parsing value to int64: %s\n", line)
|
if err != nil {
|
||||||
return errors.New("Error Parsing statsd line")
|
log.Printf("Error: parsing value to float64: %s\n", line)
|
||||||
|
return errors.New("Error Parsing statsd line")
|
||||||
|
}
|
||||||
|
m.floatvalue = v
|
||||||
|
case "c", "s":
|
||||||
|
v, err := strconv.ParseInt(pipesplit[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error: parsing value to int64: %s\n", line)
|
||||||
|
return errors.New("Error Parsing statsd line")
|
||||||
|
}
|
||||||
|
// If a sample rate is given with a counter, divide value by the rate
|
||||||
|
if m.samplerate != 0 && m.mtype == "c" {
|
||||||
|
v = int64(float64(v) / m.samplerate)
|
||||||
|
}
|
||||||
|
m.intvalue = v
|
||||||
}
|
}
|
||||||
// If a sample rate is given with a counter, divide value by the rate
|
|
||||||
if m.samplerate != 0 && m.mtype == "c" {
|
// Parse the name & tags from bucket
|
||||||
v = int64(float64(v) / m.samplerate)
|
m.name, m.tags = s.parseName(m.bucket)
|
||||||
|
switch m.mtype {
|
||||||
|
case "c":
|
||||||
|
m.tags["metric_type"] = "counter"
|
||||||
|
case "g":
|
||||||
|
m.tags["metric_type"] = "gauge"
|
||||||
|
case "s":
|
||||||
|
m.tags["metric_type"] = "set"
|
||||||
|
case "ms":
|
||||||
|
m.tags["metric_type"] = "timing"
|
||||||
|
case "h":
|
||||||
|
m.tags["metric_type"] = "histogram"
|
||||||
}
|
}
|
||||||
m.intvalue = v
|
|
||||||
|
// Make a unique key for the measurement name/tags
|
||||||
|
var tg []string
|
||||||
|
for k, v := range m.tags {
|
||||||
|
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
|
||||||
|
}
|
||||||
|
sort.Strings(tg)
|
||||||
|
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)
|
||||||
|
|
||||||
|
s.aggregate(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the name & tags from bucket
|
|
||||||
m.name, m.tags = s.parseName(m.bucket)
|
|
||||||
switch m.mtype {
|
|
||||||
case "c":
|
|
||||||
m.tags["metric_type"] = "counter"
|
|
||||||
case "g":
|
|
||||||
m.tags["metric_type"] = "gauge"
|
|
||||||
case "s":
|
|
||||||
m.tags["metric_type"] = "set"
|
|
||||||
case "ms":
|
|
||||||
m.tags["metric_type"] = "timing"
|
|
||||||
case "h":
|
|
||||||
m.tags["metric_type"] = "histogram"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a unique key for the measurement name/tags
|
|
||||||
var tg []string
|
|
||||||
for k, v := range m.tags {
|
|
||||||
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
|
|
||||||
}
|
|
||||||
sort.Strings(tg)
|
|
||||||
m.hash = fmt.Sprintf("%s%s", strings.Join(tg, ""), m.name)
|
|
||||||
|
|
||||||
s.aggregate(m)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,6 +326,136 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that measurements with multiple bits, are treated as different outputs
|
||||||
|
// but are equal to their single-measurement representation
|
||||||
|
func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
|
||||||
|
single_lines := []string{
|
||||||
|
"valid.multiple:0|ms|@0.1",
|
||||||
|
"valid.multiple:0|ms|",
|
||||||
|
"valid.multiple:1|ms",
|
||||||
|
"valid.multiple.duplicate:1|c",
|
||||||
|
"valid.multiple.duplicate:1|c",
|
||||||
|
"valid.multiple.duplicate:2|c",
|
||||||
|
"valid.multiple.duplicate:1|c",
|
||||||
|
"valid.multiple.duplicate:1|h",
|
||||||
|
"valid.multiple.duplicate:1|h",
|
||||||
|
"valid.multiple.duplicate:2|h",
|
||||||
|
"valid.multiple.duplicate:1|h",
|
||||||
|
"valid.multiple.duplicate:1|s",
|
||||||
|
"valid.multiple.duplicate:1|s",
|
||||||
|
"valid.multiple.duplicate:2|s",
|
||||||
|
"valid.multiple.duplicate:1|s",
|
||||||
|
"valid.multiple.duplicate:1|g",
|
||||||
|
"valid.multiple.duplicate:1|g",
|
||||||
|
"valid.multiple.duplicate:2|g",
|
||||||
|
"valid.multiple.duplicate:1|g",
|
||||||
|
"valid.multiple.mixed:1|c",
|
||||||
|
"valid.multiple.mixed:1|ms",
|
||||||
|
"valid.multiple.mixed:2|s",
|
||||||
|
"valid.multiple.mixed:1|g",
|
||||||
|
}
|
||||||
|
|
||||||
|
multiple_lines := []string{
|
||||||
|
"valid.multiple:0|ms|@0.1:0|ms|:1|ms",
|
||||||
|
"valid.multiple.duplicate:1|c:1|c:2|c:1|c",
|
||||||
|
"valid.multiple.duplicate:1|h:1|h:2|h:1|h",
|
||||||
|
"valid.multiple.duplicate:1|s:1|s:2|s:1|s",
|
||||||
|
"valid.multiple.duplicate:1|g:1|g:2|g:1|g",
|
||||||
|
"valid.multiple.mixed:1|c:1|ms:2|s:1|g",
|
||||||
|
}
|
||||||
|
|
||||||
|
s_single := NewStatsd()
|
||||||
|
s_multiple := NewStatsd()
|
||||||
|
|
||||||
|
for _, line := range single_lines {
|
||||||
|
err := s_single.parseStatsdLine(line)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range multiple_lines {
|
||||||
|
err := s_multiple.parseStatsdLine(line)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s_single.timings) != 3 {
|
||||||
|
t.Errorf("Expected 3 measurement, found %d", len(s_single.timings))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cachedtiming, ok := s_single.timings["metric_type=timingvalid_multiple"]; !ok {
|
||||||
|
t.Errorf("Expected cached measurement with hash 'metric_type=timingvalid_multiple' not found")
|
||||||
|
} else {
|
||||||
|
if cachedtiming.name != "valid_multiple" {
|
||||||
|
t.Errorf("Expected the name to be 'valid_multiple', got %s", cachedtiming.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// A 0 at samplerate 0.1 will add 10 values of 0,
|
||||||
|
// A 0 with invalid samplerate will add a single 0,
|
||||||
|
// plus the last bit of value 1
|
||||||
|
// which adds up to 12 individual datapoints to be cached
|
||||||
|
if cachedtiming.stats.n != 12 {
|
||||||
|
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cachedtiming.stats.upper != 1 {
|
||||||
|
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test if s_single and s_multiple did compute the same stats for valid.multiple.duplicate
|
||||||
|
if err := test_validate_set("valid_multiple_duplicate", 2, s_single.sets); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_set("valid_multiple_duplicate", 2, s_multiple.sets); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_counter("valid_multiple_duplicate", 5, s_single.counters); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_counter("valid_multiple_duplicate", 5, s_multiple.counters); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_gauge("valid_multiple_duplicate", 1, s_single.gauges); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_gauge("valid_multiple_duplicate", 1, s_multiple.gauges); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// test if s_single and s_multiple did compute the same stats for valid.multiple.mixed
|
||||||
|
if err := test_validate_set("valid_multiple_mixed", 1, s_single.sets); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_set("valid_multiple_mixed", 1, s_multiple.sets); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_counter("valid_multiple_mixed", 1, s_single.counters); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_counter("valid_multiple_mixed", 1, s_multiple.counters); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_gauge("valid_multiple_mixed", 1, s_single.gauges); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := test_validate_gauge("valid_multiple_mixed", 1, s_multiple.gauges); err != nil {
|
||||||
|
t.Error(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Valid lines should be parsed and their values should be cached
|
// Valid lines should be parsed and their values should be cached
|
||||||
func TestParse_ValidLines(t *testing.T) {
|
func TestParse_ValidLines(t *testing.T) {
|
||||||
s := NewStatsd()
|
s := NewStatsd()
|
||||||
|
|
Loading…
Reference in New Issue