From 9cc06702da192675e7905c83ff5d3f6a19dcc147 Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Fri, 21 Dec 2018 14:26:07 -0500 Subject: [PATCH] Use wavefront sdk in wavefront output (#5161) --- Gopkg.lock | 12 ++ Gopkg.toml | 4 + plugins/outputs/wavefront/README.md | 40 +++-- plugins/outputs/wavefront/wavefront.go | 154 ++++++++++---------- plugins/outputs/wavefront/wavefront_test.go | 28 +--- 5 files changed, 118 insertions(+), 120 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 27d81c89f..2d9883f04 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1093,6 +1093,17 @@ revision = "e3a01f9611c32b2362366434bcd671516e78955d" version = "v0.18.0" +[[projects]] + digest = "1:c1855527c165f0224708fbc7d76843b4b20bcb74b328f212f8d0e9c855d4c49d" + name = "github.com/wavefronthq/wavefront-sdk-go" + packages = [ + "internal", + "senders", + ] + pruneopts = "" + revision = "12511c8b82654d412b0334768d94dc080b617fd1" + version = "v0.9.0" + [[projects]] branch = "master" digest = "1:98ed05e9796df287b90c1d96854e3913c8e349dbc546412d3cabb472ecf4b417" @@ -1545,6 +1556,7 @@ "github.com/vmware/govmomi/vim25/mo", "github.com/vmware/govmomi/vim25/soap", "github.com/vmware/govmomi/vim25/types", + "github.com/wavefronthq/wavefront-sdk-go/senders", "github.com/wvanbergen/kafka/consumergroup", "golang.org/x/net/context", "golang.org/x/net/html/charset", diff --git a/Gopkg.toml b/Gopkg.toml index 3b5c1b917..3e430b4c3 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -246,6 +246,10 @@ name = "github.com/vishvananda/netlink" revision = "b2de5d10e38ecce8607e6b438b6d174f389a004e" +[[constraint]] + name = "github.com/wavefronthq/wavefront-sdk-go" + version = "v0.9.0" + [[constraint]] name = "github.com/karrick/godirwalk" version = "1.7.5" diff --git a/plugins/outputs/wavefront/README.md b/plugins/outputs/wavefront/README.md index be8fcd7dc..bc2156b13 100644 --- a/plugins/outputs/wavefront/README.md +++ b/plugins/outputs/wavefront/README.md @@ -6,25 +6,30 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro ### Configuration: ```toml -# Configuration for Wavefront output -[[outputs.wavefront]] - ## DNS name of the wavefront proxy server - host = "wavefront.example.com" + ## Url for Wavefront Direct Ingestion or using HTTP with Wavefront Proxy + ## If using Wavefront Proxy, also specify port. example: http://proxyserver:2878 + url = "https://metrics.wavefront.com" - ## Port that the Wavefront proxy server listens on - port = 2878 + ## Authentication Token for Wavefront. Only required if using Direct Ingestion + #token = "DUMMY_TOKEN" + + ## DNS name of the wavefront proxy server. Do not use if url is specified + #host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on. Do not use if url is specified + #port = 2878 ## prefix for metrics keys #prefix = "my.specific.prefix." - ## wether to use "value" for name of simple fields. default is false + ## whether to use "value" for name of simple fields. default is false #simple_fields = false - ## character to use between metric and field name. default is . (dot) + ## character to use between metric and field name. default is . (dot) #metric_separator = "." - ## Convert metric name paths to use metricSeperator character - ## When true will convert all _ (underscore) chartacters in final metric name. default is true + ## Convert metric name paths to use metricSeparator character + ## When true will convert all _ (underscore) characters in final metric name. default is true #convert_paths = true ## Use Regex to sanitize metric and tag names from invalid characters @@ -32,18 +37,10 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro #use_regex = false ## point tags to use as the source name for Wavefront (if none found, host will be used) - #source_override = ["hostname", "agent_host", "node_host"] + #source_override = ["hostname", "address", "agent_host", "node_host"] ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true #convert_bool = true - - ## Define a mapping, namespaced by metric prefix, from string values to numeric values - ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for - ## any metrics beginning with "elasticsearch" - #[[outputs.wavefront.string_to_number.elasticsearch]] - # green = 1.0 - # yellow = 0.5 - # red = 0.0 ``` @@ -76,6 +73,5 @@ More information about the Wavefront data format is available [here](https://com ### Allowed values for metrics -Wavefront allows `integers` and `floats` as input values. It will ignore most `strings`, but when configured -will map certain `strings` to numeric values. By default it also maps `bool` values to numeric, false -> 0.0, -true -> 1.0 \ No newline at end of file +Wavefront allows `integers` and `floats` as input values. By default it also maps `bool` values to numeric, false -> 0.0, +true -> 1.0. To map `strings` use the [enum](../../processors/enum) processor plugin. diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index ef36d1804..257c5512e 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -1,24 +1,22 @@ package wavefront import ( - "bytes" "fmt" "log" - "net" "regexp" - "strconv" "strings" - "time" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" + wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" ) type Wavefront struct { - Prefix string + Url string + Token string Host string Port int + Prefix string SimpleFields bool MetricSeparator string ConvertPaths bool @@ -26,6 +24,8 @@ type Wavefront struct { UseRegex bool SourceOverride []string StringToNumber map[string][]map[string]float64 + + sender wavefront.Sender } // catch many of the invalid chars that could appear in a metric or tag name @@ -40,43 +40,49 @@ var sanitizedChars = strings.NewReplacer( // instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") -var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") +var tagValueReplacer = strings.NewReplacer("*", "-") var pathReplacer = strings.NewReplacer("_", "_") var sampleConfig = ` - ## DNS name of the wavefront proxy server - host = "wavefront.example.com" + ## Url for Wavefront Direct Ingestion or using HTTP with Wavefront Proxy + ## If using Wavefront Proxy, also specify port. example: http://proxyserver:2878 + url = "https://metrics.wavefront.com" - ## Port that the Wavefront proxy server listens on - port = 2878 + ## Authentication Token for Wavefront. Only required if using Direct Ingestion + #token = "DUMMY_TOKEN" + + ## DNS name of the wavefront proxy server. Do not use if url is specified + #host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on. Do not use if url is specified + #port = 2878 ## prefix for metrics keys #prefix = "my.specific.prefix." - ## whether to use "value" for name of simple fields + ## whether to use "value" for name of simple fields. default is false #simple_fields = false - ## character to use between metric and field name. defaults to . (dot) + ## character to use between metric and field name. default is . (dot) #metric_separator = "." - ## Convert metric name paths to use metricSeperator character - ## When true (default) will convert all _ (underscore) chartacters in final metric name + ## Convert metric name paths to use metricSeparator character + ## When true will convert all _ (underscore) characters in final metric name. default is true #convert_paths = true ## Use Regex to sanitize metric and tag names from invalid characters - ## Regex is more thorough, but significantly slower + ## Regex is more thorough, but significantly slower. default is false #use_regex = false ## point tags to use as the source name for Wavefront (if none found, host will be used) - #source_override = ["hostname", "agent_host", "node_host"] + #source_override = ["hostname", "address", "agent_host", "node_host"] - ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default true + ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true #convert_bool = true ## Define a mapping, namespaced by metric prefix, from string values to numeric values - ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for - ## any metrics beginning with "elasticsearch" + ## deprecated in 1.9; use the enum processor plugin #[[outputs.wavefront.string_to_number.elasticsearch]] # green = 1.0 # yellow = 0.5 @@ -92,44 +98,51 @@ type MetricPoint struct { } func (w *Wavefront) Connect() error { + + if len(w.StringToNumber) > 0 { + log.Print("W! [outputs.wavefront] The string_to_number option is deprecated; please use the enum processor instead") + } + + if w.Url != "" { + log.Printf("D! [outputs.wavefront] connecting over http/https using Url: %s", w.Url) + sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ + Server: w.Url, + Token: w.Token, + FlushIntervalSeconds: 5, + }) + if err != nil { + return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url) + } + w.sender = sender + } else { + log.Printf("D! Output [wavefront] connecting over tcp using Host: %s and Port: %d", w.Host, w.Port) + sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ + Host: w.Host, + MetricsPort: w.Port, + FlushIntervalSeconds: 5, + }) + if err != nil { + return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port) + } + w.sender = sender + } + if w.ConvertPaths && w.MetricSeparator == "_" { w.ConvertPaths = false } if w.ConvertPaths { pathReplacer = strings.NewReplacer("_", w.MetricSeparator) } - - // Test Connection to Wavefront proxy Server - uri := fmt.Sprintf("%s:%d", w.Host, w.Port) - _, err := net.ResolveTCPAddr("tcp", uri) - if err != nil { - return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error()) - } - connection, err := net.Dial("tcp", uri) - if err != nil { - return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) - } - defer connection.Close() return nil } func (w *Wavefront) Write(metrics []telegraf.Metric) error { - // Send Data to Wavefront proxy Server - uri := fmt.Sprintf("%s:%d", w.Host, w.Port) - connection, err := net.Dial("tcp", uri) - if err != nil { - return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) - } - defer connection.Close() - connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) - for _, m := range metrics { - for _, metricPoint := range buildMetrics(m, w) { - metricLine := formatMetricPoint(metricPoint, w) - _, err := connection.Write([]byte(metricLine)) + for _, point := range buildMetrics(m, w) { + err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags) if err != nil { - return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) + return fmt.Errorf("Wavefront sending error: %s", err.Error()) } } } @@ -165,7 +178,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { metricValue, buildError := buildValue(value, metric.Metric, w) if buildError != nil { - log.Printf("D! Output [wavefront] %s\n", buildError.Error()) + log.Printf("D! [outputs.wavefront] %s\n", buildError.Error()) continue } metric.Value = metricValue @@ -188,8 +201,8 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string } } + // find source, use source_override property if needed var source string - if s, ok := mTags["source"]; ok { source = s delete(mTags, "source") @@ -214,10 +227,25 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string source = mTags["host"] } } + source = tagValueReplacer.Replace(source) + // remove default host tag delete(mTags, "host") - return tagValueReplacer.Replace(source), mTags + // sanitize tag keys and values + tags := make(map[string]string) + for k, v := range mTags { + var key string + if w.UseRegex { + key = sanitizedRegex.ReplaceAllLiteralString(k, "-") + } else { + key = sanitizedChars.Replace(k) + } + val := tagValueReplacer.Replace(v) + tags[key] = val + } + + return source, tags } func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { @@ -255,34 +283,6 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) } -func formatMetricPoint(metricPoint *MetricPoint, w *Wavefront) string { - buffer := bytes.NewBufferString("") - buffer.WriteString(metricPoint.Metric) - buffer.WriteString(" ") - buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) - buffer.WriteString(" ") - buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) - buffer.WriteString(" source=\"") - buffer.WriteString(metricPoint.Source) - buffer.WriteString("\"") - - for k, v := range metricPoint.Tags { - buffer.WriteString(" ") - if w.UseRegex { - buffer.WriteString(sanitizedRegex.ReplaceAllLiteralString(k, "-")) - } else { - buffer.WriteString(sanitizedChars.Replace(k)) - } - buffer.WriteString("=\"") - buffer.WriteString(tagValueReplacer.Replace(v)) - buffer.WriteString("\"") - } - - buffer.WriteString("\n") - - return buffer.String() -} - func (w *Wavefront) SampleConfig() string { return sampleConfig } @@ -292,12 +292,14 @@ func (w *Wavefront) Description() string { } func (w *Wavefront) Close() error { + w.sender.Close() return nil } func init() { outputs.Add("wavefront", func() telegraf.Output { return &Wavefront{ + Token: "DUMMY_TOKEN", MetricSeparator: ".", ConvertPaths: true, ConvertBool: true, diff --git a/plugins/outputs/wavefront/wavefront_test.go b/plugins/outputs/wavefront/wavefront_test.go index f1722e668..1fda6c7ae 100644 --- a/plugins/outputs/wavefront/wavefront_test.go +++ b/plugins/outputs/wavefront/wavefront_test.go @@ -140,6 +140,11 @@ func TestBuildTags(t *testing.T) { "aaa", map[string]string{"dc": "bbb"}, }, + { + map[string]string{"host": "aaa", "dc": "a*$a\\abbb\"som/et|hing else", "bad#k%e/y that*sho\\uld work": "value1"}, + "aaa", + map[string]string{"dc": "a-$a\\abbb\"som/et|hing else", "bad-k-e-y-that-sho-uld-work": "value1"}, + }, } for _, tt := range tagtests { @@ -189,7 +194,7 @@ func TestBuildTagsWithSource(t *testing.T) { }, { map[string]string{"something": "abc", "host": "r*@l\"Ho/st"}, - "r-@l\\\"Ho/st", + "r-@l\"Ho/st", map[string]string{"something": "abc"}, }, } @@ -264,27 +269,6 @@ func TestBuildValueString(t *testing.T) { } -func TestFormatMetricPoint(t *testing.T) { - w := defaultWavefront() - - testpoint := &MetricPoint{ - Metric: "test.metric.something", - Value: 123.456, - Timestamp: 1257894000, - Source: "testSource", - Tags: map[string]string{"sp*c!@l\"-ch/rs": "sp*c!@l/ val\"ue"}, - } - - expected := "test.metric.something 123.456000 1257894000 source=\"testSource\" sp-c--l--ch-rs=\"sp-c!@l/ val\\\"ue\"\n" - - received := formatMetricPoint(testpoint, w) - - if expected != received { - t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", expected, received) - - } -} - // Benchmarks to test performance of string replacement via Regex and Replacer var testString = "this_is*my!test/string\\for=replacement"