From 267a9f182bc1a4cf95dd5e8e3873514e7776a4c6 Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Fri, 5 Apr 2019 17:46:12 -0400 Subject: [PATCH] Add wavefront serializer plugin (#5670) --- docs/DATA_FORMATS_OUTPUT.md | 1 + internal/config/config.go | 26 ++ plugins/serializers/registry.go | 14 + plugins/serializers/wavefront/README.md | 47 +++ plugins/serializers/wavefront/wavefront.go | 202 ++++++++++++ .../serializers/wavefront/wavefront_test.go | 295 ++++++++++++++++++ 6 files changed, 585 insertions(+) create mode 100644 plugins/serializers/wavefront/README.md create mode 100755 plugins/serializers/wavefront/wavefront.go create mode 100755 plugins/serializers/wavefront/wavefront_test.go diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index 3ee16524d..f3ac028b9 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -9,6 +9,7 @@ plugins. 1. [Graphite](/plugins/serializers/graphite) 1. [SplunkMetric](/plugins/serializers/splunkmetric) 1. [Carbon2](/plugins/serializers/carbon2) +1. [Wavefront](/plugins/serializers/wavefront) You will be able to identify the plugins with support by the presence of a `data_format` config option, for example, in the `file` output plugin: diff --git a/internal/config/config.go b/internal/config/config.go index 939cb4c75..a0fc45a3c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1810,6 +1810,30 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["wavefront_source_override"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.WavefrontSourceOverride = append(c.WavefrontSourceOverride, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["wavefront_use_strict"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if b, ok := kv.Value.(*ast.Boolean); ok { + var err error + c.WavefrontUseStrict, err = b.Boolean() + if err != nil { + return nil, err + } + } + } + } + delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_uint_support") @@ -1819,6 +1843,8 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "template") delete(tbl.Fields, "json_timestamp_units") delete(tbl.Fields, "splunkmetric_hec_routing") + delete(tbl.Fields, "wavefront_source_override") + delete(tbl.Fields, "wavefront_use_strict") return serializers.NewSerializer(c) } diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index cbc5981a6..ecac63323 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/nowmetric" "github.com/influxdata/telegraf/plugins/serializers/splunkmetric" + "github.com/influxdata/telegraf/plugins/serializers/wavefront" ) // SerializerOutput is an interface for output plugins that are able to @@ -66,6 +67,13 @@ type Config struct { // Include HEC routing fields for splunkmetric output HecRouting bool + + // Point tags to use as the source name for Wavefront (if none found, host will be used). + WavefrontSourceOverride []string + + // Use Strict rules to sanitize metric and tag names from invalid characters for Wavefront + // When enabled forward slash (/) and comma (,) will be accepted + WavefrontUseStrict bool } // NewSerializer a Serializer interface based on the given config. @@ -85,12 +93,18 @@ func NewSerializer(config *Config) (Serializer, error) { serializer, err = NewNowSerializer() case "carbon2": serializer, err = NewCarbon2Serializer() + case "wavefront": + serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } return serializer, err } +func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string) (Serializer, error) { + return wavefront.NewSerializer(prefix, useStrict, sourceOverride) +} + func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { return json.NewSerializer(timestampUnits) } diff --git a/plugins/serializers/wavefront/README.md b/plugins/serializers/wavefront/README.md new file mode 100644 index 000000000..2b3be1f78 --- /dev/null +++ b/plugins/serializers/wavefront/README.md @@ -0,0 +1,47 @@ +# Example + +The `wavefront` serializer translates the Telegraf metric format to the [Wavefront Data Format](https://docs.wavefront.com/wavefront_data_format.html). + +### Configuration + +```toml +[[outputs.file]] + files = ["stdout"] + + ## Use Strict rules to sanitize metric and tag names from invalid characters + ## When enabled forward slash (/) and comma (,) will be accpeted + # use_strict = false + + ## point tags to use as the source name for Wavefront (if none found, host will be used) + # source_override = ["hostname", "address", "agent_host", "node_host"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "wavefront" +``` + +### Metrics + +A Wavefront metric is equivalent to a single field value of a Telegraf measurement. +The Wavefront metric name will be: `.` +If a prefix is specified it will be honored. +Only boolean and numeric metrics will be serialized, all other types will generate +an error. + +### Example + +The following Telegraf metric + +``` +cpu,cpu=cpu0,host=testHost user=12,idle=88,system=0 1234567890 +``` + +will serialize into the following Wavefront metrics + +``` +"cpu.user" 12.000000 1234567890 source="testHost" "cpu"="cpu0" +"cpu.idle" 88.000000 1234567890 source="testHost" "cpu"="cpu0" +"cpu.system" 0.000000 1234567890 source="testHost" "cpu"="cpu0" +``` diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go new file mode 100755 index 000000000..70b87512f --- /dev/null +++ b/plugins/serializers/wavefront/wavefront.go @@ -0,0 +1,202 @@ +package wavefront + +import ( + "bytes" + "fmt" + "log" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs/wavefront" +) + +// WavefrontSerializer : WavefrontSerializer struct +type WavefrontSerializer struct { + Prefix string + UseStrict bool + SourceOverride []string +} + +// catch many of the invalid chars that could appear in a metric or tag name +var sanitizedChars = strings.NewReplacer( + "!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-", + "*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-", + "[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-", + ">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-", + "=", "-", +) + +// catch many of the invalid chars that could appear in a metric or tag name +var strictSanitizedChars = strings.NewReplacer( + "!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-", + "*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-", + "[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-", + ">", "-", "?", "-", "\\", "-", "|", "-", " ", "-", "=", "-", +) + +var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") + +var pathReplacer = strings.NewReplacer("_", ".") + +func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*WavefrontSerializer, error) { + s := &WavefrontSerializer{ + Prefix: prefix, + UseStrict: useStrict, + SourceOverride: sourceOverride, + } + return s, nil +} + +// Serialize : Serialize based on Wavefront format +func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { + out := []byte{} + metricSeparator := "." + + for fieldName, value := range m.Fields() { + var name string + + if fieldName == "value" { + name = fmt.Sprintf("%s%s", s.Prefix, m.Name()) + } else { + name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName) + } + + if s.UseStrict { + name = strictSanitizedChars.Replace(name) + } else { + name = sanitizedChars.Replace(name) + } + + name = pathReplacer.Replace(name) + + metric := &wavefront.MetricPoint{ + Metric: name, + Timestamp: m.Time().Unix(), + } + + metricValue, buildError := buildValue(value, metric.Metric) + if buildError != nil { + // bad value continue to next metric + continue + } + metric.Value = metricValue + + source, tags := buildTags(m.Tags(), s) + metric.Source = source + metric.Tags = tags + + out = append(out, formatMetricPoint(metric, s)...) + } + return out, nil +} + +func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + var batch bytes.Buffer + for _, m := range metrics { + buf, err := s.Serialize(m) + if err != nil { + return nil, err + } + _, err = batch.Write(buf) + if err != nil { + return nil, err + } + } + return batch.Bytes(), nil +} + +func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { + + // Remove all empty tags. + for k, v := range mTags { + if v == "" { + delete(mTags, k) + } + } + + var source string + + if src, ok := mTags["source"]; ok { + source = src + delete(mTags, "source") + } else { + sourceTagFound := false + for _, src := range s.SourceOverride { + for k, v := range mTags { + if k == src { + source = v + mTags["telegraf_host"] = mTags["host"] + sourceTagFound = true + delete(mTags, k) + break + } + } + if sourceTagFound { + break + } + } + + if !sourceTagFound { + source = mTags["host"] + } + } + + delete(mTags, "host") + + return tagValueReplacer.Replace(source), mTags +} + +func buildValue(v interface{}, name string) (float64, error) { + switch p := v.(type) { + case bool: + if p { + return 1, nil + } else { + return 0, nil + } + case int64: + return float64(v.(int64)), nil + case uint64: + return float64(v.(uint64)), nil + case float64: + return v.(float64), nil + case string: + // return an error but don't log + return 0, fmt.Errorf("string type not supported") + default: + // return an error and log a debug message + err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name) + log.Printf("D! Serializer [wavefront] %s\n", err.Error()) + return 0, err + } +} + +func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte { + var buffer bytes.Buffer + buffer.WriteString("\"") + buffer.WriteString(metricPoint.Metric) + buffer.WriteString("\" ") + buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) + buffer.WriteString(" ") + buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) + buffer.WriteString(" source=\"") + buffer.WriteString(metricPoint.Source) + buffer.WriteString("\"") + + for k, v := range metricPoint.Tags { + buffer.WriteString(" \"") + if s.UseStrict { + buffer.WriteString(strictSanitizedChars.Replace(k)) + } else { + buffer.WriteString(sanitizedChars.Replace(k)) + } + buffer.WriteString("\"=\"") + buffer.WriteString(tagValueReplacer.Replace(v)) + buffer.WriteString("\"") + } + + buffer.WriteString("\n") + + return buffer.Bytes() +} diff --git a/plugins/serializers/wavefront/wavefront_test.go b/plugins/serializers/wavefront/wavefront_test.go new file mode 100755 index 000000000..3230ce515 --- /dev/null +++ b/plugins/serializers/wavefront/wavefront_test.go @@ -0,0 +1,295 @@ +package wavefront + +import ( + "fmt" + "reflect" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/outputs/wavefront" + "github.com/stretchr/testify/assert" +) + +func TestBuildTags(t *testing.T) { + var tagTests = []struct { + ptIn map[string]string + outTags map[string]string + outSource string + }{ + { + map[string]string{"one": "two", "three": "four", "host": "testHost"}, + map[string]string{"one": "two", "three": "four"}, + "testHost", + }, + { + map[string]string{"aaa": "bbb", "host": "testHost"}, + map[string]string{"aaa": "bbb"}, + "testHost", + }, + { + map[string]string{"bbb": "789", "aaa": "123", "host": "testHost"}, + map[string]string{"aaa": "123", "bbb": "789"}, + "testHost", + }, + { + map[string]string{"host": "aaa", "dc": "bbb"}, + map[string]string{"dc": "bbb"}, + "aaa", + }, + { + map[string]string{"instanceid": "i-0123456789", "host": "aaa", "dc": "bbb"}, + map[string]string{"dc": "bbb", "telegraf_host": "aaa"}, + "i-0123456789", + }, + { + map[string]string{"instance-id": "i-0123456789", "host": "aaa", "dc": "bbb"}, + map[string]string{"dc": "bbb", "telegraf_host": "aaa"}, + "i-0123456789", + }, + { + map[string]string{"instanceid": "i-0123456789", "host": "aaa", "hostname": "ccc", "dc": "bbb"}, + map[string]string{"dc": "bbb", "hostname": "ccc", "telegraf_host": "aaa"}, + "i-0123456789", + }, + { + map[string]string{"instanceid": "i-0123456789", "host": "aaa", "snmp_host": "ccc", "dc": "bbb"}, + map[string]string{"dc": "bbb", "snmp_host": "ccc", "telegraf_host": "aaa"}, + "i-0123456789", + }, + { + map[string]string{"host": "aaa", "snmp_host": "ccc", "dc": "bbb"}, + map[string]string{"dc": "bbb", "telegraf_host": "aaa"}, + "ccc", + }, + } + s := WavefrontSerializer{SourceOverride: []string{"instanceid", "instance-id", "hostname", "snmp_host", "node_host"}} + + for _, tt := range tagTests { + source, tags := buildTags(tt.ptIn, &s) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + if source != tt.outSource { + t.Errorf("\nexpected\t%s\nreceived\t%s\n", tt.outSource, source) + } + } +} + +func TestBuildTagsHostTag(t *testing.T) { + var tagTests = []struct { + ptIn map[string]string + outTags map[string]string + outSource string + }{ + { + map[string]string{"one": "two", "host": "testHost", "snmp_host": "snmpHost"}, + map[string]string{"telegraf_host": "testHost", "one": "two"}, + "snmpHost", + }, + } + s := WavefrontSerializer{SourceOverride: []string{"snmp_host"}} + + for _, tt := range tagTests { + source, tags := buildTags(tt.ptIn, &s) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + if source != tt.outSource { + t.Errorf("\nexpected\t%s\nreceived\t%s\n", tt.outSource, source) + } + } +} + +func TestFormatMetricPoint(t *testing.T) { + var pointTests = []struct { + ptIn *wavefront.MetricPoint + out string + }{ + { + &wavefront.MetricPoint{ + Metric: "cpu.idle", + Value: 1, + Timestamp: 1554172967, + Source: "testHost", + Tags: map[string]string{"aaa": "bbb"}, + }, + "\"cpu.idle\" 1.000000 1554172967 source=\"testHost\" \"aaa\"=\"bbb\"\n", + }, + { + &wavefront.MetricPoint{ + Metric: "cpu.idle", + Value: 1, + Timestamp: 1554172967, + Source: "testHost", + Tags: map[string]string{"sp&c!al/chars,": "get*replaced"}, + }, + "\"cpu.idle\" 1.000000 1554172967 source=\"testHost\" \"sp-c-al-chars-\"=\"get-replaced\"\n", + }, + } + + s := WavefrontSerializer{} + + for _, pt := range pointTests { + bout := formatMetricPoint(pt.ptIn, &s) + sout := string(bout[:]) + if sout != pt.out { + t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) + } + } +} + +func TestUseStrict(t *testing.T) { + var pointTests = []struct { + ptIn *wavefront.MetricPoint + out string + }{ + { + &wavefront.MetricPoint{ + Metric: "cpu.idle", + Value: 1, + Timestamp: 1554172967, + Source: "testHost", + Tags: map[string]string{"sp&c!al/chars,": "get*replaced"}, + }, + "\"cpu.idle\" 1.000000 1554172967 source=\"testHost\" \"sp-c-al/chars,\"=\"get-replaced\"\n", + }, + } + + s := WavefrontSerializer{UseStrict: true} + + for _, pt := range pointTests { + bout := formatMetricPoint(pt.ptIn, &s) + sout := string(bout[:]) + if sout != pt.out { + t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) + } + } +} + +func TestSerializeMetricFloat(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"cpu.usage.idle\" 91.500000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricInt(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "usage_idle": int64(91), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricBoolTrue(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "usage_idle": true, + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"cpu.usage.idle\" 1.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricBoolFalse(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "usage_idle": false, + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"cpu.usage.idle\" 0.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricFieldValue(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "value": int64(91), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"cpu\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +} + +func TestSerializeMetricPrefix(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "host": "realHost", + } + fields := map[string]interface{}{ + "usage_idle": int64(91), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := WavefrontSerializer{Prefix: "telegraf."} + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)} + assert.Equal(t, expS, mS) +}