Add template processor (#6494)

This commit is contained in:
RobMalvern 2020-02-06 20:40:03 +00:00 committed by GitHub
parent 62ffd7172f
commit 15d0166922
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 197 additions and 0 deletions

View File

@ -0,0 +1,26 @@
# Template Processor
The `template` processor applies a go template to tag, field, measurement and time values to create a new tag.
Golang [Template Documentation]
### Configuration
```toml
# Concatenate two tags to create a new tag
[[processors.template]]
## Tag to create
tag = "topic"
## Template to create tag
# Note: Single quotes (') are used, so the double quotes (") don't need escaping (\")
template = '{{ .Tag "hostname" }}.{{ .Tag "level" }}'
```
### Example
```diff
- cpu,level=debug,hostname=localhost value=42i
+ cpu,level=debug,hostname=localhost,topic=localhost.debug value=42i
```
[Template Documentation]:https://golang.org/pkg/text/template/

View File

@ -0,0 +1,63 @@
package template
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
"strings"
"text/template"
)
type TemplateProcessor struct {
Tag string `toml:"tag"`
Template string `toml:"template"`
tmpl *template.Template
}
const sampleConfig = `
## Concatenate two tags to create a new tag
# [[processors.template]]
# ## Tag to create
# tag = "topic"
# ## Template to create tag
# Note: Single quotes (') are used, so the double quotes (") don't need escaping (\")
# template = '{{.Tag "hostname"}}.{{ .Tag "level" }}'
`
func (r *TemplateProcessor) SampleConfig() string {
return sampleConfig
}
func (r *TemplateProcessor) Description() string {
return "Uses a Go template to create a new tag"
}
func (r *TemplateProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
// for each metric in "in" array
for _, metric := range in {
var b strings.Builder
newM := TemplateMetric{metric}
// supply TemplateMetric and Template from configuration to Template.Execute
err := r.tmpl.Execute(&b, &newM)
if err != nil {
panic(err)
}
metric.AddTag(r.Tag, b.String())
}
return in
}
func (r *TemplateProcessor) Init() error {
// create template
t, err := template.New("configured_template").Parse(r.Template)
r.tmpl = t
return err
}
func init() {
processors.Add("printer", func() telegraf.Processor {
return &TemplateProcessor{}
})
}

View File

@ -0,0 +1,28 @@
package template
import (
"github.com/influxdata/telegraf"
"time"
)
type TemplateMetric struct {
metric telegraf.Metric
}
func (m *TemplateMetric) Measurement() string {
return m.Measurement()
}
func (m *TemplateMetric) Tag(key string) string {
tagString, _ := m.metric.GetTag(key)
return tagString
}
func (m *TemplateMetric) Field(key string) interface{} {
field, _ := m.metric.GetField(key)
return field
}
func (m *TemplateMetric) Time() time.Time {
return m.metric.Time()
}

View File

@ -0,0 +1,80 @@
package template
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestTagTemplateConcatenate(t *testing.T) {
now := time.Now()
// Create Template processor
tmp := TemplateProcessor{Tag: "topic", Template: `{{.Tag "hostname"}}.{{ .Tag "level" }}`}
// manually init
err := tmp.Init()
if err != nil {
panic(err)
}
// create metric for testing
input := []telegraf.Metric{testutil.MustMetric("Tags", map[string]string{"hostname": "localhost", "level": "debug"}, nil, now)}
// act
actual := tmp.Apply(input[0])
// assert
expected := []telegraf.Metric{testutil.MustMetric("Tags", map[string]string{"hostname": "localhost", "level": "debug", "topic": "localhost.debug"}, nil, now)}
testutil.RequireMetricsEqual(t, expected, actual)
}
func TestMetricMissingTagsIsNotLost(t *testing.T) {
now := time.Now()
// Create Template processor
tmp := TemplateProcessor{Tag: "topic", Template: `{{.Tag "hostname"}}.{{ .Tag "level" }}`}
// manually init
err := tmp.Init()
if err != nil {
panic(err)
}
// create metrics for testing
m1 := testutil.MustMetric("Works", map[string]string{"hostname": "localhost", "level": "debug"}, nil, now)
m2 := testutil.MustMetric("Fails", map[string]string{"hostname": "localhost"}, nil, now)
// act
actual := tmp.Apply(m1, m2)
// assert
// make sure no metrics are lost when a template process fails
assert.Equal(t, 2, len(actual), "Number of metrics input should equal number of metrics output")
}
func TestTagAndFieldConcatenate(t *testing.T) {
now := time.Now()
// Create Template processor
tmp := TemplateProcessor{Tag: "LocalTemp", Template: `{{.Tag "location"}} is {{ .Field "temperature" }}`}
// manually init
err := tmp.Init()
if err != nil {
panic(err)
}
// create metric for testing
m1 := testutil.MustMetric("weather", map[string]string{"location": "us-midwest"}, map[string]interface{}{"temperature": "too warm"}, now)
// act
actual := tmp.Apply(m1)
// assert
expected := []telegraf.Metric{testutil.MustMetric("weather", map[string]string{"location": "us-midwest", "LocalTemp": "us-midwest is too warm"}, map[string]interface{}{"temperature": "too warm"}, now)}
testutil.RequireMetricsEqual(t, expected, actual)
}