Add support for pass/drop/tagpass/tagdrop for outputs

Reuses same logic as the plugins for filtering points, should be only
a marginal performance decrease to check all the points before writing
to the output.

Added examples to the README as well (for generic pass/drop as well as
output pass/drop/tagpass/tagdrop).

X-Github-Closes #398

closes #398
closes #401
This commit is contained in:
Tait Clarridge 2015-12-01 09:15:28 -05:00 committed by Cameron Sparr
parent c83f220fc4
commit 22afc99f1e
6 changed files with 186 additions and 73 deletions

View File

@ -8,6 +8,7 @@
- [#418](https://github.com/influxdb/telegraf/pull/418): memcached plugin additional unit tests. - [#418](https://github.com/influxdb/telegraf/pull/418): memcached plugin additional unit tests.
- [#408](https://github.com/influxdb/telegraf/pull/408): MailChimp plugin. - [#408](https://github.com/influxdb/telegraf/pull/408): MailChimp plugin.
- [#382](https://github.com/influxdb/telegraf/pull/382): Add system wide network protocol stats to `net` plugin. - [#382](https://github.com/influxdb/telegraf/pull/382): Add system wide network protocol stats to `net` plugin.
- [#401](https://github.com/influxdb/telegraf/pull/401): Support pass/drop/tagpass/tagdrop for outputs. Thanks @oldmantaiter!
### Bugfixes ### Bugfixes
- [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue - [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue

View File

@ -113,7 +113,7 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect per-cpu data, dropping any measurements at a 10s interval and will collect per-cpu data, dropping any
measurements which begin with `cpu_time`. measurements which begin with `cpu_time`.
``` ```toml
[tags] [tags]
dc = "denver-1" dc = "denver-1"
@ -137,7 +137,7 @@ measurements which begin with `cpu_time`.
Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
``` ```toml
[plugins] [plugins]
[[plugins.cpu]] [[plugins.cpu]]
percpu = true percpu = true
@ -156,10 +156,23 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ] path = [ "/opt", "/home" ]
``` ```
Below is how to configure `pass` and `drop` parameters (added in 0.1.5)
```toml
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]
# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes" ]
```
Additional plugins (or outputs) of the same type can be specified, Additional plugins (or outputs) of the same type can be specified,
just define another instance in the config file: just define another instance in the config file:
``` ```toml
[[plugins.cpu]] [[plugins.cpu]]
percpu = false percpu = false
totalcpu = true totalcpu = true
@ -225,6 +238,33 @@ Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`. found by running `telegraf -sample-config`.
Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)
```toml
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
precision = "s"
# Drop all measurements that start with "aerospike"
drop = ["aerospike"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-aerospike-data"
precision = "s"
# Only accept aerospike data:
pass = ["aerospike"]
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf-cpu0-data"
precision = "s"
# Only store measurements where the tag "cpu" matches the value "cpu0"
[outputs.influxdb.tagpass]
cpu = ["cpu0"]
```
## Supported Outputs ## Supported Outputs
* influxdb * influxdb

View File

@ -107,7 +107,7 @@ func (ac *accumulator) AddFields(
} }
if ac.pluginConfig != nil { if ac.pluginConfig != nil {
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) { if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return return
} }
} }

View File

@ -230,12 +230,13 @@ func (a *Agent) writeOutput(
start := time.Now() start := time.Now()
for { for {
err := ro.Output.Write(points) filtered := ro.FilterPoints(points)
err := ro.Output.Write(filtered)
if err == nil { if err == nil {
// Write successful // Write successful
elapsed := time.Since(start) elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n", log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.Name, elapsed) len(filtered), ro.Name, elapsed)
return return
} }

View File

@ -16,6 +16,8 @@ import (
"github.com/naoina/toml" "github.com/naoina/toml"
"github.com/naoina/toml/ast" "github.com/naoina/toml/ast"
"github.com/influxdb/influxdb/client/v2"
) )
// Config specifies the URL/user/password for the database that telegraf // Config specifies the URL/user/password for the database that telegraf
@ -88,6 +90,7 @@ type TagFilter struct {
type RunningOutput struct { type RunningOutput struct {
Name string Name string
Output outputs.Output Output outputs.Output
Config *OutputConfig
} }
type RunningPlugin struct { type RunningPlugin struct {
@ -96,25 +99,52 @@ type RunningPlugin struct {
Config *PluginConfig Config *PluginConfig
} }
// PluginConfig containing a name, interval, and drop/pass prefix lists // Filter containing drop/pass and tagdrop/tagpass rules
// Also lists the tags to filter type Filter struct {
type PluginConfig struct {
Name string
Drop []string Drop []string
Pass []string Pass []string
TagDrop []TagFilter TagDrop []TagFilter
TagPass []TagFilter TagPass []TagFilter
IsActive bool
}
// PluginConfig containing a name, interval, and filter
type PluginConfig struct {
Name string
Filter Filter
Interval time.Duration Interval time.Duration
} }
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}
// Filter returns filtered slice of client.Points based on whether filters
// are active for this RunningOutput.
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
if !ro.Config.Filter.IsActive {
return points
}
var filteredPoints []*client.Point
for i := range points {
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
continue
}
filteredPoints = append(filteredPoints, points[i])
}
return filteredPoints
}
// ShouldPass returns true if the metric should pass, false if should drop // ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass plugin parameters // based on the drop/pass filter parameters
func (cp *PluginConfig) ShouldPass(measurement string) bool { func (f Filter) ShouldPass(measurement string) bool {
if cp.Pass != nil { if f.Pass != nil {
for _, pat := range cp.Pass { for _, pat := range f.Pass {
if strings.HasPrefix(measurement, pat) { if strings.HasPrefix(measurement, pat) {
return true return true
} }
@ -122,8 +152,8 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
return false return false
} }
if cp.Drop != nil { if f.Drop != nil {
for _, pat := range cp.Drop { for _, pat := range f.Drop {
if strings.HasPrefix(measurement, pat) { if strings.HasPrefix(measurement, pat) {
return false return false
} }
@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
} }
// ShouldTagsPass returns true if the metric should pass, false if should drop // ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass plugin parameters // based on the tagdrop/tagpass filter parameters
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { func (f Filter) ShouldTagsPass(tags map[string]string) bool {
if cp.TagPass != nil { if f.TagPass != nil {
for _, pat := range cp.TagPass { for _, pat := range f.TagPass {
if tagval, ok := tags[pat.Name]; ok { if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter { for _, filter := range pat.Filter {
if filter == tagval { if filter == tagval {
@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
return false return false
} }
if cp.TagDrop != nil { if f.TagDrop != nil {
for _, pat := range cp.TagDrop { for _, pat := range f.TagDrop {
if tagval, ok := tags[pat.Name]; ok { if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter { for _, filter := range pat.Filter {
if filter == tagval { if filter == tagval {
@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
if !ok { if !ok {
return fmt.Errorf("Undefined but requested output: %s", name) return fmt.Errorf("Undefined but requested output: %s", name)
} }
o := creator() output := creator()
if err := toml.UnmarshalTable(table, o); err != nil { outputConfig, err := buildOutput(name, table)
if err != nil {
return err
}
if err := toml.UnmarshalTable(table, output); err != nil {
return err return err
} }
ro := &RunningOutput{ ro := &RunningOutput{
Name: name, Name: name,
Output: o, Output: output,
Config: outputConfig,
} }
c.Outputs = append(c.Outputs, ro) c.Outputs = append(c.Outputs, ro)
return nil return nil
@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
} }
plugin := creator() plugin := creator()
pluginConfig, err := applyPlugin(name, table, plugin) pluginConfig, err := buildPlugin(name, table)
if err != nil { if err != nil {
return err return err
} }
if err := toml.UnmarshalTable(table, plugin); err != nil {
return err
}
rp := &RunningPlugin{ rp := &RunningPlugin{
Name: name, Name: name,
Plugin: plugin, Plugin: plugin,
@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
return nil return nil
} }
// applyPlugin takes defined plugin names and applies them to the given // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
// interface, returning a PluginConfig object in the end that can // be inserted into the OutputConfig/PluginConfig to be used for prefix
// be inserted into a runningPlugin by the agent. // filtering on tags and measurements
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) { func buildFilter(tbl *ast.Table) Filter {
cp := &PluginConfig{Name: name} f := Filter{}
if node, ok := tbl.Fields["pass"]; ok { if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value) f.Pass = append(f.Pass, str.Value)
f.IsActive = true
} }
} }
} }
@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value) f.Drop = append(f.Drop, str.Value)
f.IsActive = true
} }
} }
} }
} }
} }
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
if node, ok := tbl.Fields["tagpass"]; ok { if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok { if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields { for name, val := range subtbl.Fields {
@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
} }
} }
} }
cp.TagPass = append(cp.TagPass, *tagfilter) f.TagPass = append(f.TagPass, *tagfilter)
f.IsActive = true
} }
} }
} }
@ -579,7 +610,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
} }
} }
} }
cp.TagDrop = append(cp.TagDrop, *tagfilter) f.TagDrop = append(f.TagDrop, *tagfilter)
f.IsActive = true
} }
} }
} }
@ -587,8 +619,41 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
delete(tbl.Fields, "drop") delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass") delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass") delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, p) return f
}
// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
// PluginConfig to be inserted into RunningPlugin
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
delete(tbl.Fields, "interval")
cp.Filter = buildFilter(tbl)
return cp, nil
}
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// OutputConfig to be inserted into RunningPlugin
// Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
oc := &OutputConfig{
Name: name,
Filter: buildFilter(tbl),
}
return oc, nil
} }

View File

@ -20,19 +20,22 @@ func TestConfig_LoadSinglePlugin(t *testing.T) {
mConfig := &PluginConfig{ mConfig := &PluginConfig{
Name: "memcached", Name: "memcached",
Drop: []string{"other", "stuff"}, Filter: Filter{
Pass: []string{"some", "strings"}, Drop: []string{"other", "stuff"},
TagDrop: []TagFilter{ Pass: []string{"some", "strings"},
TagFilter{ TagDrop: []TagFilter{
Name: "badtag", TagFilter{
Filter: []string{"othertag"}, Name: "badtag",
Filter: []string{"othertag"},
},
}, },
}, TagPass: []TagFilter{
TagPass: []TagFilter{ TagFilter{
TagFilter{ Name: "goodtag",
Name: "goodtag", Filter: []string{"mytag"},
Filter: []string{"mytag"}, },
}, },
IsActive: true,
}, },
Interval: 5 * time.Second, Interval: 5 * time.Second,
} }
@ -59,19 +62,22 @@ func TestConfig_LoadDirectory(t *testing.T) {
mConfig := &PluginConfig{ mConfig := &PluginConfig{
Name: "memcached", Name: "memcached",
Drop: []string{"other", "stuff"}, Filter: Filter{
Pass: []string{"some", "strings"}, Drop: []string{"other", "stuff"},
TagDrop: []TagFilter{ Pass: []string{"some", "strings"},
TagFilter{ TagDrop: []TagFilter{
Name: "badtag", TagFilter{
Filter: []string{"othertag"}, Name: "badtag",
Filter: []string{"othertag"},
},
}, },
}, TagPass: []TagFilter{
TagPass: []TagFilter{ TagFilter{
TagFilter{ Name: "goodtag",
Name: "goodtag", Filter: []string{"mytag"},
Filter: []string{"mytag"}, },
}, },
IsActive: true,
}, },
Interval: 5 * time.Second, Interval: 5 * time.Second,
} }