Add support for dropwizard input format (#2846)

This commit is contained in:
atzoum
2018-01-09 01:11:36 +02:00
committed by Daniel Nelson
parent 5207b32b25
commit 05d691aa81
13 changed files with 1436 additions and 333 deletions

View File

@@ -6,11 +6,12 @@ import (
"fmt"
"io"
"math"
"sort"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
@@ -23,11 +24,10 @@ var (
// Parser encapsulates a Graphite Parser.
type GraphiteParser struct {
Separator string
Templates []string
DefaultTags map[string]string
matcher *matcher
Separator string
Templates []string
DefaultTags map[string]string
templateEngine *templating.Engine
}
func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
@@ -52,65 +52,13 @@ func NewGraphiteParser(
if defaultTags != nil {
p.DefaultTags = defaultTags
}
matcher := newMatcher()
p.matcher = matcher
defaultTemplate, _ := NewTemplate("measurement*", nil, p.Separator)
matcher.AddDefaultTemplate(defaultTemplate)
tmplts := parsedTemplates{}
for _, pattern := range p.Templates {
tmplt := parsedTemplate{}
tmplt.template = pattern
// Format is [filter] <template> [tag1=value1,tag2=value2]
parts := strings.Fields(pattern)
if len(parts) < 1 {
continue
} else if len(parts) >= 2 {
if strings.Contains(parts[1], "=") {
tmplt.template = parts[0]
tmplt.tagstring = parts[1]
} else {
tmplt.filter = parts[0]
tmplt.template = parts[1]
if len(parts) > 2 {
tmplt.tagstring = parts[2]
}
}
}
tmplts = append(tmplts, tmplt)
}
sort.Sort(tmplts)
for _, tmplt := range tmplts {
if err := p.addToMatcher(tmplt); err != nil {
return nil, err
}
}
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
if err != nil {
return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
} else {
return p, nil
}
}
func (p *GraphiteParser) addToMatcher(tmplt parsedTemplate) error {
// Parse out the default tags specific to this template
tags := map[string]string{}
if tmplt.tagstring != "" {
for _, kv := range strings.Split(tmplt.tagstring, ",") {
parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1]
}
}
tmpl, err := NewTemplate(tmplt.template, tags, p.Separator)
if err != nil {
return err
}
p.matcher.Add(tmplt.filter, tmpl)
return nil
return p, nil
}
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
@@ -162,8 +110,7 @@ func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
}
// decode the name and tags
template := p.matcher.Match(fields[0])
measurement, tags, field, err := template.Apply(fields[0])
measurement, tags, field, err := p.templateEngine.Apply(fields[0])
if err != nil {
return nil, err
}
@@ -229,8 +176,7 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,
return "", make(map[string]string), "", nil
}
// decode the name and tags
template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0])
name, tags, field, err := p.templateEngine.Apply(fields[0])
// Set the default tags on the point if they are not already set
for k, v := range p.DefaultTags {
@@ -241,269 +187,3 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,
return name, tags, field, err
}
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
type template struct {
tags []string
defaultTags map[string]string
greedyField bool
greedyMeasurement bool
separator string
}
// NewTemplate returns a new template ensuring it has a measurement
// specified.
func NewTemplate(pattern string, defaultTags map[string]string, separator string) (*template, error) {
tags := strings.Split(pattern, ".")
hasMeasurement := false
template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
for _, tag := range tags {
if strings.HasPrefix(tag, "measurement") {
hasMeasurement = true
}
if tag == "measurement*" {
template.greedyMeasurement = true
} else if tag == "field*" {
template.greedyField = true
}
}
if !hasMeasurement {
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
}
return template, nil
}
// Apply extracts the template fields from the given line and returns the measurement
// name and tags
func (t *template) Apply(line string) (string, map[string]string, string, error) {
fields := strings.Split(line, ".")
var (
measurement []string
tags = make(map[string][]string)
field []string
)
// Set any default tags
for k, v := range t.defaultTags {
tags[k] = append(tags[k], v)
}
// See if an invalid combination has been specified in the template:
for _, tag := range t.tags {
if tag == "measurement*" {
t.greedyMeasurement = true
} else if tag == "field*" {
t.greedyField = true
}
}
if t.greedyField && t.greedyMeasurement {
return "", nil, "",
fmt.Errorf("either 'field*' or 'measurement*' can be used in each "+
"template (but not both together): %q",
strings.Join(t.tags, t.separator))
}
for i, tag := range t.tags {
if i >= len(fields) {
continue
}
if tag == "" {
continue
}
switch tag {
case "measurement":
measurement = append(measurement, fields[i])
case "field":
field = append(field, fields[i])
case "field*":
field = append(field, fields[i:]...)
break
case "measurement*":
measurement = append(measurement, fields[i:]...)
break
default:
tags[tag] = append(tags[tag], fields[i])
}
}
// Convert to map of strings.
outtags := make(map[string]string)
for k, values := range tags {
outtags[k] = strings.Join(values, t.separator)
}
return strings.Join(measurement, t.separator), outtags, strings.Join(field, t.separator), nil
}
// matcher determines which template should be applied to a given metric
// based on a filter tree.
type matcher struct {
root *node
defaultTemplate *template
}
func newMatcher() *matcher {
return &matcher{
root: &node{},
}
}
// Add inserts the template in the filter tree based the given filter
func (m *matcher) Add(filter string, template *template) {
if filter == "" {
m.AddDefaultTemplate(template)
return
}
m.root.Insert(filter, template)
}
func (m *matcher) AddDefaultTemplate(template *template) {
m.defaultTemplate = template
}
// Match returns the template that matches the given graphite line
func (m *matcher) Match(line string) *template {
tmpl := m.root.Search(line)
if tmpl != nil {
return tmpl
}
return m.defaultTemplate
}
// node is an item in a sorted k-ary tree. Each child is sorted by its value.
// The special value of "*", is always last.
type node struct {
value string
children nodes
template *template
}
func (n *node) insert(values []string, template *template) {
// Add the end, set the template
if len(values) == 0 {
n.template = template
return
}
// See if the the current element already exists in the tree. If so, insert the
// into that sub-tree
for _, v := range n.children {
if v.value == values[0] {
v.insert(values[1:], template)
return
}
}
// New element, add it to the tree and sort the children
newNode := &node{value: values[0]}
n.children = append(n.children, newNode)
sort.Sort(&n.children)
// Now insert the rest of the tree into the new element
newNode.insert(values[1:], template)
}
// Insert inserts the given string template into the tree. The filter string is separated
// on "." and each part is used as the path in the tree.
func (n *node) Insert(filter string, template *template) {
n.insert(strings.Split(filter, "."), template)
}
func (n *node) search(lineParts []string) *template {
// Nothing to search
if len(lineParts) == 0 || len(n.children) == 0 {
return n.template
}
// If last element is a wildcard, don't include in this search since it's sorted
// to the end but lexicographically it would not always be and sort.Search assumes
// the slice is sorted.
length := len(n.children)
if n.children[length-1].value == "*" {
length--
}
// Find the index of child with an exact match
i := sort.Search(length, func(i int) bool {
return n.children[i].value >= lineParts[0]
})
// Found an exact match, so search that child sub-tree
if i < len(n.children) && n.children[i].value == lineParts[0] {
return n.children[i].search(lineParts[1:])
}
// Not an exact match, see if we have a wildcard child to search
if n.children[len(n.children)-1].value == "*" {
return n.children[len(n.children)-1].search(lineParts[1:])
}
return n.template
}
func (n *node) Search(line string) *template {
return n.search(strings.Split(line, "."))
}
type nodes []*node
// Less returns a boolean indicating whether the filter at position j
// is less than the filter at position k. Filters are order by string
// comparison of each component parts. A wildcard value "*" is never
// less than a non-wildcard value.
//
// For example, the filters:
// "*.*"
// "servers.*"
// "servers.localhost"
// "*.localhost"
//
// Would be sorted as:
// "servers.localhost"
// "servers.*"
// "*.localhost"
// "*.*"
func (n *nodes) Less(j, k int) bool {
if (*n)[j].value == "*" && (*n)[k].value != "*" {
return false
}
if (*n)[j].value != "*" && (*n)[k].value == "*" {
return true
}
return (*n)[j].value < (*n)[k].value
}
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
func (n *nodes) Len() int { return len(*n) }
type parsedTemplate struct {
template string
filter string
tagstring string
}
type parsedTemplates []parsedTemplate
func (e parsedTemplates) Less(j, k int) bool {
if len(e[j].filter) == 0 && len(e[k].filter) == 0 {
nj := len(strings.Split(e[j].template, "."))
nk := len(strings.Split(e[k].template, "."))
return nj < nk
}
if len(e[j].filter) == 0 {
return true
}
if len(e[k].filter) == 0 {
return false
}
nj := len(strings.Split(e[j].template, "."))
nk := len(strings.Split(e[k].template, "."))
return nj < nk
}
func (e parsedTemplates) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
func (e parsedTemplates) Len() int { return len(e) }

View File

@@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/influxdata/telegraf/internal/templating"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
@@ -118,7 +119,7 @@ func TestTemplateApply(t *testing.T) {
}
for _, test := range tests {
tmpl, err := NewTemplate(test.template, nil, DefaultSeparator)
tmpl, err := templating.NewDefaultTemplateWithPattern(test.template)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
@@ -127,7 +128,7 @@ func TestTemplateApply(t *testing.T) {
continue
}
measurement, tags, _, _ := tmpl.Apply(test.input)
measurement, tags, _, _ := tmpl.Apply(test.input, DefaultSeparator)
if measurement != test.measurement {
t.Fatalf("name parse failer. expected %v, got %v", test.measurement, measurement)
}