203 lines
4.9 KiB
Go
203 lines
4.9 KiB
Go
|
package wavefront
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/influxdata/telegraf"
|
||
|
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
|
||
|
)
|
||
|
|
||
|
// WavefrontSerializer : WavefrontSerializer struct
|
||
|
type WavefrontSerializer struct {
|
||
|
Prefix string
|
||
|
UseStrict bool
|
||
|
SourceOverride []string
|
||
|
}
|
||
|
|
||
|
// catch many of the invalid chars that could appear in a metric or tag name
|
||
|
var sanitizedChars = strings.NewReplacer(
|
||
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
||
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
||
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
||
|
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
|
||
|
"=", "-",
|
||
|
)
|
||
|
|
||
|
// catch many of the invalid chars that could appear in a metric or tag name
|
||
|
var strictSanitizedChars = strings.NewReplacer(
|
||
|
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
|
||
|
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
|
||
|
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
|
||
|
">", "-", "?", "-", "\\", "-", "|", "-", " ", "-", "=", "-",
|
||
|
)
|
||
|
|
||
|
var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-")
|
||
|
|
||
|
var pathReplacer = strings.NewReplacer("_", ".")
|
||
|
|
||
|
func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*WavefrontSerializer, error) {
|
||
|
s := &WavefrontSerializer{
|
||
|
Prefix: prefix,
|
||
|
UseStrict: useStrict,
|
||
|
SourceOverride: sourceOverride,
|
||
|
}
|
||
|
return s, nil
|
||
|
}
|
||
|
|
||
|
// Serialize : Serialize based on Wavefront format
|
||
|
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
||
|
out := []byte{}
|
||
|
metricSeparator := "."
|
||
|
|
||
|
for fieldName, value := range m.Fields() {
|
||
|
var name string
|
||
|
|
||
|
if fieldName == "value" {
|
||
|
name = fmt.Sprintf("%s%s", s.Prefix, m.Name())
|
||
|
} else {
|
||
|
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName)
|
||
|
}
|
||
|
|
||
|
if s.UseStrict {
|
||
|
name = strictSanitizedChars.Replace(name)
|
||
|
} else {
|
||
|
name = sanitizedChars.Replace(name)
|
||
|
}
|
||
|
|
||
|
name = pathReplacer.Replace(name)
|
||
|
|
||
|
metric := &wavefront.MetricPoint{
|
||
|
Metric: name,
|
||
|
Timestamp: m.Time().Unix(),
|
||
|
}
|
||
|
|
||
|
metricValue, buildError := buildValue(value, metric.Metric)
|
||
|
if buildError != nil {
|
||
|
// bad value continue to next metric
|
||
|
continue
|
||
|
}
|
||
|
metric.Value = metricValue
|
||
|
|
||
|
source, tags := buildTags(m.Tags(), s)
|
||
|
metric.Source = source
|
||
|
metric.Tags = tags
|
||
|
|
||
|
out = append(out, formatMetricPoint(metric, s)...)
|
||
|
}
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||
|
var batch bytes.Buffer
|
||
|
for _, m := range metrics {
|
||
|
buf, err := s.Serialize(m)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
_, err = batch.Write(buf)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return batch.Bytes(), nil
|
||
|
}
|
||
|
|
||
|
func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {
|
||
|
|
||
|
// Remove all empty tags.
|
||
|
for k, v := range mTags {
|
||
|
if v == "" {
|
||
|
delete(mTags, k)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var source string
|
||
|
|
||
|
if src, ok := mTags["source"]; ok {
|
||
|
source = src
|
||
|
delete(mTags, "source")
|
||
|
} else {
|
||
|
sourceTagFound := false
|
||
|
for _, src := range s.SourceOverride {
|
||
|
for k, v := range mTags {
|
||
|
if k == src {
|
||
|
source = v
|
||
|
mTags["telegraf_host"] = mTags["host"]
|
||
|
sourceTagFound = true
|
||
|
delete(mTags, k)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if sourceTagFound {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if !sourceTagFound {
|
||
|
source = mTags["host"]
|
||
|
}
|
||
|
}
|
||
|
|
||
|
delete(mTags, "host")
|
||
|
|
||
|
return tagValueReplacer.Replace(source), mTags
|
||
|
}
|
||
|
|
||
|
func buildValue(v interface{}, name string) (float64, error) {
|
||
|
switch p := v.(type) {
|
||
|
case bool:
|
||
|
if p {
|
||
|
return 1, nil
|
||
|
} else {
|
||
|
return 0, nil
|
||
|
}
|
||
|
case int64:
|
||
|
return float64(v.(int64)), nil
|
||
|
case uint64:
|
||
|
return float64(v.(uint64)), nil
|
||
|
case float64:
|
||
|
return v.(float64), nil
|
||
|
case string:
|
||
|
// return an error but don't log
|
||
|
return 0, fmt.Errorf("string type not supported")
|
||
|
default:
|
||
|
// return an error and log a debug message
|
||
|
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name)
|
||
|
log.Printf("D! Serializer [wavefront] %s\n", err.Error())
|
||
|
return 0, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
|
||
|
var buffer bytes.Buffer
|
||
|
buffer.WriteString("\"")
|
||
|
buffer.WriteString(metricPoint.Metric)
|
||
|
buffer.WriteString("\" ")
|
||
|
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64))
|
||
|
buffer.WriteString(" ")
|
||
|
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10))
|
||
|
buffer.WriteString(" source=\"")
|
||
|
buffer.WriteString(metricPoint.Source)
|
||
|
buffer.WriteString("\"")
|
||
|
|
||
|
for k, v := range metricPoint.Tags {
|
||
|
buffer.WriteString(" \"")
|
||
|
if s.UseStrict {
|
||
|
buffer.WriteString(strictSanitizedChars.Replace(k))
|
||
|
} else {
|
||
|
buffer.WriteString(sanitizedChars.Replace(k))
|
||
|
}
|
||
|
buffer.WriteString("\"=\"")
|
||
|
buffer.WriteString(tagValueReplacer.Replace(v))
|
||
|
buffer.WriteString("\"")
|
||
|
}
|
||
|
|
||
|
buffer.WriteString("\n")
|
||
|
|
||
|
return buffer.Bytes()
|
||
|
}
|