diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index 70b87512f..67fa1ae3a 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -1,11 +1,10 @@ package wavefront import ( - "bytes" - "fmt" "log" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs/wavefront" @@ -16,6 +15,8 @@ type WavefrontSerializer struct { Prefix string UseStrict bool SourceOverride []string + scratch buffer + mu sync.Mutex // buffer mutex } // catch many of the invalid chars that could appear in a metric or tag name @@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav return s, nil } -// Serialize : Serialize based on Wavefront format -func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { - out := []byte{} - metricSeparator := "." +func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) { + const metricSeparator = "." for fieldName, value := range m.Fields() { var name string if fieldName == "value" { - name = fmt.Sprintf("%s%s", s.Prefix, m.Name()) + name = s.Prefix + m.Name() } else { - name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName) + name = s.Prefix + m.Name() + metricSeparator + fieldName } if s.UseStrict { @@ -70,133 +69,150 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { name = pathReplacer.Replace(name) - metric := &wavefront.MetricPoint{ - Metric: name, - Timestamp: m.Time().Unix(), - } - - metricValue, buildError := buildValue(value, metric.Metric) - if buildError != nil { + metricValue, valid := buildValue(value, name) + if !valid { // bad value continue to next metric continue } - metric.Value = metricValue - source, tags := buildTags(m.Tags(), s) - metric.Source = source - metric.Tags = tags - - out = append(out, formatMetricPoint(metric, s)...) + metric := wavefront.MetricPoint{ + Metric: name, + Timestamp: m.Time().Unix(), + Value: metricValue, + Source: source, + Tags: tags, + } + formatMetricPoint(&s.scratch, &metric, s) } +} + +// Serialize : Serialize based on Wavefront format +func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { + s.mu.Lock() + s.scratch.Reset() + s.serialize(&s.scratch, m) + out := s.scratch.Copy() + s.mu.Unlock() return out, nil } func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { - var batch bytes.Buffer + s.mu.Lock() + s.scratch.Reset() for _, m := range metrics { - buf, err := s.Serialize(m) - if err != nil { - return nil, err - } - _, err = batch.Write(buf) - if err != nil { - return nil, err + s.serialize(&s.scratch, m) + } + out := s.scratch.Copy() + s.mu.Unlock() + return out, nil +} + +func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { + if src, ok := mTags["source"]; ok { + delete(mTags, "source") + return src + } + for _, src := range s.SourceOverride { + if source, ok := mTags[src]; ok { + delete(mTags, src) + mTags["telegraf_host"] = mTags["host"] + return source } } - return batch.Bytes(), nil + return mTags["host"] } func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { - // Remove all empty tags. for k, v := range mTags { if v == "" { delete(mTags, k) } } - - var source string - - if src, ok := mTags["source"]; ok { - source = src - delete(mTags, "source") - } else { - sourceTagFound := false - for _, src := range s.SourceOverride { - for k, v := range mTags { - if k == src { - source = v - mTags["telegraf_host"] = mTags["host"] - sourceTagFound = true - delete(mTags, k) - break - } - } - if sourceTagFound { - break - } - } - - if !sourceTagFound { - source = mTags["host"] - } - } - + source := findSourceTag(mTags, s) delete(mTags, "host") - return tagValueReplacer.Replace(source), mTags } -func buildValue(v interface{}, name string) (float64, error) { +func buildValue(v interface{}, name string) (val float64, valid bool) { switch p := v.(type) { case bool: if p { - return 1, nil - } else { - return 0, nil + return 1, true } + return 0, true case int64: - return float64(v.(int64)), nil + return float64(p), true case uint64: - return float64(v.(uint64)), nil + return float64(p), true case float64: - return v.(float64), nil + return p, true case string: - // return an error but don't log - return 0, fmt.Errorf("string type not supported") + // return false but don't log + return 0, false default: - // return an error and log a debug message - err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) - log.Printf("D! Serializer [wavefront] %s\n", err.Error()) - return 0, err + // log a debug message + log.Printf("D! Serializer [wavefront] unexpected type: %T, with value: %v, for :%s\n", + v, v, name) + return 0, false } } -func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { - var buffer bytes.Buffer - buffer.WriteString("\"") - buffer.WriteString(metricPoint.Metric) - buffer.WriteString("\" ") - buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) - buffer.WriteString(" ") - buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) - buffer.WriteString(" source=\"") - buffer.WriteString(metricPoint.Source) - buffer.WriteString("\"") +func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { + b.WriteChar('"') + b.WriteString(metricPoint.Metric) + b.WriteString(`" `) + b.WriteFloat64(metricPoint.Value) + b.WriteChar(' ') + b.WriteUint64(uint64(metricPoint.Timestamp)) + b.WriteString(` source="`) + b.WriteString(metricPoint.Source) + b.WriteChar('"') for k, v := range metricPoint.Tags { - buffer.WriteString(" \"") + b.WriteString(` "`) if s.UseStrict { - buffer.WriteString(strictSanitizedChars.Replace(k)) + b.WriteString(strictSanitizedChars.Replace(k)) } else { - buffer.WriteString(sanitizedChars.Replace(k)) + b.WriteString(sanitizedChars.Replace(k)) } - buffer.WriteString("\"=\"") - buffer.WriteString(tagValueReplacer.Replace(v)) - buffer.WriteString("\"") + b.WriteString(`"="`) + b.WriteString(tagValueReplacer.Replace(v)) + b.WriteChar('"') } - buffer.WriteString("\n") + b.WriteChar('\n') - return buffer.Bytes() + return *b +} + +type buffer []byte + +func (b *buffer) Reset() { *b = (*b)[:0] } + +func (b *buffer) Copy() []byte { + p := make([]byte, len(*b)) + copy(p, *b) + return p +} + +func (b *buffer) WriteString(s string) { + *b = append(*b, s...) +} + +// This is named WriteChar instead of WriteByte because the 'stdmethods' check +// of 'go vet' wants WriteByte to have the signature: +// +// func (b *buffer) WriteByte(c byte) error { ... } +// +func (b *buffer) WriteChar(c byte) { + *b = append(*b, c) +} + +func (b *buffer) WriteUint64(val uint64) { + *b = strconv.AppendUint(*b, val, 10) +} + +func (b *buffer) WriteFloat64(val float64) { + *b = strconv.AppendFloat(*b, val, 'f', 6, 64) } diff --git a/plugins/serializers/wavefront/wavefront_test.go b/plugins/serializers/wavefront/wavefront_test.go index 3230ce515..548326e70 100755 --- a/plugins/serializers/wavefront/wavefront_test.go +++ b/plugins/serializers/wavefront/wavefront_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/wavefront" "github.com/stretchr/testify/assert" @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) { s := WavefrontSerializer{} for _, pt := range pointTests { - bout := formatMetricPoint(pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, &s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) { s := WavefrontSerializer{UseStrict: true} for _, pt := range pointTests { - bout := formatMetricPoint(pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, &s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -293,3 +294,47 @@ func TestSerializeMetricPrefix(t *testing.T) { expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} assert.Equal(t, expS, mS) } + +func benchmarkMetrics(b *testing.B) [4]telegraf.Metric { + b.Helper() + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + newMetric := func(v interface{}) telegraf.Metric { + fields := map[string]interface{}{ + "usage_idle": v, + } + m, err := metric.New("cpu", tags, fields, now) + if err != nil { + b.Fatal(err) + } + return m + } + return [4]telegraf.Metric{ + newMetric(91.5), + newMetric(91), + newMetric(true), + newMetric(false), + } +} + +func BenchmarkSerialize(b *testing.B) { + var s WavefrontSerializer + metrics := benchmarkMetrics(b) + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Serialize(metrics[i%len(metrics)]) + } +} + +func BenchmarkSerializeBatch(b *testing.B) { + var s WavefrontSerializer + m := benchmarkMetrics(b) + metrics := m[:] + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.SerializeBatch(metrics) + } +}