From 8355f941f9ee16b1667e0ea6268911e0925fa1b8 Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Fri, 29 Sep 2017 19:13:08 -0400 Subject: [PATCH] Add Wavefront output plugin (#3160) --- plugins/outputs/all/all.go | 1 + plugins/outputs/wavefront/README.md | 81 ++++++ plugins/outputs/wavefront/wavefront.go | 288 ++++++++++++++++++ plugins/outputs/wavefront/wavefront_test.go | 307 ++++++++++++++++++++ 4 files changed, 677 insertions(+) create mode 100644 plugins/outputs/wavefront/README.md create mode 100644 plugins/outputs/wavefront/wavefront.go create mode 100644 plugins/outputs/wavefront/wavefront_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 089a56909..53d1cb8ca 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -23,4 +23,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/riemann" _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" + _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" ) diff --git a/plugins/outputs/wavefront/README.md b/plugins/outputs/wavefront/README.md new file mode 100644 index 000000000..6b673b12a --- /dev/null +++ b/plugins/outputs/wavefront/README.md @@ -0,0 +1,81 @@ +# Wavefront Output Plugin + +This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefront data format over TCP. + + +### Configuration: + +```toml +# Configuration for Wavefront output +[[outputs.wavefront]] + ## DNS name of the wavefront proxy server + host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on + port = 2878 + + ## prefix for metrics keys + #prefix = "my.specific.prefix." + + ## wether to use "value" for name of simple fields. default is false + #simple_fields = false + + ## character to use between metric and field name. default is . (dot) + #metric_separator = "." + + ## Convert metric name paths to use metricSeperator character + ## When true will convert all _ (underscore) chartacters in final metric name. default is true + #convert_paths = true + + ## Use Regex to sanitize metric and tag names from invalid characters + ## Regex is more thorough, but significantly slower. default is false + #use_regex = false + + ## point tags to use as the source name for Wavefront (if none found, host will be used) + #source_override = ["hostname", "snmp_host", "node_host"] + + ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true + #convert_bool = true + + ## Define a mapping, namespaced by metric prefix, from string values to numeric values + ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for + ## any metrics beginning with "elasticsearch" + #[[outputs.wavefront.string_to_number.elasticsearch]] + # green = 1.0 + # yellow = 0.5 + # red = 0.0 +``` + + +### Convert Path & Metric Separator +If the `convert_path` option is true any `_` in metric and field names will be converted to the `metric_separator` value. +By default, to ease metrics browsing in the Wavefront UI, the `convert_path` option is true, and `metric_separator` is `.` (dot). +Default integrations within Wavefront expect these values to be set to their defaults, however if converting from another platform +it may be desirable to change these defaults. + + +### Use Regex +Most illegal characters in the metric name are automatically converted to `-`. +The `use_regex` setting can be used to ensure all illegal characters are properly handled, but can lead to performance degradation. + + +### Source Override +Often when collecting metrics from another system, you want to use the target system as the source, not the one running Telegraf. +Many Telegraf plugins will identify the target source with a tag. The tag name can vary for different plugins. The `source_override` +option will use the value specified in any of the listed tags if found. The tag names are checked in the same order as listed, +and if found, the other tags will not be checked. If no tags specified are found, the default host tag will be used to identify the +source of the metric. + + +### Wavefront Data format +The expected input for Wavefront is specified in the following way: +``` + [] = [tagk1=tagv1 ...tagkN=tagvN] +``` +More information about the Wavefront data format is available [here](https://community.wavefront.com/docs/DOC-1031) + + +### Allowed values for metrics +Wavefront allows `integers` and `floats` as input values. It will ignore most `strings`, but when configured +will map certain `strings` to numeric values. By default it also maps `bool` values to numeric, false -> 0.0, +true -> 1.0 \ No newline at end of file diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go new file mode 100644 index 000000000..773e970bc --- /dev/null +++ b/plugins/outputs/wavefront/wavefront.go @@ -0,0 +1,288 @@ +package wavefront + +import ( + "bytes" + "fmt" + "log" + "net" + "regexp" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type Wavefront struct { + Prefix string + Host string + Port int + SimpleFields bool + MetricSeparator string + ConvertPaths bool + ConvertBool bool + UseRegex bool + SourceOverride []string + StringToNumber map[string][]map[string]float64 +} + +// catch many of the invalid chars that could appear in a metric or tag name +var sanitizedChars = strings.NewReplacer( + "!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-", + "*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-", + "[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-", + ">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-", + "=", "-", +) + +// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer +var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") + +var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") + +var pathReplacer = strings.NewReplacer("_", "_") + +var sampleConfig = ` + ## DNS name of the wavefront proxy server + host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on + port = 2878 + + ## prefix for metrics keys + #prefix = "my.specific.prefix." + + ## whether to use "value" for name of simple fields + #simple_fields = false + + ## character to use between metric and field name. defaults to . (dot) + #metric_separator = "." + + ## Convert metric name paths to use metricSeperator character + ## When true (default) will convert all _ (underscore) chartacters in final metric name + #convert_paths = true + + ## Use Regex to sanitize metric and tag names from invalid characters + ## Regex is more thorough, but significantly slower + #use_regex = false + + ## point tags to use as the source name for Wavefront (if none found, host will be used) + #source_override = ["hostname", "snmp_host", "node_host"] + + ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default true + #convert_bool = true + + ## Define a mapping, namespaced by metric prefix, from string values to numeric values + ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for + ## any metrics beginning with "elasticsearch" + #[[outputs.wavefront.string_to_number.elasticsearch]] + # green = 1.0 + # yellow = 0.5 + # red = 0.0 +` + +type MetricPoint struct { + Metric string + Value float64 + Timestamp int64 + Source string + Tags map[string]string +} + +func (w *Wavefront) Connect() error { + if w.ConvertPaths && w.MetricSeparator == "_" { + w.ConvertPaths = false + } + if w.ConvertPaths { + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + } + + // Test Connection to Wavefront proxy Server + uri := fmt.Sprintf("%s:%d", w.Host, w.Port) + _, err := net.ResolveTCPAddr("tcp", uri) + if err != nil { + return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error()) + } + connection, err := net.Dial("tcp", uri) + if err != nil { + return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) + } + defer connection.Close() + return nil +} + +func (w *Wavefront) Write(metrics []telegraf.Metric) error { + + // Send Data to Wavefront proxy Server + uri := fmt.Sprintf("%s:%d", w.Host, w.Port) + connection, err := net.Dial("tcp", uri) + if err != nil { + return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) + } + defer connection.Close() + + for _, m := range metrics { + for _, metricPoint := range buildMetrics(m, w) { + metricLine := formatMetricPoint(metricPoint, w) + //log.Printf("D! Output [wavefront] %s", metricLine) + _, err := connection.Write([]byte(metricLine)) + if err != nil { + return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) + } + } + } + + return nil +} + +func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { + ret := []*MetricPoint{} + + for fieldName, value := range m.Fields() { + var name string + if !w.SimpleFields && fieldName == "value" { + name = fmt.Sprintf("%s%s", w.Prefix, m.Name()) + } else { + name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName) + } + + if w.UseRegex { + name = sanitizedRegex.ReplaceAllLiteralString(name, "-") + } else { + name = sanitizedChars.Replace(name) + } + + if w.ConvertPaths { + name = pathReplacer.Replace(name) + } + + metric := &MetricPoint{ + Metric: name, + Timestamp: m.UnixNano() / 1000000000, + } + + metricValue, buildError := buildValue(value, metric.Metric, w) + if buildError != nil { + log.Printf("D! Output [wavefront] %s\n", buildError.Error()) + continue + } + metric.Value = metricValue + + source, tags := buildTags(m.Tags(), w) + metric.Source = source + metric.Tags = tags + + ret = append(ret, metric) + } + return ret +} + +func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) { + var source string + sourceTagFound := false + + for _, s := range w.SourceOverride { + for k, v := range mTags { + if k == s { + source = v + mTags["telegraf_host"] = mTags["host"] + sourceTagFound = true + delete(mTags, k) + break + } + } + if sourceTagFound { + break + } + } + + if !sourceTagFound { + source = mTags["host"] + } + delete(mTags, "host") + + return tagValueReplacer.Replace(source), mTags +} + +func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { + switch p := v.(type) { + case bool: + if w.ConvertBool { + if p { + return 1, nil + } else { + return 0, nil + } + } + case int64: + return float64(v.(int64)), nil + case uint64: + return float64(v.(uint64)), nil + case float64: + return v.(float64), nil + case string: + for prefix, mappings := range w.StringToNumber { + if strings.HasPrefix(name, prefix) { + for _, mapping := range mappings { + val, hasVal := mapping[string(p)] + if hasVal { + return val, nil + } + } + } + } + return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) + default: + return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) + } + + return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) +} + +func formatMetricPoint(metricPoint *MetricPoint, w *Wavefront) string { + buffer := bytes.NewBufferString("") + buffer.WriteString(metricPoint.Metric) + buffer.WriteString(" ") + buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) + buffer.WriteString(" ") + buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) + buffer.WriteString(" source=\"") + buffer.WriteString(metricPoint.Source) + buffer.WriteString("\"") + + for k, v := range metricPoint.Tags { + buffer.WriteString(" ") + if w.UseRegex { + buffer.WriteString(sanitizedRegex.ReplaceAllLiteralString(k, "-")) + } else { + buffer.WriteString(sanitizedChars.Replace(k)) + } + buffer.WriteString("=\"") + buffer.WriteString(tagValueReplacer.Replace(v)) + buffer.WriteString("\"") + } + + return buffer.String() +} + +func (w *Wavefront) SampleConfig() string { + return sampleConfig +} + +func (w *Wavefront) Description() string { + return "Configuration for Wavefront server to send metrics to" +} + +func (w *Wavefront) Close() error { + return nil +} + +func init() { + outputs.Add("wavefront", func() telegraf.Output { + return &Wavefront{ + MetricSeparator: ".", + ConvertPaths: true, + ConvertBool: true, + } + }) +} diff --git a/plugins/outputs/wavefront/wavefront_test.go b/plugins/outputs/wavefront/wavefront_test.go new file mode 100644 index 000000000..1dd4d7078 --- /dev/null +++ b/plugins/outputs/wavefront/wavefront_test.go @@ -0,0 +1,307 @@ +package wavefront + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "reflect" + "strings" + "testing" + "time" +) + +// default config used by Tests +func defaultWavefront() *Wavefront { + return &Wavefront{ + Host: "localhost", + Port: 2878, + Prefix: "testWF.", + SimpleFields: false, + MetricSeparator: ".", + ConvertPaths: true, + ConvertBool: true, + UseRegex: false, + } +} + +func TestBuildMetrics(t *testing.T) { + w := defaultWavefront() + w.Prefix = "testthis." + + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + + testMetric1, _ := metric.New( + "test.simple.metric", + map[string]string{"tag1": "value1", "host": "testHost"}, + map[string]interface{}{"value": 123}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + var timestamp int64 = 1257894000 + + var metricTests = []struct { + metric telegraf.Metric + metricPoints []MetricPoint + }{ + { + testutil.TestMetric(float64(1), "testing_just*a%metric:float", "metric2"), + []MetricPoint{ + {Metric: w.Prefix + "testing.just-a-metric-float", Value: 1, Timestamp: timestamp, Tags: map[string]string{"tag1": "value1"}}, + {Metric: w.Prefix + "testing.metric2", Value: 1, Timestamp: timestamp, Tags: map[string]string{"tag1": "value1"}}, + }, + }, + { + testMetric1, + []MetricPoint{{Metric: w.Prefix + "test.simple.metric", Value: 123, Timestamp: timestamp, Source: "testHost", Tags: map[string]string{"tag1": "value1"}}}, + }, + } + + for _, mt := range metricTests { + ml := buildMetrics(mt.metric, w) + for i, line := range ml { + if mt.metricPoints[i].Metric != line.Metric || mt.metricPoints[i].Value != line.Value { + t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricPoints[i].Metric, mt.metricPoints[i].Value, line.Metric, line.Value) + } + } + } + +} + +func TestBuildMetricsWithSimpleFields(t *testing.T) { + w := defaultWavefront() + w.Prefix = "testthis." + w.SimpleFields = true + + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + + testMetric1, _ := metric.New( + "test.simple.metric", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value": 123}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + var metricTests = []struct { + metric telegraf.Metric + metricLines []MetricPoint + }{ + { + testutil.TestMetric(float64(1), "testing_just*a%metric:float"), + []MetricPoint{{Metric: w.Prefix + "testing.just-a-metric-float.value", Value: 1}}, + }, + { + testMetric1, + []MetricPoint{{Metric: w.Prefix + "test.simple.metric.value", Value: 123}}, + }, + } + + for _, mt := range metricTests { + ml := buildMetrics(mt.metric, w) + for i, line := range ml { + if mt.metricLines[i].Metric != line.Metric || mt.metricLines[i].Value != line.Value { + t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricLines[i].Metric, mt.metricLines[i].Value, line.Metric, line.Value) + } + } + } + +} + +func TestBuildTags(t *testing.T) { + + w := defaultWavefront() + + var tagtests = []struct { + ptIn map[string]string + outSource string + outTags map[string]string + }{ + { + map[string]string{}, + "", + map[string]string{}, + }, + { + map[string]string{"one": "two", "three": "four", "host": "testHost"}, + "testHost", + map[string]string{"one": "two", "three": "four"}, + }, + { + map[string]string{"aaa": "bbb", "host": "testHost"}, + "testHost", + map[string]string{"aaa": "bbb"}, + }, + { + map[string]string{"bbb": "789", "aaa": "123", "host": "testHost"}, + "testHost", + map[string]string{"aaa": "123", "bbb": "789"}, + }, + { + map[string]string{"host": "aaa", "dc": "bbb"}, + "aaa", + map[string]string{"dc": "bbb"}, + }, + } + + for _, tt := range tagtests { + source, tags := buildTags(tt.ptIn, w) + if source != tt.outSource { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source) + } + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + } +} + +func TestBuildTagsWithSource(t *testing.T) { + w := defaultWavefront() + w.SourceOverride = []string{"snmp_host", "hostagent"} + + var tagtests = []struct { + ptIn map[string]string + outSource string + outTags map[string]string + }{ + { + map[string]string{"host": "realHost"}, + "realHost", + map[string]string{}, + }, + { + map[string]string{"tag1": "value1", "host": "realHost"}, + "realHost", + map[string]string{"tag1": "value1"}, + }, + { + map[string]string{"snmp_host": "realHost", "host": "origHost"}, + "realHost", + map[string]string{"telegraf_host": "origHost"}, + }, + { + map[string]string{"hostagent": "realHost", "host": "origHost"}, + "realHost", + map[string]string{"telegraf_host": "origHost"}, + }, + { + map[string]string{"hostagent": "abc", "snmp_host": "realHost", "host": "origHost"}, + "realHost", + map[string]string{"hostagent": "abc", "telegraf_host": "origHost"}, + }, + { + map[string]string{"something": "abc", "host": "r*@l\"Ho/st"}, + "r-@l\\\"Ho/st", + map[string]string{"something": "abc"}, + }, + } + + for _, tt := range tagtests { + source, tags := buildTags(tt.ptIn, w) + if source != tt.outSource { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source) + } + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + } +} + +func TestBuildValue(t *testing.T) { + w := defaultWavefront() + + var valuetests = []struct { + value interface{} + name string + out float64 + isErr bool + }{ + {value: int64(123), out: 123}, + {value: uint64(456), out: 456}, + {value: float64(789), out: 789}, + {value: true, out: 1}, + {value: false, out: 0}, + {value: "bad", out: 0, isErr: true}, + } + + for _, vt := range valuetests { + value, err := buildValue(vt.value, vt.name, w) + if vt.isErr && err == nil { + t.Errorf("\nexpected error with\t%+v\nreceived\t%+v\n", vt.out, value) + } else if value != vt.out { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", vt.out, value) + } + } + +} + +func TestBuildValueString(t *testing.T) { + w := defaultWavefront() + w.StringToNumber = map[string][]map[string]float64{ + "test1": {{"green": 1, "red": 10}}, + "test2": {{"active": 1, "hidden": 2}}, + } + + var valuetests = []struct { + value interface{} + name string + out float64 + isErr bool + }{ + {value: int64(123), name: "", out: 123}, + {value: "green", name: "test1", out: 1}, + {value: "red", name: "test1", out: 10}, + {value: "hidden", name: "test2", out: 2}, + {value: "bad", name: "test1", out: 0, isErr: true}, + } + + for _, vt := range valuetests { + value, err := buildValue(vt.value, vt.name, w) + if vt.isErr && err == nil { + t.Errorf("\nexpected error with\t%+v\nreceived\t%+v\n", vt.out, value) + } else if value != vt.out { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", vt.out, value) + } + } + +} + +func TestFormatMetricPoint(t *testing.T) { + w := defaultWavefront() + + testpoint := &MetricPoint{ + Metric: "test.metric.something", + Value: 123.456, + Timestamp: 1257894000, + Source: "testSource", + Tags: map[string]string{"sp*c!@l\"-ch/rs": "sp*c!@l/ val\"ue"}, + } + + expected := "test.metric.something 123.456000 1257894000 source=\"testSource\" sp-c--l--ch-rs=\"sp-c!@l/ val\\\"ue\"" + + received := formatMetricPoint(testpoint, w) + + if expected != received { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", expected, received) + + } +} + +// Benchmarks to test performance of string replacement via Regex and Replacer +var testString = "this_is*my!test/string\\for=replacement" + +func BenchmarkReplaceAllString(b *testing.B) { + for n := 0; n < b.N; n++ { + sanitizedRegex.ReplaceAllString(testString, "-") + } +} + +func BenchmarkReplaceAllLiteralString(b *testing.B) { + for n := 0; n < b.N; n++ { + sanitizedRegex.ReplaceAllLiteralString(testString, "-") + } +} + +func BenchmarkReplacer(b *testing.B) { + for n := 0; n < b.N; n++ { + sanitizedChars.Replace(testString) + } +}