Create a template system for the graphite serializer

closes #925
closes #879
This commit is contained in:
Cameron Sparr 2016-04-08 16:04:45 -06:00
parent 27fe4f7062
commit f5878eafb9
12 changed files with 401 additions and 137 deletions

View File

@ -2,6 +2,11 @@
### Release Notes ### Release Notes
- Breaking change in the dovecot input plugin. See Features section below. - Breaking change in the dovecot input plugin. See Features section below.
- Graphite output templates are now supported. See
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
- Possible breaking change for the librato and graphite outputs. Telegraf will
no longer insert field names when the field is simply named `value`. This is
because the `value` field is redundant in the graphite/librato context.
### Features ### Features
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs. - [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
@ -11,6 +16,7 @@
- [#943](https://github.com/influxdata/telegraf/pull/943): http_response input plugin. Thanks @Lswith! - [#943](https://github.com/influxdata/telegraf/pull/943): http_response input plugin. Thanks @Lswith!
- [#939](https://github.com/influxdata/telegraf/pull/939): sysstat input plugin. Thanks @zbindenren! - [#939](https://github.com/influxdata/telegraf/pull/939): sysstat input plugin. Thanks @zbindenren!
- [#998](https://github.com/influxdata/telegraf/pull/998): **breaking change** enabled global, user and ip queries in dovecot plugin. Thanks @mikif70! - [#998](https://github.com/influxdata/telegraf/pull/998): **breaking change** enabled global, user and ip queries in dovecot plugin. Thanks @mikif70!
- [#1001](https://github.com/influxdata/telegraf/pull/1001): Graphite serializer templates.
### Bugfixes ### Bugfixes
- [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name) - [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)

View File

@ -1,5 +1,11 @@
# Telegraf Output Data Formats # Telegraf Output Data Formats
Telegraf is able to serialize metrics into the following output data formats:
1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#influx)
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#json)
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite)
Telegraf metrics, like InfluxDB Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
are a combination of four basic parts: are a combination of four basic parts:
@ -30,7 +36,6 @@ config option, for example, in the `file` output plugin:
files = ["stdout"] files = ["stdout"]
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
@ -42,12 +47,12 @@ config option, for example, in the `file` output plugin:
Each data_format has an additional set of configuration options available, which Each data_format has an additional set of configuration options available, which
I'll go over below. I'll go over below.
## Influx: # Influx:
There are no additional configuration options for InfluxDB line-protocol. The There are no additional configuration options for InfluxDB line-protocol. The
metrics are serialized directly into InfluxDB line-protocol. metrics are serialized directly into InfluxDB line-protocol.
#### Influx Configuration: ### Influx Configuration:
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -55,22 +60,33 @@ metrics are serialized directly into InfluxDB line-protocol.
files = ["stdout", "/tmp/metrics.out"] files = ["stdout", "/tmp/metrics.out"]
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" data_format = "influx"
``` ```
## Graphite: # Graphite:
The Graphite data format translates Telegraf metrics into _dot_ buckets. The Graphite data format translates Telegraf metrics into _dot_ buckets. A
The format is: template can be specified for the output of Telegraf metrics into Graphite
buckets. The default template is:
``` ```
[prefix].[host tag].[all tags (alphabetical)].[measurement name].[field name] value timestamp template = "host.tags.measurement.field"
``` ```
In the above template, we have four parts:
1. _host_ is a tag key. This can be any tag key that is in the Telegraf
metric(s). If the key doesn't exist, it will be ignored. If it does exist, the
tag value will be filled in.
1. _tags_ is a special keyword that outputs all remaining tag values, separated
by dots and in alphabetical order (by tag key). These will be filled after all
tag keys are filled.
1. _measurement_ is a special keyword that outputs the measurement name.
1. _field_ is a special keyword that outputs the field name.
Which means the following influx metric -> graphite conversion would happen: Which means the following influx metric -> graphite conversion would happen:
``` ```
@ -80,9 +96,7 @@ tars.cpu-total.us-east-1.cpu.usage_user 0.89 1455320690
tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690 tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
``` ```
`prefix` is a configuration option when using the graphite output data format. ### Graphite Configuration:
#### Graphite Configuration:
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -90,18 +104,20 @@ tars.cpu-total.us-east-1.cpu.usage_idle 98.09 1455320690
files = ["stdout", "/tmp/metrics.out"] files = ["stdout", "/tmp/metrics.out"]
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx" data_format = "graphite"
# prefix each graphite bucket
prefix = "telegraf" prefix = "telegraf"
# graphite template
template = "host.tags.measurement.field"
``` ```
## Json: # JSON:
The Json data format serialized Telegraf metrics in json format. The format is: The JSON data format serialized Telegraf metrics in json format. The format is:
```json ```json
{ {
@ -119,7 +135,7 @@ The Json data format serialized Telegraf metrics in json format. The format is:
} }
``` ```
#### Json Configuration: ### JSON Configuration:
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -127,7 +143,6 @@ The Json data format serialized Telegraf metrics in json format. The format is:
files = ["stdout", "/tmp/metrics.out"] files = ["stdout", "/tmp/metrics.out"]
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md

View File

@ -178,6 +178,9 @@
# servers = ["localhost:2003"] # servers = ["localhost:2003"]
# ## Prefix metrics name # ## Prefix metrics name
# prefix = "" # prefix = ""
# ## Graphite output template
# ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# template = "host.tags.measurement.field"
# ## timeout in seconds for the write connection to graphite # ## timeout in seconds for the write connection to graphite
# timeout = 2 # timeout = 2
@ -251,22 +254,20 @@
# [[outputs.librato]] # [[outputs.librato]]
# ## Librator API Docs # ## Librator API Docs
# ## http://dev.librato.com/v1/metrics-authentication # ## http://dev.librato.com/v1/metrics-authentication
#
# ## Librato API user # ## Librato API user
# api_user = "telegraf@influxdb.com" # required. # api_user = "telegraf@influxdb.com" # required.
#
# ## Librato API token # ## Librato API token
# api_token = "my-secret-token" # required. # api_token = "my-secret-token" # required.
# # ## Debug
# ### Debug
# # debug = false # # debug = false
# # ## Tag Field to populate source attribute (optional)
# ### Tag Field to populate source attribute (optional) # ## This is typically the _hostname_ from which the metric was obtained.
# ### This is typically the _hostname_ from which the metric was obtained.
# source_tag = "host" # source_tag = "host"
#
# ## Connection timeout. # ## Connection timeout.
# # timeout = "5s" # # timeout = "5s"
# ## Output Name Template (same as graphite buckets)
# ## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
# template = "host.tags.measurement.field"
# # Configuration for MQTT server to send metrics to # # Configuration for MQTT server to send metrics to

View File

@ -850,8 +850,17 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
if node, ok := tbl.Fields["template"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Template = str.Value
}
}
}
delete(tbl.Fields, "data_format") delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix") delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
return serializers.NewSerializer(c) return serializers.NewSerializer(c)
} }

View File

@ -1,13 +1,34 @@
# Graphite Output Plugin # Graphite Output Plugin
This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html) via raw TCP. This plugin writes to [Graphite](http://graphite.readthedocs.org/en/latest/index.html)
via raw TCP.
## Configuration:
```toml
# Configuration for Graphite server to send metrics to
[[outputs.graphite]]
## TCP endpoint for your graphite instance.
servers = ["localhost:2003"]
## Prefix metrics name
prefix = ""
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## timeout in seconds for the write connection to graphite
timeout = 2
```
Parameters: Parameters:
Servers []string Servers []string
Prefix string Prefix string
Timeout int Timeout int
Template string
* `servers`: List of strings, ["mygraphiteserver:2003"]. * `servers`: List of strings, ["mygraphiteserver:2003"].
* `prefix`: String use to prefix all sent metrics. * `prefix`: String use to prefix all sent metrics.
* `timeout`: Connection timeout in second. * `timeout`: Connection timeout in seconds.
* `template`: Template for graphite output format, see
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
for more details.

View File

@ -18,6 +18,7 @@ type Graphite struct {
// URL is only for backwards compatability // URL is only for backwards compatability
Servers []string Servers []string
Prefix string Prefix string
Template string
Timeout int Timeout int
conns []net.Conn conns []net.Conn
} }
@ -27,6 +28,9 @@ var sampleConfig = `
servers = ["localhost:2003"] servers = ["localhost:2003"]
## Prefix metrics name ## Prefix metrics name
prefix = "" prefix = ""
## Graphite output template
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
template = "host.tags.measurement.field"
## timeout in seconds for the write connection to graphite ## timeout in seconds for the write connection to graphite
timeout = 2 timeout = 2
` `
@ -72,7 +76,7 @@ func (g *Graphite) Description() string {
func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data // Prepare data
var bp []string var bp []string
s, err := serializers.NewGraphiteSerializer(g.Prefix) s, err := serializers.NewGraphiteSerializer(g.Prefix, g.Template)
if err != nil { if err != nil {
return err return err
} }

View File

@ -54,7 +54,7 @@ func TestGraphiteOK(t *testing.T) {
m1, _ := telegraf.NewMetric( m1, _ := telegraf.NewMetric(
"mymeasurement", "mymeasurement",
map[string]string{"host": "192.168.0.1"}, map[string]string{"host": "192.168.0.1"},
map[string]interface{}{"mymeasurement": float64(3.14)}, map[string]interface{}{"myfield": float64(3.14)},
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
m2, _ := telegraf.NewMetric( m2, _ := telegraf.NewMetric(
@ -90,10 +90,10 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader) tp := textproto.NewReader(reader)
data1, _ := tp.ReadLine() data1, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data1) assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
data2, _ := tp.ReadLine() data2, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.value 3.14 1289430000", data2) assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
data3, _ := tp.ReadLine() data3, _ := tp.ReadLine()
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.value 3.14 1289430000", data3) assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
conn.Close() conn.Close()
} }

View File

@ -21,6 +21,7 @@ type Librato struct {
NameFromTags bool NameFromTags bool
SourceTag string SourceTag string
Timeout internal.Duration Timeout internal.Duration
Template string
apiUrl string apiUrl string
client *http.Client client *http.Client
@ -29,22 +30,20 @@ type Librato struct {
var sampleConfig = ` var sampleConfig = `
## Librator API Docs ## Librator API Docs
## http://dev.librato.com/v1/metrics-authentication ## http://dev.librato.com/v1/metrics-authentication
## Librato API user ## Librato API user
api_user = "telegraf@influxdb.com" # required. api_user = "telegraf@influxdb.com" # required.
## Librato API token ## Librato API token
api_token = "my-secret-token" # required. api_token = "my-secret-token" # required.
## Debug
### Debug
# debug = false # debug = false
## Tag Field to populate source attribute (optional)
### Tag Field to populate source attribute (optional) ## This is typically the _hostname_ from which the metric was obtained.
### This is typically the _hostname_ from which the metric was obtained.
source_tag = "host" source_tag = "host"
## Connection timeout. ## Connection timeout.
# timeout = "5s" # timeout = "5s"
## Output Name Template (same as graphite buckets)
## see https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite
template = "host.tags.measurement.field"
` `
type LMetrics struct { type LMetrics struct {
@ -152,17 +151,13 @@ func (l *Librato) Description() string {
return "Configuration for Librato API to send metrics to." return "Configuration for Librato API to send metrics to."
} }
func (l *Librato) buildGaugeName(m telegraf.Metric, fieldName string) string {
// Use the GraphiteSerializer
graphiteSerializer := graphite.GraphiteSerializer{}
return graphiteSerializer.SerializeBucketName(m, fieldName)
}
func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) { func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
gauges := []*Gauge{} gauges := []*Gauge{}
serializer := graphite.GraphiteSerializer{Template: l.Template}
bucket := serializer.SerializeBucketName(m.Name(), m.Tags())
for fieldName, value := range m.Fields() { for fieldName, value := range m.Fields() {
gauge := &Gauge{ gauge := &Gauge{
Name: l.buildGaugeName(m, fieldName), Name: graphite.InsertField(bucket, fieldName),
MeasureTime: m.Time().Unix(), MeasureTime: m.Time().Unix(),
} }
if !gauge.verifyValue(value) { if !gauge.verifyValue(value) {

View File

@ -86,7 +86,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(0.0, "test1"), testutil.TestMetric(0.0, "test1"),
&Gauge{ &Gauge{
Name: "value1.test1.value", Name: "value1.test1",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0, Value: 0.0,
}, },
@ -95,7 +95,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(1.0, "test2"), testutil.TestMetric(1.0, "test2"),
&Gauge{ &Gauge{
Name: "value1.test2.value", Name: "value1.test2",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0, Value: 1.0,
}, },
@ -104,7 +104,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(10, "test3"), testutil.TestMetric(10, "test3"),
&Gauge{ &Gauge{
Name: "value1.test3.value", Name: "value1.test3",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 10.0, Value: 10.0,
}, },
@ -113,7 +113,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(int32(112345), "test4"), testutil.TestMetric(int32(112345), "test4"),
&Gauge{ &Gauge{
Name: "value1.test4.value", Name: "value1.test4",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0, Value: 112345.0,
}, },
@ -122,7 +122,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(int64(112345), "test5"), testutil.TestMetric(int64(112345), "test5"),
&Gauge{ &Gauge{
Name: "value1.test5.value", Name: "value1.test5",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 112345.0, Value: 112345.0,
}, },
@ -131,7 +131,7 @@ func TestBuildGauge(t *testing.T) {
{ {
testutil.TestMetric(float32(11234.5), "test6"), testutil.TestMetric(float32(11234.5), "test6"),
&Gauge{ &Gauge{
Name: "value1.test6.value", Name: "value1.test6",
MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 11234.5, Value: 11234.5,
}, },
@ -189,7 +189,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
{ {
pt1, pt1,
&Gauge{ &Gauge{
Name: "192_168_0_1.value1.test1.value", Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 0.0, Value: 0.0,
Source: "192.168.0.1", Source: "192.168.0.1",
@ -199,7 +199,7 @@ func TestBuildGaugeWithSource(t *testing.T) {
{ {
pt2, pt2,
&Gauge{ &Gauge{
Name: "192_168_0_1.value1.test1.value", Name: "192_168_0_1.value1.test1",
MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(),
Value: 1.0, Value: 1.0,
}, },

View File

@ -8,11 +8,16 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
const DEFAULT_TEMPLATE = "host.tags.measurement.field"
var fieldDeleter = strings.NewReplacer(".FIELDNAME", "", "FIELDNAME.", "")
type GraphiteSerializer struct { type GraphiteSerializer struct {
Prefix string Prefix string
Template string
} }
var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", " ", "_") var sanitizedChars = strings.NewReplacer("/", "-", "@", "-", " ", "_", "..", ".")
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) { func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error) {
out := []string{} out := []string{}
@ -20,65 +25,95 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]string, error)
// Convert UnixNano to Unix timestamps // Convert UnixNano to Unix timestamps
timestamp := metric.UnixNano() / 1000000000 timestamp := metric.UnixNano() / 1000000000
for field_name, value := range metric.Fields() { bucket := s.SerializeBucketName(metric.Name(), metric.Tags())
// Convert value
value_str := fmt.Sprintf("%#v", value) for fieldName, value := range metric.Fields() {
// Write graphite metric // Convert value to string
var graphitePoint string valueS := fmt.Sprintf("%#v", value)
graphitePoint = fmt.Sprintf("%s %s %d", point := fmt.Sprintf("%s %s %d",
s.SerializeBucketName(metric, field_name), // insert "field" section of template
value_str, InsertField(bucket, fieldName),
valueS,
timestamp) timestamp)
out = append(out, graphitePoint) out = append(out, point)
} }
return out, nil return out, nil
} }
func (s *GraphiteSerializer) SerializeBucketName(metric telegraf.Metric, field_name string) string { // SerializeBucketName will take the given measurement name and tags and
// Get the metric name // produce a graphite bucket. It will use the GraphiteSerializer.Template
name := metric.Name() // to generate this, or DEFAULT_TEMPLATE.
//
// Convert UnixNano to Unix timestamps // NOTE: SerializeBucketName replaces the "field" portion of the template with
tag_str := buildTags(metric) // FIELDNAME. It is up to the user to replace this. This is so that
// SerializeBucketName can be called just once per measurement, rather than
// Write graphite metric // once per field. See GraphiteSerializer.InsertField() function.
var serializedBucketName string func (s *GraphiteSerializer) SerializeBucketName(
if name == field_name { measurement string,
serializedBucketName = fmt.Sprintf("%s.%s", tags map[string]string,
tag_str, ) string {
strings.Replace(name, ".", "_", -1)) if s.Template == "" {
} else { s.Template = DEFAULT_TEMPLATE
serializedBucketName = fmt.Sprintf("%s.%s.%s",
tag_str,
strings.Replace(name, ".", "_", -1),
strings.Replace(field_name, ".", "_", -1))
} }
if s.Prefix != "" { tagsCopy := make(map[string]string)
serializedBucketName = fmt.Sprintf("%s.%s", s.Prefix, serializedBucketName) for k, v := range tags {
tagsCopy[k] = v
} }
return serializedBucketName
var out []string
templateParts := strings.Split(s.Template, ".")
for _, templatePart := range templateParts {
switch templatePart {
case "measurement":
out = append(out, measurement)
case "tags":
// we will replace this later
out = append(out, "TAGS")
case "field":
// user of SerializeBucketName needs to replace this
out = append(out, "FIELDNAME")
default:
// This is a tag being applied
if tagvalue, ok := tagsCopy[templatePart]; ok {
out = append(out, strings.Replace(tagvalue, ".", "_", -1))
delete(tagsCopy, templatePart)
}
}
}
// insert remaining tags into output name
for i, templatePart := range out {
if templatePart == "TAGS" {
out[i] = buildTags(tagsCopy)
break
}
}
if s.Prefix == "" {
return sanitizedChars.Replace(strings.Join(out, "."))
}
return sanitizedChars.Replace(s.Prefix + "." + strings.Join(out, "."))
} }
func buildTags(metric telegraf.Metric) string { // InsertField takes the bucket string from SerializeBucketName and replaces the
var keys []string // FIELDNAME portion. If fieldName == "value", it will simply delete the
tags := metric.Tags() // FIELDNAME portion.
for k := range tags { func InsertField(bucket, fieldName string) string {
if k == "host" { // if the field name is "value", then dont use it
continue if fieldName == "value" {
return fieldDeleter.Replace(bucket)
} }
return strings.Replace(bucket, "FIELDNAME", fieldName, 1)
}
func buildTags(tags map[string]string) string {
var keys []string
for k := range tags {
keys = append(keys, k) keys = append(keys, k)
} }
sort.Strings(keys) sort.Strings(keys)
var tag_str string var tag_str string
if host, ok := tags["host"]; ok {
if len(keys) > 0 {
tag_str = strings.Replace(host, ".", "_", -1) + "."
} else {
tag_str = strings.Replace(host, ".", "_", -1)
}
}
for i, k := range keys { for i, k := range keys {
tag_value := strings.Replace(tags[k], ".", "_", -1) tag_value := strings.Replace(tags[k], ".", "_", -1)
if i == 0 { if i == 0 {
@ -87,5 +122,5 @@ func buildTags(metric telegraf.Metric) string {
tag_str += "." + tag_value tag_str += "." + tag_value
} }
} }
return sanitizedChars.Replace(tag_str) return tag_str
} }

View File

@ -11,6 +11,23 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
var defaultTags = map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
const (
template1 = "tags.measurement.field"
template2 = "host.measurement.field"
template3 = "host.tags.field"
template4 = "host.tags.measurement"
// this template explicitly uses all tag keys, so "tags" should be empty
template5 = "host.datacenter.cpu.tags.measurement.field"
// this template has non-existent tag keys
template6 = "foo.host.cpu.bar.tags.measurement.field"
)
func TestGraphiteTags(t *testing.T) { func TestGraphiteTags(t *testing.T) {
m1, _ := telegraf.NewMetric( m1, _ := telegraf.NewMetric(
"mymeasurement", "mymeasurement",
@ -31,12 +48,12 @@ func TestGraphiteTags(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
tags1 := buildTags(m1) tags1 := buildTags(m1.Tags())
tags2 := buildTags(m2) tags2 := buildTags(m2.Tags())
tags3 := buildTags(m3) tags3 := buildTags(m3.Tags())
assert.Equal(t, "192_168_0_1", tags1) assert.Equal(t, "192_168_0_1", tags1)
assert.Equal(t, "192_168_0_1.first.second", tags2) assert.Equal(t, "first.second.192_168_0_1", tags2)
assert.Equal(t, "first.second", tags3) assert.Equal(t, "first.second", tags3)
} }
@ -93,6 +110,82 @@ func TestSerializeMetricHost(t *testing.T) {
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
} }
// test that a field named "value" gets ignored.
func TestSerializeValueField(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"value": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
// test that a field named "value" gets ignored in middle of template.
func TestSerializeValueField2(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"value": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
Template: "host.field.tags.measurement",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
// test that a field named "value" gets ignored at beginning of template.
func TestSerializeValueField3(t *testing.T) {
now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{
"value": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", tags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{
Template: "field.host.tags.measurement",
}
mS, err := s.Serialize(m)
assert.NoError(t, err)
expS := []string{
fmt.Sprintf("localhost.cpu0.us-west-2.cpu 91.5 %d", now.Unix()),
}
assert.Equal(t, expS, mS)
}
func TestSerializeMetricPrefix(t *testing.T) { func TestSerializeMetricPrefix(t *testing.T) {
now := time.Now() now := time.Now()
tags := map[string]string{ tags := map[string]string{
@ -133,48 +226,128 @@ func TestSerializeBucketNameNoHost(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{} s := GraphiteSerializer{}
mS := s.SerializeBucketName(m, "usage_idle") mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := fmt.Sprintf("cpu0.us-west-2.cpu.usage_idle") expS := "cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
} }
func TestSerializeBucketNameHost(t *testing.T) { func TestSerializeBucketNameHost(t *testing.T) {
now := time.Now() now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage_idle": float64(91.5), "usage_idle": float64(91.5),
} }
m, err := telegraf.NewMetric("cpu", tags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{} s := GraphiteSerializer{}
mS := s.SerializeBucketName(m, "usage_idle") mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := fmt.Sprintf("localhost.cpu0.us-west-2.cpu.usage_idle") expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
} }
func TestSerializeBucketNamePrefix(t *testing.T) { func TestSerializeBucketNamePrefix(t *testing.T) {
now := time.Now() now := time.Now()
tags := map[string]string{
"host": "localhost",
"cpu": "cpu0",
"datacenter": "us-west-2",
}
fields := map[string]interface{}{ fields := map[string]interface{}{
"usage_idle": float64(91.5), "usage_idle": float64(91.5),
} }
m, err := telegraf.NewMetric("cpu", tags, fields, now) m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s := GraphiteSerializer{Prefix: "prefix"} s := GraphiteSerializer{Prefix: "prefix"}
mS := s.SerializeBucketName(m, "usage_idle") mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := fmt.Sprintf("prefix.localhost.cpu0.us-west-2.cpu.usage_idle") expS := "prefix.localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
func TestTemplate1(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template1}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "cpu0.us-west-2.localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
func TestTemplate2(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template2}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
func TestTemplate3(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template3}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.FIELDNAME"
assert.Equal(t, expS, mS)
}
func TestTemplate4(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template4}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.cpu"
assert.Equal(t, expS, mS)
}
func TestTemplate5(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template5}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.us-west-2.cpu0.cpu.FIELDNAME"
assert.Equal(t, expS, mS)
}
func TestTemplate6(t *testing.T) {
now := time.Now()
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := telegraf.NewMetric("cpu", defaultTags, fields, now)
assert.NoError(t, err)
s := GraphiteSerializer{Template: template6}
mS := s.SerializeBucketName(m.Name(), m.Tags())
expS := "localhost.cpu0.us-west-2.cpu.FIELDNAME"
assert.Equal(t, expS, mS) assert.Equal(t, expS, mS)
} }

View File

@ -30,6 +30,10 @@ type Config struct {
// Prefix to add to all measurements, only supports Graphite // Prefix to add to all measurements, only supports Graphite
Prefix string Prefix string
// Template for converting telegraf metrics into Graphite
// only supports Graphite
Template string
} }
// NewSerializer a Serializer interface based on the given config. // NewSerializer a Serializer interface based on the given config.
@ -40,7 +44,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "influx": case "influx":
serializer, err = NewInfluxSerializer() serializer, err = NewInfluxSerializer()
case "graphite": case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix) serializer, err = NewGraphiteSerializer(config.Prefix, config.Template)
case "json": case "json":
serializer, err = NewJsonSerializer() serializer, err = NewJsonSerializer()
} }
@ -55,8 +59,9 @@ func NewInfluxSerializer() (Serializer, error) {
return &influx.InfluxSerializer{}, nil return &influx.InfluxSerializer{}, nil
} }
func NewGraphiteSerializer(prefix string) (Serializer, error) { func NewGraphiteSerializer(prefix, template string) (Serializer, error) {
return &graphite.GraphiteSerializer{ return &graphite.GraphiteSerializer{
Prefix: prefix, Prefix: prefix,
Template: template,
}, nil }, nil
} }