468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
package graphite
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"bytes"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"math"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/influxdata/telegraf"
 | 
						|
)
 | 
						|
 | 
						|
// Minimum and maximum supported dates for timestamps.
 | 
						|
var (
 | 
						|
	MinDate = time.Date(1901, 12, 13, 0, 0, 0, 0, time.UTC)
 | 
						|
	MaxDate = time.Date(2038, 1, 19, 0, 0, 0, 0, time.UTC)
 | 
						|
)
 | 
						|
 | 
						|
// Parser encapsulates a Graphite Parser.
 | 
						|
type GraphiteParser struct {
 | 
						|
	Separator   string
 | 
						|
	Templates   []string
 | 
						|
	DefaultTags map[string]string
 | 
						|
 | 
						|
	matcher *matcher
 | 
						|
}
 | 
						|
 | 
						|
func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
 | 
						|
	p.DefaultTags = tags
 | 
						|
}
 | 
						|
 | 
						|
func NewGraphiteParser(
 | 
						|
	separator string,
 | 
						|
	templates []string,
 | 
						|
	defaultTags map[string]string,
 | 
						|
) (*GraphiteParser, error) {
 | 
						|
	var err error
 | 
						|
 | 
						|
	if separator == "" {
 | 
						|
		separator = DefaultSeparator
 | 
						|
	}
 | 
						|
	p := &GraphiteParser{
 | 
						|
		Separator: separator,
 | 
						|
		Templates: templates,
 | 
						|
	}
 | 
						|
 | 
						|
	if defaultTags != nil {
 | 
						|
		p.DefaultTags = defaultTags
 | 
						|
	}
 | 
						|
 | 
						|
	matcher := newMatcher()
 | 
						|
	p.matcher = matcher
 | 
						|
	defaultTemplate, _ := NewTemplate("measurement*", nil, p.Separator)
 | 
						|
	matcher.AddDefaultTemplate(defaultTemplate)
 | 
						|
 | 
						|
	for _, pattern := range p.Templates {
 | 
						|
		template := pattern
 | 
						|
		filter := ""
 | 
						|
		// Format is [filter] <template> [tag1=value1,tag2=value2]
 | 
						|
		parts := strings.Fields(pattern)
 | 
						|
		if len(parts) < 1 {
 | 
						|
			continue
 | 
						|
		} else if len(parts) >= 2 {
 | 
						|
			if strings.Contains(parts[1], "=") {
 | 
						|
				template = parts[0]
 | 
						|
			} else {
 | 
						|
				filter = parts[0]
 | 
						|
				template = parts[1]
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Parse out the default tags specific to this template
 | 
						|
		tags := map[string]string{}
 | 
						|
		if strings.Contains(parts[len(parts)-1], "=") {
 | 
						|
			tagStrs := strings.Split(parts[len(parts)-1], ",")
 | 
						|
			for _, kv := range tagStrs {
 | 
						|
				parts := strings.Split(kv, "=")
 | 
						|
				tags[parts[0]] = parts[1]
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		tmpl, err1 := NewTemplate(template, tags, p.Separator)
 | 
						|
		if err1 != nil {
 | 
						|
			err = err1
 | 
						|
			break
 | 
						|
		}
 | 
						|
		matcher.Add(filter, tmpl)
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
 | 
						|
	} else {
 | 
						|
		return p, nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
 | 
						|
	// parse even if the buffer begins with a newline
 | 
						|
	buf = bytes.TrimPrefix(buf, []byte("\n"))
 | 
						|
	// add newline to end if not exists:
 | 
						|
	if len(buf) > 0 && !bytes.HasSuffix(buf, []byte("\n")) {
 | 
						|
		buf = append(buf, []byte("\n")...)
 | 
						|
	}
 | 
						|
 | 
						|
	metrics := make([]telegraf.Metric, 0)
 | 
						|
 | 
						|
	var errStr string
 | 
						|
	buffer := bytes.NewBuffer(buf)
 | 
						|
	reader := bufio.NewReader(buffer)
 | 
						|
	for {
 | 
						|
		// Read up to the next newline.
 | 
						|
		buf, err := reader.ReadBytes('\n')
 | 
						|
		if err == io.EOF {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if err != nil && err != io.EOF {
 | 
						|
			return metrics, err
 | 
						|
		}
 | 
						|
 | 
						|
		// Trim the buffer, even though there should be no padding
 | 
						|
		line := strings.TrimSpace(string(buf))
 | 
						|
		metric, err := p.ParseLine(line)
 | 
						|
 | 
						|
		if err == nil {
 | 
						|
			metrics = append(metrics, metric)
 | 
						|
		} else {
 | 
						|
			errStr += err.Error() + "\n"
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if errStr != "" {
 | 
						|
		return metrics, fmt.Errorf(strings.TrimSpace(errStr))
 | 
						|
	}
 | 
						|
	return metrics, nil
 | 
						|
}
 | 
						|
 | 
						|
// Parse performs Graphite parsing of a single line.
 | 
						|
func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
 | 
						|
	// Break into 3 fields (name, value, timestamp).
 | 
						|
	fields := strings.Fields(line)
 | 
						|
	if len(fields) != 2 && len(fields) != 3 {
 | 
						|
		return nil, fmt.Errorf("received %q which doesn't have required fields", line)
 | 
						|
	}
 | 
						|
 | 
						|
	// decode the name and tags
 | 
						|
	template := p.matcher.Match(fields[0])
 | 
						|
	measurement, tags, field, err := template.Apply(fields[0])
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Could not extract measurement, use the raw value
 | 
						|
	if measurement == "" {
 | 
						|
		measurement = fields[0]
 | 
						|
	}
 | 
						|
 | 
						|
	// Parse value.
 | 
						|
	v, err := strconv.ParseFloat(fields[1], 64)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf(`field "%s" value: %s`, fields[0], err)
 | 
						|
	}
 | 
						|
 | 
						|
	if math.IsNaN(v) || math.IsInf(v, 0) {
 | 
						|
		return nil, &UnsupposedValueError{Field: fields[0], Value: v}
 | 
						|
	}
 | 
						|
 | 
						|
	fieldValues := map[string]interface{}{}
 | 
						|
	if field != "" {
 | 
						|
		fieldValues[field] = v
 | 
						|
	} else {
 | 
						|
		fieldValues["value"] = v
 | 
						|
	}
 | 
						|
 | 
						|
	// If no 3rd field, use now as timestamp
 | 
						|
	timestamp := time.Now().UTC()
 | 
						|
 | 
						|
	if len(fields) == 3 {
 | 
						|
		// Parse timestamp.
 | 
						|
		unixTime, err := strconv.ParseFloat(fields[2], 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf(`field "%s" time: %s`, fields[0], err)
 | 
						|
		}
 | 
						|
 | 
						|
		// -1 is a special value that gets converted to current UTC time
 | 
						|
		// See https://github.com/graphite-project/carbon/issues/54
 | 
						|
		if unixTime != float64(-1) {
 | 
						|
			// Check if we have fractional seconds
 | 
						|
			timestamp = time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
 | 
						|
			if timestamp.Before(MinDate) || timestamp.After(MaxDate) {
 | 
						|
				return nil, fmt.Errorf("timestamp out of range")
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// Set the default tags on the point if they are not already set
 | 
						|
	for k, v := range p.DefaultTags {
 | 
						|
		if _, ok := tags[k]; !ok {
 | 
						|
			tags[k] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return telegraf.NewMetric(measurement, tags, fieldValues, timestamp)
 | 
						|
}
 | 
						|
 | 
						|
// ApplyTemplate extracts the template fields from the given line and
 | 
						|
// returns the measurement name and tags.
 | 
						|
func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string, string, error) {
 | 
						|
	// Break line into fields (name, value, timestamp), only name is used
 | 
						|
	fields := strings.Fields(line)
 | 
						|
	if len(fields) == 0 {
 | 
						|
		return "", make(map[string]string), "", nil
 | 
						|
	}
 | 
						|
	// decode the name and tags
 | 
						|
	template := p.matcher.Match(fields[0])
 | 
						|
	name, tags, field, err := template.Apply(fields[0])
 | 
						|
 | 
						|
	// Set the default tags on the point if they are not already set
 | 
						|
	for k, v := range p.DefaultTags {
 | 
						|
		if _, ok := tags[k]; !ok {
 | 
						|
			tags[k] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return name, tags, field, err
 | 
						|
}
 | 
						|
 | 
						|
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
 | 
						|
type template struct {
 | 
						|
	tags              []string
 | 
						|
	defaultTags       map[string]string
 | 
						|
	greedyField       bool
 | 
						|
	greedyMeasurement bool
 | 
						|
	separator         string
 | 
						|
}
 | 
						|
 | 
						|
// NewTemplate returns a new template ensuring it has a measurement
 | 
						|
// specified.
 | 
						|
func NewTemplate(pattern string, defaultTags map[string]string, separator string) (*template, error) {
 | 
						|
	tags := strings.Split(pattern, ".")
 | 
						|
	hasMeasurement := false
 | 
						|
	template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
 | 
						|
 | 
						|
	for _, tag := range tags {
 | 
						|
		if strings.HasPrefix(tag, "measurement") {
 | 
						|
			hasMeasurement = true
 | 
						|
		}
 | 
						|
		if tag == "measurement*" {
 | 
						|
			template.greedyMeasurement = true
 | 
						|
		} else if tag == "field*" {
 | 
						|
			template.greedyField = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if !hasMeasurement {
 | 
						|
		return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
 | 
						|
	}
 | 
						|
 | 
						|
	return template, nil
 | 
						|
}
 | 
						|
 | 
						|
// Apply extracts the template fields from the given line and returns the measurement
 | 
						|
// name and tags
 | 
						|
func (t *template) Apply(line string) (string, map[string]string, string, error) {
 | 
						|
	fields := strings.Split(line, ".")
 | 
						|
	var (
 | 
						|
		measurement []string
 | 
						|
		tags        = make(map[string][]string)
 | 
						|
		field       []string
 | 
						|
	)
 | 
						|
 | 
						|
	// Set any default tags
 | 
						|
	for k, v := range t.defaultTags {
 | 
						|
		tags[k] = append(tags[k], v)
 | 
						|
	}
 | 
						|
 | 
						|
	// See if an invalid combination has been specified in the template:
 | 
						|
	for _, tag := range t.tags {
 | 
						|
		if tag == "measurement*" {
 | 
						|
			t.greedyMeasurement = true
 | 
						|
		} else if tag == "field*" {
 | 
						|
			t.greedyField = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if t.greedyField && t.greedyMeasurement {
 | 
						|
		return "", nil, "",
 | 
						|
			fmt.Errorf("either 'field*' or 'measurement*' can be used in each "+
 | 
						|
				"template (but not both together): %q",
 | 
						|
				strings.Join(t.tags, t.separator))
 | 
						|
	}
 | 
						|
 | 
						|
	for i, tag := range t.tags {
 | 
						|
		if i >= len(fields) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if tag == "" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		switch tag {
 | 
						|
		case "measurement":
 | 
						|
			measurement = append(measurement, fields[i])
 | 
						|
		case "field":
 | 
						|
			field = append(field, fields[i])
 | 
						|
		case "field*":
 | 
						|
			field = append(field, fields[i:]...)
 | 
						|
			break
 | 
						|
		case "measurement*":
 | 
						|
			measurement = append(measurement, fields[i:]...)
 | 
						|
			break
 | 
						|
		default:
 | 
						|
			tags[tag] = append(tags[tag], fields[i])
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Convert to map of strings.
 | 
						|
	outtags := make(map[string]string)
 | 
						|
	for k, values := range tags {
 | 
						|
		outtags[k] = strings.Join(values, t.separator)
 | 
						|
	}
 | 
						|
 | 
						|
	return strings.Join(measurement, t.separator), outtags, strings.Join(field, t.separator), nil
 | 
						|
}
 | 
						|
 | 
						|
// matcher determines which template should be applied to a given metric
 | 
						|
// based on a filter tree.
 | 
						|
type matcher struct {
 | 
						|
	root            *node
 | 
						|
	defaultTemplate *template
 | 
						|
}
 | 
						|
 | 
						|
func newMatcher() *matcher {
 | 
						|
	return &matcher{
 | 
						|
		root: &node{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Add inserts the template in the filter tree based the given filter
 | 
						|
func (m *matcher) Add(filter string, template *template) {
 | 
						|
	if filter == "" {
 | 
						|
		m.AddDefaultTemplate(template)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	m.root.Insert(filter, template)
 | 
						|
}
 | 
						|
 | 
						|
func (m *matcher) AddDefaultTemplate(template *template) {
 | 
						|
	m.defaultTemplate = template
 | 
						|
}
 | 
						|
 | 
						|
// Match returns the template that matches the given graphite line
 | 
						|
func (m *matcher) Match(line string) *template {
 | 
						|
	tmpl := m.root.Search(line)
 | 
						|
	if tmpl != nil {
 | 
						|
		return tmpl
 | 
						|
	}
 | 
						|
 | 
						|
	return m.defaultTemplate
 | 
						|
}
 | 
						|
 | 
						|
// node is an item in a sorted k-ary tree.  Each child is sorted by its value.
 | 
						|
// The special value of "*", is always last.
 | 
						|
type node struct {
 | 
						|
	value    string
 | 
						|
	children nodes
 | 
						|
	template *template
 | 
						|
}
 | 
						|
 | 
						|
func (n *node) insert(values []string, template *template) {
 | 
						|
	// Add the end, set the template
 | 
						|
	if len(values) == 0 {
 | 
						|
		n.template = template
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// See if the the current element already exists in the tree. If so, insert the
 | 
						|
	// into that sub-tree
 | 
						|
	for _, v := range n.children {
 | 
						|
		if v.value == values[0] {
 | 
						|
			v.insert(values[1:], template)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// New element, add it to the tree and sort the children
 | 
						|
	newNode := &node{value: values[0]}
 | 
						|
	n.children = append(n.children, newNode)
 | 
						|
	sort.Sort(&n.children)
 | 
						|
 | 
						|
	// Now insert the rest of the tree into the new element
 | 
						|
	newNode.insert(values[1:], template)
 | 
						|
}
 | 
						|
 | 
						|
// Insert inserts the given string template into the tree.  The filter string is separated
 | 
						|
// on "." and each part is used as the path in the tree.
 | 
						|
func (n *node) Insert(filter string, template *template) {
 | 
						|
	n.insert(strings.Split(filter, "."), template)
 | 
						|
}
 | 
						|
 | 
						|
func (n *node) search(lineParts []string) *template {
 | 
						|
	// Nothing to search
 | 
						|
	if len(lineParts) == 0 || len(n.children) == 0 {
 | 
						|
		return n.template
 | 
						|
	}
 | 
						|
 | 
						|
	// If last element is a wildcard, don't include in this search since it's sorted
 | 
						|
	// to the end but lexicographically it would not always be and sort.Search assumes
 | 
						|
	// the slice is sorted.
 | 
						|
	length := len(n.children)
 | 
						|
	if n.children[length-1].value == "*" {
 | 
						|
		length--
 | 
						|
	}
 | 
						|
 | 
						|
	// Find the index of child with an exact match
 | 
						|
	i := sort.Search(length, func(i int) bool {
 | 
						|
		return n.children[i].value >= lineParts[0]
 | 
						|
	})
 | 
						|
 | 
						|
	// Found an exact match, so search that child sub-tree
 | 
						|
	if i < len(n.children) && n.children[i].value == lineParts[0] {
 | 
						|
		return n.children[i].search(lineParts[1:])
 | 
						|
	}
 | 
						|
	// Not an exact match, see if we have a wildcard child to search
 | 
						|
	if n.children[len(n.children)-1].value == "*" {
 | 
						|
		return n.children[len(n.children)-1].search(lineParts[1:])
 | 
						|
	}
 | 
						|
	return n.template
 | 
						|
}
 | 
						|
 | 
						|
func (n *node) Search(line string) *template {
 | 
						|
	return n.search(strings.Split(line, "."))
 | 
						|
}
 | 
						|
 | 
						|
type nodes []*node
 | 
						|
 | 
						|
// Less returns a boolean indicating whether the filter at position j
 | 
						|
// is less than the filter at position k.  Filters are order by string
 | 
						|
// comparison of each component parts.  A wildcard value "*" is never
 | 
						|
// less than a non-wildcard value.
 | 
						|
//
 | 
						|
// For example, the filters:
 | 
						|
//             "*.*"
 | 
						|
//             "servers.*"
 | 
						|
//             "servers.localhost"
 | 
						|
//             "*.localhost"
 | 
						|
//
 | 
						|
// Would be sorted as:
 | 
						|
//             "servers.localhost"
 | 
						|
//             "servers.*"
 | 
						|
//             "*.localhost"
 | 
						|
//             "*.*"
 | 
						|
func (n *nodes) Less(j, k int) bool {
 | 
						|
	if (*n)[j].value == "*" && (*n)[k].value != "*" {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if (*n)[j].value != "*" && (*n)[k].value == "*" {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	return (*n)[j].value < (*n)[k].value
 | 
						|
}
 | 
						|
 | 
						|
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
 | 
						|
func (n *nodes) Len() int      { return len(*n) }
 |