From 7660315e4517003f4821643fc0869d9565d10760 Mon Sep 17 00:00:00 2001 From: Pavel Boev Date: Tue, 22 May 2018 01:59:56 +0300 Subject: [PATCH] Add support for Graphite 1.1.x tags (#4165) --- docs/DATA_FORMATS_OUTPUT.md | 46 +++ plugins/outputs/graphite/README.md | 9 + plugins/outputs/graphite/graphite.go | 7 +- plugins/outputs/graphite/graphite_test.go | 90 +++++ plugins/outputs/instrumental/instrumental.go | 2 +- plugins/serializers/graphite/graphite.go | 94 ++++- plugins/serializers/graphite/graphite_test.go | 345 ++++++++++++++++++ plugins/serializers/registry.go | 12 +- 8 files changed, 581 insertions(+), 24 deletions(-) diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index d21aca91d..f8d8cf466 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -89,6 +89,15 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690 Fields with string values will be skipped. Boolean fields will be converted to 1 (true) or 0 (false). +With enable `graphite_tag_support` option following influx metric -> graphite conversion would happen: + +``` +cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758 +=> +cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690 +cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690 +``` + ### Graphite Configuration ```toml @@ -106,6 +115,43 @@ to 1 (true) or 0 (false). prefix = "telegraf" # graphite template template = "host.tags.measurement.field" + # Enable Graphite tags support + # Defaults to "false" + graphite_tag_support = true +``` + + +## Graphite 1.1 + +The Graphite11 data format translates Telegraf metrics into Graphite protocol which supports storing data using tags to identify each series. [Graphite Tag Support](http://graphite.readthedocs.io/en/latest/tags.html) + +Which means the following influx metric -> graphite 1.1.x conversion would happen: + +``` +cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758 +=> +cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690 +cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690 +``` + +Fields with string values will be skipped. Boolean fields will be converted +to 1 (true) or 0 (false). + +### Graphite Configuration + +```toml +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["stdout", "/tmp/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "graphite11" + + # prefix each graphite bucket + prefix = "telegraf" ``` ## JSON diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md index 216c09ca0..c54f07bb1 100644 --- a/plugins/outputs/graphite/README.md +++ b/plugins/outputs/graphite/README.md @@ -3,6 +3,11 @@ This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP. + + + ## Configuration: ```toml @@ -17,6 +22,10 @@ via raw TCP. ## Graphite output template ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md template = "host.tags.measurement.field" + ## Enable Graphite tags support + ## see http://graphite.readthedocs.io/en/latest/tags.html + ## Defaults to "false" + # graphite_tag_support = true ## timeout in seconds for the write connection to graphite timeout = 2 diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index 4346c50d8..8931dbe0a 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -16,6 +16,7 @@ import ( ) type Graphite struct { + GraphiteTagSupport bool // URL is only for backwards compatibility Servers []string Prefix string @@ -35,6 +36,10 @@ var sampleConfig = ` ## Graphite output template ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md template = "host.tags.measurement.field" + ## Enable Graphite tags support + ## see http://graphite.readthedocs.io/en/latest/tags.html + ## Defaults to "false" + # graphite_tag_support = true ## timeout in seconds for the write connection to graphite timeout = 2 @@ -129,7 +134,7 @@ func checkEOF(conn net.Conn) { func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data var batch []byte - s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template) + s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport) if err != nil { return err } diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 485829fec..3857236e5 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -98,6 +98,66 @@ func TestGraphiteOK(t *testing.T) { g.Close() } +func TestGraphiteOkWithTags(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1WithTags(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + GraphiteTagSupport: true, + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2WithTags(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + func TCPServer1(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") go func() { @@ -127,3 +187,33 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) { tcpServer.Close() }() } + +func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1) + conn.Close() + tcpServer.Close() + }() +} + +func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn2, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn2) + tp := textproto.NewReader(reader) + data2, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3) + conn2.Close() + tcpServer.Close() + }() +} diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 7c52d312b..117c9d434 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -85,7 +85,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { } } - s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template) + s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false) if err != nil { return err } diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index c06ba67ff..d02b0e26b 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -30,8 +30,9 @@ var ( ) type GraphiteSerializer struct { - Prefix string - Template string + Prefix string + Template string + TagSupport bool } func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { @@ -40,23 +41,42 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { // Convert UnixNano to Unix timestamps timestamp := metric.Time().UnixNano() / 1000000000 - bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix) - if bucket == "" { - return out, nil - } - - for fieldName, value := range metric.Fields() { - fieldValue := formatValue(value) - if fieldValue == "" { - continue + switch s.TagSupport { + case true: + for fieldName, value := range metric.Fields() { + fieldValue := formatValue(value) + if fieldValue == "" { + continue + } + bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName) + metricString := fmt.Sprintf("%s %s %d\n", + // insert "field" section of template + bucket, + //bucket, + fieldValue, + timestamp) + point := []byte(metricString) + out = append(out, point...) + } + default: + bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix) + if bucket == "" { + return out, nil + } + + for fieldName, value := range metric.Fields() { + fieldValue := formatValue(value) + if fieldValue == "" { + continue + } + metricString := fmt.Sprintf("%s %s %d\n", + // insert "field" section of template + sanitize(InsertField(bucket, fieldName)), + fieldValue, + timestamp) + point := []byte(metricString) + out = append(out, point...) } - metricString := fmt.Sprintf("%s %s %d\n", - // insert "field" section of template - sanitize(InsertField(bucket, fieldName)), - fieldValue, - timestamp) - point := []byte(metricString) - out = append(out, point...) } return out, nil } @@ -165,6 +185,44 @@ func SerializeBucketName( return prefix + "." + strings.Join(out, ".") } +// SerializeBucketNameWithTags will take the given measurement name and tags and +// produce a graphite bucket. It will use the Graphite11Serializer. +// http://graphite.readthedocs.io/en/latest/tags.html +func SerializeBucketNameWithTags( + measurement string, + tags map[string]string, + prefix string, + field string, +) string { + var out string + var tagsCopy []string + for k, v := range tags { + if k == "name" { + k = "_name" + } + tagsCopy = append(tagsCopy, sanitize(k+"="+v)) + } + sort.Strings(tagsCopy) + + if prefix != "" { + out = prefix + "." + } + + out += measurement + + if field != "value" { + out += "." + field + } + + out = sanitize(out) + + if len(tagsCopy) > 0 { + out += ";" + strings.Join(tagsCopy, ";") + } + + return out +} + // InsertField takes the bucket string from SerializeBucketName and replaces the // FIELDNAME portion. If fieldName == "value", it will simply delete the // FIELDNAME portion. diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index 85e7a6458..e72ed7a30 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -87,6 +87,35 @@ func TestSerializeMetricNoHost(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeMetricNoHostWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2 91.5 %d", now.Unix()), + fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + func TestSerializeMetricHost(t *testing.T) { now := time.Now() tags := map[string]string{ @@ -115,6 +144,36 @@ func TestSerializeMetricHost(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeMetricHostWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()), + fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + // test that a field named "value" gets ignored. func TestSerializeValueField(t *testing.T) { now := time.Now() @@ -140,6 +199,32 @@ func TestSerializeValueField(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeValueFieldWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "value": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()), + } + assert.Equal(t, expS, mS) +} + // test that a field named "value" gets ignored in middle of template. func TestSerializeValueField2(t *testing.T) { now := time.Now() @@ -189,6 +274,28 @@ func TestSerializeValueString(t *testing.T) { assert.Equal(t, "", mS[0]) } +func TestSerializeValueStringWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "value": "asdasd", + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + assert.Equal(t, "", mS[0]) +} + func TestSerializeValueBoolean(t *testing.T) { now := time.Now() tags := map[string]string{ @@ -219,6 +326,36 @@ func TestSerializeValueBoolean(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeValueBooleanWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "enabled": true, + "disabled": false, + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu.enabled;cpu=cpu0;datacenter=us-west-2;host=localhost 1 %d", now.Unix()), + fmt.Sprintf("cpu.disabled;cpu=cpu0;datacenter=us-west-2;host=localhost 0 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + func TestSerializeValueUnsigned(t *testing.T) { now := time.Unix(0, 0) tags := map[string]string{} @@ -262,6 +399,32 @@ func TestSerializeFieldWithSpaces(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + `field\ with\ spaces`: float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu.field_with_spaces;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()), + } + assert.Equal(t, expS, mS) +} + // test that tags with spaces get fixed. func TestSerializeTagWithSpaces(t *testing.T) { now := time.Now() @@ -289,6 +452,32 @@ func TestSerializeTagWithSpaces(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": `cpu\ 0`, + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + `field_with_spaces`: float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("cpu.field_with_spaces;cpu=cpu_0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()), + } + assert.Equal(t, expS, mS) +} + // test that a field named "value" gets ignored at beginning of template. func TestSerializeValueField3(t *testing.T) { now := time.Now() @@ -371,6 +560,37 @@ func TestSerializeMetricPrefix(t *testing.T) { assert.Equal(t, expS, mS) } +func TestSerializeMetricPrefixWithTagSupport(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "host": "localhost", + "cpu": "cpu0", + "datacenter": "us-west-2", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + "usage_busy": float64(8.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s := GraphiteSerializer{ + Prefix: "prefix", + TagSupport: true, + } + buf, _ := s.Serialize(m) + mS := strings.Split(strings.TrimSpace(string(buf)), "\n") + assert.NoError(t, err) + + expS := []string{ + fmt.Sprintf("prefix.cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()), + fmt.Sprintf("prefix.cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()), + } + sort.Strings(mS) + sort.Strings(expS) + assert.Equal(t, expS, mS) +} + func TestSerializeBucketNameNoHost(t *testing.T) { now := time.Now() tags := map[string]string{ @@ -579,6 +799,100 @@ func TestClean(t *testing.T) { } } +func TestCleanWithTagsSupport(t *testing.T) { + now := time.Unix(1234567890, 0) + tests := []struct { + name string + metric_name string + tags map[string]string + fields map[string]interface{} + expected string + }{ + { + "Base metric", + "cpu", + map[string]string{"host": "localhost"}, + map[string]interface{}{"usage_busy": float64(8.5)}, + "cpu.usage_busy;host=localhost 8.5 1234567890\n", + }, + { + "Dot and whitespace in tags", + "cpu", + map[string]string{"host": "localhost", "label.dot and space": "value with.dot"}, + map[string]interface{}{"usage_busy": float64(8.5)}, + "cpu.usage_busy;host=localhost;label.dot_and_space=value_with.dot 8.5 1234567890\n", + }, + { + "Field with space", + "system", + map[string]string{"host": "localhost"}, + map[string]interface{}{"uptime_format": "20 days, 23:26"}, + "", // yes nothing. graphite don't serialize string fields + }, + { + "Allowed punct", + "cpu", + map[string]string{"host": "localhost", "tag": "-_:="}, + map[string]interface{}{"usage_busy": float64(10)}, + "cpu.usage_busy;host=localhost;tag=-_:= 10 1234567890\n", + }, + { + "Special conversions to hyphen", + "cpu", + map[string]string{"host": "localhost", "tag": "/@*"}, + map[string]interface{}{"usage_busy": float64(10)}, + "cpu.usage_busy;host=localhost;tag=--- 10 1234567890\n", + }, + { + "Special drop chars", + "cpu", + map[string]string{"host": "localhost", "tag": `\no slash`}, + map[string]interface{}{"usage_busy": float64(10)}, + "cpu.usage_busy;host=localhost;tag=no_slash 10 1234567890\n", + }, + { + "Empty tag & value field", + "cpu", + map[string]string{"host": "localhost"}, + map[string]interface{}{"value": float64(10)}, + "cpu;host=localhost 10 1234567890\n", + }, + { + "Unicode Letters allowed", + "cpu", + map[string]string{"host": "localhost", "tag": "μnicodε_letters"}, + map[string]interface{}{"value": float64(10)}, + "cpu;host=localhost;tag=μnicodε_letters 10 1234567890\n", + }, + { + "Other Unicode not allowed", + "cpu", + map[string]string{"host": "localhost", "tag": "“☢”"}, + map[string]interface{}{"value": float64(10)}, + "cpu;host=localhost;tag=___ 10 1234567890\n", + }, + { + "Newline in tags", + "cpu", + map[string]string{"host": "localhost", "label": "some\nthing\nwith\nnewline"}, + map[string]interface{}{"usage_busy": float64(8.5)}, + "cpu.usage_busy;host=localhost;label=some_thing_with_newline 8.5 1234567890\n", + }, + } + + s := GraphiteSerializer{ + TagSupport: true, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now) + assert.NoError(t, err) + actual, _ := s.Serialize(m) + require.Equal(t, tt.expected, string(actual)) + }) + } +} + func TestSerializeBatch(t *testing.T) { now := time.Unix(1234567890, 0) tests := []struct { @@ -607,3 +921,34 @@ func TestSerializeBatch(t *testing.T) { }) } } + +func TestSerializeBatchWithTagsSupport(t *testing.T) { + now := time.Unix(1234567890, 0) + tests := []struct { + name string + metric_name string + tags map[string]string + fields map[string]interface{} + expected string + }{ + { + "Base metric", + "cpu", + map[string]string{"host": "localhost"}, + map[string]interface{}{"usage_busy": float64(8.5)}, + "cpu.usage_busy;host=localhost 8.5 1234567890\ncpu.usage_busy;host=localhost 8.5 1234567890\n", + }, + } + + s := GraphiteSerializer{ + TagSupport: true, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now) + assert.NoError(t, err) + actual, _ := s.SerializeBatch([]telegraf.Metric{m, m}) + require.Equal(t, tt.expected, string(actual)) + }) + } +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 59050d089..277d33206 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -38,6 +38,9 @@ type Config struct { // Dataformat can be one of: influx, graphite, or json DataFormat string + // Support tags in graphite protocol + GraphiteTagSupport bool + // Maximum line length in bytes; influx format only InfluxMaxLineBytes int @@ -67,7 +70,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "influx": serializer, err = NewInfluxSerializerConfig(config) case "graphite": - serializer, err = NewGraphiteSerializer(config.Prefix, config.Template) + serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport) case "json": serializer, err = NewJsonSerializer(config.TimestampUnits) default: @@ -102,9 +105,10 @@ func NewInfluxSerializer() (Serializer, error) { return influx.NewSerializer(), nil } -func NewGraphiteSerializer(prefix, template string) (Serializer, error) { +func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) { return &graphite.GraphiteSerializer{ - Prefix: prefix, - Template: template, + Prefix: prefix, + Template: template, + TagSupport: tag_support, }, nil }