From 63f35fec95893fadd36827b4c642c3360f6d6e29 Mon Sep 17 00:00:00 2001 From: Tait Clarridge Date: Tue, 1 Dec 2015 09:15:28 -0500 Subject: [PATCH] Add support for pass/drop/tagpass/tagdrop for outputs Reuses same logic as the plugins for filtering points, should be only a marginal performance decrease to check all the points before writing to the output. Added examples to the README as well (for generic pass/drop as well as output pass/drop/tagpass/tagdrop). X-Github-Closes #398 --- README.md | 34 ++++++++ accumulator.go | 2 +- agent.go | 5 +- internal/config/config.go | 155 +++++++++++++++++++++++---------- internal/config/config_test.go | 50 ++++++----- 5 files changed, 176 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 9d9ef3859..4228f3f7b 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) path = [ "/opt", "/home" ] ``` +Below is how to configure `pass` and `drop` parameters (added in 0.1.5) + +``` +# Drop all metrics for guest CPU usage +[[plugins.cpu]] + drop = [ "cpu_usage_guest" ] + +# Only store inode related metrics for disks +[[plugins.disk]] + pass = [ "disk_inodes" ] +``` + + Additional plugins (or outputs) of the same type can be specified, just define another instance in the config file: @@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to, configuring each output sink is different, but examples can be found by running `telegraf -sample-config`. +Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop) + +``` +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf" + # Drop all measurements that start with "aerospike" + drop = ["aerospike"] + +# Send to a different database +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "mydb" + precision = "s" + +# Only store measurements where the tag "mytag" matches the value "B" +[outputs.influxdb.tagpass] + mytag = ["B"] +``` + + ## Supported Outputs * influxdb diff --git a/accumulator.go b/accumulator.go index 2e8b61d1c..8dbf2e8aa 100644 --- a/accumulator.go +++ b/accumulator.go @@ -107,7 +107,7 @@ func (ac *accumulator) AddFields( } if ac.pluginConfig != nil { - if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) { + if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) { return } } diff --git a/agent.go b/agent.go index e1292bffe..6aced134b 100644 --- a/agent.go +++ b/agent.go @@ -226,12 +226,13 @@ func (a *Agent) writeOutput( start := time.Now() for { - err := ro.Output.Write(points) + filtered := ro.FilterPoints(points) + err := ro.Output.Write(filtered) if err == nil { // Write successful elapsed := time.Since(start) log.Printf("Flushed %d metrics to output %s in %s\n", - len(points), ro.Name, elapsed) + len(filtered), ro.Name, elapsed) return } diff --git a/internal/config/config.go b/internal/config/config.go index 5bb77f2c6..d39f6833c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,8 @@ import ( "github.com/naoina/toml" "github.com/naoina/toml/ast" + + "github.com/influxdb/influxdb/client/v2" ) // Config specifies the URL/user/password for the database that telegraf @@ -88,6 +90,7 @@ type TagFilter struct { type RunningOutput struct { Name string Output outputs.Output + Config *OutputConfig } type RunningPlugin struct { @@ -96,25 +99,52 @@ type RunningPlugin struct { Config *PluginConfig } -// PluginConfig containing a name, interval, and drop/pass prefix lists -// Also lists the tags to filter -type PluginConfig struct { - Name string - +// Filter containing drop/pass and tagdrop/tagpass rules +type Filter struct { Drop []string Pass []string TagDrop []TagFilter TagPass []TagFilter + IsActive bool +} + +// PluginConfig containing a name, interval, and filter +type PluginConfig struct { + Name string + Filter Filter Interval time.Duration } +// OutputConfig containing name and filter +type OutputConfig struct { + Name string + Filter Filter +} + +// Filter returns filtered slice of client.Points based on whether filters +// are active for this RunningOutput. +func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point { + if !ro.Config.Filter.IsActive { + return points + } + + var filteredPoints []*client.Point + for i := range points { + if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) { + continue + } + filteredPoints = append(filteredPoints, points[i]) + } + return filteredPoints +} + // ShouldPass returns true if the metric should pass, false if should drop -// based on the drop/pass plugin parameters -func (cp *PluginConfig) ShouldPass(measurement string) bool { - if cp.Pass != nil { - for _, pat := range cp.Pass { +// based on the drop/pass filter parameters +func (f Filter) ShouldPass(measurement string) bool { + if f.Pass != nil { + for _, pat := range f.Pass { if strings.HasPrefix(measurement, pat) { return true } @@ -122,8 +152,8 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { return false } - if cp.Drop != nil { - for _, pat := range cp.Drop { + if f.Drop != nil { + for _, pat := range f.Drop { if strings.HasPrefix(measurement, pat) { return false } @@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { } // ShouldTagsPass returns true if the metric should pass, false if should drop -// based on the tagdrop/tagpass plugin parameters -func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { - if cp.TagPass != nil { - for _, pat := range cp.TagPass { +// based on the tagdrop/tagpass filter parameters +func (f Filter) ShouldTagsPass(tags map[string]string) bool { + if f.TagPass != nil { + for _, pat := range f.TagPass { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { return false } - if cp.TagDrop != nil { - for _, pat := range cp.TagDrop { + if f.TagDrop != nil { + for _, pat := range f.TagDrop { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error { if !ok { return fmt.Errorf("Undefined but requested output: %s", name) } - o := creator() + output := creator() - if err := toml.UnmarshalTable(table, o); err != nil { + outputConfig, err := buildOutput(name, table) + if err != nil { + return err + } + + if err := toml.UnmarshalTable(table, output); err != nil { return err } ro := &RunningOutput{ Name: name, - Output: o, + Output: output, + Config: outputConfig, } c.Outputs = append(c.Outputs, ro) return nil @@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error { } plugin := creator() - pluginConfig, err := applyPlugin(name, table, plugin) + pluginConfig, err := buildPlugin(name, table) if err != nil { return err } + + if err := toml.UnmarshalTable(table, plugin); err != nil { + return err + } + rp := &RunningPlugin{ Name: name, Plugin: plugin, @@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error { return nil } -// applyPlugin takes defined plugin names and applies them to the given -// interface, returning a PluginConfig object in the end that can -// be inserted into a runningPlugin by the agent. -func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) { - cp := &PluginConfig{Name: name} +// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to +// be inserted into the OutputConfig/PluginConfig to be used for prefix +// filtering on tags and measurements +func buildFilter(tbl *ast.Table) Filter { + f := Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Pass = append(cp.Pass, str.Value) + f.Pass = append(f.Pass, str.Value) + f.IsActive = true } } } @@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Drop = append(cp.Drop, str.Value) + f.Drop = append(f.Drop, str.Value) + f.IsActive = true } } } } } - if node, ok := tbl.Fields["interval"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - cp.Interval = dur - } - } - } - if node, ok := tbl.Fields["tagpass"]; ok { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { @@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagPass = append(cp.TagPass, *tagfilter) + f.TagPass = append(f.TagPass, *tagfilter) + f.IsActive = true } } } @@ -579,7 +610,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagDrop = append(cp.TagDrop, *tagfilter) + f.TagDrop = append(f.TagDrop, *tagfilter) + f.IsActive = true } } } @@ -587,8 +619,41 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, delete(tbl.Fields, "drop") delete(tbl.Fields, "pass") - delete(tbl.Fields, "interval") delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagpass") - return cp, toml.UnmarshalTable(tbl, p) + return f +} + +// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a +// PluginConfig to be inserted into RunningPlugin +func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { + cp := &PluginConfig{Name: name} + if node, ok := tbl.Fields["interval"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + cp.Interval = dur + } + } + } + delete(tbl.Fields, "interval") + cp.Filter = buildFilter(tbl) + return cp, nil + +} + +// buildOutput parses output specific items from the ast.Table, builds the filter and returns an +// OutputConfig to be inserted into RunningPlugin +// Note: error exists in the return for future calls that might require error +func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { + oc := &OutputConfig{ + Name: name, + Filter: buildFilter(tbl), + } + return oc, nil + } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index e53911224..067592a86 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -20,19 +20,22 @@ func TestConfig_LoadSinglePlugin(t *testing.T) { mConfig := &PluginConfig{ Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, + Filter: Filter{ + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, }, + IsActive: true, }, Interval: 5 * time.Second, } @@ -59,19 +62,22 @@ func TestConfig_LoadDirectory(t *testing.T) { mConfig := &PluginConfig{ Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, + Filter: Filter{ + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, }, + IsActive: true, }, Interval: 5 * time.Second, }