diff --git a/CHANGELOG.md b/CHANGELOG.md index 350e444ff..8bc84768f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#418](https://github.com/influxdb/telegraf/pull/418): memcached plugin additional unit tests. - [#408](https://github.com/influxdb/telegraf/pull/408): MailChimp plugin. - [#382](https://github.com/influxdb/telegraf/pull/382): Add system wide network protocol stats to `net` plugin. +- [#401](https://github.com/influxdb/telegraf/pull/401): Support pass/drop/tagpass/tagdrop for outputs. Thanks @oldmantaiter! ### Bugfixes - [#405](https://github.com/influxdb/telegraf/issues/405): Prometheus output cardinality issue diff --git a/README.md b/README.md index e6cf28e25..138b8023d 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output measurements at a 10s interval and will collect per-cpu data, dropping any measurements which begin with `cpu_time`. -``` +```toml [tags] dc = "denver-1" @@ -137,7 +137,7 @@ measurements which begin with `cpu_time`. Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) -``` +```toml [plugins] [[plugins.cpu]] percpu = true @@ -156,10 +156,23 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) path = [ "/opt", "/home" ] ``` +Below is how to configure `pass` and `drop` parameters (added in 0.1.5) + +```toml +# Drop all metrics for guest CPU usage +[[plugins.cpu]] + drop = [ "cpu_usage_guest" ] + +# Only store inode related metrics for disks +[[plugins.disk]] + pass = [ "disk_inodes" ] +``` + + Additional plugins (or outputs) of the same type can be specified, just define another instance in the config file: -``` +```toml [[plugins.cpu]] percpu = false totalcpu = true @@ -225,6 +238,33 @@ Telegraf also supports specifying multiple output sinks to send data to, configuring each output sink is different, but examples can be found by running `telegraf -sample-config`. +Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop) + +```toml +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf" + precision = "s" + # Drop all measurements that start with "aerospike" + drop = ["aerospike"] + +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf-aerospike-data" + precision = "s" + # Only accept aerospike data: + pass = ["aerospike"] + +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf-cpu0-data" + precision = "s" + # Only store measurements where the tag "cpu" matches the value "cpu0" + [outputs.influxdb.tagpass] + cpu = ["cpu0"] +``` + + ## Supported Outputs * influxdb diff --git a/accumulator.go b/accumulator.go index 2e8b61d1c..8dbf2e8aa 100644 --- a/accumulator.go +++ b/accumulator.go @@ -107,7 +107,7 @@ func (ac *accumulator) AddFields( } if ac.pluginConfig != nil { - if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) { + if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) { return } } diff --git a/agent.go b/agent.go index 4bc001182..68b1b5f16 100644 --- a/agent.go +++ b/agent.go @@ -230,12 +230,13 @@ func (a *Agent) writeOutput( start := time.Now() for { - err := ro.Output.Write(points) + filtered := ro.FilterPoints(points) + err := ro.Output.Write(filtered) if err == nil { // Write successful elapsed := time.Since(start) log.Printf("Flushed %d metrics to output %s in %s\n", - len(points), ro.Name, elapsed) + len(filtered), ro.Name, elapsed) return } diff --git a/internal/config/config.go b/internal/config/config.go index f16803142..26a1eb640 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,8 @@ import ( "github.com/naoina/toml" "github.com/naoina/toml/ast" + + "github.com/influxdb/influxdb/client/v2" ) // Config specifies the URL/user/password for the database that telegraf @@ -88,6 +90,7 @@ type TagFilter struct { type RunningOutput struct { Name string Output outputs.Output + Config *OutputConfig } type RunningPlugin struct { @@ -96,25 +99,52 @@ type RunningPlugin struct { Config *PluginConfig } -// PluginConfig containing a name, interval, and drop/pass prefix lists -// Also lists the tags to filter -type PluginConfig struct { - Name string - +// Filter containing drop/pass and tagdrop/tagpass rules +type Filter struct { Drop []string Pass []string TagDrop []TagFilter TagPass []TagFilter + IsActive bool +} + +// PluginConfig containing a name, interval, and filter +type PluginConfig struct { + Name string + Filter Filter Interval time.Duration } +// OutputConfig containing name and filter +type OutputConfig struct { + Name string + Filter Filter +} + +// Filter returns filtered slice of client.Points based on whether filters +// are active for this RunningOutput. +func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point { + if !ro.Config.Filter.IsActive { + return points + } + + var filteredPoints []*client.Point + for i := range points { + if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) { + continue + } + filteredPoints = append(filteredPoints, points[i]) + } + return filteredPoints +} + // ShouldPass returns true if the metric should pass, false if should drop -// based on the drop/pass plugin parameters -func (cp *PluginConfig) ShouldPass(measurement string) bool { - if cp.Pass != nil { - for _, pat := range cp.Pass { +// based on the drop/pass filter parameters +func (f Filter) ShouldPass(measurement string) bool { + if f.Pass != nil { + for _, pat := range f.Pass { if strings.HasPrefix(measurement, pat) { return true } @@ -122,8 +152,8 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { return false } - if cp.Drop != nil { - for _, pat := range cp.Drop { + if f.Drop != nil { + for _, pat := range f.Drop { if strings.HasPrefix(measurement, pat) { return false } @@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { } // ShouldTagsPass returns true if the metric should pass, false if should drop -// based on the tagdrop/tagpass plugin parameters -func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { - if cp.TagPass != nil { - for _, pat := range cp.TagPass { +// based on the tagdrop/tagpass filter parameters +func (f Filter) ShouldTagsPass(tags map[string]string) bool { + if f.TagPass != nil { + for _, pat := range f.TagPass { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { return false } - if cp.TagDrop != nil { - for _, pat := range cp.TagDrop { + if f.TagDrop != nil { + for _, pat := range f.TagDrop { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error { if !ok { return fmt.Errorf("Undefined but requested output: %s", name) } - o := creator() + output := creator() - if err := toml.UnmarshalTable(table, o); err != nil { + outputConfig, err := buildOutput(name, table) + if err != nil { + return err + } + + if err := toml.UnmarshalTable(table, output); err != nil { return err } ro := &RunningOutput{ Name: name, - Output: o, + Output: output, + Config: outputConfig, } c.Outputs = append(c.Outputs, ro) return nil @@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error { } plugin := creator() - pluginConfig, err := applyPlugin(name, table, plugin) + pluginConfig, err := buildPlugin(name, table) if err != nil { return err } + + if err := toml.UnmarshalTable(table, plugin); err != nil { + return err + } + rp := &RunningPlugin{ Name: name, Plugin: plugin, @@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error { return nil } -// applyPlugin takes defined plugin names and applies them to the given -// interface, returning a PluginConfig object in the end that can -// be inserted into a runningPlugin by the agent. -func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) { - cp := &PluginConfig{Name: name} +// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to +// be inserted into the OutputConfig/PluginConfig to be used for prefix +// filtering on tags and measurements +func buildFilter(tbl *ast.Table) Filter { + f := Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Pass = append(cp.Pass, str.Value) + f.Pass = append(f.Pass, str.Value) + f.IsActive = true } } } @@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Drop = append(cp.Drop, str.Value) + f.Drop = append(f.Drop, str.Value) + f.IsActive = true } } } } } - if node, ok := tbl.Fields["interval"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - cp.Interval = dur - } - } - } - if node, ok := tbl.Fields["tagpass"]; ok { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { @@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagPass = append(cp.TagPass, *tagfilter) + f.TagPass = append(f.TagPass, *tagfilter) + f.IsActive = true } } } @@ -579,7 +610,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagDrop = append(cp.TagDrop, *tagfilter) + f.TagDrop = append(f.TagDrop, *tagfilter) + f.IsActive = true } } } @@ -587,8 +619,41 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, delete(tbl.Fields, "drop") delete(tbl.Fields, "pass") - delete(tbl.Fields, "interval") delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagpass") - return cp, toml.UnmarshalTable(tbl, p) + return f +} + +// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a +// PluginConfig to be inserted into RunningPlugin +func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) { + cp := &PluginConfig{Name: name} + if node, ok := tbl.Fields["interval"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + cp.Interval = dur + } + } + } + delete(tbl.Fields, "interval") + cp.Filter = buildFilter(tbl) + return cp, nil + +} + +// buildOutput parses output specific items from the ast.Table, builds the filter and returns an +// OutputConfig to be inserted into RunningPlugin +// Note: error exists in the return for future calls that might require error +func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { + oc := &OutputConfig{ + Name: name, + Filter: buildFilter(tbl), + } + return oc, nil + } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index e53911224..067592a86 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -20,19 +20,22 @@ func TestConfig_LoadSinglePlugin(t *testing.T) { mConfig := &PluginConfig{ Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, + Filter: Filter{ + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, }, + IsActive: true, }, Interval: 5 * time.Second, } @@ -59,19 +62,22 @@ func TestConfig_LoadDirectory(t *testing.T) { mConfig := &PluginConfig{ Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, + Filter: Filter{ + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, }, + IsActive: true, }, Interval: 5 * time.Second, }