Add carbon2 serializer (#5345)
This commit is contained in:
parent
7d64620440
commit
a15305385f
|
@ -303,6 +303,7 @@ For documentation on the latest development code see the [documentation index][d
|
||||||
- [Graphite](/plugins/serializers/graphite)
|
- [Graphite](/plugins/serializers/graphite)
|
||||||
- [ServiceNow](/plugins/serializers/nowmetric)
|
- [ServiceNow](/plugins/serializers/nowmetric)
|
||||||
- [SplunkMetric](/plugins/serializers/splunkmetric)
|
- [SplunkMetric](/plugins/serializers/splunkmetric)
|
||||||
|
- [Carbon2](/plugins/serializers/carbon2)
|
||||||
|
|
||||||
## Processor Plugins
|
## Processor Plugins
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ plugins.
|
||||||
1. [JSON](/plugins/serializers/json)
|
1. [JSON](/plugins/serializers/json)
|
||||||
1. [Graphite](/plugins/serializers/graphite)
|
1. [Graphite](/plugins/serializers/graphite)
|
||||||
1. [SplunkMetric](/plugins/serializers/splunkmetric)
|
1. [SplunkMetric](/plugins/serializers/splunkmetric)
|
||||||
|
1. [Carbon2](/plugins/serializers/carbon2)
|
||||||
|
|
||||||
You will be able to identify the plugins with support by the presence of a
|
You will be able to identify the plugins with support by the presence of a
|
||||||
`data_format` config option, for example, in the `file` output plugin:
|
`data_format` config option, for example, in the `file` output plugin:
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
# Carbon2
|
||||||
|
|
||||||
|
The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/).
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[outputs.file]]
|
||||||
|
## Files to write to, "stdout" is a specially handled file.
|
||||||
|
files = ["stdout", "/tmp/metrics.out"]
|
||||||
|
|
||||||
|
## Data format to output.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
|
||||||
|
data_format = "carbon2"
|
||||||
|
```
|
||||||
|
|
||||||
|
Standard form:
|
||||||
|
```
|
||||||
|
metric=name field=field_1 host=foo 30 1234567890
|
||||||
|
metric=name field=field_2 host=foo 4 1234567890
|
||||||
|
metric=name field=field_N host=foo 59 1234567890
|
||||||
|
```
|
||||||
|
|
||||||
|
### Metrics
|
||||||
|
|
||||||
|
The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
If we take the following InfluxDB Line Protocol:
|
||||||
|
|
||||||
|
```
|
||||||
|
weather,location=us-midwest,season=summer temperature=82,wind=100 1234567890
|
||||||
|
```
|
||||||
|
|
||||||
|
after serializing in Carbon2, the result would be:
|
||||||
|
|
||||||
|
```
|
||||||
|
metric=weather field=temperature location=us-midwest season=summer 82 1234567890
|
||||||
|
metric=weather field=wind location=us-midwest season=summer 100 1234567890
|
||||||
|
```
|
||||||
|
|
||||||
|
### Fields and Tags with spaces
|
||||||
|
When a field key or tag key/value have spaces, spaces will be replaced with `_`.
|
||||||
|
|
||||||
|
### Tags with empty values
|
||||||
|
When a tag's value is empty, it will be replaced with `null`
|
|
@ -0,0 +1,67 @@
|
||||||
|
package carbon2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type serializer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSerializer() (*serializer, error) {
|
||||||
|
s := &serializer{}
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
return s.createObject(metric), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
var batch bytes.Buffer
|
||||||
|
for _, metric := range metrics {
|
||||||
|
batch.Write(s.createObject(metric))
|
||||||
|
}
|
||||||
|
return batch.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serializer) createObject(metric telegraf.Metric) []byte {
|
||||||
|
var m bytes.Buffer
|
||||||
|
for fieldName, fieldValue := range metric.Fields() {
|
||||||
|
if isNumeric(fieldValue) {
|
||||||
|
m.WriteString("metric=")
|
||||||
|
m.WriteString(strings.Replace(metric.Name(), " ", "_", -1))
|
||||||
|
m.WriteString(" field=")
|
||||||
|
m.WriteString(strings.Replace(fieldName, " ", "_", -1))
|
||||||
|
m.WriteString(" ")
|
||||||
|
for _, tag := range metric.TagList() {
|
||||||
|
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
|
||||||
|
m.WriteString("=")
|
||||||
|
value := tag.Value
|
||||||
|
if len(value) == 0 {
|
||||||
|
value = "null"
|
||||||
|
}
|
||||||
|
m.WriteString(strings.Replace(value, " ", "_", -1))
|
||||||
|
m.WriteString(" ")
|
||||||
|
}
|
||||||
|
m.WriteString(" ")
|
||||||
|
m.WriteString(fmt.Sprintf("%v", fieldValue))
|
||||||
|
m.WriteString(" ")
|
||||||
|
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
|
||||||
|
m.WriteString("\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNumeric(v interface{}) bool {
|
||||||
|
switch v.(type) {
|
||||||
|
case string:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,138 @@
|
||||||
|
package carbon2
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricFloat(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
var buf []byte
|
||||||
|
buf, err = s.Serialize(m)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n")
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
var buf []byte
|
||||||
|
buf, err = s.Serialize(m)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n")
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeWithSpaces(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu 0": "cpu 0",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle 1": float64(91.5),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu metric", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
var buf []byte
|
||||||
|
buf, err = s.Serialize(m)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n")
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricInt(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": int64(90),
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
var buf []byte
|
||||||
|
buf, err = s.Serialize(m)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n")
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeMetricString(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"usage_idle": "foobar",
|
||||||
|
}
|
||||||
|
m, err := metric.New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
var buf []byte
|
||||||
|
buf, err = s.Serialize(m)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expS := []byte("")
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSerializeBatch(t *testing.T) {
|
||||||
|
m := MustMetric(
|
||||||
|
metric.New(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{m, m}
|
||||||
|
s, _ := NewSerializer()
|
||||||
|
buf, err := s.SerializeBatch(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expS := []byte(`metric=cpu field=value 42 0
|
||||||
|
metric=cpu field=value 42 0
|
||||||
|
`)
|
||||||
|
assert.Equal(t, string(expS), string(buf))
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/json"
|
"github.com/influxdata/telegraf/plugins/serializers/json"
|
||||||
|
@ -82,6 +83,8 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
|
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
|
||||||
case "nowmetric":
|
case "nowmetric":
|
||||||
serializer, err = NewNowSerializer()
|
serializer, err = NewNowSerializer()
|
||||||
|
case "carbon2":
|
||||||
|
serializer, err = NewCarbon2Serializer()
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||||
}
|
}
|
||||||
|
@ -92,6 +95,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
|
||||||
return json.NewSerializer(timestampUnits)
|
return json.NewSerializer(timestampUnits)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewCarbon2Serializer() (Serializer, error) {
|
||||||
|
return carbon2.NewSerializer()
|
||||||
|
}
|
||||||
|
|
||||||
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
|
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
|
||||||
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
|
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue