diff --git a/config/config.go b/config/config.go index 0ebb9e29b..23ba1b5b3 100644 --- a/config/config.go +++ b/config/config.go @@ -1951,6 +1951,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["graphite_separator"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.GraphiteSeparator = str.Value + } + } + } + if node, ok := tbl.Fields["json_timestamp_units"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -2055,6 +2063,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_uint_support") delete(tbl.Fields, "graphite_tag_support") + delete(tbl.Fields, "graphite_separator") delete(tbl.Fields, "data_format") delete(tbl.Fields, "prefix") delete(tbl.Fields, "template") diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 05d7daadb..239f77c60 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -572,6 +572,8 @@ # # ## Enable Graphite tags support # # graphite_tag_support = false +# ## Character for separating metric name and field for Graphite tags +# # graphite_separator = "." # # ## timeout in seconds for the write connection to graphite # timeout = 2 diff --git a/plugins/outputs/graphite/README.md b/plugins/outputs/graphite/README.md index b7ffd361b..b6b36cfca 100644 --- a/plugins/outputs/graphite/README.md +++ b/plugins/outputs/graphite/README.md @@ -34,6 +34,9 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md) ## Enable Graphite tags support # graphite_tag_support = false + ## Character for separating metric name and field for Graphite tags + # graphite_separator = "." + ## timeout in seconds for the write connection to graphite timeout = 2 diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index e7d192662..4e284609d 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -17,6 +17,7 @@ import ( type Graphite struct { GraphiteTagSupport bool + GraphiteSeparator string // URL is only for backwards compatibility Servers []string Prefix string @@ -41,6 +42,9 @@ var sampleConfig = ` ## Enable Graphite tags support # graphite_tag_support = false + ## Character for separating metric name and field for Graphite tags + # graphite_separator = "." + ## Graphite templates patterns ## 1. Template for cpu ## 2. Template for disk* @@ -145,7 +149,7 @@ func checkEOF(conn net.Conn) { func (g *Graphite) Write(metrics []telegraf.Metric) error { // Prepare data var batch []byte - s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates) + s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.GraphiteSeparator, g.Templates) if err != nil { return err } diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index ad76d45b5..82aad0d7d 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -98,6 +98,126 @@ func TestGraphiteOK(t *testing.T) { g.Close() } +func TestGraphiteOkWithSeparatorDot(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + GraphiteSeparator: ".", + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + +func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + GraphiteSeparator: "_", + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + func TestGraphiteOKWithMultipleTemplates(t *testing.T) { var wg sync.WaitGroup // Start TCP server @@ -222,6 +342,128 @@ func TestGraphiteOkWithTags(t *testing.T) { g.Close() } +func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1WithTags(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my.prefix", + GraphiteTagSupport: true, + GraphiteSeparator: ".", + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2WithTags(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + +func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) { + var wg sync.WaitGroup + // Start TCP server + wg.Add(1) + t.Log("Starting server") + TCPServer1WithTagsSeparatorUnderscore(t, &wg) + + // Init plugin + g := Graphite{ + Prefix: "my_prefix", + GraphiteTagSupport: true, + GraphiteSeparator: "_", + } + + // Init metrics + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + m3, _ := metric.New( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + // Prepare point list + metrics := []telegraf.Metric{m1} + metrics2 := []telegraf.Metric{m2, m3} + err1 := g.Connect() + require.NoError(t, err1) + // Send Data + t.Log("Send first data") + err2 := g.Write(metrics) + require.NoError(t, err2) + + // Waiting TCPserver, should reconnect and resend + wg.Wait() + t.Log("Finished Waiting for first data") + var wg2 sync.WaitGroup + // Start TCP server + wg2.Add(1) + TCPServer2WithTagsSeparatorUnderscore(t, &wg2) + //Write but expect an error, but reconnect + err3 := g.Write(metrics2) + t.Log("Finished writing second data, it should have reconnected automatically") + + require.NoError(t, err3) + t.Log("Finished writing third data") + wg2.Wait() + g.Close() +} + func TCPServer1(t *testing.T, wg *sync.WaitGroup) { tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") go func() { @@ -311,3 +553,33 @@ func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) { tcpServer.Close() }() } + +func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn) + tp := textproto.NewReader(reader) + data1, _ := tp.ReadLine() + assert.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1) + conn.Close() + tcpServer.Close() + }() +} + +func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { + tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") + go func() { + defer wg.Done() + conn2, _ := (tcpServer).Accept() + reader := bufio.NewReader(conn2) + tp := textproto.NewReader(reader) + data2, _ := tp.ReadLine() + assert.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2) + data3, _ := tp.ReadLine() + assert.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3) + conn2.Close() + tcpServer.Close() + }() +} diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index 7284c0ca1..e5decbf7f 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -86,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { } } - s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates) + s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, ".", i.Templates) if err != nil { return err } diff --git a/plugins/serializers/graphite/README.md b/plugins/serializers/graphite/README.md index 74bde2b5d..f6fd0c2cc 100644 --- a/plugins/serializers/graphite/README.md +++ b/plugins/serializers/graphite/README.md @@ -22,7 +22,7 @@ method is used, otherwise the [Template Pattern](templates) is used. prefix = "telegraf" ## Graphite template pattern template = "host.tags.measurement.field" - + ## Graphite templates patterns ## 1. Template for cpu ## 2. Template for disk* @@ -35,6 +35,8 @@ method is used, otherwise the [Template Pattern](templates) is used. ## Support Graphite tags, recommended to enable when using Graphite 1.1 or later. # graphite_tag_support = false + ## Character for separating metric name and field for Graphite tags + # graphite_separator = "." ``` #### graphite_tag_support @@ -54,5 +56,12 @@ cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 145532 cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690 cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690 ``` +With set option `graphite_separator` to "_" +``` +cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758 +=> +cpu_usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690 +cpu_usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690 +``` [templates]: /docs/TEMPLATE_PATTERN.md diff --git a/plugins/serializers/graphite/graphite.go b/plugins/serializers/graphite/graphite.go index 2f6cd8da5..e580409fe 100644 --- a/plugins/serializers/graphite/graphite.go +++ b/plugins/serializers/graphite/graphite.go @@ -39,6 +39,7 @@ type GraphiteSerializer struct { Prefix string Template string TagSupport bool + Separator string Templates []*GraphiteTemplate } @@ -55,7 +56,7 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { if fieldValue == "" { continue } - bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName) + bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, s.Separator, fieldName) metricString := fmt.Sprintf("%s %s %d\n", // insert "field" section of template bucket, @@ -246,6 +247,7 @@ func SerializeBucketNameWithTags( measurement string, tags map[string]string, prefix string, + separator string, field string, ) string { var out string @@ -259,13 +261,13 @@ func SerializeBucketNameWithTags( sort.Strings(tagsCopy) if prefix != "" { - out = prefix + "." + out = prefix + separator } out += measurement if field != "value" { - out += "." + field + out += separator + field } out = sanitize(out) diff --git a/plugins/serializers/graphite/graphite_test.go b/plugins/serializers/graphite/graphite_test.go index e50b7292b..b6fcad696 100644 --- a/plugins/serializers/graphite/graphite_test.go +++ b/plugins/serializers/graphite/graphite_test.go @@ -102,6 +102,7 @@ func TestSerializeMetricNoHostWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -251,6 +252,7 @@ func TestSerializeMetricHostWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -305,6 +307,7 @@ func TestSerializeValueFieldWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -380,6 +383,7 @@ func TestSerializeValueStringWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -433,6 +437,7 @@ func TestSerializeValueBooleanWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -505,6 +510,7 @@ func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -558,6 +564,7 @@ func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -668,6 +675,7 @@ func TestSerializeMetricPrefixWithTagSupport(t *testing.T) { s := GraphiteSerializer{ Prefix: "prefix", TagSupport: true, + Separator: ".", } buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -973,6 +981,7 @@ func TestCleanWithTagsSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1033,6 +1042,7 @@ func TestSerializeBatchWithTagsSupport(t *testing.T) { s := GraphiteSerializer{ TagSupport: true, + Separator: ".", } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 17de980fd..e5065a93c 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -51,6 +51,9 @@ type Config struct { // Support tags in graphite protocol GraphiteTagSupport bool `toml:"graphite_tag_support"` + // Character for separating metric name and field for Graphite tags + GraphiteSeparator string `toml:"graphite_separator"` + // Maximum line length in bytes; influx format only InfluxMaxLineBytes int `toml:"influx_max_line_bytes"` @@ -107,7 +110,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "influx": serializer, err = NewInfluxSerializerConfig(config) case "graphite": - serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.Templates) + serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteSeparator, config.Templates) case "json": serializer, err = NewJsonSerializer(config.TimestampUnits) case "splunkmetric": @@ -191,7 +194,7 @@ func NewInfluxSerializer() (Serializer, error) { return influx.NewSerializer(), nil } -func NewGraphiteSerializer(prefix, template string, tag_support bool, templates []string) (Serializer, error) { +func NewGraphiteSerializer(prefix, template string, tag_support bool, separator string, templates []string) (Serializer, error) { graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates) if err != nil { @@ -202,10 +205,15 @@ func NewGraphiteSerializer(prefix, template string, tag_support bool, templates template = defaultTemplate } + if separator == "" { + separator = "." + } + return &graphite.GraphiteSerializer{ Prefix: prefix, Template: template, TagSupport: tag_support, + Separator: separator, Templates: graphiteTemplates, }, nil }