From 23756077a4769cb63729e28f8d55631331bdbc44 Mon Sep 17 00:00:00 2001 From: Pontus Rydin Date: Wed, 13 May 2020 15:02:39 -0400 Subject: [PATCH] Add truncate_tags setting to wavefront output (#7503) --- plugins/outputs/wavefront/README.md | 4 ++ plugins/outputs/wavefront/wavefront.go | 38 +++++++++++----- plugins/outputs/wavefront/wavefront_test.go | 48 ++++++++++++++++++--- 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/plugins/outputs/wavefront/README.md b/plugins/outputs/wavefront/README.md index 71a760900..231e1057d 100644 --- a/plugins/outputs/wavefront/README.md +++ b/plugins/outputs/wavefront/README.md @@ -45,6 +45,10 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true #convert_bool = true + + ## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any + ## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility. + #truncate_tags = false ``` diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index 65666d627..c455b6fa6 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -2,7 +2,6 @@ package wavefront import ( "fmt" - "log" "regexp" "strings" @@ -11,6 +10,8 @@ import ( wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" ) +const maxTagLength = 254 + type Wavefront struct { Url string Token string @@ -23,10 +24,12 @@ type Wavefront struct { ConvertBool bool UseRegex bool UseStrict bool + TruncateTags bool SourceOverride []string StringToNumber map[string][]map[string]float64 sender wavefront.Sender + Log telegraf.Logger } // catch many of the invalid chars that could appear in a metric or tag name @@ -94,6 +97,10 @@ var sampleConfig = ` ## whether to convert boolean values to numeric values, with false -> 0.0 and true -> 1.0. default is true #convert_bool = true + ## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any + ## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility. + #truncate_tags = false + ## Define a mapping, namespaced by metric prefix, from string values to numeric values ## deprecated in 1.9; use the enum processor plugin #[[outputs.wavefront.string_to_number.elasticsearch]] @@ -113,11 +120,11 @@ type MetricPoint struct { func (w *Wavefront) Connect() error { if len(w.StringToNumber) > 0 { - log.Print("W! [outputs.wavefront] The string_to_number option is deprecated; please use the enum processor instead") + w.Log.Warn("The string_to_number option is deprecated; please use the enum processor instead") } if w.Url != "" { - log.Printf("D! [outputs.wavefront] connecting over http/https using Url: %s", w.Url) + w.Log.Debug("connecting over http/https using Url: %s", w.Url) sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ Server: w.Url, Token: w.Token, @@ -128,7 +135,7 @@ func (w *Wavefront) Connect() error { } w.sender = sender } else { - log.Printf("D! Output [wavefront] connecting over tcp using Host: %s and Port: %d", w.Host, w.Port) + w.Log.Debug("connecting over tcp using Host: %s and Port: %d", w.Host, w.Port) sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ Host: w.Host, MetricsPort: w.Port, @@ -152,18 +159,17 @@ func (w *Wavefront) Connect() error { func (w *Wavefront) Write(metrics []telegraf.Metric) error { for _, m := range metrics { - for _, point := range buildMetrics(m, w) { + for _, point := range w.buildMetrics(m) { err := w.sender.SendMetric(point.Metric, point.Value, point.Timestamp, point.Source, point.Tags) if err != nil { return fmt.Errorf("Wavefront sending error: %s", err.Error()) } } } - return nil } -func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { +func (w *Wavefront) buildMetrics(m telegraf.Metric) []*MetricPoint { ret := []*MetricPoint{} for fieldName, value := range m.Fields() { @@ -193,12 +199,12 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { metricValue, buildError := buildValue(value, metric.Metric, w) if buildError != nil { - log.Printf("D! [outputs.wavefront] %s\n", buildError.Error()) + w.Log.Debug("Error building tags: %s\n", buildError.Error()) continue } metric.Value = metricValue - source, tags := buildTags(m.Tags(), w) + source, tags := w.buildTags(m.Tags()) metric.Source = source metric.Tags = tags @@ -207,7 +213,7 @@ func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricPoint { return ret } -func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string) { +func (w *Wavefront) buildTags(mTags map[string]string) (string, map[string]string) { // Remove all empty tags. for k, v := range mTags { @@ -259,6 +265,16 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string key = sanitizedChars.Replace(k) } val := tagValueReplacer.Replace(v) + if w.TruncateTags { + if len(key) > maxTagLength { + w.Log.Warnf("Tag key length > 254. Skipping tag: %s", key) + continue + } + if len(key)+len(val) > maxTagLength { + w.Log.Debugf("Key+value length > 254: %s", key) + val = val[:maxTagLength-len(key)] + } + } tags[key] = val } @@ -296,7 +312,6 @@ func buildValue(v interface{}, name string, w *Wavefront) (float64, error) { default: return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) } - return 0, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) } @@ -320,6 +335,7 @@ func init() { MetricSeparator: ".", ConvertPaths: true, ConvertBool: true, + TruncateTags: false, } }) } diff --git a/plugins/outputs/wavefront/wavefront_test.go b/plugins/outputs/wavefront/wavefront_test.go index 776c3698f..40707e6d6 100644 --- a/plugins/outputs/wavefront/wavefront_test.go +++ b/plugins/outputs/wavefront/wavefront_test.go @@ -4,6 +4,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" "reflect" "strings" "testing" @@ -21,6 +22,7 @@ func defaultWavefront() *Wavefront { ConvertPaths: true, ConvertBool: true, UseRegex: false, + Log: testutil.Logger{}, } } @@ -64,7 +66,7 @@ func TestBuildMetrics(t *testing.T) { } for _, mt := range metricTests { - ml := buildMetrics(mt.metric, w) + ml := w.buildMetrics(mt.metric) for i, line := range ml { if mt.metricPoints[i].Metric != line.Metric || mt.metricPoints[i].Value != line.Value { t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricPoints[i].Metric, mt.metricPoints[i].Value, line.Metric, line.Value) @@ -104,7 +106,7 @@ func TestBuildMetricsStrict(t *testing.T) { } for _, mt := range metricTests { - ml := buildMetrics(mt.metric, w) + ml := w.buildMetrics(mt.metric) for i, line := range ml { if mt.metricPoints[i].Metric != line.Metric || mt.metricPoints[i].Value != line.Value { t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricPoints[i].Metric, mt.metricPoints[i].Value, line.Metric, line.Value) @@ -143,7 +145,7 @@ func TestBuildMetricsWithSimpleFields(t *testing.T) { } for _, mt := range metricTests { - ml := buildMetrics(mt.metric, w) + ml := w.buildMetrics(mt.metric) for i, line := range ml { if mt.metricLines[i].Metric != line.Metric || mt.metricLines[i].Value != line.Value { t.Errorf("\nexpected\t%+v %+v\nreceived\t%+v %+v\n", mt.metricLines[i].Metric, mt.metricLines[i].Value, line.Metric, line.Value) @@ -195,7 +197,7 @@ func TestBuildTags(t *testing.T) { } for _, tt := range tagtests { - source, tags := buildTags(tt.ptIn, w) + source, tags := w.buildTags(tt.ptIn) if source != tt.outSource { t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source) } @@ -247,7 +249,7 @@ func TestBuildTagsWithSource(t *testing.T) { } for _, tt := range tagtests { - source, tags := buildTags(tt.ptIn, w) + source, tags := w.buildTags(tt.ptIn) if source != tt.outSource { t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outSource, source) } @@ -316,6 +318,42 @@ func TestBuildValueString(t *testing.T) { } +func TestTagLimits(t *testing.T) { + w := defaultWavefront() + w.TruncateTags = true + + // Should fail (all tags skipped) + template := make(map[string]string) + template[strings.Repeat("x", 255)] = "whatever" + _, tags := w.buildTags(template) + require.Empty(t, tags, "All tags should have been skipped") + + // Should truncate value + template = make(map[string]string) + longKey := strings.Repeat("x", 253) + template[longKey] = "whatever" + _, tags = w.buildTags(template) + require.Contains(t, tags, longKey, "Should contain truncated long key") + require.Equal(t, "w", tags[longKey]) + + // Should not truncate + template = make(map[string]string) + longKey = strings.Repeat("x", 251) + template[longKey] = "Hi!" + _, tags = w.buildTags(template) + require.Contains(t, tags, longKey, "Should contain non truncated long key") + require.Equal(t, "Hi!", tags[longKey]) + + // Turn off truncating and make sure it leaves the tags intact + w.TruncateTags = false + template = make(map[string]string) + longKey = strings.Repeat("x", 255) + template[longKey] = longKey + _, tags = w.buildTags(template) + require.Contains(t, tags, longKey, "Should contain non truncated long key") + require.Equal(t, longKey, tags[longKey]) +} + // Benchmarks to test performance of string replacement via Regex and Replacer var testString = "this_is*my!test/string\\for=replacement"