Add support for dropwizard input format (#2846)
This commit is contained in:
parent
9cfa3b292b
commit
317de40ac4
2
Godeps
2
Godeps
|
@ -72,6 +72,8 @@ github.com/StackExchange/wmi f3e2bae1e0cb5aef83e319133eabfee30013a4a5
|
||||||
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
|
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6
|
||||||
github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94
|
github.com/stretchr/objx 1a9d0bb9f541897e62256577b352fdbc1fb4fd94
|
||||||
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
|
github.com/stretchr/testify 4d4bfba8f1d1027c4fdbe371823030df51419987
|
||||||
|
github.com/tidwall/gjson 0623bd8fbdbf97cc62b98d15108832851a658e59
|
||||||
|
github.com/tidwall/match 173748da739a410c5b0b813b956f89ff94730b4c
|
||||||
github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
|
github.com/vjeantet/grok d73e972b60935c7fec0b4ffbc904ed39ecaf7efe
|
||||||
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
github.com/wvanbergen/kafka bc265fedb9ff5b5c5d3c0fdcef4a819b3523d3ee
|
||||||
github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
|
github.com/wvanbergen/kazoo-go 968957352185472eacb69215fa3dbfcfdbac1096
|
||||||
|
|
|
@ -256,6 +256,7 @@ formats may be used with input plugins supporting the `data_format` option:
|
||||||
* [Value](./docs/DATA_FORMATS_INPUT.md#value)
|
* [Value](./docs/DATA_FORMATS_INPUT.md#value)
|
||||||
* [Nagios](./docs/DATA_FORMATS_INPUT.md#nagios)
|
* [Nagios](./docs/DATA_FORMATS_INPUT.md#nagios)
|
||||||
* [Collectd](./docs/DATA_FORMATS_INPUT.md#collectd)
|
* [Collectd](./docs/DATA_FORMATS_INPUT.md#collectd)
|
||||||
|
* [Dropwizard](./docs/DATA_FORMATS_INPUT.md#dropwizard)
|
||||||
|
|
||||||
## Processor Plugins
|
## Processor Plugins
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ Telegraf is able to parse the following input data formats into metrics:
|
||||||
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
1. [Value](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#value), ie: 45 or "booyah"
|
||||||
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
1. [Nagios](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#nagios) (exec input only)
|
||||||
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
|
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
|
||||||
|
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
|
||||||
|
|
||||||
Telegraf metrics, like InfluxDB
|
Telegraf metrics, like InfluxDB
|
||||||
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
|
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
|
||||||
|
@ -479,3 +480,176 @@ You can also change the path to the typesdb or add additional typesdb using
|
||||||
## Path of to TypesDB specifications
|
## Path of to TypesDB specifications
|
||||||
collectd_typesdb = ["/usr/share/collectd/types.db"]
|
collectd_typesdb = ["/usr/share/collectd/types.db"]
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# Dropwizard:
|
||||||
|
|
||||||
|
The dropwizard format can parse the JSON representation of a single dropwizard metric registry. By default, tags are parsed from metric names as if they were actual influxdb line protocol keys (`measurement<,tag_set>`) which can be overriden by defining custom [measurement & tag templates](./DATA_FORMATS_INPUT.md#measurement--tag-templates). All field values are collected as float64 fields.
|
||||||
|
|
||||||
|
A typical JSON of a dropwizard metric registry:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {
|
||||||
|
"measurement,tag1=green" : {
|
||||||
|
"count" : 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"meters" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1,
|
||||||
|
"m15_rate" : 1.0,
|
||||||
|
"m1_rate" : 1.0,
|
||||||
|
"m5_rate" : 1.0,
|
||||||
|
"mean_rate" : 1.0,
|
||||||
|
"units" : "events/second"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"gauges" : {
|
||||||
|
"measurement" : {
|
||||||
|
"value" : 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"histograms" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1,
|
||||||
|
"max" : 1.0,
|
||||||
|
"mean" : 1.0,
|
||||||
|
"min" : 1.0,
|
||||||
|
"p50" : 1.0,
|
||||||
|
"p75" : 1.0,
|
||||||
|
"p95" : 1.0,
|
||||||
|
"p98" : 1.0,
|
||||||
|
"p99" : 1.0,
|
||||||
|
"p999" : 1.0,
|
||||||
|
"stddev" : 1.0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"timers" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1,
|
||||||
|
"max" : 1.0,
|
||||||
|
"mean" : 1.0,
|
||||||
|
"min" : 1.0,
|
||||||
|
"p50" : 1.0,
|
||||||
|
"p75" : 1.0,
|
||||||
|
"p95" : 1.0,
|
||||||
|
"p98" : 1.0,
|
||||||
|
"p99" : 1.0,
|
||||||
|
"p999" : 1.0,
|
||||||
|
"stddev" : 1.0,
|
||||||
|
"m15_rate" : 1.0,
|
||||||
|
"m1_rate" : 1.0,
|
||||||
|
"m5_rate" : 1.0,
|
||||||
|
"mean_rate" : 1.0,
|
||||||
|
"duration_units" : "seconds",
|
||||||
|
"rate_units" : "calls/second"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Would get translated into 4 different measurements:
|
||||||
|
|
||||||
|
```
|
||||||
|
measurement,metric_type=counter,tag1=green count=1
|
||||||
|
measurement,metric_type=meter count=1,m15_rate=1.0,m1_rate=1.0,m5_rate=1.0,mean_rate=1.0
|
||||||
|
measurement,metric_type=gauge value=1
|
||||||
|
measurement,metric_type=histogram count=1,max=1.0,mean=1.0,min=1.0,p50=1.0,p75=1.0,p95=1.0,p98=1.0,p99=1.0,p999=1.0
|
||||||
|
measurement,metric_type=timer count=1,max=1.0,mean=1.0,min=1.0,p50=1.0,p75=1.0,p95=1.0,p98=1.0,p99=1.0,p999=1.0,stddev=1.0,m15_rate=1.0,m1_rate=1.0,m5_rate=1.0,mean_rate=1.0
|
||||||
|
```
|
||||||
|
|
||||||
|
You may also parse a dropwizard registry from any JSON document which contains a dropwizard registry in some inner field.
|
||||||
|
Eg. to parse the following JSON document:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"time" : "2017-02-22T14:33:03.662+02:00",
|
||||||
|
"tags" : {
|
||||||
|
"tag1" : "green",
|
||||||
|
"tag2" : "yellow"
|
||||||
|
},
|
||||||
|
"metrics" : {
|
||||||
|
"counters" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
and translate it into:
|
||||||
|
|
||||||
|
```
|
||||||
|
measurement,metric_type=counter,tag1=green,tag2=yellow count=1 1487766783662000000
|
||||||
|
```
|
||||||
|
|
||||||
|
you simply need to use the following additional configuration properties:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
dropwizard_metric_registry_path = "metrics"
|
||||||
|
dropwizard_time_path = "time"
|
||||||
|
dropwizard_time_format = "2006-01-02T15:04:05Z07:00"
|
||||||
|
dropwizard_tags_path = "tags"
|
||||||
|
## tag paths per tag are supported too, eg.
|
||||||
|
#[inputs.yourinput.dropwizard_tag_paths]
|
||||||
|
# tag1 = "tags.tag1"
|
||||||
|
# tag2 = "tags.tag2"
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
For more information about the dropwizard json format see
|
||||||
|
[here](http://metrics.dropwizard.io/3.1.0/manual/json/).
|
||||||
|
|
||||||
|
#### Dropwizard Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.exec]]
|
||||||
|
## Commands array
|
||||||
|
commands = ["curl http://localhost:8080/sys/metrics"]
|
||||||
|
timeout = "5s"
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
data_format = "dropwizard"
|
||||||
|
|
||||||
|
## Used by the templating engine to join matched values when cardinality is > 1
|
||||||
|
separator = "_"
|
||||||
|
|
||||||
|
## Each template line requires a template pattern. It can have an optional
|
||||||
|
## filter before the template and separated by spaces. It can also have optional extra
|
||||||
|
## tags following the template. Multiple tags should be separated by commas and no spaces
|
||||||
|
## similar to the line protocol format. There can be only one default template.
|
||||||
|
## Templates support below format:
|
||||||
|
## 1. filter + template
|
||||||
|
## 2. filter + template + extra tag(s)
|
||||||
|
## 3. filter + template with field key
|
||||||
|
## 4. default template
|
||||||
|
## By providing an empty template array, templating is disabled and measurements are parsed as influxdb line protocol keys (measurement<,tag_set>)
|
||||||
|
templates = []
|
||||||
|
|
||||||
|
## You may use an appropriate [gjson path](https://github.com/tidwall/gjson#path-syntax)
|
||||||
|
## to locate the metric registry within the JSON document
|
||||||
|
# dropwizard_metric_registry_path = "metrics"
|
||||||
|
|
||||||
|
## You may use an appropriate [gjson path](https://github.com/tidwall/gjson#path-syntax)
|
||||||
|
## to locate the default time of the measurements within the JSON document
|
||||||
|
# dropwizard_time_path = "time"
|
||||||
|
# dropwizard_time_format = "2006-01-02T15:04:05Z07:00"
|
||||||
|
|
||||||
|
## You may use an appropriate [gjson path](https://github.com/tidwall/gjson#path-syntax)
|
||||||
|
## to locate the tags map within the JSON document
|
||||||
|
# dropwizard_tags_path = "tags"
|
||||||
|
|
||||||
|
## You may even use tag paths per tag
|
||||||
|
# [inputs.exec.dropwizard_tag_paths]
|
||||||
|
# tag1 = "tags.tag1"
|
||||||
|
# tag2 = "tags.tag2"
|
||||||
|
|
||||||
|
```
|
|
@ -1272,6 +1272,47 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if node, ok := tbl.Fields["dropwizard_metric_registry_path"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.DropwizardMetricRegistryPath = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if node, ok := tbl.Fields["dropwizard_time_path"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.DropwizardTimePath = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if node, ok := tbl.Fields["dropwizard_time_format"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.DropwizardTimeFormat = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if node, ok := tbl.Fields["dropwizard_tags_path"]; ok {
|
||||||
|
if kv, ok := node.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.DropwizardTagsPath = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.DropwizardTagPathsMap = make(map[string]string)
|
||||||
|
if node, ok := tbl.Fields["dropwizard_tag_paths"]; ok {
|
||||||
|
if subtbl, ok := node.(*ast.Table); ok {
|
||||||
|
for name, val := range subtbl.Fields {
|
||||||
|
if kv, ok := val.(*ast.KeyValue); ok {
|
||||||
|
if str, ok := kv.Value.(*ast.String); ok {
|
||||||
|
c.DropwizardTagPathsMap[name] = str.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
c.MetricName = name
|
c.MetricName = name
|
||||||
|
|
||||||
delete(tbl.Fields, "data_format")
|
delete(tbl.Fields, "data_format")
|
||||||
|
@ -1282,6 +1323,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
|
||||||
delete(tbl.Fields, "collectd_auth_file")
|
delete(tbl.Fields, "collectd_auth_file")
|
||||||
delete(tbl.Fields, "collectd_security_level")
|
delete(tbl.Fields, "collectd_security_level")
|
||||||
delete(tbl.Fields, "collectd_typesdb")
|
delete(tbl.Fields, "collectd_typesdb")
|
||||||
|
delete(tbl.Fields, "dropwizard_metric_registry_path")
|
||||||
|
delete(tbl.Fields, "dropwizard_time_path")
|
||||||
|
delete(tbl.Fields, "dropwizard_time_format")
|
||||||
|
delete(tbl.Fields, "dropwizard_tags_path")
|
||||||
|
delete(tbl.Fields, "dropwizard_tag_paths")
|
||||||
|
|
||||||
return parsers.NewParser(c)
|
return parsers.NewParser(c)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
package templating
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultSeparator is the default separation character to use when separating template parts.
|
||||||
|
DefaultSeparator = "."
|
||||||
|
)
|
||||||
|
|
||||||
|
// Engine uses a Matcher to retrieve the appropriate template and applies the template
|
||||||
|
// to the input string
|
||||||
|
type Engine struct {
|
||||||
|
joiner string
|
||||||
|
matcher *matcher
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply extracts the template fields from the given line and returns the measurement
|
||||||
|
// name, tags and field name
|
||||||
|
func (e *Engine) Apply(line string) (string, map[string]string, string, error) {
|
||||||
|
return e.matcher.match(line).Apply(line, e.joiner)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEngine creates a new templating engine
|
||||||
|
func NewEngine(joiner string, defaultTemplate *Template, templates []string) (*Engine, error) {
|
||||||
|
engine := Engine{
|
||||||
|
joiner: joiner,
|
||||||
|
matcher: newMatcher(defaultTemplate),
|
||||||
|
}
|
||||||
|
templateSpecs := parseTemplateSpecs(templates)
|
||||||
|
|
||||||
|
for _, templateSpec := range templateSpecs {
|
||||||
|
if err := engine.matcher.addSpec(templateSpec); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &engine, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTemplateSpecs(templates []string) templateSpecs {
|
||||||
|
tmplts := templateSpecs{}
|
||||||
|
for _, pattern := range templates {
|
||||||
|
tmplt := templateSpec{
|
||||||
|
separator: DefaultSeparator,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Format is [separator] [filter] <template> [tag1=value1,tag2=value2]
|
||||||
|
parts := strings.Fields(pattern)
|
||||||
|
partsLength := len(parts)
|
||||||
|
if partsLength < 1 {
|
||||||
|
// ignore
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if partsLength == 1 {
|
||||||
|
tmplt.template = pattern
|
||||||
|
} else if partsLength == 4 {
|
||||||
|
tmplt.separator = parts[0]
|
||||||
|
tmplt.filter = parts[1]
|
||||||
|
tmplt.template = parts[2]
|
||||||
|
tmplt.tagstring = parts[3]
|
||||||
|
} else {
|
||||||
|
hasTagstring := strings.Contains(parts[partsLength-1], "=")
|
||||||
|
if hasTagstring {
|
||||||
|
tmplt.tagstring = parts[partsLength-1]
|
||||||
|
tmplt.template = parts[partsLength-2]
|
||||||
|
if partsLength == 3 {
|
||||||
|
tmplt.filter = parts[0]
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tmplt.template = parts[partsLength-1]
|
||||||
|
if partsLength == 2 {
|
||||||
|
tmplt.filter = parts[0]
|
||||||
|
} else { // length == 3
|
||||||
|
tmplt.separator = parts[0]
|
||||||
|
tmplt.filter = parts[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tmplts = append(tmplts, tmplt)
|
||||||
|
}
|
||||||
|
sort.Sort(tmplts)
|
||||||
|
return tmplts
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
package templating
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// matcher determines which template should be applied to a given metric
|
||||||
|
// based on a filter tree.
|
||||||
|
type matcher struct {
|
||||||
|
root *node
|
||||||
|
defaultTemplate *Template
|
||||||
|
}
|
||||||
|
|
||||||
|
// newMatcher creates a new matcher.
|
||||||
|
func newMatcher(defaultTemplate *Template) *matcher {
|
||||||
|
return &matcher{
|
||||||
|
root: &node{},
|
||||||
|
defaultTemplate: defaultTemplate,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *matcher) addSpec(tmplt templateSpec) error {
|
||||||
|
// Parse out the default tags specific to this template
|
||||||
|
tags := map[string]string{}
|
||||||
|
if tmplt.tagstring != "" {
|
||||||
|
for _, kv := range strings.Split(tmplt.tagstring, ",") {
|
||||||
|
parts := strings.Split(kv, "=")
|
||||||
|
tags[parts[0]] = parts[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpl, err := NewTemplate(tmplt.separator, tmplt.template, tags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.add(tmplt.filter, tmpl)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// add inserts the template in the filter tree based the given filter
|
||||||
|
func (m *matcher) add(filter string, template *Template) {
|
||||||
|
if filter == "" {
|
||||||
|
m.defaultTemplate = template
|
||||||
|
m.root.separator = template.separator
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.root.insert(filter, template)
|
||||||
|
}
|
||||||
|
|
||||||
|
// match returns the template that matches the given measurement line.
|
||||||
|
// If no template matches, the default template is returned.
|
||||||
|
func (m *matcher) match(line string) *Template {
|
||||||
|
tmpl := m.root.search(line)
|
||||||
|
if tmpl != nil {
|
||||||
|
return tmpl
|
||||||
|
}
|
||||||
|
return m.defaultTemplate
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
package templating
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// node is an item in a sorted k-ary tree of filter parts. Each child is sorted by its part value.
|
||||||
|
// The special value of "*", is always sorted last.
|
||||||
|
type node struct {
|
||||||
|
separator string
|
||||||
|
value string
|
||||||
|
children nodes
|
||||||
|
template *Template
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert inserts the given string template into the tree. The filter string is separated
|
||||||
|
// on the template separator and each part is used as the path in the tree.
|
||||||
|
func (n *node) insert(filter string, template *Template) {
|
||||||
|
n.separator = template.separator
|
||||||
|
n.recursiveInsert(strings.Split(filter, n.separator), template)
|
||||||
|
}
|
||||||
|
|
||||||
|
// recursiveInsert does the actual recursive insertion
|
||||||
|
func (n *node) recursiveInsert(values []string, template *Template) {
|
||||||
|
// Add the end, set the template
|
||||||
|
if len(values) == 0 {
|
||||||
|
n.template = template
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if the the current element already exists in the tree. If so, insert the
|
||||||
|
// into that sub-tree
|
||||||
|
for _, v := range n.children {
|
||||||
|
if v.value == values[0] {
|
||||||
|
v.recursiveInsert(values[1:], template)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New element, add it to the tree and sort the children
|
||||||
|
newNode := &node{value: values[0]}
|
||||||
|
n.children = append(n.children, newNode)
|
||||||
|
sort.Sort(&n.children)
|
||||||
|
|
||||||
|
// Now insert the rest of the tree into the new element
|
||||||
|
newNode.recursiveInsert(values[1:], template)
|
||||||
|
}
|
||||||
|
|
||||||
|
// search searches for a template matching the input string
|
||||||
|
func (n *node) search(line string) *Template {
|
||||||
|
separator := n.separator
|
||||||
|
return n.recursiveSearch(strings.Split(line, separator))
|
||||||
|
}
|
||||||
|
|
||||||
|
// recursiveSearch performs the actual recursive search
|
||||||
|
func (n *node) recursiveSearch(lineParts []string) *Template {
|
||||||
|
// Nothing to search
|
||||||
|
if len(lineParts) == 0 || len(n.children) == 0 {
|
||||||
|
return n.template
|
||||||
|
}
|
||||||
|
|
||||||
|
// If last element is a wildcard, don't include it in this search since it's sorted
|
||||||
|
// to the end but lexicographically it would not always be and sort.Search assumes
|
||||||
|
// the slice is sorted.
|
||||||
|
length := len(n.children)
|
||||||
|
if n.children[length-1].value == "*" {
|
||||||
|
length--
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the index of child with an exact match
|
||||||
|
i := sort.Search(length, func(i int) bool {
|
||||||
|
return n.children[i].value >= lineParts[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
// Found an exact match, so search that child sub-tree
|
||||||
|
if i < len(n.children) && n.children[i].value == lineParts[0] {
|
||||||
|
return n.children[i].recursiveSearch(lineParts[1:])
|
||||||
|
}
|
||||||
|
// Not an exact match, see if we have a wildcard child to search
|
||||||
|
if n.children[len(n.children)-1].value == "*" {
|
||||||
|
return n.children[len(n.children)-1].recursiveSearch(lineParts[1:])
|
||||||
|
}
|
||||||
|
return n.template
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodes is simply an array of nodes implementing the sorting interface.
|
||||||
|
type nodes []*node
|
||||||
|
|
||||||
|
// Less returns a boolean indicating whether the filter at position j
|
||||||
|
// is less than the filter at position k. Filters are order by string
|
||||||
|
// comparison of each component parts. A wildcard value "*" is never
|
||||||
|
// less than a non-wildcard value.
|
||||||
|
//
|
||||||
|
// For example, the filters:
|
||||||
|
// "*.*"
|
||||||
|
// "servers.*"
|
||||||
|
// "servers.localhost"
|
||||||
|
// "*.localhost"
|
||||||
|
//
|
||||||
|
// Would be sorted as:
|
||||||
|
// "servers.localhost"
|
||||||
|
// "servers.*"
|
||||||
|
// "*.localhost"
|
||||||
|
// "*.*"
|
||||||
|
func (n *nodes) Less(j, k int) bool {
|
||||||
|
if (*n)[j].value == "*" && (*n)[k].value != "*" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*n)[j].value != "*" && (*n)[k].value == "*" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return (*n)[j].value < (*n)[k].value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap swaps two elements of the array
|
||||||
|
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
|
||||||
|
|
||||||
|
// Len returns the length of the array
|
||||||
|
func (n *nodes) Len() int { return len(*n) }
|
|
@ -0,0 +1,148 @@
|
||||||
|
package templating
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Template represents a pattern and tags to map a metric string to a influxdb Point
|
||||||
|
type Template struct {
|
||||||
|
separator string
|
||||||
|
parts []string
|
||||||
|
defaultTags map[string]string
|
||||||
|
greedyField bool
|
||||||
|
greedyMeasurement bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// apply extracts the template fields from the given line and returns the measurement
|
||||||
|
// name, tags and field name
|
||||||
|
func (t *Template) Apply(line string, joiner string) (string, map[string]string, string, error) {
|
||||||
|
fields := strings.Split(line, t.separator)
|
||||||
|
var (
|
||||||
|
measurement []string
|
||||||
|
tags = make(map[string][]string)
|
||||||
|
field []string
|
||||||
|
)
|
||||||
|
|
||||||
|
// Set any default tags
|
||||||
|
for k, v := range t.defaultTags {
|
||||||
|
tags[k] = append(tags[k], v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if an invalid combination has been specified in the template:
|
||||||
|
for _, tag := range t.parts {
|
||||||
|
if tag == "measurement*" {
|
||||||
|
t.greedyMeasurement = true
|
||||||
|
} else if tag == "field*" {
|
||||||
|
t.greedyField = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if t.greedyField && t.greedyMeasurement {
|
||||||
|
return "", nil, "",
|
||||||
|
fmt.Errorf("either 'field*' or 'measurement*' can be used in each "+
|
||||||
|
"template (but not both together): %q",
|
||||||
|
strings.Join(t.parts, joiner))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tag := range t.parts {
|
||||||
|
if i >= len(fields) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if tag == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch tag {
|
||||||
|
case "measurement":
|
||||||
|
measurement = append(measurement, fields[i])
|
||||||
|
case "field":
|
||||||
|
field = append(field, fields[i])
|
||||||
|
case "field*":
|
||||||
|
field = append(field, fields[i:]...)
|
||||||
|
break
|
||||||
|
case "measurement*":
|
||||||
|
measurement = append(measurement, fields[i:]...)
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
tags[tag] = append(tags[tag], fields[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to map of strings.
|
||||||
|
outtags := make(map[string]string)
|
||||||
|
for k, values := range tags {
|
||||||
|
outtags[k] = strings.Join(values, joiner)
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(measurement, joiner), outtags, strings.Join(field, joiner), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDefaultTemplateWithPattern(pattern string) (*Template, error) {
|
||||||
|
return NewTemplate(DefaultSeparator, pattern, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTemplate returns a new template ensuring it has a measurement
|
||||||
|
// specified.
|
||||||
|
func NewTemplate(separator string, pattern string, defaultTags map[string]string) (*Template, error) {
|
||||||
|
parts := strings.Split(pattern, separator)
|
||||||
|
hasMeasurement := false
|
||||||
|
template := &Template{
|
||||||
|
separator: separator,
|
||||||
|
parts: parts,
|
||||||
|
defaultTags: defaultTags,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, part := range parts {
|
||||||
|
if strings.HasPrefix(part, "measurement") {
|
||||||
|
hasMeasurement = true
|
||||||
|
}
|
||||||
|
if part == "measurement*" {
|
||||||
|
template.greedyMeasurement = true
|
||||||
|
} else if part == "field*" {
|
||||||
|
template.greedyField = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasMeasurement {
|
||||||
|
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
|
||||||
|
}
|
||||||
|
|
||||||
|
return template, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// templateSpec is a template string split in its constituent parts
|
||||||
|
type templateSpec struct {
|
||||||
|
separator string
|
||||||
|
filter string
|
||||||
|
template string
|
||||||
|
tagstring string
|
||||||
|
}
|
||||||
|
|
||||||
|
// templateSpecs is simply an array of template specs implementing the sorting interface
|
||||||
|
type templateSpecs []templateSpec
|
||||||
|
|
||||||
|
// Less reports whether the element with
|
||||||
|
// index j should sort before the element with index k.
|
||||||
|
func (e templateSpecs) Less(j, k int) bool {
|
||||||
|
if len(e[j].filter) == 0 && len(e[k].filter) == 0 {
|
||||||
|
jlength := len(strings.Split(e[j].template, e[j].separator))
|
||||||
|
klength := len(strings.Split(e[k].template, e[k].separator))
|
||||||
|
return jlength < klength
|
||||||
|
}
|
||||||
|
if len(e[j].filter) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if len(e[k].filter) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
jlength := len(strings.Split(e[j].template, e[j].separator))
|
||||||
|
klength := len(strings.Split(e[k].template, e[k].separator))
|
||||||
|
return jlength < klength
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap swaps the elements with indexes i and j.
|
||||||
|
func (e templateSpecs) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
|
||||||
|
|
||||||
|
// Len is the number of elements in the collection.
|
||||||
|
func (e templateSpecs) Len() int { return len(e) }
|
|
@ -0,0 +1,253 @@
|
||||||
|
package dropwizard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/templating"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Parser parses json inputs containing dropwizard metrics,
|
||||||
|
// either top-level or embedded inside a json field.
|
||||||
|
// This parser is using gjon for retrieving paths within the json file.
|
||||||
|
type Parser struct {
|
||||||
|
|
||||||
|
// an optional json path containing the metric registry object
|
||||||
|
// if left empty, the whole json object is parsed as a metric registry
|
||||||
|
MetricRegistryPath string
|
||||||
|
|
||||||
|
// an optional json path containing the default time of the metrics
|
||||||
|
// if left empty, or if cannot be parsed the current processing time is used as the time of the metrics
|
||||||
|
TimePath string
|
||||||
|
|
||||||
|
// time format to use for parsing the time field
|
||||||
|
// defaults to time.RFC3339
|
||||||
|
TimeFormat string
|
||||||
|
|
||||||
|
// an optional json path pointing to a json object with tag key/value pairs
|
||||||
|
// takes precedence over TagPathsMap
|
||||||
|
TagsPath string
|
||||||
|
|
||||||
|
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||||
|
// used if TagsPath is empty or doesn't return any tags
|
||||||
|
TagPathsMap map[string]string
|
||||||
|
|
||||||
|
// an optional map of default tags to use for metrics
|
||||||
|
DefaultTags map[string]string
|
||||||
|
|
||||||
|
// templating configuration
|
||||||
|
Separator string
|
||||||
|
Templates []string
|
||||||
|
|
||||||
|
templateEngine *templating.Engine
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse parses the input bytes to an array of metrics
|
||||||
|
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
|
|
||||||
|
metrics := make([]telegraf.Metric, 0)
|
||||||
|
|
||||||
|
metricTime, err := p.parseTime(buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dwr, err := p.unmarshalMetrics(buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics = p.readDWMetrics("counter", dwr["counters"], metrics, metricTime)
|
||||||
|
metrics = p.readDWMetrics("meter", dwr["meters"], metrics, metricTime)
|
||||||
|
metrics = p.readDWMetrics("gauge", dwr["gauges"], metrics, metricTime)
|
||||||
|
metrics = p.readDWMetrics("histogram", dwr["histograms"], metrics, metricTime)
|
||||||
|
metrics = p.readDWMetrics("timer", dwr["timers"], metrics, metricTime)
|
||||||
|
|
||||||
|
jsonTags := p.readTags(buf)
|
||||||
|
|
||||||
|
// fill json tags first
|
||||||
|
if len(jsonTags) > 0 {
|
||||||
|
for _, m := range metrics {
|
||||||
|
for k, v := range jsonTags {
|
||||||
|
// only set the tag if it doesn't already exist:
|
||||||
|
if !m.HasTag(k) {
|
||||||
|
m.AddTag(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// fill default tags last
|
||||||
|
if len(p.DefaultTags) > 0 {
|
||||||
|
for _, m := range metrics {
|
||||||
|
for k, v := range p.DefaultTags {
|
||||||
|
// only set the default tag if it doesn't already exist:
|
||||||
|
if !m.HasTag(k) {
|
||||||
|
m.AddTag(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitTemplating initializes the templating support
|
||||||
|
func (p *Parser) InitTemplating() error {
|
||||||
|
if len(p.Templates) > 0 {
|
||||||
|
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
|
||||||
|
templateEngine, err := templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
|
||||||
|
p.templateEngine = templateEngine
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseLine is not supported by the dropwizard format
|
||||||
|
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
|
return nil, fmt.Errorf("ParseLine not supported: %s, for data format: dropwizard", line)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDefaultTags sets the default tags
|
||||||
|
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||||
|
p.DefaultTags = tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) readTags(buf []byte) map[string]string {
|
||||||
|
|
||||||
|
if p.TagsPath != "" {
|
||||||
|
var tagsBytes []byte
|
||||||
|
tagsResult := gjson.GetBytes(buf, p.TagsPath)
|
||||||
|
if tagsResult.Index > 0 {
|
||||||
|
tagsBytes = buf[tagsResult.Index : tagsResult.Index+len(tagsResult.Raw)]
|
||||||
|
} else {
|
||||||
|
tagsBytes = []byte(tagsResult.Raw)
|
||||||
|
}
|
||||||
|
var tags map[string]string
|
||||||
|
err := json.Unmarshal(tagsBytes, &tags)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("W! failed to parse tags from JSON path '%s': %s\n", p.TagsPath, err)
|
||||||
|
} else if len(tags) > 0 {
|
||||||
|
return tags
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
for tagKey, jsonPath := range p.TagPathsMap {
|
||||||
|
tags[tagKey] = gjson.GetBytes(buf, jsonPath).String()
|
||||||
|
}
|
||||||
|
return tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) parseTime(buf []byte) (time.Time, error) {
|
||||||
|
|
||||||
|
if p.TimePath != "" {
|
||||||
|
timeFormat := p.TimeFormat
|
||||||
|
if timeFormat == "" {
|
||||||
|
timeFormat = time.RFC3339
|
||||||
|
}
|
||||||
|
timeString := gjson.GetBytes(buf, p.TimePath).String()
|
||||||
|
if timeString == "" {
|
||||||
|
err := fmt.Errorf("time not found in JSON path %s", p.TimePath)
|
||||||
|
return time.Now().UTC(), err
|
||||||
|
}
|
||||||
|
t, err := time.Parse(timeFormat, timeString)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("time %s cannot be parsed with format %s, %s", timeString, timeFormat, err)
|
||||||
|
return time.Now().UTC(), err
|
||||||
|
}
|
||||||
|
return t.UTC(), nil
|
||||||
|
}
|
||||||
|
return time.Now().UTC(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) unmarshalMetrics(buf []byte) (map[string]interface{}, error) {
|
||||||
|
|
||||||
|
var registryBytes []byte
|
||||||
|
if p.MetricRegistryPath != "" {
|
||||||
|
regResult := gjson.GetBytes(buf, p.MetricRegistryPath)
|
||||||
|
if regResult.Index > 0 {
|
||||||
|
registryBytes = buf[regResult.Index : regResult.Index+len(regResult.Raw)]
|
||||||
|
} else {
|
||||||
|
registryBytes = []byte(regResult.Raw)
|
||||||
|
}
|
||||||
|
if len(registryBytes) == 0 {
|
||||||
|
err := fmt.Errorf("metric registry not found in JSON path %s", p.MetricRegistryPath)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
registryBytes = buf
|
||||||
|
}
|
||||||
|
var jsonOut map[string]interface{}
|
||||||
|
err := json.Unmarshal(registryBytes, &jsonOut)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("unable to parse dropwizard metric registry from JSON document, %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return jsonOut, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) readDWMetrics(metricType string, dwms interface{}, metrics []telegraf.Metric, time time.Time) []telegraf.Metric {
|
||||||
|
|
||||||
|
switch dwmsTyped := dwms.(type) {
|
||||||
|
case map[string]interface{}:
|
||||||
|
var metricsBuffer bytes.Buffer
|
||||||
|
for dwmName, dwmFields := range dwmsTyped {
|
||||||
|
measurementName := dwmName
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fieldPrefix := ""
|
||||||
|
if p.templateEngine != nil {
|
||||||
|
measurementName, tags, fieldPrefix, _ = p.templateEngine.Apply(dwmName)
|
||||||
|
if len(fieldPrefix) > 0 {
|
||||||
|
fieldPrefix = fmt.Sprintf("%s%s", fieldPrefix, p.Separator)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tags["metric_type"] = metricType
|
||||||
|
|
||||||
|
measurementWithTags := measurementName
|
||||||
|
for tagName, tagValue := range tags {
|
||||||
|
tagKeyValue := fmt.Sprintf("%s=%s", tagName, tagValue)
|
||||||
|
measurementWithTags = fmt.Sprintf("%s,%s", measurementWithTags, tagKeyValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := make([]string, 0)
|
||||||
|
switch t := dwmFields.(type) {
|
||||||
|
case map[string]interface{}: // json object
|
||||||
|
for fieldName, fieldValue := range t {
|
||||||
|
|
||||||
|
switch v := fieldValue.(type) {
|
||||||
|
case float64:
|
||||||
|
fields = append(fields, fmt.Sprintf("%s%s=%f", fieldPrefix, fieldName, v))
|
||||||
|
default: // ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default: // ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsBuffer.WriteString(fmt.Sprintf("%s,metric_type=%s ", measurementWithTags, metricType))
|
||||||
|
metricsBuffer.WriteString(strings.Join(fields, ","))
|
||||||
|
metricsBuffer.WriteString("\n")
|
||||||
|
}
|
||||||
|
newMetrics, err := metric.ParseWithDefaultTime(metricsBuffer.Bytes(), time)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("W! failed to create metric of type '%s': %s\n", metricType, err)
|
||||||
|
}
|
||||||
|
return append(metrics, newMetrics...)
|
||||||
|
default:
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func arraymap(vs []string, f func(string) string) []string {
|
||||||
|
vsm := make([]string, len(vs))
|
||||||
|
for i, v := range vs {
|
||||||
|
vsm[i] = f(v)
|
||||||
|
}
|
||||||
|
return vsm
|
||||||
|
}
|
|
@ -0,0 +1,485 @@
|
||||||
|
package dropwizard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// validEmptyJSON is a valid dropwizard json document, but without any metrics
|
||||||
|
const validEmptyJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidEmptyJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
// Most basic vanilla test
|
||||||
|
metrics, err := parser.Parse([]byte(validEmptyJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// validCounterJSON is a valid dropwizard json document containing one counter
|
||||||
|
const validCounterJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidCounterJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validCounterJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(1),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "counter"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validEmbeddedCounterJSON is a valid json document containing separate fields for dropwizard metrics, tags and time override.
|
||||||
|
const validEmbeddedCounterJSON = `
|
||||||
|
{
|
||||||
|
"time" : "2017-02-22T14:33:03.662+02:00",
|
||||||
|
"tags" : {
|
||||||
|
"tag1" : "green",
|
||||||
|
"tag2" : "yellow"
|
||||||
|
},
|
||||||
|
"metrics" : {
|
||||||
|
"counters" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidEmbeddedCounterJSON(t *testing.T) {
|
||||||
|
timeFormat := "2006-01-02T15:04:05Z07:00"
|
||||||
|
metricTime, _ := time.Parse(timeFormat, "2017-02-22T15:33:03.662+03:00")
|
||||||
|
parser := Parser{
|
||||||
|
MetricRegistryPath: "metrics",
|
||||||
|
TagsPath: "tags",
|
||||||
|
TimePath: "time",
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validEmbeddedCounterJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(1),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green", "tag2": "yellow"}, metrics[0].Tags())
|
||||||
|
assert.True(t, metricTime.Equal(metrics[0].Time()), fmt.Sprintf("%s should be equal to %s", metrics[0].Time(), metricTime))
|
||||||
|
|
||||||
|
// now test json tags through TagPathsMap
|
||||||
|
parser2 := Parser{
|
||||||
|
MetricRegistryPath: "metrics",
|
||||||
|
TagPathsMap: map[string]string{"tag1": "tags.tag1"},
|
||||||
|
TimePath: "time",
|
||||||
|
}
|
||||||
|
metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON))
|
||||||
|
assert.NoError(t, err2)
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validMeterJSON1 is a valid dropwizard json document containing one meter
|
||||||
|
const validMeterJSON1 = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {
|
||||||
|
"measurement1" : {
|
||||||
|
"count" : 1,
|
||||||
|
"m15_rate" : 1.0,
|
||||||
|
"m1_rate" : 1.0,
|
||||||
|
"m5_rate" : 1.0,
|
||||||
|
"mean_rate" : 1.0,
|
||||||
|
"units" : "events/second"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidMeterJSON1(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validMeterJSON1))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement1", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(1),
|
||||||
|
"m15_rate": float64(1),
|
||||||
|
"m1_rate": float64(1),
|
||||||
|
"m5_rate": float64(1),
|
||||||
|
"mean_rate": float64(1),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "meter"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validMeterJSON2 is a valid dropwizard json document containing one meter with one tag
|
||||||
|
const validMeterJSON2 = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {
|
||||||
|
"measurement2,key=value" : {
|
||||||
|
"count" : 2,
|
||||||
|
"m15_rate" : 2.0,
|
||||||
|
"m1_rate" : 2.0,
|
||||||
|
"m5_rate" : 2.0,
|
||||||
|
"mean_rate" : 2.0,
|
||||||
|
"units" : "events/second"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidMeterJSON2(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validMeterJSON2))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement2", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(2),
|
||||||
|
"m15_rate": float64(2),
|
||||||
|
"m1_rate": float64(2),
|
||||||
|
"m5_rate": float64(2),
|
||||||
|
"mean_rate": float64(2),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "meter", "key": "value"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validGaugeJSON is a valid dropwizard json document containing one gauge
|
||||||
|
const validGaugeJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {
|
||||||
|
"measurement" : {
|
||||||
|
"value" : 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidGaugeJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validGaugeJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"value": float64(0),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "gauge"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validHistogramJSON is a valid dropwizard json document containing one histogram
|
||||||
|
const validHistogramJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1,
|
||||||
|
"max" : 2,
|
||||||
|
"mean" : 3,
|
||||||
|
"min" : 4,
|
||||||
|
"p50" : 5,
|
||||||
|
"p75" : 6,
|
||||||
|
"p95" : 7,
|
||||||
|
"p98" : 8,
|
||||||
|
"p99" : 9,
|
||||||
|
"p999" : 10,
|
||||||
|
"stddev" : 11
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidHistogramJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validHistogramJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(1),
|
||||||
|
"max": float64(2),
|
||||||
|
"mean": float64(3),
|
||||||
|
"min": float64(4),
|
||||||
|
"p50": float64(5),
|
||||||
|
"p75": float64(6),
|
||||||
|
"p95": float64(7),
|
||||||
|
"p98": float64(8),
|
||||||
|
"p99": float64(9),
|
||||||
|
"p999": float64(10),
|
||||||
|
"stddev": float64(11),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "histogram"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validTimerJSON is a valid dropwizard json document containing one timer
|
||||||
|
const validTimerJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {},
|
||||||
|
"histograms" : {},
|
||||||
|
"timers" : {
|
||||||
|
"measurement" : {
|
||||||
|
"count" : 1,
|
||||||
|
"max" : 2,
|
||||||
|
"mean" : 3,
|
||||||
|
"min" : 4,
|
||||||
|
"p50" : 5,
|
||||||
|
"p75" : 6,
|
||||||
|
"p95" : 7,
|
||||||
|
"p98" : 8,
|
||||||
|
"p99" : 9,
|
||||||
|
"p999" : 10,
|
||||||
|
"stddev" : 11,
|
||||||
|
"m15_rate" : 12,
|
||||||
|
"m1_rate" : 13,
|
||||||
|
"m5_rate" : 14,
|
||||||
|
"mean_rate" : 15,
|
||||||
|
"duration_units" : "seconds",
|
||||||
|
"rate_units" : "calls/second"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidTimerJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validTimerJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "measurement", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"count": float64(1),
|
||||||
|
"max": float64(2),
|
||||||
|
"mean": float64(3),
|
||||||
|
"min": float64(4),
|
||||||
|
"p50": float64(5),
|
||||||
|
"p75": float64(6),
|
||||||
|
"p95": float64(7),
|
||||||
|
"p98": float64(8),
|
||||||
|
"p99": float64(9),
|
||||||
|
"p999": float64(10),
|
||||||
|
"stddev": float64(11),
|
||||||
|
"m15_rate": float64(12),
|
||||||
|
"m1_rate": float64(13),
|
||||||
|
"m5_rate": float64(14),
|
||||||
|
"mean_rate": float64(15),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "timer"}, metrics[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// validAllJSON is a valid dropwizard json document containing one metric of each type
|
||||||
|
const validAllJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {
|
||||||
|
"measurement" : {"count" : 1}
|
||||||
|
},
|
||||||
|
"meters" : {
|
||||||
|
"measurement" : {"count" : 1}
|
||||||
|
},
|
||||||
|
"gauges" : {
|
||||||
|
"measurement" : {"value" : 1}
|
||||||
|
},
|
||||||
|
"histograms" : {
|
||||||
|
"measurement" : {"count" : 1}
|
||||||
|
},
|
||||||
|
"timers" : {
|
||||||
|
"measurement" : {"count" : 1}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseValidAllJSON(t *testing.T) {
|
||||||
|
parser := Parser{}
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(validAllJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTagParsingProblems(t *testing.T) {
|
||||||
|
// giving a wrong path results in empty tags
|
||||||
|
parser1 := Parser{MetricRegistryPath: "metrics", TagsPath: "tags1"}
|
||||||
|
metrics1, err1 := parser1.Parse([]byte(validEmbeddedCounterJSON))
|
||||||
|
assert.NoError(t, err1)
|
||||||
|
assert.Len(t, metrics1, 1)
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "counter"}, metrics1[0].Tags())
|
||||||
|
|
||||||
|
// giving a wrong TagsPath falls back to TagPathsMap
|
||||||
|
parser2 := Parser{
|
||||||
|
MetricRegistryPath: "metrics",
|
||||||
|
TagsPath: "tags1",
|
||||||
|
TagPathsMap: map[string]string{"tag1": "tags.tag1"},
|
||||||
|
}
|
||||||
|
metrics2, err2 := parser2.Parse([]byte(validEmbeddedCounterJSON))
|
||||||
|
assert.NoError(t, err2)
|
||||||
|
assert.Len(t, metrics2, 1)
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "counter", "tag1": "green"}, metrics2[0].Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
// sampleTemplateJSON is a sample json document containing metrics to be tested against the templating engine.
|
||||||
|
const sampleTemplateJSON = `
|
||||||
|
{
|
||||||
|
"version": "3.0.0",
|
||||||
|
"counters" : {},
|
||||||
|
"meters" : {},
|
||||||
|
"gauges" : {
|
||||||
|
"vm.memory.heap.committed" : { "value" : 1 },
|
||||||
|
"vm.memory.heap.init" : { "value" : 2 },
|
||||||
|
"vm.memory.heap.max" : { "value" : 3 },
|
||||||
|
"vm.memory.heap.usage" : { "value" : 4 },
|
||||||
|
"vm.memory.heap.used" : { "value" : 5 },
|
||||||
|
"vm.memory.non-heap.committed" : { "value" : 6 },
|
||||||
|
"vm.memory.non-heap.init" : { "value" : 7 },
|
||||||
|
"vm.memory.non-heap.max" : { "value" : 8 },
|
||||||
|
"vm.memory.non-heap.usage" : { "value" : 9 },
|
||||||
|
"vm.memory.non-heap.used" : { "value" : 10 }
|
||||||
|
},
|
||||||
|
"histograms" : {
|
||||||
|
"jenkins.job.building.duration" : {
|
||||||
|
"count" : 1,
|
||||||
|
"max" : 2,
|
||||||
|
"mean" : 3,
|
||||||
|
"min" : 4,
|
||||||
|
"p50" : 5,
|
||||||
|
"p75" : 6,
|
||||||
|
"p95" : 7,
|
||||||
|
"p98" : 8,
|
||||||
|
"p99" : 9,
|
||||||
|
"p999" : 10,
|
||||||
|
"stddev" : 11
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"timers" : {}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestParseSampleTemplateJSON(t *testing.T) {
|
||||||
|
parser := Parser{
|
||||||
|
Separator: "_",
|
||||||
|
Templates: []string{
|
||||||
|
"jenkins.* measurement.metric.metric.field",
|
||||||
|
"vm.* measurement.measurement.pool.field",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
parser.InitTemplating()
|
||||||
|
|
||||||
|
metrics, err := parser.Parse([]byte(sampleTemplateJSON))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, metrics, 11)
|
||||||
|
|
||||||
|
jenkinsMetric := search(metrics, "jenkins", nil, "")
|
||||||
|
assert.NotNil(t, jenkinsMetric, "the metrics should contain a jenkins measurement")
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"duration_count": float64(1),
|
||||||
|
"duration_max": float64(2),
|
||||||
|
"duration_mean": float64(3),
|
||||||
|
"duration_min": float64(4),
|
||||||
|
"duration_p50": float64(5),
|
||||||
|
"duration_p75": float64(6),
|
||||||
|
"duration_p95": float64(7),
|
||||||
|
"duration_p98": float64(8),
|
||||||
|
"duration_p99": float64(9),
|
||||||
|
"duration_p999": float64(10),
|
||||||
|
"duration_stddev": float64(11),
|
||||||
|
}, jenkinsMetric.Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "histogram", "metric": "job_building"}, jenkinsMetric.Tags())
|
||||||
|
|
||||||
|
vmMemoryHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "heap"}, "committed_value")
|
||||||
|
assert.NotNil(t, vmMemoryHeapCommitted)
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"committed_value": float64(1),
|
||||||
|
}, vmMemoryHeapCommitted.Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "heap"}, vmMemoryHeapCommitted.Tags())
|
||||||
|
|
||||||
|
vmMemoryNonHeapCommitted := search(metrics, "vm_memory", map[string]string{"pool": "non-heap"}, "committed_value")
|
||||||
|
assert.NotNil(t, vmMemoryNonHeapCommitted)
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"committed_value": float64(6),
|
||||||
|
}, vmMemoryNonHeapCommitted.Fields())
|
||||||
|
assert.Equal(t, map[string]string{"metric_type": "gauge", "pool": "non-heap"}, vmMemoryNonHeapCommitted.Tags())
|
||||||
|
}
|
||||||
|
|
||||||
|
func search(metrics []telegraf.Metric, name string, tags map[string]string, fieldName string) telegraf.Metric {
|
||||||
|
for _, v := range metrics {
|
||||||
|
if v.Name() == name && containsAll(v.Tags(), tags) {
|
||||||
|
if len(fieldName) == 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
if _, ok := v.Fields()[fieldName]; ok {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsAll(t1 map[string]string, t2 map[string]string) bool {
|
||||||
|
for k, v := range t2 {
|
||||||
|
if foundValue, ok := t1[k]; !ok || v != foundValue {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
|
@ -6,11 +6,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/internal/templating"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
)
|
)
|
||||||
|
@ -26,8 +27,7 @@ type GraphiteParser struct {
|
||||||
Separator string
|
Separator string
|
||||||
Templates []string
|
Templates []string
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
|
templateEngine *templating.Engine
|
||||||
matcher *matcher
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
|
func (p *GraphiteParser) SetDefaultTags(tags map[string]string) {
|
||||||
|
@ -52,65 +52,13 @@ func NewGraphiteParser(
|
||||||
if defaultTags != nil {
|
if defaultTags != nil {
|
||||||
p.DefaultTags = defaultTags
|
p.DefaultTags = defaultTags
|
||||||
}
|
}
|
||||||
|
defaultTemplate, _ := templating.NewDefaultTemplateWithPattern("measurement*")
|
||||||
matcher := newMatcher()
|
p.templateEngine, err = templating.NewEngine(p.Separator, defaultTemplate, p.Templates)
|
||||||
p.matcher = matcher
|
|
||||||
defaultTemplate, _ := NewTemplate("measurement*", nil, p.Separator)
|
|
||||||
matcher.AddDefaultTemplate(defaultTemplate)
|
|
||||||
|
|
||||||
tmplts := parsedTemplates{}
|
|
||||||
for _, pattern := range p.Templates {
|
|
||||||
tmplt := parsedTemplate{}
|
|
||||||
tmplt.template = pattern
|
|
||||||
// Format is [filter] <template> [tag1=value1,tag2=value2]
|
|
||||||
parts := strings.Fields(pattern)
|
|
||||||
if len(parts) < 1 {
|
|
||||||
continue
|
|
||||||
} else if len(parts) >= 2 {
|
|
||||||
if strings.Contains(parts[1], "=") {
|
|
||||||
tmplt.template = parts[0]
|
|
||||||
tmplt.tagstring = parts[1]
|
|
||||||
} else {
|
|
||||||
tmplt.filter = parts[0]
|
|
||||||
tmplt.template = parts[1]
|
|
||||||
if len(parts) > 2 {
|
|
||||||
tmplt.tagstring = parts[2]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tmplts = append(tmplts, tmplt)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Sort(tmplts)
|
|
||||||
for _, tmplt := range tmplts {
|
|
||||||
if err := p.addToMatcher(tmplt); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
|
return p, fmt.Errorf("exec input parser config is error: %s ", err.Error())
|
||||||
} else {
|
}
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *GraphiteParser) addToMatcher(tmplt parsedTemplate) error {
|
|
||||||
// Parse out the default tags specific to this template
|
|
||||||
tags := map[string]string{}
|
|
||||||
if tmplt.tagstring != "" {
|
|
||||||
for _, kv := range strings.Split(tmplt.tagstring, ",") {
|
|
||||||
parts := strings.Split(kv, "=")
|
|
||||||
tags[parts[0]] = parts[1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tmpl, err := NewTemplate(tmplt.template, tags, p.Separator)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
p.matcher.Add(tmplt.filter, tmpl)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
|
@ -162,8 +110,7 @@ func (p *GraphiteParser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode the name and tags
|
// decode the name and tags
|
||||||
template := p.matcher.Match(fields[0])
|
measurement, tags, field, err := p.templateEngine.Apply(fields[0])
|
||||||
measurement, tags, field, err := template.Apply(fields[0])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -229,8 +176,7 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,
|
||||||
return "", make(map[string]string), "", nil
|
return "", make(map[string]string), "", nil
|
||||||
}
|
}
|
||||||
// decode the name and tags
|
// decode the name and tags
|
||||||
template := p.matcher.Match(fields[0])
|
name, tags, field, err := p.templateEngine.Apply(fields[0])
|
||||||
name, tags, field, err := template.Apply(fields[0])
|
|
||||||
|
|
||||||
// Set the default tags on the point if they are not already set
|
// Set the default tags on the point if they are not already set
|
||||||
for k, v := range p.DefaultTags {
|
for k, v := range p.DefaultTags {
|
||||||
|
@ -241,269 +187,3 @@ func (p *GraphiteParser) ApplyTemplate(line string) (string, map[string]string,
|
||||||
|
|
||||||
return name, tags, field, err
|
return name, tags, field, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// template represents a pattern and tags to map a graphite metric string to a influxdb Point
|
|
||||||
type template struct {
|
|
||||||
tags []string
|
|
||||||
defaultTags map[string]string
|
|
||||||
greedyField bool
|
|
||||||
greedyMeasurement bool
|
|
||||||
separator string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTemplate returns a new template ensuring it has a measurement
|
|
||||||
// specified.
|
|
||||||
func NewTemplate(pattern string, defaultTags map[string]string, separator string) (*template, error) {
|
|
||||||
tags := strings.Split(pattern, ".")
|
|
||||||
hasMeasurement := false
|
|
||||||
template := &template{tags: tags, defaultTags: defaultTags, separator: separator}
|
|
||||||
|
|
||||||
for _, tag := range tags {
|
|
||||||
if strings.HasPrefix(tag, "measurement") {
|
|
||||||
hasMeasurement = true
|
|
||||||
}
|
|
||||||
if tag == "measurement*" {
|
|
||||||
template.greedyMeasurement = true
|
|
||||||
} else if tag == "field*" {
|
|
||||||
template.greedyField = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !hasMeasurement {
|
|
||||||
return nil, fmt.Errorf("no measurement specified for template. %q", pattern)
|
|
||||||
}
|
|
||||||
|
|
||||||
return template, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply extracts the template fields from the given line and returns the measurement
|
|
||||||
// name and tags
|
|
||||||
func (t *template) Apply(line string) (string, map[string]string, string, error) {
|
|
||||||
fields := strings.Split(line, ".")
|
|
||||||
var (
|
|
||||||
measurement []string
|
|
||||||
tags = make(map[string][]string)
|
|
||||||
field []string
|
|
||||||
)
|
|
||||||
|
|
||||||
// Set any default tags
|
|
||||||
for k, v := range t.defaultTags {
|
|
||||||
tags[k] = append(tags[k], v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if an invalid combination has been specified in the template:
|
|
||||||
for _, tag := range t.tags {
|
|
||||||
if tag == "measurement*" {
|
|
||||||
t.greedyMeasurement = true
|
|
||||||
} else if tag == "field*" {
|
|
||||||
t.greedyField = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if t.greedyField && t.greedyMeasurement {
|
|
||||||
return "", nil, "",
|
|
||||||
fmt.Errorf("either 'field*' or 'measurement*' can be used in each "+
|
|
||||||
"template (but not both together): %q",
|
|
||||||
strings.Join(t.tags, t.separator))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, tag := range t.tags {
|
|
||||||
if i >= len(fields) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if tag == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
switch tag {
|
|
||||||
case "measurement":
|
|
||||||
measurement = append(measurement, fields[i])
|
|
||||||
case "field":
|
|
||||||
field = append(field, fields[i])
|
|
||||||
case "field*":
|
|
||||||
field = append(field, fields[i:]...)
|
|
||||||
break
|
|
||||||
case "measurement*":
|
|
||||||
measurement = append(measurement, fields[i:]...)
|
|
||||||
break
|
|
||||||
default:
|
|
||||||
tags[tag] = append(tags[tag], fields[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to map of strings.
|
|
||||||
outtags := make(map[string]string)
|
|
||||||
for k, values := range tags {
|
|
||||||
outtags[k] = strings.Join(values, t.separator)
|
|
||||||
}
|
|
||||||
|
|
||||||
return strings.Join(measurement, t.separator), outtags, strings.Join(field, t.separator), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// matcher determines which template should be applied to a given metric
|
|
||||||
// based on a filter tree.
|
|
||||||
type matcher struct {
|
|
||||||
root *node
|
|
||||||
defaultTemplate *template
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMatcher() *matcher {
|
|
||||||
return &matcher{
|
|
||||||
root: &node{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add inserts the template in the filter tree based the given filter
|
|
||||||
func (m *matcher) Add(filter string, template *template) {
|
|
||||||
if filter == "" {
|
|
||||||
m.AddDefaultTemplate(template)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.root.Insert(filter, template)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *matcher) AddDefaultTemplate(template *template) {
|
|
||||||
m.defaultTemplate = template
|
|
||||||
}
|
|
||||||
|
|
||||||
// Match returns the template that matches the given graphite line
|
|
||||||
func (m *matcher) Match(line string) *template {
|
|
||||||
tmpl := m.root.Search(line)
|
|
||||||
if tmpl != nil {
|
|
||||||
return tmpl
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.defaultTemplate
|
|
||||||
}
|
|
||||||
|
|
||||||
// node is an item in a sorted k-ary tree. Each child is sorted by its value.
|
|
||||||
// The special value of "*", is always last.
|
|
||||||
type node struct {
|
|
||||||
value string
|
|
||||||
children nodes
|
|
||||||
template *template
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) insert(values []string, template *template) {
|
|
||||||
// Add the end, set the template
|
|
||||||
if len(values) == 0 {
|
|
||||||
n.template = template
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if the the current element already exists in the tree. If so, insert the
|
|
||||||
// into that sub-tree
|
|
||||||
for _, v := range n.children {
|
|
||||||
if v.value == values[0] {
|
|
||||||
v.insert(values[1:], template)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New element, add it to the tree and sort the children
|
|
||||||
newNode := &node{value: values[0]}
|
|
||||||
n.children = append(n.children, newNode)
|
|
||||||
sort.Sort(&n.children)
|
|
||||||
|
|
||||||
// Now insert the rest of the tree into the new element
|
|
||||||
newNode.insert(values[1:], template)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert inserts the given string template into the tree. The filter string is separated
|
|
||||||
// on "." and each part is used as the path in the tree.
|
|
||||||
func (n *node) Insert(filter string, template *template) {
|
|
||||||
n.insert(strings.Split(filter, "."), template)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) search(lineParts []string) *template {
|
|
||||||
// Nothing to search
|
|
||||||
if len(lineParts) == 0 || len(n.children) == 0 {
|
|
||||||
return n.template
|
|
||||||
}
|
|
||||||
|
|
||||||
// If last element is a wildcard, don't include in this search since it's sorted
|
|
||||||
// to the end but lexicographically it would not always be and sort.Search assumes
|
|
||||||
// the slice is sorted.
|
|
||||||
length := len(n.children)
|
|
||||||
if n.children[length-1].value == "*" {
|
|
||||||
length--
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the index of child with an exact match
|
|
||||||
i := sort.Search(length, func(i int) bool {
|
|
||||||
return n.children[i].value >= lineParts[0]
|
|
||||||
})
|
|
||||||
|
|
||||||
// Found an exact match, so search that child sub-tree
|
|
||||||
if i < len(n.children) && n.children[i].value == lineParts[0] {
|
|
||||||
return n.children[i].search(lineParts[1:])
|
|
||||||
}
|
|
||||||
// Not an exact match, see if we have a wildcard child to search
|
|
||||||
if n.children[len(n.children)-1].value == "*" {
|
|
||||||
return n.children[len(n.children)-1].search(lineParts[1:])
|
|
||||||
}
|
|
||||||
return n.template
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *node) Search(line string) *template {
|
|
||||||
return n.search(strings.Split(line, "."))
|
|
||||||
}
|
|
||||||
|
|
||||||
type nodes []*node
|
|
||||||
|
|
||||||
// Less returns a boolean indicating whether the filter at position j
|
|
||||||
// is less than the filter at position k. Filters are order by string
|
|
||||||
// comparison of each component parts. A wildcard value "*" is never
|
|
||||||
// less than a non-wildcard value.
|
|
||||||
//
|
|
||||||
// For example, the filters:
|
|
||||||
// "*.*"
|
|
||||||
// "servers.*"
|
|
||||||
// "servers.localhost"
|
|
||||||
// "*.localhost"
|
|
||||||
//
|
|
||||||
// Would be sorted as:
|
|
||||||
// "servers.localhost"
|
|
||||||
// "servers.*"
|
|
||||||
// "*.localhost"
|
|
||||||
// "*.*"
|
|
||||||
func (n *nodes) Less(j, k int) bool {
|
|
||||||
if (*n)[j].value == "*" && (*n)[k].value != "*" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*n)[j].value != "*" && (*n)[k].value == "*" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return (*n)[j].value < (*n)[k].value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *nodes) Swap(i, j int) { (*n)[i], (*n)[j] = (*n)[j], (*n)[i] }
|
|
||||||
func (n *nodes) Len() int { return len(*n) }
|
|
||||||
|
|
||||||
type parsedTemplate struct {
|
|
||||||
template string
|
|
||||||
filter string
|
|
||||||
tagstring string
|
|
||||||
}
|
|
||||||
type parsedTemplates []parsedTemplate
|
|
||||||
|
|
||||||
func (e parsedTemplates) Less(j, k int) bool {
|
|
||||||
if len(e[j].filter) == 0 && len(e[k].filter) == 0 {
|
|
||||||
nj := len(strings.Split(e[j].template, "."))
|
|
||||||
nk := len(strings.Split(e[k].template, "."))
|
|
||||||
return nj < nk
|
|
||||||
}
|
|
||||||
if len(e[j].filter) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if len(e[k].filter) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
nj := len(strings.Split(e[j].template, "."))
|
|
||||||
nk := len(strings.Split(e[k].template, "."))
|
|
||||||
return nj < nk
|
|
||||||
}
|
|
||||||
func (e parsedTemplates) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
|
|
||||||
func (e parsedTemplates) Len() int { return len(e) }
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/internal/templating"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -118,7 +119,7 @@ func TestTemplateApply(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
tmpl, err := NewTemplate(test.template, nil, DefaultSeparator)
|
tmpl, err := templating.NewDefaultTemplateWithPattern(test.template)
|
||||||
if errstr(err) != test.err {
|
if errstr(err) != test.err {
|
||||||
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
|
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
|
||||||
}
|
}
|
||||||
|
@ -127,7 +128,7 @@ func TestTemplateApply(t *testing.T) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
measurement, tags, _, _ := tmpl.Apply(test.input)
|
measurement, tags, _, _ := tmpl.Apply(test.input, DefaultSeparator)
|
||||||
if measurement != test.measurement {
|
if measurement != test.measurement {
|
||||||
t.Fatalf("name parse failer. expected %v, got %v", test.measurement, measurement)
|
t.Fatalf("name parse failer. expected %v, got %v", test.measurement, measurement)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
"github.com/influxdata/telegraf/plugins/parsers/collectd"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
"github.com/influxdata/telegraf/plugins/parsers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/json"
|
"github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
|
@ -66,6 +67,22 @@ type Config struct {
|
||||||
|
|
||||||
// DefaultTags are the default tags that will be added to all parsed metrics.
|
// DefaultTags are the default tags that will be added to all parsed metrics.
|
||||||
DefaultTags map[string]string
|
DefaultTags map[string]string
|
||||||
|
|
||||||
|
// an optional json path containing the metric registry object
|
||||||
|
// if left empty, the whole json object is parsed as a metric registry
|
||||||
|
DropwizardMetricRegistryPath string
|
||||||
|
// an optional json path containing the default time of the metrics
|
||||||
|
// if left empty, the processing time is used
|
||||||
|
DropwizardTimePath string
|
||||||
|
// time format to use for parsing the time field
|
||||||
|
// defaults to time.RFC3339
|
||||||
|
DropwizardTimeFormat string
|
||||||
|
// an optional json path pointing to a json object with tag key/value pairs
|
||||||
|
// takes precedence over DropwizardTagPathsMap
|
||||||
|
DropwizardTagsPath string
|
||||||
|
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
|
||||||
|
// used if TagsPath is empty or doesn't return any tags
|
||||||
|
DropwizardTagPathsMap map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParser returns a Parser interface based on the given config.
|
// NewParser returns a Parser interface based on the given config.
|
||||||
|
@ -89,6 +106,10 @@ func NewParser(config *Config) (Parser, error) {
|
||||||
case "collectd":
|
case "collectd":
|
||||||
parser, err = NewCollectdParser(config.CollectdAuthFile,
|
parser, err = NewCollectdParser(config.CollectdAuthFile,
|
||||||
config.CollectdSecurityLevel, config.CollectdTypesDB)
|
config.CollectdSecurityLevel, config.CollectdTypesDB)
|
||||||
|
case "dropwizard":
|
||||||
|
parser, err = NewDropwizardParser(config.DropwizardMetricRegistryPath,
|
||||||
|
config.DropwizardTimePath, config.DropwizardTimeFormat, config.DropwizardTagsPath, config.DropwizardTagPathsMap, config.DefaultTags,
|
||||||
|
config.Separator, config.Templates)
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
|
||||||
}
|
}
|
||||||
|
@ -143,3 +164,29 @@ func NewCollectdParser(
|
||||||
) (Parser, error) {
|
) (Parser, error) {
|
||||||
return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
|
return collectd.NewCollectdParser(authFile, securityLevel, typesDB)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewDropwizardParser(
|
||||||
|
metricRegistryPath string,
|
||||||
|
timePath string,
|
||||||
|
timeFormat string,
|
||||||
|
tagsPath string,
|
||||||
|
tagPathsMap map[string]string,
|
||||||
|
defaultTags map[string]string,
|
||||||
|
separator string,
|
||||||
|
templates []string,
|
||||||
|
|
||||||
|
) (Parser, error) {
|
||||||
|
parser := &dropwizard.Parser{
|
||||||
|
MetricRegistryPath: metricRegistryPath,
|
||||||
|
TimePath: timePath,
|
||||||
|
TimeFormat: timeFormat,
|
||||||
|
TagsPath: tagsPath,
|
||||||
|
TagPathsMap: tagPathsMap,
|
||||||
|
DefaultTags: defaultTags,
|
||||||
|
Separator: separator,
|
||||||
|
Templates: templates,
|
||||||
|
}
|
||||||
|
err := parser.InitTemplating()
|
||||||
|
|
||||||
|
return parser, err
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue