Improve performance of wavefront serializer (#5842)

This commit is contained in:
Charlie Vieth 2019-06-12 14:59:51 -04:00 committed by Daniel Nelson
parent 1d682b847c
commit 1a647fb6ba
2 changed files with 155 additions and 94 deletions

View File

@ -1,11 +1,10 @@
package wavefront package wavefront
import ( import (
"bytes"
"fmt"
"log" "log"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/wavefront" "github.com/influxdata/telegraf/plugins/outputs/wavefront"
@ -16,6 +15,8 @@ type WavefrontSerializer struct {
Prefix string Prefix string
UseStrict bool UseStrict bool
SourceOverride []string SourceOverride []string
scratch buffer
mu sync.Mutex // buffer mutex
} }
// catch many of the invalid chars that could appear in a metric or tag name // catch many of the invalid chars that could appear in a metric or tag name
@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav
return s, nil return s, nil
} }
// Serialize : Serialize based on Wavefront format func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) {
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { const metricSeparator = "."
out := []byte{}
metricSeparator := "."
for fieldName, value := range m.Fields() { for fieldName, value := range m.Fields() {
var name string var name string
if fieldName == "value" { if fieldName == "value" {
name = fmt.Sprintf("%s%s", s.Prefix, m.Name()) name = s.Prefix + m.Name()
} else { } else {
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName) name = s.Prefix + m.Name() + metricSeparator + fieldName
} }
if s.UseStrict { if s.UseStrict {
@ -70,133 +69,150 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
name = pathReplacer.Replace(name) name = pathReplacer.Replace(name)
metric := &wavefront.MetricPoint{ metricValue, valid := buildValue(value, name)
Metric: name, if !valid {
Timestamp: m.Time().Unix(),
}
metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
// bad value continue to next metric // bad value continue to next metric
continue continue
} }
metric.Value = metricValue
source, tags := buildTags(m.Tags(), s) source, tags := buildTags(m.Tags(), s)
metric.Source = source metric := wavefront.MetricPoint{
metric.Tags = tags Metric: name,
Timestamp: m.Time().Unix(),
out = append(out, formatMetricPoint(metric, s)...) Value: metricValue,
Source: source,
Tags: tags,
} }
formatMetricPoint(&s.scratch, &metric, s)
}
}
// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
s.mu.Lock()
s.scratch.Reset()
s.serialize(&s.scratch, m)
out := s.scratch.Copy()
s.mu.Unlock()
return out, nil return out, nil
} }
func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer s.mu.Lock()
s.scratch.Reset()
for _, m := range metrics { for _, m := range metrics {
buf, err := s.Serialize(m) s.serialize(&s.scratch, m)
if err != nil {
return nil, err
} }
_, err = batch.Write(buf) out := s.scratch.Copy()
if err != nil { s.mu.Unlock()
return nil, err return out, nil
}
func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
if src, ok := mTags["source"]; ok {
delete(mTags, "source")
return src
}
for _, src := range s.SourceOverride {
if source, ok := mTags[src]; ok {
delete(mTags, src)
mTags["telegraf_host"] = mTags["host"]
return source
} }
} }
return batch.Bytes(), nil return mTags["host"]
} }
func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {
// Remove all empty tags. // Remove all empty tags.
for k, v := range mTags { for k, v := range mTags {
if v == "" { if v == "" {
delete(mTags, k) delete(mTags, k)
} }
} }
source := findSourceTag(mTags, s)
var source string
if src, ok := mTags["source"]; ok {
source = src
delete(mTags, "source")
} else {
sourceTagFound := false
for _, src := range s.SourceOverride {
for k, v := range mTags {
if k == src {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}
if !sourceTagFound {
source = mTags["host"]
}
}
delete(mTags, "host") delete(mTags, "host")
return tagValueReplacer.Replace(source), mTags return tagValueReplacer.Replace(source), mTags
} }
func buildValue(v interface{}, name string) (float64, error) { func buildValue(v interface{}, name string) (val float64, valid bool) {
switch p := v.(type) { switch p := v.(type) {
case bool: case bool:
if p { if p {
return 1, nil return 1, true
} else {
return 0, nil
} }
return 0, true
case int64: case int64:
return float64(v.(int64)), nil return float64(p), true
case uint64: case uint64:
return float64(v.(uint64)), nil return float64(p), true
case float64: case float64:
return v.(float64), nil return p, true
case string: case string:
// return an error but don't log // return false but don't log
return 0, fmt.Errorf("string type not supported") return 0, false
default: default:
// return an error and log a debug message // log a debug message
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) log.Printf("D! Serializer [wavefront] unexpected type: %T, with value: %v, for :%s\n",
log.Printf("D! Serializer [wavefront] %s\n", err.Error()) v, v, name)
return 0, err return 0, false
} }
} }
func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
var buffer bytes.Buffer b.WriteChar('"')
buffer.WriteString("\"") b.WriteString(metricPoint.Metric)
buffer.WriteString(metricPoint.Metric) b.WriteString(`" `)
buffer.WriteString("\" ") b.WriteFloat64(metricPoint.Value)
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) b.WriteChar(' ')
buffer.WriteString(" ") b.WriteUint64(uint64(metricPoint.Timestamp))
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) b.WriteString(` source="`)
buffer.WriteString(" source=\"") b.WriteString(metricPoint.Source)
buffer.WriteString(metricPoint.Source) b.WriteChar('"')
buffer.WriteString("\"")
for k, v := range metricPoint.Tags { for k, v := range metricPoint.Tags {
buffer.WriteString(" \"") b.WriteString(` "`)
if s.UseStrict { if s.UseStrict {
buffer.WriteString(strictSanitizedChars.Replace(k)) b.WriteString(strictSanitizedChars.Replace(k))
} else { } else {
buffer.WriteString(sanitizedChars.Replace(k)) b.WriteString(sanitizedChars.Replace(k))
} }
buffer.WriteString("\"=\"") b.WriteString(`"="`)
buffer.WriteString(tagValueReplacer.Replace(v)) b.WriteString(tagValueReplacer.Replace(v))
buffer.WriteString("\"") b.WriteChar('"')
} }
buffer.WriteString("\n") b.WriteChar('\n')
return buffer.Bytes() return *b
}
type buffer []byte
func (b *buffer) Reset() { *b = (*b)[:0] }
func (b *buffer) Copy() []byte {
p := make([]byte, len(*b))
copy(p, *b)
return p
}
func (b *buffer) WriteString(s string) {
*b = append(*b, s...)
}
// This is named WriteChar instead of WriteByte because the 'stdmethods' check
// of 'go vet' wants WriteByte to have the signature:
//
// func (b *buffer) WriteByte(c byte) error { ... }
//
func (b *buffer) WriteChar(c byte) {
*b = append(*b, c)
}
func (b *buffer) WriteUint64(val uint64) {
*b = strconv.AppendUint(*b, val, 10)
}
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
} }

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/wavefront" "github.com/influxdata/telegraf/plugins/outputs/wavefront"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) {
s := WavefrontSerializer{} s := WavefrontSerializer{}
for _, pt := range pointTests { for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s) bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:]) sout := string(bout[:])
if sout != pt.out { if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) {
s := WavefrontSerializer{UseStrict: true} s := WavefrontSerializer{UseStrict: true}
for _, pt := range pointTests { for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s) bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:]) sout := string(bout[:])
if sout != pt.out { if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
@ -293,3 +294,47 @@ func TestSerializeMetricPrefix(t *testing.T) {
expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)}
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
} }
func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
b.Helper()
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"host": "realHost",
}
newMetric := func(v interface{}) telegraf.Metric {
fields := map[string]interface{}{
"usage_idle": v,
}
m, err := metric.New("cpu", tags, fields, now)
if err != nil {
b.Fatal(err)
}
return m
}
return [4]telegraf.Metric{
newMetric(91.5),
newMetric(91),
newMetric(true),
newMetric(false),
}
}
func BenchmarkSerialize(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Serialize(metrics[i%len(metrics)])
}
}
func BenchmarkSerializeBatch(b *testing.B) {
var s WavefrontSerializer
m := benchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.SerializeBatch(metrics)
}
}