From 5349a3b6d16aad1a2163d7b582b508deb9768fb7 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 22 Jan 2016 11:54:12 -0700 Subject: [PATCH] Implement a per-output fixed size metric buffer Also moved some objects out of config.go and put them in their own package, internal/models fixes #568 closes #285 --- CHANGELOG.md | 5 + accumulator.go | 6 +- agent.go | 94 +++-------- internal/config/config.go | 198 ++++------------------ internal/config/config_test.go | 201 ++--------------------- internal/models/filter.go | 92 +++++++++++ internal/models/filter_test.go | 177 ++++++++++++++++++++ internal/models/running_input.go | 24 +++ internal/models/running_output.go | 77 +++++++++ plugins/inputs/docker/docker.go | 7 +- plugins/inputs/influxdb/influxdb.go | 2 +- plugins/inputs/influxdb/influxdb_test.go | 4 +- plugins/inputs/procstat/procstat.go | 16 +- 13 files changed, 468 insertions(+), 435 deletions(-) create mode 100644 internal/models/filter.go create mode 100644 internal/models/filter_test.go create mode 100644 internal/models/running_input.go create mode 100644 internal/models/running_output.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 28b47fe20..2a8dee99d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ### Release Notes: +- Telegraf now keeps a fixed-length buffer of metrics per-output. This buffer +defaults to 10,000 metrics, and is adjustable. The buffer is cleared when a +successful write to that output occurs. - The docker plugin has been significantly overhauled to add more metrics and allow for docker-machine (incl OSX) support. [See the readme](https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/README.md) @@ -26,6 +29,7 @@ specifying a docker endpoint to get metrics from. - [#553](https://github.com/influxdata/telegraf/pull/553): Amazon CloudWatch output. thanks @skwong2! - [#503](https://github.com/influxdata/telegraf/pull/503): Support docker endpoint configuration. - [#563](https://github.com/influxdata/telegraf/pull/563): Docker plugin overhaul. +- [#285](https://github.com/influxdata/telegraf/issues/285): Fixed-size buffer of points. ### Bugfixes - [#506](https://github.com/influxdata/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! @@ -34,6 +38,7 @@ specifying a docker endpoint to get metrics from. - [#543](https://github.com/influxdata/telegraf/issues/543): Statsd Packet size sometimes truncated. - [#440](https://github.com/influxdata/telegraf/issues/440): Don't query filtered devices for disk stats. - [#463](https://github.com/influxdata/telegraf/issues/463): Docker plugin not working on AWS Linux +- [#568](https://github.com/influxdata/telegraf/issues/568): Multiple output race condition. ## v0.10.0 [2016-01-12] diff --git a/accumulator.go b/accumulator.go index c628907d7..83f61ae99 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/influxdb/client/v2" ) @@ -29,7 +29,7 @@ type Accumulator interface { } func NewAccumulator( - inputConfig *config.InputConfig, + inputConfig *models.InputConfig, points chan *client.Point, ) Accumulator { acc := accumulator{} @@ -47,7 +47,7 @@ type accumulator struct { debug bool - inputConfig *config.InputConfig + inputConfig *models.InputConfig prefix string } diff --git a/agent.go b/agent.go index 5425fba33..d0f82145e 100644 --- a/agent.go +++ b/agent.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf/internal/config" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" @@ -101,7 +102,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { wg.Add(1) counter++ - go func(input *config.RunningInput) { + go func(input *models.RunningInput) { defer wg.Done() acc := NewAccumulator(input.Config, pointChan) @@ -144,7 +145,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { // reporting interval. func (a *Agent) gatherSeparate( shutdown chan struct{}, - input *config.RunningInput, + input *models.RunningInput, pointChan chan *client.Point, ) error { ticker := time.NewTicker(input.Config.Interval) @@ -202,7 +203,6 @@ func (a *Agent) Test() error { for _, input := range a.Config.Inputs { acc := NewAccumulator(input.Config, pointChan) acc.SetDebug(true) - // acc.SetPrefix(input.Name + "_") fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) if input.Config.Interval != 0 { @@ -228,93 +228,45 @@ func (a *Agent) Test() error { return nil } -// writeOutput writes a list of points to a single output, with retries. -// Optionally takes a `done` channel to indicate that it is done writing. -func (a *Agent) writeOutput( - points []*client.Point, - ro *config.RunningOutput, - shutdown chan struct{}, - wg *sync.WaitGroup, -) { - defer wg.Done() - if len(points) == 0 { - return - } - retry := 0 - retries := a.Config.Agent.FlushRetries - start := time.Now() - - for { - filtered := ro.FilterPoints(points) - err := ro.Output.Write(filtered) - if err == nil { - // Write successful - elapsed := time.Since(start) - if !a.Config.Agent.Quiet { - log.Printf("Flushed %d metrics to output %s in %s\n", - len(filtered), ro.Name, elapsed) - } - return - } - - select { - case <-shutdown: - return - default: - if retry >= retries { - // No more retries - msg := "FATAL: Write to output [%s] failed %d times, dropping" + - " %d metrics\n" - log.Printf(msg, ro.Name, retries+1, len(points)) - return - } else if err != nil { - // Sleep for a retry - log.Printf("Error in output [%s]: %s, retrying in %s", - ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration) - time.Sleep(a.Config.Agent.FlushInterval.Duration) - } - } - - retry++ - } -} - // flush writes a list of points to all configured outputs -func (a *Agent) flush( - points []*client.Point, - shutdown chan struct{}, - wait bool, -) { +func (a *Agent) flush() { var wg sync.WaitGroup + + wg.Add(len(a.Config.Outputs)) for _, o := range a.Config.Outputs { - wg.Add(1) - go a.writeOutput(points, o, shutdown, &wg) - } - if wait { - wg.Wait() + go func(output *models.RunningOutput) { + defer wg.Done() + err := output.Write() + if err != nil { + log.Printf("Error writing to output [%s]: %s\n", + output.Name, err.Error()) + } + }(o) } + + wg.Wait() } // flusher monitors the points input channel and flushes on the minimum interval func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { // Inelegant, but this sleep is to allow the Gather threads to run, so that // the flusher will flush after metrics are collected. - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) - points := make([]*client.Point, 0) for { select { case <-shutdown: log.Println("Hang on, flushing any cached points before shutdown") - a.flush(points, shutdown, true) + a.flush() return nil case <-ticker.C: - a.flush(points, shutdown, false) - points = make([]*client.Point, 0) + a.flush() case pt := <-pointChan: - points = append(points, pt) + for _, o := range a.Config.Outputs { + o.AddPoint(pt) + } } } } @@ -389,7 +341,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { // configured. Default intervals are handled below with gatherParallel if input.Config.Interval != 0 { wg.Add(1) - go func(input *config.RunningInput) { + go func(input *models.RunningInput) { defer wg.Done() if err := a.gatherSeparate(shutdown, input, pointChan); err != nil { log.Printf(err.Error()) diff --git a/internal/config/config.go b/internal/config/config.go index 3b5e4ff17..005290fb6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,13 +11,12 @@ import ( "time" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" "github.com/naoina/toml" "github.com/naoina/toml/ast" - - "github.com/influxdata/influxdb/client/v2" ) // Config specifies the URL/user/password for the database that telegraf @@ -29,8 +28,8 @@ type Config struct { OutputFilters []string Agent *AgentConfig - Inputs []*RunningInput - Outputs []*RunningOutput + Inputs []*models.RunningInput + Outputs []*models.RunningOutput } func NewConfig() *Config { @@ -40,13 +39,12 @@ func NewConfig() *Config { Interval: internal.Duration{Duration: 10 * time.Second}, RoundInterval: true, FlushInterval: internal.Duration{Duration: 10 * time.Second}, - FlushRetries: 2, FlushJitter: internal.Duration{Duration: 5 * time.Second}, }, Tags: make(map[string]string), - Inputs: make([]*RunningInput, 0), - Outputs: make([]*RunningOutput, 0), + Inputs: make([]*models.RunningInput, 0), + Outputs: make([]*models.RunningOutput, 0), InputFilters: make([]string, 0), OutputFilters: make([]string, 0), } @@ -70,15 +68,17 @@ type AgentConfig struct { // Interval at which to flush data FlushInterval internal.Duration - // FlushRetries is the number of times to retry each data flush - FlushRetries int - // FlushJitter Jitters the flush interval by a random amount. // This is primarily to avoid large write spikes for users running a large // number of telegraf instances. // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s FlushJitter internal.Duration + // MetricBufferLimit is the max number of metrics that each output plugin + // will cache. The buffer is cleared when a successful write occurs. When + // full, the oldest metrics will be overwritten. + MetricBufferLimit int + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -93,129 +93,6 @@ type AgentConfig struct { Hostname string } -// TagFilter is the name of a tag, and the values on which to filter -type TagFilter struct { - Name string - Filter []string -} - -type RunningOutput struct { - Name string - Output outputs.Output - Config *OutputConfig -} - -type RunningInput struct { - Name string - Input inputs.Input - Config *InputConfig -} - -// Filter containing drop/pass and tagdrop/tagpass rules -type Filter struct { - Drop []string - Pass []string - - TagDrop []TagFilter - TagPass []TagFilter - - IsActive bool -} - -// InputConfig containing a name, interval, and filter -type InputConfig struct { - Name string - NameOverride string - MeasurementPrefix string - MeasurementSuffix string - Tags map[string]string - Filter Filter - Interval time.Duration -} - -// OutputConfig containing name and filter -type OutputConfig struct { - Name string - Filter Filter -} - -// Filter returns filtered slice of client.Points based on whether filters -// are active for this RunningOutput. -func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point { - if !ro.Config.Filter.IsActive { - return points - } - - var filteredPoints []*client.Point - for i := range points { - if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) { - continue - } - filteredPoints = append(filteredPoints, points[i]) - } - return filteredPoints -} - -// ShouldPass returns true if the metric should pass, false if should drop -// based on the drop/pass filter parameters -func (f Filter) ShouldPass(fieldkey string) bool { - if f.Pass != nil { - for _, pat := range f.Pass { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { - return true - } - } - return false - } - - if f.Drop != nil { - for _, pat := range f.Drop { - // TODO remove HasPrefix check, leaving it for now for legacy support. - // Cam, 2015-12-07 - if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) { - return false - } - } - - return true - } - return true -} - -// ShouldTagsPass returns true if the metric should pass, false if should drop -// based on the tagdrop/tagpass filter parameters -func (f Filter) ShouldTagsPass(tags map[string]string) bool { - if f.TagPass != nil { - for _, pat := range f.TagPass { - if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return true - } - } - } - } - return false - } - - if f.TagDrop != nil { - for _, pat := range f.TagDrop { - if tagval, ok := tags[pat.Name]; ok { - for _, filter := range pat.Filter { - if internal.Glob(filter, tagval) { - return false - } - } - } - } - return true - } - - return true -} - // Inputs returns a list of strings of the configured inputs. func (c *Config) InputNames() []string { var name []string @@ -251,24 +128,14 @@ func (c *Config) ListTags() string { var header = `# Telegraf configuration # Telegraf is entirely plugin driven. All metrics are gathered from the -# declared inputs. +# declared inputs, and sent to the declared outputs. -# Even if a plugin has no configuration, it must be declared in here -# to be active. Declaring a plugin means just specifying the name -# as a section with no variables. To deactivate a plugin, comment -# out the name and any variables. +# Plugins must be declared in here to be active. +# To deactivate a plugin, comment out the name and any variables. -# Use 'telegraf -config telegraf.toml -test' to see what metrics a config +# Use 'telegraf -config telegraf.conf -test' to see what metrics a config # file would generate. -# One rule that plugins conform to is wherever a connection string -# can be passed, the values '' and 'localhost' are treated specially. -# They indicate to the plugin to use their own builtin configuration to -# connect to the local system. - -# NOTE: The configuration has a few required parameters. They are marked -# with 'required'. Be sure to edit those to make this configuration work. - # Tags can also be specified via a normal map, but only one form at a time: [tags] # dc = "us-east-1" @@ -280,6 +147,11 @@ var header = `# Telegraf configuration # Rounds collection interval to 'interval' # ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true + + # Telegraf will cache metric_buffer_limit metrics for each output, and will + # flush this buffer on a successful write. + metric_buffer_limit = 10000 + # Collection jitter is used to jitter the collection by a random amount. # Each plugin will sleep for a random time within jitter before collecting. # This can be used to avoid many plugins querying things like sysfs at the @@ -535,11 +407,11 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return err } - ro := &RunningOutput{ - Name: name, - Output: output, - Config: outputConfig, + ro := models.NewRunningOutput(name, output, outputConfig) + if c.Agent.MetricBufferLimit > 0 { + ro.PointBufferLimit = c.Agent.MetricBufferLimit } + ro.Quiet = c.Agent.Quiet c.Outputs = append(c.Outputs, ro) return nil } @@ -568,7 +440,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &RunningInput{ + rp := &models.RunningInput{ Name: name, Input: input, Config: pluginConfig, @@ -578,10 +450,10 @@ func (c *Config) addInput(name string, table *ast.Table) error { } // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to -// be inserted into the OutputConfig/InputConfig to be used for prefix +// be inserted into the models.OutputConfig/models.InputConfig to be used for prefix // filtering on tags and measurements -func buildFilter(tbl *ast.Table) Filter { - f := Filter{} +func buildFilter(tbl *ast.Table) models.Filter { + f := models.Filter{} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { @@ -613,7 +485,7 @@ func buildFilter(tbl *ast.Table) Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} + tagfilter := &models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -632,7 +504,7 @@ func buildFilter(tbl *ast.Table) Filter { if subtbl, ok := node.(*ast.Table); ok { for name, val := range subtbl.Fields { if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} + tagfilter := &models.TagFilter{Name: name} if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { @@ -656,9 +528,9 @@ func buildFilter(tbl *ast.Table) Filter { // buildInput parses input specific items from the ast.Table, // builds the filter and returns a -// InputConfig to be inserted into RunningInput -func buildInput(name string, tbl *ast.Table) (*InputConfig, error) { - cp := &InputConfig{Name: name} +// models.InputConfig to be inserted into models.RunningInput +func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { + cp := &models.InputConfig{Name: name} if node, ok := tbl.Fields["interval"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -715,10 +587,10 @@ func buildInput(name string, tbl *ast.Table) (*InputConfig, error) { } // buildOutput parses output specific items from the ast.Table, builds the filter and returns an -// OutputConfig to be inserted into RunningInput +// models.OutputConfig to be inserted into models.RunningInput // Note: error exists in the return for future calls that might require error -func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { - oc := &OutputConfig{ +func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { + oc := &models.OutputConfig{ Name: name, Filter: buildFilter(tbl), } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 40af30c1e..92f45ad0a 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/internal/models" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/memcached" @@ -18,19 +19,19 @@ func TestConfig_LoadSingleInput(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &InputConfig{ + mConfig := &models.InputConfig{ Name: "memcached", - Filter: Filter{ + Filter: models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ + TagDrop: []models.TagFilter{ + models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []TagFilter{ - TagFilter{ + TagPass: []models.TagFilter{ + models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -61,19 +62,19 @@ func TestConfig_LoadDirectory(t *testing.T) { memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &InputConfig{ + mConfig := &models.InputConfig{ Name: "memcached", - Filter: Filter{ + Filter: models.Filter{ Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ + TagDrop: []models.TagFilter{ + models.TagFilter{ Name: "badtag", Filter: []string{"othertag"}, }, }, - TagPass: []TagFilter{ - TagFilter{ + TagPass: []models.TagFilter{ + models.TagFilter{ Name: "goodtag", Filter: []string{"mytag"}, }, @@ -91,7 +92,7 @@ func TestConfig_LoadDirectory(t *testing.T) { ex := inputs.Inputs["exec"]().(*exec.Exec) ex.Command = "/usr/bin/myothercollector --foo=bar" - eConfig := &InputConfig{ + eConfig := &models.InputConfig{ Name: "exec", MeasurementSuffix: "_myothercollector", } @@ -110,7 +111,7 @@ func TestConfig_LoadDirectory(t *testing.T) { pstat := inputs.Inputs["procstat"]().(*procstat.Procstat) pstat.PidFile = "/var/run/grafana-server.pid" - pConfig := &InputConfig{Name: "procstat"} + pConfig := &models.InputConfig{Name: "procstat"} pConfig.Tags = make(map[string]string) assert.Equal(t, pstat, c.Inputs[3].Input, @@ -118,175 +119,3 @@ func TestConfig_LoadDirectory(t *testing.T) { assert.Equal(t, pConfig, c.Inputs[3].Config, "Merged Testdata did not produce correct procstat metadata.") } - -func TestFilter_Empty(t *testing.T) { - f := Filter{} - - measurements := []string{ - "foo", - "bar", - "barfoo", - "foo_bar", - "foo.bar", - "foo-bar", - "supercalifradjulisticexpialidocious", - } - - for _, measurement := range measurements { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } -} - -func TestFilter_Pass(t *testing.T) { - f := Filter{ - Pass: []string{"foo*", "cpu_usage_idle"}, - } - - passes := []string{ - "foo", - "foo_bar", - "foo.bar", - "foo-bar", - "cpu_usage_idle", - } - - drops := []string{ - "bar", - "barfoo", - "bar_foo", - "cpu_usage_busy", - } - - for _, measurement := range passes { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } - - for _, measurement := range drops { - if f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to drop", measurement) - } - } -} - -func TestFilter_Drop(t *testing.T) { - f := Filter{ - Drop: []string{"foo*", "cpu_usage_idle"}, - } - - drops := []string{ - "foo", - "foo_bar", - "foo.bar", - "foo-bar", - "cpu_usage_idle", - } - - passes := []string{ - "bar", - "barfoo", - "bar_foo", - "cpu_usage_busy", - } - - for _, measurement := range passes { - if !f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to pass", measurement) - } - } - - for _, measurement := range drops { - if f.ShouldPass(measurement) { - t.Errorf("Expected measurement %s to drop", measurement) - } - } -} - -func TestFilter_TagPass(t *testing.T) { - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"cpu-*"}, - }, - TagFilter{ - Name: "mem", - Filter: []string{"mem_free"}, - }} - f := Filter{ - TagPass: filters, - } - - passes := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, - } - - drops := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, - } - - for _, tags := range passes { - if !f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to pass", tags) - } - } - - for _, tags := range drops { - if f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to drop", tags) - } - } -} - -func TestFilter_TagDrop(t *testing.T) { - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"cpu-*"}, - }, - TagFilter{ - Name: "mem", - Filter: []string{"mem_free"}, - }} - f := Filter{ - TagDrop: filters, - } - - drops := []map[string]string{ - {"cpu": "cpu-total"}, - {"cpu": "cpu-0"}, - {"cpu": "cpu-1"}, - {"cpu": "cpu-2"}, - {"mem": "mem_free"}, - } - - passes := []map[string]string{ - {"cpu": "cputotal"}, - {"cpu": "cpu0"}, - {"cpu": "cpu1"}, - {"cpu": "cpu2"}, - {"mem": "mem_used"}, - } - - for _, tags := range passes { - if !f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to pass", tags) - } - } - - for _, tags := range drops { - if f.ShouldTagsPass(tags) { - t.Errorf("Expected tags %v to drop", tags) - } - } -} diff --git a/internal/models/filter.go b/internal/models/filter.go new file mode 100644 index 000000000..3f171ccac --- /dev/null +++ b/internal/models/filter.go @@ -0,0 +1,92 @@ +package models + +import ( + "strings" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf/internal" +) + +// TagFilter is the name of a tag, and the values on which to filter +type TagFilter struct { + Name string + Filter []string +} + +// Filter containing drop/pass and tagdrop/tagpass rules +type Filter struct { + Drop []string + Pass []string + + TagDrop []TagFilter + TagPass []TagFilter + + IsActive bool +} + +func (f Filter) ShouldPointPass(point *client.Point) bool { + if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) { + return true + } + return false +} + +// ShouldPass returns true if the metric should pass, false if should drop +// based on the drop/pass filter parameters +func (f Filter) ShouldPass(key string) bool { + if f.Pass != nil { + for _, pat := range f.Pass { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return true + } + } + return false + } + + if f.Drop != nil { + for _, pat := range f.Drop { + // TODO remove HasPrefix check, leaving it for now for legacy support. + // Cam, 2015-12-07 + if strings.HasPrefix(key, pat) || internal.Glob(pat, key) { + return false + } + } + + return true + } + return true +} + +// ShouldTagsPass returns true if the metric should pass, false if should drop +// based on the tagdrop/tagpass filter parameters +func (f Filter) ShouldTagsPass(tags map[string]string) bool { + if f.TagPass != nil { + for _, pat := range f.TagPass { + if tagval, ok := tags[pat.Name]; ok { + for _, filter := range pat.Filter { + if internal.Glob(filter, tagval) { + return true + } + } + } + } + return false + } + + if f.TagDrop != nil { + for _, pat := range f.TagDrop { + if tagval, ok := tags[pat.Name]; ok { + for _, filter := range pat.Filter { + if internal.Glob(filter, tagval) { + return false + } + } + } + } + return true + } + + return true +} diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go new file mode 100644 index 000000000..9e962e420 --- /dev/null +++ b/internal/models/filter_test.go @@ -0,0 +1,177 @@ +package models + +import ( + "testing" +) + +func TestFilter_Empty(t *testing.T) { + f := Filter{} + + measurements := []string{ + "foo", + "bar", + "barfoo", + "foo_bar", + "foo.bar", + "foo-bar", + "supercalifradjulisticexpialidocious", + } + + for _, measurement := range measurements { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } +} + +func TestFilter_Pass(t *testing.T) { + f := Filter{ + Pass: []string{"foo*", "cpu_usage_idle"}, + } + + passes := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + drops := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_Drop(t *testing.T) { + f := Filter{ + Drop: []string{"foo*", "cpu_usage_idle"}, + } + + drops := []string{ + "foo", + "foo_bar", + "foo.bar", + "foo-bar", + "cpu_usage_idle", + } + + passes := []string{ + "bar", + "barfoo", + "bar_foo", + "cpu_usage_busy", + } + + for _, measurement := range passes { + if !f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to pass", measurement) + } + } + + for _, measurement := range drops { + if f.ShouldPass(measurement) { + t.Errorf("Expected measurement %s to drop", measurement) + } + } +} + +func TestFilter_TagPass(t *testing.T) { + filters := []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"cpu-*"}, + }, + TagFilter{ + Name: "mem", + Filter: []string{"mem_free"}, + }} + f := Filter{ + TagPass: filters, + } + + passes := []map[string]string{ + {"cpu": "cpu-total"}, + {"cpu": "cpu-0"}, + {"cpu": "cpu-1"}, + {"cpu": "cpu-2"}, + {"mem": "mem_free"}, + } + + drops := []map[string]string{ + {"cpu": "cputotal"}, + {"cpu": "cpu0"}, + {"cpu": "cpu1"}, + {"cpu": "cpu2"}, + {"mem": "mem_used"}, + } + + for _, tags := range passes { + if !f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to pass", tags) + } + } + + for _, tags := range drops { + if f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to drop", tags) + } + } +} + +func TestFilter_TagDrop(t *testing.T) { + filters := []TagFilter{ + TagFilter{ + Name: "cpu", + Filter: []string{"cpu-*"}, + }, + TagFilter{ + Name: "mem", + Filter: []string{"mem_free"}, + }} + f := Filter{ + TagDrop: filters, + } + + drops := []map[string]string{ + {"cpu": "cpu-total"}, + {"cpu": "cpu-0"}, + {"cpu": "cpu-1"}, + {"cpu": "cpu-2"}, + {"mem": "mem_free"}, + } + + passes := []map[string]string{ + {"cpu": "cputotal"}, + {"cpu": "cpu0"}, + {"cpu": "cpu1"}, + {"cpu": "cpu2"}, + {"mem": "mem_used"}, + } + + for _, tags := range passes { + if !f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to pass", tags) + } + } + + for _, tags := range drops { + if f.ShouldTagsPass(tags) { + t.Errorf("Expected tags %v to drop", tags) + } + } +} diff --git a/internal/models/running_input.go b/internal/models/running_input.go new file mode 100644 index 000000000..17c0d2129 --- /dev/null +++ b/internal/models/running_input.go @@ -0,0 +1,24 @@ +package models + +import ( + "time" + + "github.com/influxdata/telegraf/plugins/inputs" +) + +type RunningInput struct { + Name string + Input inputs.Input + Config *InputConfig +} + +// InputConfig containing a name, interval, and filter +type InputConfig struct { + Name string + NameOverride string + MeasurementPrefix string + MeasurementSuffix string + Tags map[string]string + Filter Filter + Interval time.Duration +} diff --git a/internal/models/running_output.go b/internal/models/running_output.go new file mode 100644 index 000000000..196ebdc8a --- /dev/null +++ b/internal/models/running_output.go @@ -0,0 +1,77 @@ +package models + +import ( + "log" + "time" + + "github.com/influxdata/telegraf/plugins/outputs" + + "github.com/influxdata/influxdb/client/v2" +) + +const DEFAULT_POINT_BUFFER_LIMIT = 10000 + +type RunningOutput struct { + Name string + Output outputs.Output + Config *OutputConfig + Quiet bool + PointBufferLimit int + + points []*client.Point + overwriteCounter int +} + +func NewRunningOutput( + name string, + output outputs.Output, + conf *OutputConfig, +) *RunningOutput { + ro := &RunningOutput{ + Name: name, + points: make([]*client.Point, 0), + Output: output, + Config: conf, + PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT, + } + return ro +} + +func (ro *RunningOutput) AddPoint(point *client.Point) { + if ro.Config.Filter.IsActive { + if !ro.Config.Filter.ShouldPointPass(point) { + return + } + } + + if len(ro.points) < ro.PointBufferLimit { + ro.points = append(ro.points, point) + } else { + if ro.overwriteCounter == len(ro.points) { + ro.overwriteCounter = 0 + } + ro.points[ro.overwriteCounter] = point + ro.overwriteCounter++ + } +} + +func (ro *RunningOutput) Write() error { + start := time.Now() + err := ro.Output.Write(ro.points) + elapsed := time.Since(start) + if err == nil { + if !ro.Quiet { + log.Printf("Wrote %d metrics to output %s in %s\n", + len(ro.points), ro.Name, elapsed) + } + ro.points = make([]*client.Point, 0) + ro.overwriteCounter = 0 + } + return err +} + +// OutputConfig containing name and filter +type OutputConfig struct { + Name string + Filter Filter +} diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 70fcaa19a..0e96dd176 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -110,15 +110,12 @@ func (d *Docker) gatherContainer( Timeout: time.Duration(time.Second * 5), } - var err error go func() { - err = d.client.Stats(statOpts) + d.client.Stats(statOpts) }() stat := <-statChan - if err != nil { - return err - } + close(done) // Add labels to tags for k, v := range container.Labels { diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index e65c8afd2..311f6ba0c 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -130,7 +130,7 @@ func (i *InfluxDB) gatherURL( p.Tags["url"] = url acc.AddFields( - p.Name, + "influxdb_"+p.Name, p.Values, p.Tags, ) diff --git a/plugins/inputs/influxdb/influxdb_test.go b/plugins/inputs/influxdb/influxdb_test.go index e7b43e7bc..ef6c1a97a 100644 --- a/plugins/inputs/influxdb/influxdb_test.go +++ b/plugins/inputs/influxdb/influxdb_test.go @@ -84,7 +84,7 @@ func TestBasic(t *testing.T) { "id": "ex1", "url": fakeServer.URL + "/endpoint", } - acc.AssertContainsTaggedFields(t, "foo", fields, tags) + acc.AssertContainsTaggedFields(t, "influxdb_foo", fields, tags) fields = map[string]interface{}{ "x": "x", @@ -93,5 +93,5 @@ func TestBasic(t *testing.T) { "id": "ex2", "url": fakeServer.URL + "/endpoint", } - acc.AssertContainsTaggedFields(t, "bar", fields, tags) + acc.AssertContainsTaggedFields(t, "influxdb_bar", fields, tags) } diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index aa56bd501..fd8158ec7 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -129,9 +129,13 @@ func pidsFromFile(file string) ([]int32, error) { func pidsFromExe(exe string) ([]int32, error) { var out []int32 var outerr error - pgrep, err := exec.Command("pgrep", exe).Output() + bin, err := exec.LookPath("pgrep") if err != nil { - return out, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err) + return out, fmt.Errorf("Couldn't find pgrep binary: %s", err) + } + pgrep, err := exec.Command(bin, exe).Output() + if err != nil { + return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err) } else { pids := strings.Fields(string(pgrep)) for _, pid := range pids { @@ -149,9 +153,13 @@ func pidsFromExe(exe string) ([]int32, error) { func pidsFromPattern(pattern string) ([]int32, error) { var out []int32 var outerr error - pgrep, err := exec.Command("pgrep", "-f", pattern).Output() + bin, err := exec.LookPath("pgrep") if err != nil { - return out, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err) + return out, fmt.Errorf("Couldn't find pgrep binary: %s", err) + } + pgrep, err := exec.Command(bin, "-f", pattern).Output() + if err != nil { + return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err) } else { pids := strings.Fields(string(pgrep)) for _, pid := range pids {