Add configurable separator graphite serializer and output (#7545)

This commit is contained in:
ihard 2020-05-21 03:15:18 +03:00 committed by GitHub
parent 10560e5a10
commit 94c75b51a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 327 additions and 8 deletions

View File

@ -1951,6 +1951,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
if node, ok := tbl.Fields["graphite_separator"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.GraphiteSeparator = str.Value
}
}
}
if node, ok := tbl.Fields["json_timestamp_units"]; ok { if node, ok := tbl.Fields["json_timestamp_units"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if str, ok := kv.Value.(*ast.String); ok {
@ -2055,6 +2063,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support") delete(tbl.Fields, "influx_uint_support")
delete(tbl.Fields, "graphite_tag_support") delete(tbl.Fields, "graphite_tag_support")
delete(tbl.Fields, "graphite_separator")
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix") delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template") delete(tbl.Fields, "template")

View File

@ -572,6 +572,8 @@
# #
# ## Enable Graphite tags support # ## Enable Graphite tags support
# # graphite_tag_support = false # # graphite_tag_support = false
# ## Character for separating metric name and field for Graphite tags
# # graphite_separator = "."
# #
# ## timeout in seconds for the write connection to graphite # ## timeout in seconds for the write connection to graphite
# timeout = 2 # timeout = 2

View File

@ -34,6 +34,9 @@ see the [Graphite Data Format](../../../docs/DATA_FORMATS_OUTPUT.md)
## Enable Graphite tags support ## Enable Graphite tags support
# graphite_tag_support = false # graphite_tag_support = false
## Character for separating metric name and field for Graphite tags
# graphite_separator = "."
## timeout in seconds for the write connection to graphite ## timeout in seconds for the write connection to graphite
timeout = 2 timeout = 2

View File

@ -17,6 +17,7 @@ import (
type Graphite struct { type Graphite struct {
GraphiteTagSupport bool GraphiteTagSupport bool
GraphiteSeparator string
// URL is only for backwards compatibility // URL is only for backwards compatibility
Servers []string Servers []string
Prefix string Prefix string
@ -41,6 +42,9 @@ var sampleConfig = `
## Enable Graphite tags support ## Enable Graphite tags support
# graphite_tag_support = false # graphite_tag_support = false
## Character for separating metric name and field for Graphite tags
# graphite_separator = "."
## Graphite templates patterns ## Graphite templates patterns
## 1. Template for cpu ## 1. Template for cpu
## 2. Template for disk* ## 2. Template for disk*
@ -145,7 +149,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data // Prepare data
var batch []byte var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.Templates) s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport, g.GraphiteSeparator, g.Templates)
if err != nil { if err != nil {
return err return err
} }

View File

@ -98,6 +98,126 @@ func TestGraphiteOK(t *testing.T) {
g.Close() g.Close()
} }
func TestGraphiteOkWithSeparatorDot(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1(t, &wg)
// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteSeparator: ".",
}
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}
func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1(t, &wg)
// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteSeparator: "_",
}
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}
func TestGraphiteOKWithMultipleTemplates(t *testing.T) { func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
// Start TCP server // Start TCP server
@ -222,6 +342,128 @@ func TestGraphiteOkWithTags(t *testing.T) {
g.Close() g.Close()
} }
func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)
// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteTagSupport: true,
GraphiteSeparator: ".",
}
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}
func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTagsSeparatorUnderscore(t, &wg)
// Init plugin
g := Graphite{
Prefix: "my_prefix",
GraphiteTagSupport: true,
GraphiteSeparator: "_",
}
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTagsSeparatorUnderscore(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}
func TCPServer1(t *testing.T, wg *sync.WaitGroup) { func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003") tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() { go func() {
@ -311,3 +553,33 @@ func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer.Close() tcpServer.Close()
}() }()
} }
func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}
func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}

View File

@ -86,7 +86,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
} }
} }
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, i.Templates) s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false, ".", i.Templates)
if err != nil { if err != nil {
return err return err
} }

View File

@ -22,7 +22,7 @@ method is used, otherwise the [Template Pattern](templates) is used.
prefix = "telegraf" prefix = "telegraf"
## Graphite template pattern ## Graphite template pattern
template = "host.tags.measurement.field" template = "host.tags.measurement.field"
## Graphite templates patterns ## Graphite templates patterns
## 1. Template for cpu ## 1. Template for cpu
## 2. Template for disk* ## 2. Template for disk*
@ -35,6 +35,8 @@ method is used, otherwise the [Template Pattern](templates) is used.
## Support Graphite tags, recommended to enable when using Graphite 1.1 or later. ## Support Graphite tags, recommended to enable when using Graphite 1.1 or later.
# graphite_tag_support = false # graphite_tag_support = false
## Character for separating metric name and field for Graphite tags
# graphite_separator = "."
``` ```
#### graphite_tag_support #### graphite_tag_support
@ -54,5 +56,12 @@ cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 145532
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690 cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690 cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
``` ```
With set option `graphite_separator` to "_"
```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu_usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu_usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```
[templates]: /docs/TEMPLATE_PATTERN.md [templates]: /docs/TEMPLATE_PATTERN.md

View File

@ -39,6 +39,7 @@ type GraphiteSerializer struct {
Prefix string Prefix string
Template string Template string
TagSupport bool TagSupport bool
Separator string
Templates []*GraphiteTemplate Templates []*GraphiteTemplate
} }
@ -55,7 +56,7 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
if fieldValue == "" { if fieldValue == "" {
continue continue
} }
bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName) bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, s.Separator, fieldName)
metricString := fmt.Sprintf("%s %s %d\n", metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template // insert "field" section of template
bucket, bucket,
@ -246,6 +247,7 @@ func SerializeBucketNameWithTags(
measurement string, measurement string,
tags map[string]string, tags map[string]string,
prefix string, prefix string,
separator string,
field string, field string,
) string { ) string {
var out string var out string
@ -259,13 +261,13 @@ func SerializeBucketNameWithTags(
sort.Strings(tagsCopy) sort.Strings(tagsCopy)
if prefix != "" { if prefix != "" {
out = prefix + "." out = prefix + separator
} }
out += measurement out += measurement
if field != "value" { if field != "value" {
out += "." + field out += separator + field
} }
out = sanitize(out) out = sanitize(out)

View File

@ -102,6 +102,7 @@ func TestSerializeMetricNoHostWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -251,6 +252,7 @@ func TestSerializeMetricHostWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -305,6 +307,7 @@ func TestSerializeValueFieldWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -380,6 +383,7 @@ func TestSerializeValueStringWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -433,6 +437,7 @@ func TestSerializeValueBooleanWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -505,6 +510,7 @@ func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -558,6 +564,7 @@ func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -668,6 +675,7 @@ func TestSerializeMetricPrefixWithTagSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Prefix: "prefix", Prefix: "prefix",
TagSupport: true, TagSupport: true,
Separator: ".",
} }
buf, _ := s.Serialize(m) buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
@ -973,6 +981,7 @@ func TestCleanWithTagsSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -1033,6 +1042,7 @@ func TestSerializeBatchWithTagsSupport(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
TagSupport: true, TagSupport: true,
Separator: ".",
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {

View File

@ -51,6 +51,9 @@ type Config struct {
// Support tags in graphite protocol // Support tags in graphite protocol
GraphiteTagSupport bool `toml:"graphite_tag_support"` GraphiteTagSupport bool `toml:"graphite_tag_support"`
// Character for separating metric name and field for Graphite tags
GraphiteSeparator string `toml:"graphite_separator"`
// Maximum line length in bytes; influx format only // Maximum line length in bytes; influx format only
InfluxMaxLineBytes int `toml:"influx_max_line_bytes"` InfluxMaxLineBytes int `toml:"influx_max_line_bytes"`
@ -107,7 +110,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "influx": case "influx":
serializer, err = NewInfluxSerializerConfig(config) serializer, err = NewInfluxSerializerConfig(config)
case "graphite": case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.Templates) serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteSeparator, config.Templates)
case "json": case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits) serializer, err = NewJsonSerializer(config.TimestampUnits)
case "splunkmetric": case "splunkmetric":
@ -191,7 +194,7 @@ func NewInfluxSerializer() (Serializer, error) {
return influx.NewSerializer(), nil return influx.NewSerializer(), nil
} }
func NewGraphiteSerializer(prefix, template string, tag_support bool, templates []string) (Serializer, error) { func NewGraphiteSerializer(prefix, template string, tag_support bool, separator string, templates []string) (Serializer, error) {
graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates) graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates)
if err != nil { if err != nil {
@ -202,10 +205,15 @@ func NewGraphiteSerializer(prefix, template string, tag_support bool, templates
template = defaultTemplate template = defaultTemplate
} }
if separator == "" {
separator = "."
}
return &graphite.GraphiteSerializer{ return &graphite.GraphiteSerializer{
Prefix: prefix, Prefix: prefix,
Template: template, Template: template,
TagSupport: tag_support, TagSupport: tag_support,
Separator: separator,
Templates: graphiteTemplates, Templates: graphiteTemplates,
}, nil }, nil
} }