Add support for Graphite 1.1.x tags (#4165)
This commit is contained in:
parent
703be4f124
commit
7660315e45
|
@ -89,6 +89,15 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
|
||||||
Fields with string values will be skipped. Boolean fields will be converted
|
Fields with string values will be skipped. Boolean fields will be converted
|
||||||
to 1 (true) or 0 (false).
|
to 1 (true) or 0 (false).
|
||||||
|
|
||||||
|
With enable `graphite_tag_support` option following influx metric -> graphite conversion would happen:
|
||||||
|
|
||||||
|
```
|
||||||
|
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
|
||||||
|
=>
|
||||||
|
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
|
||||||
|
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
|
||||||
|
```
|
||||||
|
|
||||||
### Graphite Configuration
|
### Graphite Configuration
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
|
@ -106,6 +115,43 @@ to 1 (true) or 0 (false).
|
||||||
prefix = "telegraf"
|
prefix = "telegraf"
|
||||||
# graphite template
|
# graphite template
|
||||||
template = "host.tags.measurement.field"
|
template = "host.tags.measurement.field"
|
||||||
|
# Enable Graphite tags support
|
||||||
|
# Defaults to "false"
|
||||||
|
graphite_tag_support = true
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## Graphite 1.1
|
||||||
|
|
||||||
|
The Graphite11 data format translates Telegraf metrics into Graphite protocol which supports storing data using tags to identify each series. [Graphite Tag Support](http://graphite.readthedocs.io/en/latest/tags.html)
|
||||||
|
|
||||||
|
Which means the following influx metric -> graphite 1.1.x conversion would happen:
|
||||||
|
|
||||||
|
```
|
||||||
|
cpu,cpu=cpu-total,dc=us-east-1,host=tars usage_idle=98.09,usage_user=0.89 1455320660004257758
|
||||||
|
=>
|
||||||
|
cpu.usage_user;cpu=cpu-total;dc=us-east-1;host=tars 0.89 1455320690
|
||||||
|
cpu.usage_idle;cpu=cpu-total;dc=us-east-1;host=tars 98.09 1455320690
|
||||||
|
```
|
||||||
|
|
||||||
|
Fields with string values will be skipped. Boolean fields will be converted
|
||||||
|
to 1 (true) or 0 (false).
|
||||||
|
|
||||||
|
### Graphite Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[outputs.file]]
|
||||||
|
## Files to write to, "stdout" is a specially handled file.
|
||||||
|
files = ["stdout", "/tmp/metrics.out"]
|
||||||
|
|
||||||
|
## Data format to output.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
|
data_format = "graphite11"
|
||||||
|
|
||||||
|
# prefix each graphite bucket
|
||||||
|
prefix = "telegraf"
|
||||||
```
|
```
|
||||||
|
|
||||||
## JSON
|
## JSON
|
||||||
|
|
|
@ -3,6 +3,11 @@
|
||||||
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
|
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
|
||||||
via raw TCP.
|
via raw TCP.
|
||||||
|
|
||||||
|
<aside class="notice">
|
||||||
|
When `graphite_tag_support` is enabled, `name` as tag name is reserved for graphite metric and will be replaced with `_name`.
|
||||||
|
</aside>
|
||||||
|
|
||||||
|
|
||||||
## Configuration:
|
## Configuration:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
|
@ -17,6 +22,10 @@ via raw TCP.
|
||||||
## Graphite output template
|
## Graphite output template
|
||||||
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
template = "host.tags.measurement.field"
|
template = "host.tags.measurement.field"
|
||||||
|
## Enable Graphite tags support
|
||||||
|
## see http://graphite.readthedocs.io/en/latest/tags.html
|
||||||
|
## Defaults to "false"
|
||||||
|
# graphite_tag_support = true
|
||||||
## timeout in seconds for the write connection to graphite
|
## timeout in seconds for the write connection to graphite
|
||||||
timeout = 2
|
timeout = 2
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Graphite struct {
|
type Graphite struct {
|
||||||
|
GraphiteTagSupport bool
|
||||||
// URL is only for backwards compatibility
|
// URL is only for backwards compatibility
|
||||||
Servers []string
|
Servers []string
|
||||||
Prefix string
|
Prefix string
|
||||||
|
@ -35,6 +36,10 @@ var sampleConfig = `
|
||||||
## Graphite output template
|
## Graphite output template
|
||||||
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
template = "host.tags.measurement.field"
|
template = "host.tags.measurement.field"
|
||||||
|
## Enable Graphite tags support
|
||||||
|
## see http://graphite.readthedocs.io/en/latest/tags.html
|
||||||
|
## Defaults to "false"
|
||||||
|
# graphite_tag_support = true
|
||||||
## timeout in seconds for the write connection to graphite
|
## timeout in seconds for the write connection to graphite
|
||||||
timeout = 2
|
timeout = 2
|
||||||
|
|
||||||
|
@ -129,7 +134,7 @@ func checkEOF(conn net.Conn) {
|
||||||
func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
// Prepare data
|
// Prepare data
|
||||||
var batch []byte
|
var batch []byte
|
||||||
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
|
s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template, g.GraphiteTagSupport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,66 @@ func TestGraphiteOK(t *testing.T) {
|
||||||
g.Close()
|
g.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGraphiteOkWithTags(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// Start TCP server
|
||||||
|
wg.Add(1)
|
||||||
|
t.Log("Starting server")
|
||||||
|
TCPServer1WithTags(t, &wg)
|
||||||
|
|
||||||
|
// Init plugin
|
||||||
|
g := Graphite{
|
||||||
|
Prefix: "my.prefix",
|
||||||
|
GraphiteTagSupport: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init metrics
|
||||||
|
m1, _ := metric.New(
|
||||||
|
"mymeasurement",
|
||||||
|
map[string]string{"host": "192.168.0.1"},
|
||||||
|
map[string]interface{}{"myfield": float64(3.14)},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
m2, _ := metric.New(
|
||||||
|
"mymeasurement",
|
||||||
|
map[string]string{"host": "192.168.0.1"},
|
||||||
|
map[string]interface{}{"value": float64(3.14)},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
m3, _ := metric.New(
|
||||||
|
"my_measurement",
|
||||||
|
map[string]string{"host": "192.168.0.1"},
|
||||||
|
map[string]interface{}{"value": float64(3.14)},
|
||||||
|
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Prepare point list
|
||||||
|
metrics := []telegraf.Metric{m1}
|
||||||
|
metrics2 := []telegraf.Metric{m2, m3}
|
||||||
|
err1 := g.Connect()
|
||||||
|
require.NoError(t, err1)
|
||||||
|
// Send Data
|
||||||
|
t.Log("Send first data")
|
||||||
|
err2 := g.Write(metrics)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
|
||||||
|
// Waiting TCPserver, should reconnect and resend
|
||||||
|
wg.Wait()
|
||||||
|
t.Log("Finished Waiting for first data")
|
||||||
|
var wg2 sync.WaitGroup
|
||||||
|
// Start TCP server
|
||||||
|
wg2.Add(1)
|
||||||
|
TCPServer2WithTags(t, &wg2)
|
||||||
|
//Write but expect an error, but reconnect
|
||||||
|
err3 := g.Write(metrics2)
|
||||||
|
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||||
|
|
||||||
|
require.NoError(t, err3)
|
||||||
|
t.Log("Finished writing third data")
|
||||||
|
wg2.Wait()
|
||||||
|
g.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
||||||
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -127,3 +187,33 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
|
||||||
tcpServer.Close()
|
tcpServer.Close()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
|
||||||
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
conn, _ := (tcpServer).Accept()
|
||||||
|
reader := bufio.NewReader(conn)
|
||||||
|
tp := textproto.NewReader(reader)
|
||||||
|
data1, _ := tp.ReadLine()
|
||||||
|
assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
|
||||||
|
conn.Close()
|
||||||
|
tcpServer.Close()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
|
||||||
|
tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
conn2, _ := (tcpServer).Accept()
|
||||||
|
reader := bufio.NewReader(conn2)
|
||||||
|
tp := textproto.NewReader(reader)
|
||||||
|
data2, _ := tp.ReadLine()
|
||||||
|
assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
|
||||||
|
data3, _ := tp.ReadLine()
|
||||||
|
assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
|
||||||
|
conn2.Close()
|
||||||
|
tcpServer.Close()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template)
|
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ var (
|
||||||
type GraphiteSerializer struct {
|
type GraphiteSerializer struct {
|
||||||
Prefix string
|
Prefix string
|
||||||
Template string
|
Template string
|
||||||
|
TagSupport bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
@ -40,6 +41,24 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
// Convert UnixNano to Unix timestamps
|
// Convert UnixNano to Unix timestamps
|
||||||
timestamp := metric.Time().UnixNano() / 1000000000
|
timestamp := metric.Time().UnixNano() / 1000000000
|
||||||
|
|
||||||
|
switch s.TagSupport {
|
||||||
|
case true:
|
||||||
|
for fieldName, value := range metric.Fields() {
|
||||||
|
fieldValue := formatValue(value)
|
||||||
|
if fieldValue == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bucket := SerializeBucketNameWithTags(metric.Name(), metric.Tags(), s.Prefix, fieldName)
|
||||||
|
metricString := fmt.Sprintf("%s %s %d\n",
|
||||||
|
// insert "field" section of template
|
||||||
|
bucket,
|
||||||
|
//bucket,
|
||||||
|
fieldValue,
|
||||||
|
timestamp)
|
||||||
|
point := []byte(metricString)
|
||||||
|
out = append(out, point...)
|
||||||
|
}
|
||||||
|
default:
|
||||||
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
|
bucket := SerializeBucketName(metric.Name(), metric.Tags(), s.Template, s.Prefix)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -58,6 +77,7 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
point := []byte(metricString)
|
point := []byte(metricString)
|
||||||
out = append(out, point...)
|
out = append(out, point...)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,6 +185,44 @@ func SerializeBucketName(
|
||||||
return prefix + "." + strings.Join(out, ".")
|
return prefix + "." + strings.Join(out, ".")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SerializeBucketNameWithTags will take the given measurement name and tags and
|
||||||
|
// produce a graphite bucket. It will use the Graphite11Serializer.
|
||||||
|
// http://graphite.readthedocs.io/en/latest/tags.html
|
||||||
|
func SerializeBucketNameWithTags(
|
||||||
|
measurement string,
|
||||||
|
tags map[string]string,
|
||||||
|
prefix string,
|
||||||
|
field string,
|
||||||
|
) string {
|
||||||
|
var out string
|
||||||
|
var tagsCopy []string
|
||||||
|
for k, v := range tags {
|
||||||
|
if k == "name" {
|
||||||
|
k = "_name"
|
||||||
|
}
|
||||||
|
tagsCopy = append(tagsCopy, sanitize(k+"="+v))
|
||||||
|
}
|
||||||
|
sort.Strings(tagsCopy)
|
||||||
|
|
||||||
|
if prefix != "" {
|
||||||
|
out = prefix + "."
|
||||||
|
}
|
||||||
|
|
||||||
|
out += measurement
|
||||||
|
|
||||||
|
if field != "value" {
|
||||||
|
out += "." + field
|
||||||
|
}
|
||||||
|
|
||||||
|
out = sanitize(out)
|
||||||
|
|
||||||
|
if len(tagsCopy) > 0 {
|
||||||
|
out += ";" + strings.Join(tagsCopy, ";")
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// InsertField takes the bucket string from SerializeBucketName and replaces the
|
// InsertField takes the bucket string from SerializeBucketName and replaces the
|
||||||
// FIELDNAME portion. If fieldName == "value", it will simply delete the
|
// FIELDNAME portion. If fieldName == "value", it will simply delete the
|
||||||
// FIELDNAME portion.
|
// FIELDNAME portion.
|
||||||
|
|
|
@ -87,6 +87,35 @@ func TestSerializeMetricNoHost(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricNoHostWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(91.5),
|
||||||
|
"usage_busy": float64(8.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2 91.5 %d", now.Unix()),
|
||||||
|
fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2 8.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
sort.Strings(mS)
|
||||||
|
sort.Strings(expS)
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeMetricHost(t *testing.T) {
|
func TestSerializeMetricHost(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
@ -115,6 +144,36 @@ func TestSerializeMetricHost(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricHostWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(91.5),
|
||||||
|
"usage_busy": float64(8.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
|
||||||
|
fmt.Sprintf("cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
sort.Strings(mS)
|
||||||
|
sort.Strings(expS)
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
// test that a field named "value" gets ignored.
|
// test that a field named "value" gets ignored.
|
||||||
func TestSerializeValueField(t *testing.T) {
|
func TestSerializeValueField(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -140,6 +199,32 @@ func TestSerializeValueField(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeValueFieldWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"value": float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
// test that a field named "value" gets ignored in middle of template.
|
// test that a field named "value" gets ignored in middle of template.
|
||||||
func TestSerializeValueField2(t *testing.T) {
|
func TestSerializeValueField2(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -189,6 +274,28 @@ func TestSerializeValueString(t *testing.T) {
|
||||||
assert.Equal(t, "", mS[0])
|
assert.Equal(t, "", mS[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeValueStringWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"value": "asdasd",
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, "", mS[0])
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeValueBoolean(t *testing.T) {
|
func TestSerializeValueBoolean(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
@ -219,6 +326,36 @@ func TestSerializeValueBoolean(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeValueBooleanWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"enabled": true,
|
||||||
|
"disabled": false,
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu.enabled;cpu=cpu0;datacenter=us-west-2;host=localhost 1 %d", now.Unix()),
|
||||||
|
fmt.Sprintf("cpu.disabled;cpu=cpu0;datacenter=us-west-2;host=localhost 0 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
sort.Strings(mS)
|
||||||
|
sort.Strings(expS)
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeValueUnsigned(t *testing.T) {
|
func TestSerializeValueUnsigned(t *testing.T) {
|
||||||
now := time.Unix(0, 0)
|
now := time.Unix(0, 0)
|
||||||
tags := map[string]string{}
|
tags := map[string]string{}
|
||||||
|
@ -262,6 +399,32 @@ func TestSerializeFieldWithSpaces(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
`field\ with\ spaces`: float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu.field_with_spaces;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
// test that tags with spaces get fixed.
|
// test that tags with spaces get fixed.
|
||||||
func TestSerializeTagWithSpaces(t *testing.T) {
|
func TestSerializeTagWithSpaces(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -289,6 +452,32 @@ func TestSerializeTagWithSpaces(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": `cpu\ 0`,
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
`field_with_spaces`: float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("cpu.field_with_spaces;cpu=cpu_0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
// test that a field named "value" gets ignored at beginning of template.
|
// test that a field named "value" gets ignored at beginning of template.
|
||||||
func TestSerializeValueField3(t *testing.T) {
|
func TestSerializeValueField3(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -371,6 +560,37 @@ func TestSerializeMetricPrefix(t *testing.T) {
|
||||||
assert.Equal(t, expS, mS)
|
assert.Equal(t, expS, mS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricPrefixWithTagSupport(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
"cpu": "cpu0",
|
||||||
|
"datacenter": "us-west-2",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(91.5),
|
||||||
|
"usage_busy": float64(8.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
Prefix: "prefix",
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
buf, _ := s.Serialize(m)
|
||||||
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []string{
|
||||||
|
fmt.Sprintf("prefix.cpu.usage_idle;cpu=cpu0;datacenter=us-west-2;host=localhost 91.5 %d", now.Unix()),
|
||||||
|
fmt.Sprintf("prefix.cpu.usage_busy;cpu=cpu0;datacenter=us-west-2;host=localhost 8.5 %d", now.Unix()),
|
||||||
|
}
|
||||||
|
sort.Strings(mS)
|
||||||
|
sort.Strings(expS)
|
||||||
|
assert.Equal(t, expS, mS)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeBucketNameNoHost(t *testing.T) {
|
func TestSerializeBucketNameNoHost(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
@ -579,6 +799,100 @@ func TestClean(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCleanWithTagsSupport(t *testing.T) {
|
||||||
|
now := time.Unix(1234567890, 0)
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
metric_name string
|
||||||
|
tags map[string]string
|
||||||
|
fields map[string]interface{}
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"Base metric",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"usage_busy": float64(8.5)},
|
||||||
|
"cpu.usage_busy;host=localhost 8.5 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Dot and whitespace in tags",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "label.dot and space": "value with.dot"},
|
||||||
|
map[string]interface{}{"usage_busy": float64(8.5)},
|
||||||
|
"cpu.usage_busy;host=localhost;label.dot_and_space=value_with.dot 8.5 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Field with space",
|
||||||
|
"system",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"uptime_format": "20 days, 23:26"},
|
||||||
|
"", // yes nothing. graphite don't serialize string fields
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Allowed punct",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "tag": "-_:="},
|
||||||
|
map[string]interface{}{"usage_busy": float64(10)},
|
||||||
|
"cpu.usage_busy;host=localhost;tag=-_:= 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Special conversions to hyphen",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "tag": "/@*"},
|
||||||
|
map[string]interface{}{"usage_busy": float64(10)},
|
||||||
|
"cpu.usage_busy;host=localhost;tag=--- 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Special drop chars",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "tag": `\no slash`},
|
||||||
|
map[string]interface{}{"usage_busy": float64(10)},
|
||||||
|
"cpu.usage_busy;host=localhost;tag=no_slash 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Empty tag & value field",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"value": float64(10)},
|
||||||
|
"cpu;host=localhost 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Unicode Letters allowed",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "tag": "μnicodε_letters"},
|
||||||
|
map[string]interface{}{"value": float64(10)},
|
||||||
|
"cpu;host=localhost;tag=μnicodε_letters 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Other Unicode not allowed",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "tag": "“☢”"},
|
||||||
|
map[string]interface{}{"value": float64(10)},
|
||||||
|
"cpu;host=localhost;tag=___ 10 1234567890\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Newline in tags",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost", "label": "some\nthing\nwith\nnewline"},
|
||||||
|
map[string]interface{}{"usage_busy": float64(8.5)},
|
||||||
|
"cpu.usage_busy;host=localhost;label=some_thing_with_newline 8.5 1234567890\n",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
actual, _ := s.Serialize(m)
|
||||||
|
require.Equal(t, tt.expected, string(actual))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSerializeBatch(t *testing.T) {
|
func TestSerializeBatch(t *testing.T) {
|
||||||
now := time.Unix(1234567890, 0)
|
now := time.Unix(1234567890, 0)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -607,3 +921,34 @@ func TestSerializeBatch(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSerializeBatchWithTagsSupport(t *testing.T) {
|
||||||
|
now := time.Unix(1234567890, 0)
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
metric_name string
|
||||||
|
tags map[string]string
|
||||||
|
fields map[string]interface{}
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"Base metric",
|
||||||
|
"cpu",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"usage_busy": float64(8.5)},
|
||||||
|
"cpu.usage_busy;host=localhost 8.5 1234567890\ncpu.usage_busy;host=localhost 8.5 1234567890\n",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := GraphiteSerializer{
|
||||||
|
TagSupport: true,
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
m, err := metric.New(tt.metric_name, tt.tags, tt.fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
|
||||||
|
require.Equal(t, tt.expected, string(actual))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -38,6 +38,9 @@ type Config struct {
|
||||||
// Dataformat can be one of: influx, graphite, or json
|
// Dataformat can be one of: influx, graphite, or json
|
||||||
DataFormat string
|
DataFormat string
|
||||||
|
|
||||||
|
// Support tags in graphite protocol
|
||||||
|
GraphiteTagSupport bool
|
||||||
|
|
||||||
// Maximum line length in bytes; influx format only
|
// Maximum line length in bytes; influx format only
|
||||||
InfluxMaxLineBytes int
|
InfluxMaxLineBytes int
|
||||||
|
|
||||||
|
@ -67,7 +70,7 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
case "influx":
|
case "influx":
|
||||||
serializer, err = NewInfluxSerializerConfig(config)
|
serializer, err = NewInfluxSerializerConfig(config)
|
||||||
case "graphite":
|
case "graphite":
|
||||||
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template)
|
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
|
||||||
case "json":
|
case "json":
|
||||||
serializer, err = NewJsonSerializer(config.TimestampUnits)
|
serializer, err = NewJsonSerializer(config.TimestampUnits)
|
||||||
default:
|
default:
|
||||||
|
@ -102,9 +105,10 @@ func NewInfluxSerializer() (Serializer, error) {
|
||||||
return influx.NewSerializer(), nil
|
return influx.NewSerializer(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGraphiteSerializer(prefix, template string) (Serializer, error) {
|
func NewGraphiteSerializer(prefix, template string, tag_support bool) (Serializer, error) {
|
||||||
return &graphite.GraphiteSerializer{
|
return &graphite.GraphiteSerializer{
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
Template: template,
|
Template: template,
|
||||||
|
TagSupport: tag_support,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue