Use wavefront sdk in wavefront output (#5161)
This commit is contained in:
		
							parent
							
								
									675178f915
								
							
						
					
					
						commit
						9cc06702da
					
				|  | @ -1093,6 +1093,17 @@ | |||
|   revision = "e3a01f9611c32b2362366434bcd671516e78955d" | ||||
|   version = "v0.18.0" | ||||
| 
 | ||||
| [[projects]] | ||||
|   digest = "1:c1855527c165f0224708fbc7d76843b4b20bcb74b328f212f8d0e9c855d4c49d" | ||||
|   name = "github.com/wavefronthq/wavefront-sdk-go" | ||||
|   packages = [ | ||||
|     "internal", | ||||
|     "senders", | ||||
|   ] | ||||
|   pruneopts = "" | ||||
|   revision = "12511c8b82654d412b0334768d94dc080b617fd1" | ||||
|   version = "v0.9.0" | ||||
| 
 | ||||
| [[projects]] | ||||
|   branch = "master" | ||||
|   digest = "1:98ed05e9796df287b90c1d96854e3913c8e349dbc546412d3cabb472ecf4b417" | ||||
|  | @ -1545,6 +1556,7 @@ | |||
|     "github.com/vmware/govmomi/vim25/mo", | ||||
|     "github.com/vmware/govmomi/vim25/soap", | ||||
|     "github.com/vmware/govmomi/vim25/types", | ||||
|     "github.com/wavefronthq/wavefront-sdk-go/senders", | ||||
|     "github.com/wvanbergen/kafka/consumergroup", | ||||
|     "golang.org/x/net/context", | ||||
|     "golang.org/x/net/html/charset", | ||||
|  |  | |||
|  | @ -246,6 +246,10 @@ | |||
|   name = "github.com/vishvananda/netlink" | ||||
|   revision = "b2de5d10e38ecce8607e6b438b6d174f389a004e" | ||||
| 
 | ||||
| [[constraint]] | ||||
|   name = "github.com/wavefronthq/wavefront-sdk-go" | ||||
|   version = "v0.9.0" | ||||
| 
 | ||||
| [[constraint]] | ||||
|   name = "github.com/karrick/godirwalk" | ||||
|   version = "1.7.5" | ||||
|  |  | |||
|  | @ -6,25 +6,30 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro | |||
| ### Configuration: | ||||
| 
 | ||||
| ```toml | ||||
| # Configuration for Wavefront output  | ||||
| [[outputs.wavefront]] | ||||
|   ## DNS name of the wavefront proxy server | ||||
|   host = "wavefront.example.com" | ||||
|   ## Url for Wavefront Direct Ingestion or using HTTP with Wavefront Proxy | ||||
|   ## If using Wavefront Proxy, also specify port. example: http://proxyserver:2878 | ||||
|   url = "https://metrics.wavefront.com" | ||||
| 
 | ||||
|   ## Port that the Wavefront proxy server listens on | ||||
|   port = 2878 | ||||
|   ## Authentication Token for Wavefront. Only required if using Direct Ingestion | ||||
|   #token = "DUMMY_TOKEN"   | ||||
|    | ||||
|   ## DNS name of the wavefront proxy server. Do not use if url is specified | ||||
|   #host = "wavefront.example.com" | ||||
| 
 | ||||
|   ## Port that the Wavefront proxy server listens on. Do not use if url is specified | ||||
|   #port = 2878 | ||||
| 
 | ||||
|   ## prefix for metrics keys | ||||
|   #prefix = "my.specific.prefix." | ||||
| 
 | ||||
|   ## wether to use "value" for name of simple fields. default is false | ||||
|   ## whether to use "value" for name of simple fields. default is false | ||||
|   #simple_fields = false | ||||
| 
 | ||||
|   ## character to use between metric and field name. default is . (dot) | ||||
|   ## character to use between metric and field name.  default is . (dot) | ||||
|   #metric_separator = "." | ||||
| 
 | ||||
|   ## Convert metric name paths to use metricSeperator character | ||||
|   ## When true will convert all _ (underscore) chartacters in final metric name. default is true | ||||
|   ## Convert metric name paths to use metricSeparator character | ||||
|   ## When true will convert all _ (underscore) characters in final metric name. default is true | ||||
|   #convert_paths = true | ||||
| 
 | ||||
|   ## Use Regex to sanitize metric and tag names from invalid characters | ||||
|  | @ -32,18 +37,10 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro | |||
|   #use_regex = false | ||||
| 
 | ||||
|   ## point tags to use as the source name for Wavefront (if none found, host will be used) | ||||
|   #source_override = ["hostname", "agent_host", "node_host"] | ||||
|   #source_override = ["hostname", "address", "agent_host", "node_host"] | ||||
| 
 | ||||
|   ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true | ||||
|   #convert_bool = true | ||||
| 
 | ||||
|   ## Define a mapping, namespaced by metric prefix, from string values to numeric values | ||||
|   ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for | ||||
|   ## any metrics beginning with "elasticsearch" | ||||
|   #[[outputs.wavefront.string_to_number.elasticsearch]] | ||||
|   #  green = 1.0 | ||||
|   #  yellow = 0.5 | ||||
|   #  red = 0.0 | ||||
| ``` | ||||
| 
 | ||||
| 
 | ||||
|  | @ -76,6 +73,5 @@ More information about the Wavefront data format is available [here](https://com | |||
| 
 | ||||
| 
 | ||||
| ### Allowed values for metrics | ||||
| Wavefront allows `integers` and `floats` as input values.  It will ignore most `strings`, but when configured | ||||
| will map certain `strings` to numeric values.  By default it also maps `bool` values to numeric, false -> 0.0,  | ||||
| true -> 1.0 | ||||
| Wavefront allows `integers` and `floats` as input values.  By default it also maps `bool` values to numeric, false -> 0.0,  | ||||
| true -> 1.0.  To map `strings` use the [enum](../../processors/enum) processor plugin. | ||||
|  |  | |||
|  | @ -1,24 +1,22 @@ | |||
| package wavefront | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"net" | ||||
| 	"regexp" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/influxdata/telegraf" | ||||
| 	"github.com/influxdata/telegraf/plugins/outputs" | ||||
| 	wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" | ||||
| ) | ||||
| 
 | ||||
| type Wavefront struct { | ||||
| 	Prefix          string | ||||
| 	Url             string | ||||
| 	Token           string | ||||
| 	Host            string | ||||
| 	Port            int | ||||
| 	Prefix          string | ||||
| 	SimpleFields    bool | ||||
| 	MetricSeparator string | ||||
| 	ConvertPaths    bool | ||||
|  | @ -26,6 +24,8 @@ type Wavefront struct { | |||
| 	UseRegex        bool | ||||
| 	SourceOverride  []string | ||||
| 	StringToNumber  map[string][]map[string]float64 | ||||
| 
 | ||||
| 	sender wavefront.Sender | ||||
| } | ||||
| 
 | ||||
| // catch many of the invalid chars that could appear in a metric or tag name
 | ||||
|  | @ -40,43 +40,49 @@ var sanitizedChars = strings.NewReplacer( | |||
| // instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
 | ||||
| var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") | ||||
| 
 | ||||
| var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") | ||||
| var tagValueReplacer = strings.NewReplacer("*", "-") | ||||
| 
 | ||||
| var pathReplacer = strings.NewReplacer("_", "_") | ||||
| 
 | ||||
| var sampleConfig = ` | ||||
|   ## DNS name of the wavefront proxy server | ||||
|   host = "wavefront.example.com" | ||||
|   ## Url for Wavefront Direct Ingestion or using HTTP with Wavefront Proxy | ||||
|   ## If using Wavefront Proxy, also specify port. example: http://proxyserver:2878
 | ||||
|   url = "https://metrics.wavefront.com" | ||||
| 
 | ||||
|   ## Port that the Wavefront proxy server listens on | ||||
|   port = 2878 | ||||
|   ## Authentication Token for Wavefront. Only required if using Direct Ingestion | ||||
|   #token = "DUMMY_TOKEN"   | ||||
|    | ||||
|   ## DNS name of the wavefront proxy server. Do not use if url is specified | ||||
|   #host = "wavefront.example.com" | ||||
| 
 | ||||
|   ## Port that the Wavefront proxy server listens on. Do not use if url is specified | ||||
|   #port = 2878 | ||||
| 
 | ||||
|   ## prefix for metrics keys | ||||
|   #prefix = "my.specific.prefix." | ||||
| 
 | ||||
|   ## whether to use "value" for name of simple fields | ||||
|   ## whether to use "value" for name of simple fields. default is false | ||||
|   #simple_fields = false | ||||
| 
 | ||||
|   ## character to use between metric and field name.  defaults to . (dot) | ||||
|   ## character to use between metric and field name.  default is . (dot) | ||||
|   #metric_separator = "." | ||||
| 
 | ||||
|   ## Convert metric name paths to use metricSeperator character | ||||
|   ## When true (default) will convert all _ (underscore) chartacters in final metric name | ||||
|   ## Convert metric name paths to use metricSeparator character | ||||
|   ## When true will convert all _ (underscore) characters in final metric name. default is true | ||||
|   #convert_paths = true | ||||
| 
 | ||||
|   ## Use Regex to sanitize metric and tag names from invalid characters | ||||
|   ## Regex is more thorough, but significantly slower | ||||
|   ## Regex is more thorough, but significantly slower. default is false | ||||
|   #use_regex = false | ||||
| 
 | ||||
|   ## point tags to use as the source name for Wavefront (if none found, host will be used) | ||||
|   #source_override = ["hostname", "agent_host", "node_host"] | ||||
|   #source_override = ["hostname", "address", "agent_host", "node_host"] | ||||
| 
 | ||||
|   ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0.  default true | ||||
|   ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true | ||||
|   #convert_bool = true | ||||
| 
 | ||||
|   ## Define a mapping, namespaced by metric prefix, from string values to numeric values | ||||
|   ## The example below maps "green" -> 1.0, "yellow" -> 0.5, "red" -> 0.0 for | ||||
|   ## any metrics beginning with "elasticsearch" | ||||
|   ##   deprecated in 1.9; use the enum processor plugin | ||||
|   #[[outputs.wavefront.string_to_number.elasticsearch]] | ||||
|   #  green = 1.0 | ||||
|   #  yellow = 0.5 | ||||
|  | @ -92,44 +98,51 @@ type MetricPoint struct { | |||
| } | ||||
| 
 | ||||
| func (w *Wavefront) Connect() error { | ||||
| 
 | ||||
| 	if len(w.StringToNumber) > 0 { | ||||
| 		log.Print("W! [outputs.wavefront] The string_to_number option is deprecated; please use the enum processor instead") | ||||
| 	} | ||||
| 
 | ||||
| 	if w.Url != "" { | ||||
| 		log.Printf("D! [outputs.wavefront] connecting over http/https using Url: %s", w.Url) | ||||
| 		sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ | ||||
| 			Server:               w.Url, | ||||
| 			Token:                w.Token, | ||||
| 			FlushIntervalSeconds: 5, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url) | ||||
| 		} | ||||
| 		w.sender = sender | ||||
| 	} else { | ||||
| 		log.Printf("D! Output [wavefront] connecting over tcp using Host: %s and Port: %d", w.Host, w.Port) | ||||
| 		sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ | ||||
| 			Host:                 w.Host, | ||||
| 			MetricsPort:          w.Port, | ||||
| 			FlushIntervalSeconds: 5, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port) | ||||
| 		} | ||||
| 		w.sender = sender | ||||
| 	} | ||||
| 
 | ||||
| 	if w.ConvertPaths && w.MetricSeparator == "_" { | ||||
| 		w.ConvertPaths = false | ||||
| 	} | ||||
| 	if w.ConvertPaths { | ||||
| 		pathReplacer = strings.NewReplacer("_", w.MetricSeparator) | ||||
| 	} | ||||
| 
 | ||||
| 	// Test Connection to Wavefront proxy Server
 | ||||
| 	uri := fmt.Sprintf("%s:%d", w.Host, w.Port) | ||||
| 	_, err := net.ResolveTCPAddr("tcp", uri) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error()) | ||||
| 	} | ||||
| 	connection, err := net.Dial("tcp", uri) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) | ||||
| 	} | ||||
| 	defer connection.Close() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (w *Wavefront) Write(metrics []telegraf.Metric) error { | ||||
| 
 | ||||
| 	// Send Data to Wavefront proxy Server
 | ||||
| 	uri := fmt.Sprintf("%s:%d", w.Host, w.Port) | ||||
| 	connection, err := net.Dial("tcp", uri) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) | ||||
| 	} | ||||
| 	defer connection.Close() | ||||
| 	connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) | ||||
| 
 | ||||
| 	for _, m := range metrics { | ||||
| 		for _, metricPoint := range buildMetrics(m, w) { | ||||
| 			metricLine := formatMetricPoint(metricPoint, w) | ||||
| 			_, err := connection.Write([]byte(metricLine)) | ||||
| 		for _, point := range buildMetrics(m, w) { | ||||
| 			err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags) | ||||
| 			if err != nil { | ||||
| 				return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) | ||||
| 				return fmt.Errorf("Wavefront sending error: %s", err.Error()) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | @ -165,7 +178,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { | |||
| 
 | ||||
| 		metricValue, buildError := buildValue(value, metric.Metric, w) | ||||
| 		if buildError != nil { | ||||
| 			log.Printf("D! Output [wavefront] %s\n", buildError.Error()) | ||||
| 			log.Printf("D! [outputs.wavefront] %s\n", buildError.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		metric.Value = metricValue | ||||
|  | @ -188,8 +201,8 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// find source, use source_override property if needed
 | ||||
| 	var source string | ||||
| 
 | ||||
| 	if s, ok := mTags["source"]; ok { | ||||
| 		source = s | ||||
| 		delete(mTags, "source") | ||||
|  | @ -214,10 +227,25 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string | |||
| 			source = mTags["host"] | ||||
| 		} | ||||
| 	} | ||||
| 	source = tagValueReplacer.Replace(source) | ||||
| 
 | ||||
| 	// remove default host tag
 | ||||
| 	delete(mTags, "host") | ||||
| 
 | ||||
| 	return tagValueReplacer.Replace(source), mTags | ||||
| 	// sanitize tag keys and values
 | ||||
| 	tags := make(map[string]string) | ||||
| 	for k, v := range mTags { | ||||
| 		var key string | ||||
| 		if w.UseRegex { | ||||
| 			key = sanitizedRegex.ReplaceAllLiteralString(k, "-") | ||||
| 		} else { | ||||
| 			key = sanitizedChars.Replace(k) | ||||
| 		} | ||||
| 		val := tagValueReplacer.Replace(v) | ||||
| 		tags[key] = val | ||||
| 	} | ||||
| 
 | ||||
| 	return source, tags | ||||
| } | ||||
| 
 | ||||
| func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { | ||||
|  | @ -255,34 +283,6 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { | |||
| 	return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) | ||||
| } | ||||
| 
 | ||||
| func formatMetricPoint(metricPoint *MetricPoint, w *Wavefront) string { | ||||
| 	buffer := bytes.NewBufferString("") | ||||
| 	buffer.WriteString(metricPoint.Metric) | ||||
| 	buffer.WriteString(" ") | ||||
| 	buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64)) | ||||
| 	buffer.WriteString(" ") | ||||
| 	buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10)) | ||||
| 	buffer.WriteString(" source=\"") | ||||
| 	buffer.WriteString(metricPoint.Source) | ||||
| 	buffer.WriteString("\"") | ||||
| 
 | ||||
| 	for k, v := range metricPoint.Tags { | ||||
| 		buffer.WriteString(" ") | ||||
| 		if w.UseRegex { | ||||
| 			buffer.WriteString(sanitizedRegex.ReplaceAllLiteralString(k, "-")) | ||||
| 		} else { | ||||
| 			buffer.WriteString(sanitizedChars.Replace(k)) | ||||
| 		} | ||||
| 		buffer.WriteString("=\"") | ||||
| 		buffer.WriteString(tagValueReplacer.Replace(v)) | ||||
| 		buffer.WriteString("\"") | ||||
| 	} | ||||
| 
 | ||||
| 	buffer.WriteString("\n") | ||||
| 
 | ||||
| 	return buffer.String() | ||||
| } | ||||
| 
 | ||||
| func (w *Wavefront) SampleConfig() string { | ||||
| 	return sampleConfig | ||||
| } | ||||
|  | @ -292,12 +292,14 @@ func (w *Wavefront) Description() string { | |||
| } | ||||
| 
 | ||||
| func (w *Wavefront) Close() error { | ||||
| 	w.sender.Close() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	outputs.Add("wavefront", func() telegraf.Output { | ||||
| 		return &Wavefront{ | ||||
| 			Token:           "DUMMY_TOKEN", | ||||
| 			MetricSeparator: ".", | ||||
| 			ConvertPaths:    true, | ||||
| 			ConvertBool:     true, | ||||
|  |  | |||
|  | @ -140,6 +140,11 @@ func TestBuildTags(t *testing.T) { | |||
| 			"aaa", | ||||
| 			map[string]string{"dc": "bbb"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			map[string]string{"host": "aaa", "dc": "a*$a\\abbb\"som/et|hing else", "bad#k%e/y that*sho\\uld work": "value1"}, | ||||
| 			"aaa", | ||||
| 			map[string]string{"dc": "a-$a\\abbb\"som/et|hing else", "bad-k-e-y-that-sho-uld-work": "value1"}, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	for _, tt := range tagtests { | ||||
|  | @ -189,7 +194,7 @@ func TestBuildTagsWithSource(t *testing.T) { | |||
| 		}, | ||||
| 		{ | ||||
| 			map[string]string{"something": "abc", "host": "r*@l\"Ho/st"}, | ||||
| 			"r-@l\\\"Ho/st", | ||||
| 			"r-@l\"Ho/st", | ||||
| 			map[string]string{"something": "abc"}, | ||||
| 		}, | ||||
| 	} | ||||
|  | @ -264,27 +269,6 @@ func TestBuildValueString(t *testing.T) { | |||
| 
 | ||||
| } | ||||
| 
 | ||||
| func TestFormatMetricPoint(t *testing.T) { | ||||
| 	w := defaultWavefront() | ||||
| 
 | ||||
| 	testpoint := &MetricPoint{ | ||||
| 		Metric:    "test.metric.something", | ||||
| 		Value:     123.456, | ||||
| 		Timestamp: 1257894000, | ||||
| 		Source:    "testSource", | ||||
| 		Tags:      map[string]string{"sp*c!@l\"-ch/rs": "sp*c!@l/ val\"ue"}, | ||||
| 	} | ||||
| 
 | ||||
| 	expected := "test.metric.something 123.456000 1257894000 source=\"testSource\" sp-c--l--ch-rs=\"sp-c!@l/ val\\\"ue\"\n" | ||||
| 
 | ||||
| 	received := formatMetricPoint(testpoint, w) | ||||
| 
 | ||||
| 	if expected != received { | ||||
| 		t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", expected, received) | ||||
| 
 | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Benchmarks to test performance of string replacement via Regex and Replacer
 | ||||
| var testString = "this_is*my!test/string\\for=replacement" | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue