Add support for Graphite 1.1.x tags (#4165)

This commit is contained in:
Pavel Boev 2018-05-22 01:59:56 +03:00 committed by Daniel Nelson
parent 5ec3229149
commit a0c086300f
8 changed files with 581 additions and 24 deletions

View File

@ -89,6 +89,15 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).
With enable `graphite_tag_support` option following influx metric -> graphite conversion would happen:
```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```
### Graphite Configuration
```toml
@ -106,6 +115,43 @@ to 1 (true) or 0 (false).
prefix = "telegraf"
# graphite template
template = "host.tags.measurement.field"
# Enable Graphite tags support
# Defaults to "false"
graphite_tag_support = true
```
## Graphite 1.1
The Graphite11 data format translates Telegraf metrics into Graphite protocol which supports storing data using tags to identify each series. [Graphite Tag Support](http://graphite.readthedocs.io/en/latest/tags.html)
Which means the following influx metric -> graphite 1.1.x conversion would happen:
```
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
=>
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
```
Fields with string values will be skipped. Boolean fields will be converted
to 1 (true) or 0 (false).
### Graphite Configuration
```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "graphite11"
# prefix each graphite bucket
prefix = "telegraf"
```
## JSON

View File

@ -3,6 +3,11 @@
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
via raw TCP.
<aside class="notice">
When `graphite_tag_support` is enabled, `name` as tag name is reserved for graphite metric and will be replaced with `_name`.
</aside>
## Configuration:
```toml
@ -17,6 +22,10 @@ via raw TCP.
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2

View File

@ -16,6 +16,7 @@ import (
)
type Graphite struct {
GraphiteTagSupport bool
// URL is only for backwards compatibility
Servers []string
Prefix string
@ -35,6 +36,10 @@ var sampleConfig = `
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## Enable Graphite tags support
## see http://graphite.readthedocs.io/en/latest/tags.html
## Defaults to "false"
# graphite_tag_support = true
## timeout in seconds for the write connection to graphite
timeout = 2
@ -129,7 +134,7 @@ func checkEOF(conn net.Conn) {
func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data
var batch []byte
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
if err != nil {
return err
}

View File

@ -98,6 +98,66 @@ func TestGraphiteOK(t *testing.T) {
g.Close()
}
func TestGraphiteOkWithTags(t *testing.T) {
var wg sync.WaitGroup
// Start TCP server
wg.Add(1)
t.Log("Starting server")
TCPServer1WithTags(t, &wg)
// Init plugin
g := Graphite{
Prefix: "my.prefix",
GraphiteTagSupport: true,
}
// Init metrics
m1, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m2, _ := metric.New(
"mymeasurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
m3, _ := metric.New(
"my_measurement",
map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"value": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
)
// Prepare point list
metrics := []telegraf.Metric{m1}
metrics2 := []telegraf.Metric{m2, m3}
err1 := g.Connect()
require.NoError(t, err1)
// Send Data
t.Log("Send first data")
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
// Start TCP server
wg2.Add(1)
TCPServer2WithTags(t, &wg2)
//Write but expect an error, but reconnect
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
}
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
@ -127,3 +187,33 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
tcpServer.Close()
}()
}
func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
conn.Close()
tcpServer.Close()
}()
}
func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
go func() {
defer wg.Done()
conn2, _ := (tcpServer).Accept()
reader := bufio.NewReader(conn2)
tp := textproto.NewReader(reader)
data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
conn2.Close()
tcpServer.Close()
}()
}

View File

@ -85,7 +85,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
}
}
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
if err != nil {
return err
}

View File

@ -32,6 +32,7 @@ var (
type GraphiteSerializer struct {
Prefix string
Template string
TagSupport bool
}
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
@ -40,6 +41,24 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
// Convert UnixNano to Unix timestamps
timestamp := metric.Time().UnixNano() / 1000000000
switch s.TagSupport {
case true:
for fieldName, value := range metric.Fields() {
fieldValue := formatValue(value)
if fieldValue == "" {
continue
}
bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName)
metricString := fmt.Sprintf("%s %s %d\n",
// insert "field" section of template
bucket,
//bucket,
fieldValue,
timestamp)
point := []byte(metricString)
out = append(out, point...)
}
default:
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
if bucket == "" {
return out, nil
@ -58,6 +77,7 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
point := []byte(metricString)
out = append(out, point...)
}
}
return out, nil
}
@ -165,6 +185,44 @@ func SerializeBucketName(
return prefix + "." + strings.Join(out, ".")
}
// SerializeBucketNameWithTags will take the given measurement name and tags and
// produce a graphite bucket. It will use the Graphite11Serializer.
// http://graphite.readthedocs.io/en/latest/tags.html
func SerializeBucketNameWithTags(
measurement string,
tags map[string]string,
prefix string,
field string,
) string {
var out string
var tagsCopy []string
for k, v := range tags {
if k == "name" {
k = "_name"
}
tagsCopy = append(tagsCopy, sanitize(k+"="+v))
}
sort.Strings(tagsCopy)
if prefix != "" {
out = prefix + "."
}
out += measurement
if field != "value" {
out += "." + field
}
out = sanitize(out)
if len(tagsCopy) > 0 {
out += ";" + strings.Join(tagsCopy, ";")
}
return out
}
// InsertField takes the bucket string from SerializeBucketName and replaces the
// FIELDNAME portion. If fieldName == "value", it will simply delete the
// FIELDNAME portion.

View File

@ -87,6 +87,35 @@ func TestSerializeMetricNoHost(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeMetricNoHostWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2 91.5 %d", now.Unix()),
fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
func TestSerializeMetricHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
@ -115,6 +144,36 @@ func TestSerializeMetricHost(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeMetricHostWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
// test that a field named "value" gets ignored.
func TestSerializeValueField(t *testing.T) {
now := time.Now()
@ -140,6 +199,32 @@ func TestSerializeValueField(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeValueFieldWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"value": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
// test that a field named "value" gets ignored in middle of template.
func TestSerializeValueField2(t *testing.T) {
now := time.Now()
@ -189,6 +274,28 @@ func TestSerializeValueString(t *testing.T) {
assert.Equal(t, "", mS[0])
}
func TestSerializeValueStringWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"value": "asdasd",
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
assert.Equal(t, "", mS[0])
}
func TestSerializeValueBoolean(t *testing.T) {
now := time.Now()
tags := map[string]string{
@ -219,6 +326,36 @@ func TestSerializeValueBoolean(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeValueBooleanWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"enabled": true,
"disabled": false,
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu.enabled;cpu=cpu0;datacenter=us-west-2;host=localhost 1 %d", now.Unix()),
fmt.Sprintf("cpu.disabled;cpu=cpu0;datacenter=us-west-2;host=localhost 0 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
func TestSerializeValueUnsigned(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{}
@ -262,6 +399,32 @@ func TestSerializeFieldWithSpaces(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
`field\ with\ spaces`: float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu.field_with_spaces;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
// test that tags with spaces get fixed.
func TestSerializeTagWithSpaces(t *testing.T) {
now := time.Now()
@ -289,6 +452,32 @@ func TestSerializeTagWithSpaces(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": `cpu\ 0`,
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
`field_with_spaces`: float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("cpu.field_with_spaces;cpu=cpu_0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
// test that a field named "value" gets ignored at beginning of template.
func TestSerializeValueField3(t *testing.T) {
now := time.Now()
@ -371,6 +560,37 @@ func TestSerializeMetricPrefix(t *testing.T) {
assert.Equal(t, expS, mS)
}
func TestSerializeMetricPrefixWithTagSupport(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
"usage_busy": float64(8.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
Prefix: "prefix",
TagSupport: true,
}
buf, _ := s.Serialize(m)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("prefix.cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
fmt.Sprintf("prefix.cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}
func TestSerializeBucketNameNoHost(t *testing.T) {
now := time.Now()
tags := map[string]string{
@ -579,6 +799,100 @@ func TestClean(t *testing.T) {
}
}
func TestCleanWithTagsSupport(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
name string
metric_name string
tags map[string]string
fields map[string]interface{}
expected string
}{
{
"Base metric",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"usage_busy": float64(8.5)},
"cpu.usage_busy;host=localhost 8.5 1234567890\n",
},
{
"Dot and whitespace in tags",
"cpu",
map[string]string{"host": "localhost", "label.dot and space": "value with.dot"},
map[string]interface{}{"usage_busy": float64(8.5)},
"cpu.usage_busy;host=localhost;label.dot_and_space=value_with.dot 8.5 1234567890\n",
},
{
"Field with space",
"system",
map[string]string{"host": "localhost"},
map[string]interface{}{"uptime_format": "20 days, 23:26"},
"", // yes nothing. graphite don't serialize string fields
},
{
"Allowed punct",
"cpu",
map[string]string{"host": "localhost", "tag": "-_:="},
map[string]interface{}{"usage_busy": float64(10)},
"cpu.usage_busy;host=localhost;tag=-_:= 10 1234567890\n",
},
{
"Special conversions to hyphen",
"cpu",
map[string]string{"host": "localhost", "tag": "/@*"},
map[string]interface{}{"usage_busy": float64(10)},
"cpu.usage_busy;host=localhost;tag=--- 10 1234567890\n",
},
{
"Special drop chars",
"cpu",
map[string]string{"host": "localhost", "tag": `\no slash`},
map[string]interface{}{"usage_busy": float64(10)},
"cpu.usage_busy;host=localhost;tag=no_slash 10 1234567890\n",
},
{
"Empty tag & value field",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": float64(10)},
"cpu;host=localhost 10 1234567890\n",
},
{
"Unicode Letters allowed",
"cpu",
map[string]string{"host": "localhost", "tag": "μnicodε_letters"},
map[string]interface{}{"value": float64(10)},
"cpu;host=localhost;tag=μnicodε_letters 10 1234567890\n",
},
{
"Other Unicode not allowed",
"cpu",
map[string]string{"host": "localhost", "tag": "“☢”"},
map[string]interface{}{"value": float64(10)},
"cpu;host=localhost;tag=___ 10 1234567890\n",
},
{
"Newline in tags",
"cpu",
map[string]string{"host": "localhost", "label": "some\nthing\nwith\nnewline"},
map[string]interface{}{"usage_busy": float64(8.5)},
"cpu.usage_busy;host=localhost;label=some_thing_with_newline 8.5 1234567890\n",
},
}
s := GraphiteSerializer{
TagSupport: true,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
assert.NoError(t, err)
actual, _ := s.Serialize(m)
require.Equal(t, tt.expected, string(actual))
})
}
}
func TestSerializeBatch(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
@ -607,3 +921,34 @@ func TestSerializeBatch(t *testing.T) {
})
}
}
func TestSerializeBatchWithTagsSupport(t *testing.T) {
now := time.Unix(1234567890, 0)
tests := []struct {
name string
metric_name string
tags map[string]string
fields map[string]interface{}
expected string
}{
{
"Base metric",
"cpu",
map[string]string{"host": "localhost"},
map[string]interface{}{"usage_busy": float64(8.5)},
"cpu.usage_busy;host=localhost 8.5 1234567890\ncpu.usage_busy;host=localhost 8.5 1234567890\n",
},
}
s := GraphiteSerializer{
TagSupport: true,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
assert.NoError(t, err)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual))
})
}
}

View File

@ -38,6 +38,9 @@ type Config struct {
// Dataformat can be one of: influx, graphite, or json
DataFormat string
// Support tags in graphite protocol
GraphiteTagSupport bool
// Maximum line length in bytes; influx format only
InfluxMaxLineBytes int
@ -67,7 +70,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "influx":
serializer, err = NewInfluxSerializerConfig(config)
case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template)
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits)
default:
@ -102,9 +105,10 @@ func NewInfluxSerializer() (Serializer, error) {
return influx.NewSerializer(), nil
}
func NewGraphiteSerializer(prefix, template string) (Serializer, error) {
func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) {
return &graphite.GraphiteSerializer{
Prefix: prefix,
Template: template,
TagSupport: tag_support,
}, nil
}