Implement a per-output fixed size metric buffer

Also moved some objects out of config.go and put them in their own
package, internal/models

fixes #568
closes #285
This commit is contained in:
Cameron Sparr 2016-01-22 11:54:12 -07:00
parent f2ab5f61f5
commit 5349a3b6d1
13 changed files with 468 additions and 435 deletions

View File

@ -2,6 +2,9 @@
### Release Notes: ### Release Notes:
- Telegraf now keeps a fixed-length buffer of metrics per-output. This buffer
defaults to 10,000 metrics, and is adjustable. The buffer is cleared when a
successful write to that output occurs.
- The docker plugin has been significantly overhauled to add more metrics - The docker plugin has been significantly overhauled to add more metrics
and allow for docker-machine (incl OSX) support. and allow for docker-machine (incl OSX) support.
[See the readme](https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/README.md) [See the readme](https://github.com/influxdata/telegraf/blob/master/plugins/inputs/docker/README.md)
@ -26,6 +29,7 @@ specifying a docker endpoint to get metrics from.
- [#553](https://github.com/influxdata/telegraf/pull/553): Amazon CloudWatch output. thanks @skwong2! - [#553](https://github.com/influxdata/telegraf/pull/553): Amazon CloudWatch output. thanks @skwong2!
- [#503](https://github.com/influxdata/telegraf/pull/503): Support docker endpoint configuration. - [#503](https://github.com/influxdata/telegraf/pull/503): Support docker endpoint configuration.
- [#563](https://github.com/influxdata/telegraf/pull/563): Docker plugin overhaul. - [#563](https://github.com/influxdata/telegraf/pull/563): Docker plugin overhaul.
- [#285](https://github.com/influxdata/telegraf/issues/285): Fixed-size buffer of points.
### Bugfixes ### Bugfixes
- [#506](https://github.com/influxdata/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! - [#506](https://github.com/influxdata/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
@ -34,6 +38,7 @@ specifying a docker endpoint to get metrics from.
- [#543](https://github.com/influxdata/telegraf/issues/543): Statsd Packet size sometimes truncated. - [#543](https://github.com/influxdata/telegraf/issues/543): Statsd Packet size sometimes truncated.
- [#440](https://github.com/influxdata/telegraf/issues/440): Don't query filtered devices for disk stats. - [#440](https://github.com/influxdata/telegraf/issues/440): Don't query filtered devices for disk stats.
- [#463](https://github.com/influxdata/telegraf/issues/463): Docker plugin not working on AWS Linux - [#463](https://github.com/influxdata/telegraf/issues/463): Docker plugin not working on AWS Linux
- [#568](https://github.com/influxdata/telegraf/issues/568): Multiple output race condition.
## v0.10.0 [2016-01-12] ## v0.10.0 [2016-01-12]

View File

@ -7,7 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
) )
@ -29,7 +29,7 @@ type Accumulator interface {
} }
func NewAccumulator( func NewAccumulator(
inputConfig *config.InputConfig, inputConfig *models.InputConfig,
points chan *client.Point, points chan *client.Point,
) Accumulator { ) Accumulator {
acc := accumulator{} acc := accumulator{}
@ -47,7 +47,7 @@ type accumulator struct {
debug bool debug bool
inputConfig *config.InputConfig inputConfig *models.InputConfig
prefix string prefix string
} }

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
@ -101,7 +102,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
wg.Add(1) wg.Add(1)
counter++ counter++
go func(input *config.RunningInput) { go func(input *models.RunningInput) {
defer wg.Done() defer wg.Done()
acc := NewAccumulator(input.Config, pointChan) acc := NewAccumulator(input.Config, pointChan)
@ -144,7 +145,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
// reporting interval. // reporting interval.
func (a *Agent) gatherSeparate( func (a *Agent) gatherSeparate(
shutdown chan struct{}, shutdown chan struct{},
input *config.RunningInput, input *models.RunningInput,
pointChan chan *client.Point, pointChan chan *client.Point,
) error { ) error {
ticker := time.NewTicker(input.Config.Interval) ticker := time.NewTicker(input.Config.Interval)
@ -202,7 +203,6 @@ func (a *Agent) Test() error {
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, pointChan) acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(true) acc.SetDebug(true)
// acc.SetPrefix(input.Name + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", input.Name) fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
if input.Config.Interval != 0 { if input.Config.Interval != 0 {
@ -228,93 +228,45 @@ func (a *Agent) Test() error {
return nil return nil
} }
// writeOutput writes a list of points to a single output, with retries.
// Optionally takes a `done` channel to indicate that it is done writing.
func (a *Agent) writeOutput(
points []*client.Point,
ro *config.RunningOutput,
shutdown chan struct{},
wg *sync.WaitGroup,
) {
defer wg.Done()
if len(points) == 0 {
return
}
retry := 0
retries := a.Config.Agent.FlushRetries
start := time.Now()
for {
filtered := ro.FilterPoints(points)
err := ro.Output.Write(filtered)
if err == nil {
// Write successful
elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Flushed %d metrics to output %s in %s\n",
len(filtered), ro.Name, elapsed)
}
return
}
select {
case <-shutdown:
return
default:
if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.Name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration)
time.Sleep(a.Config.Agent.FlushInterval.Duration)
}
}
retry++
}
}
// flush writes a list of points to all configured outputs // flush writes a list of points to all configured outputs
func (a *Agent) flush( func (a *Agent) flush() {
points []*client.Point,
shutdown chan struct{},
wait bool,
) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(a.Config.Outputs))
for _, o := range a.Config.Outputs { for _, o := range a.Config.Outputs {
wg.Add(1) go func(output *models.RunningOutput) {
go a.writeOutput(points, o, shutdown, &wg) defer wg.Done()
} err := output.Write()
if wait { if err != nil {
wg.Wait() log.Printf("Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
} }
wg.Wait()
} }
// flusher monitors the points input channel and flushes on the minimum interval // flusher monitors the points input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error { func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that // Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected. // the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 200)
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
points := make([]*client.Point, 0)
for { for {
select { select {
case <-shutdown: case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown") log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown, true) a.flush()
return nil return nil
case <-ticker.C: case <-ticker.C:
a.flush(points, shutdown, false) a.flush()
points = make([]*client.Point, 0)
case pt := <-pointChan: case pt := <-pointChan:
points = append(points, pt) for _, o := range a.Config.Outputs {
o.AddPoint(pt)
}
} }
} }
} }
@ -389,7 +341,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// configured. Default intervals are handled below with gatherParallel // configured. Default intervals are handled below with gatherParallel
if input.Config.Interval != 0 { if input.Config.Interval != 0 {
wg.Add(1) wg.Add(1)
go func(input *config.RunningInput) { go func(input *models.RunningInput) {
defer wg.Done() defer wg.Done()
if err := a.gatherSeparate(shutdown, input, pointChan); err != nil { if err := a.gatherSeparate(shutdown, input, pointChan); err != nil {
log.Printf(err.Error()) log.Printf(err.Error())

View File

@ -11,13 +11,12 @@ import (
"time" "time"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/naoina/toml" "github.com/naoina/toml"
"github.com/naoina/toml/ast" "github.com/naoina/toml/ast"
"github.com/influxdata/influxdb/client/v2"
) )
// Config specifies the URL/user/password for the database that telegraf // Config specifies the URL/user/password for the database that telegraf
@ -29,8 +28,8 @@ type Config struct {
OutputFilters []string OutputFilters []string
Agent *AgentConfig Agent *AgentConfig
Inputs []*RunningInput Inputs []*models.RunningInput
Outputs []*RunningOutput Outputs []*models.RunningOutput
} }
func NewConfig() *Config { func NewConfig() *Config {
@ -40,13 +39,12 @@ func NewConfig() *Config {
Interval: internal.Duration{Duration: 10 * time.Second}, Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true, RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second}, FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushRetries: 2,
FlushJitter: internal.Duration{Duration: 5 * time.Second}, FlushJitter: internal.Duration{Duration: 5 * time.Second},
}, },
Tags: make(map[string]string), Tags: make(map[string]string),
Inputs: make([]*RunningInput, 0), Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*RunningOutput, 0), Outputs: make([]*models.RunningOutput, 0),
InputFilters: make([]string, 0), InputFilters: make([]string, 0),
OutputFilters: make([]string, 0), OutputFilters: make([]string, 0),
} }
@ -70,15 +68,17 @@ type AgentConfig struct {
// Interval at which to flush data // Interval at which to flush data
FlushInterval internal.Duration FlushInterval internal.Duration
// FlushRetries is the number of times to retry each data flush
FlushRetries int
// FlushJitter Jitters the flush interval by a random amount. // FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large // This is primarily to avoid large write spikes for users running a large
// number of telegraf instances. // number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration FlushJitter internal.Duration
// MetricBufferLimit is the max number of metrics that each output plugin
// will cache. The buffer is cleared when a successful write occurs. When
// full, the oldest metrics will be overwritten.
MetricBufferLimit int
// TODO(cam): Remove UTC and Precision parameters, they are no longer // TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards- // valid for the agent config. Leaving them here for now for backwards-
// compatability // compatability
@ -93,129 +93,6 @@ type AgentConfig struct {
Hostname string Hostname string
} }
// TagFilter is the name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
}
type RunningOutput struct {
Name string
Output outputs.Output
Config *OutputConfig
}
type RunningInput struct {
Name string
Input inputs.Input
Config *InputConfig
}
// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
Drop []string
Pass []string
TagDrop []TagFilter
TagPass []TagFilter
IsActive bool
}
// InputConfig containing a name, interval, and filter
type InputConfig struct {
Name string
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}
// Filter returns filtered slice of client.Points based on whether filters
// are active for this RunningOutput.
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
if !ro.Config.Filter.IsActive {
return points
}
var filteredPoints []*client.Point
for i := range points {
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
continue
}
filteredPoints = append(filteredPoints, points[i])
}
return filteredPoints
}
// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f Filter) ShouldPass(fieldkey string) bool {
if f.Pass != nil {
for _, pat := range f.Pass {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return true
}
}
return false
}
if f.Drop != nil {
for _, pat := range f.Drop {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(fieldkey, pat) || internal.Glob(pat, fieldkey) {
return false
}
}
return true
}
return true
}
// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass filter parameters
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
if f.TagPass != nil {
for _, pat := range f.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return true
}
}
}
}
return false
}
if f.TagDrop != nil {
for _, pat := range f.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return false
}
}
}
}
return true
}
return true
}
// Inputs returns a list of strings of the configured inputs. // Inputs returns a list of strings of the configured inputs.
func (c *Config) InputNames() []string { func (c *Config) InputNames() []string {
var name []string var name []string
@ -251,24 +128,14 @@ func (c *Config) ListTags() string {
var header = `# Telegraf configuration var header = `# Telegraf configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the # Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs. # declared inputs, and sent to the declared outputs.
# Even if a plugin has no configuration, it must be declared in here # Plugins must be declared in here to be active.
# to be active. Declaring a plugin means just specifying the name # To deactivate a plugin, comment out the name and any variables.
# as a section with no variables. To deactivate a plugin, comment
# out the name and any variables.
# Use 'telegraf -config telegraf.toml -test' to see what metrics a config # Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate. # file would generate.
# One rule that plugins conform to is wherever a connection string
# can be passed, the values '' and 'localhost' are treated specially.
# They indicate to the plugin to use their own builtin configuration to
# connect to the local system.
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
[tags] [tags]
# dc = "us-east-1" # dc = "us-east-1"
@ -280,6 +147,11 @@ var header = `# Telegraf configuration
# Rounds collection interval to 'interval' # Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc. # ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true round_interval = true
# Telegraf will cache metric_buffer_limit metrics for each output, and will
# flush this buffer on a successful write.
metric_buffer_limit = 10000
# Collection jitter is used to jitter the collection by a random amount. # Collection jitter is used to jitter the collection by a random amount.
# Each plugin will sleep for a random time within jitter before collecting. # Each plugin will sleep for a random time within jitter before collecting.
# This can be used to avoid many plugins querying things like sysfs at the # This can be used to avoid many plugins querying things like sysfs at the
@ -535,11 +407,11 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return err return err
} }
ro := &RunningOutput{ ro := models.NewRunningOutput(name, output, outputConfig)
Name: name, if c.Agent.MetricBufferLimit > 0 {
Output: output, ro.PointBufferLimit = c.Agent.MetricBufferLimit
Config: outputConfig,
} }
ro.Quiet = c.Agent.Quiet
c.Outputs = append(c.Outputs, ro) c.Outputs = append(c.Outputs, ro)
return nil return nil
} }
@ -568,7 +440,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err return err
} }
rp := &RunningInput{ rp := &models.RunningInput{
Name: name, Name: name,
Input: input, Input: input,
Config: pluginConfig, Config: pluginConfig,
@ -578,10 +450,10 @@ func (c *Config) addInput(name string, table *ast.Table) error {
} }
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to // buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
// be inserted into the OutputConfig/InputConfig to be used for prefix // be inserted into the models.OutputConfig/models.InputConfig to be used for prefix
// filtering on tags and measurements // filtering on tags and measurements
func buildFilter(tbl *ast.Table) Filter { func buildFilter(tbl *ast.Table) models.Filter {
f := Filter{} f := models.Filter{}
if node, ok := tbl.Fields["pass"]; ok { if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
@ -613,7 +485,7 @@ func buildFilter(tbl *ast.Table) Filter {
if subtbl, ok := node.(*ast.Table); ok { if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields { for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok { if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name} tagfilter := &models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
@ -632,7 +504,7 @@ func buildFilter(tbl *ast.Table) Filter {
if subtbl, ok := node.(*ast.Table); ok { if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields { for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok { if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name} tagfilter := &models.TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok { if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value { for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok { if str, ok := elem.(*ast.String); ok {
@ -656,9 +528,9 @@ func buildFilter(tbl *ast.Table) Filter {
// buildInput parses input specific items from the ast.Table, // buildInput parses input specific items from the ast.Table,
// builds the filter and returns a // builds the filter and returns a
// InputConfig to be inserted into RunningInput // models.InputConfig to be inserted into models.RunningInput
func buildInput(name string, tbl *ast.Table) (*InputConfig, error) { func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &InputConfig{Name: name} cp := &models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok { if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok { if str, ok := kv.Value.(*ast.String); ok {
@ -715,10 +587,10 @@ func buildInput(name string, tbl *ast.Table) (*InputConfig, error) {
} }
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an // buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// OutputConfig to be inserted into RunningInput // models.OutputConfig to be inserted into models.RunningInput
// Note: error exists in the return for future calls that might require error // Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) { func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
oc := &OutputConfig{ oc := &models.OutputConfig{
Name: name, Name: name,
Filter: buildFilter(tbl), Filter: buildFilter(tbl),
} }

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/internal/models"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/exec" "github.com/influxdata/telegraf/plugins/inputs/exec"
"github.com/influxdata/telegraf/plugins/inputs/memcached" "github.com/influxdata/telegraf/plugins/inputs/memcached"
@ -18,19 +19,19 @@ func TestConfig_LoadSingleInput(t *testing.T) {
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"} memcached.Servers = []string{"localhost"}
mConfig := &InputConfig{ mConfig := &models.InputConfig{
Name: "memcached", Name: "memcached",
Filter: Filter{ Filter: models.Filter{
Drop: []string{"other", "stuff"}, Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"}, Pass: []string{"some", "strings"},
TagDrop: []TagFilter{ TagDrop: []models.TagFilter{
TagFilter{ models.TagFilter{
Name: "badtag", Name: "badtag",
Filter: []string{"othertag"}, Filter: []string{"othertag"},
}, },
}, },
TagPass: []TagFilter{ TagPass: []models.TagFilter{
TagFilter{ models.TagFilter{
Name: "goodtag", Name: "goodtag",
Filter: []string{"mytag"}, Filter: []string{"mytag"},
}, },
@ -61,19 +62,19 @@ func TestConfig_LoadDirectory(t *testing.T) {
memcached := inputs.Inputs["memcached"]().(*memcached.Memcached) memcached := inputs.Inputs["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"} memcached.Servers = []string{"localhost"}
mConfig := &InputConfig{ mConfig := &models.InputConfig{
Name: "memcached", Name: "memcached",
Filter: Filter{ Filter: models.Filter{
Drop: []string{"other", "stuff"}, Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"}, Pass: []string{"some", "strings"},
TagDrop: []TagFilter{ TagDrop: []models.TagFilter{
TagFilter{ models.TagFilter{
Name: "badtag", Name: "badtag",
Filter: []string{"othertag"}, Filter: []string{"othertag"},
}, },
}, },
TagPass: []TagFilter{ TagPass: []models.TagFilter{
TagFilter{ models.TagFilter{
Name: "goodtag", Name: "goodtag",
Filter: []string{"mytag"}, Filter: []string{"mytag"},
}, },
@ -91,7 +92,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
ex := inputs.Inputs["exec"]().(*exec.Exec) ex := inputs.Inputs["exec"]().(*exec.Exec)
ex.Command = "/usr/bin/myothercollector --foo=bar" ex.Command = "/usr/bin/myothercollector --foo=bar"
eConfig := &InputConfig{ eConfig := &models.InputConfig{
Name: "exec", Name: "exec",
MeasurementSuffix: "_myothercollector", MeasurementSuffix: "_myothercollector",
} }
@ -110,7 +111,7 @@ func TestConfig_LoadDirectory(t *testing.T) {
pstat := inputs.Inputs["procstat"]().(*procstat.Procstat) pstat := inputs.Inputs["procstat"]().(*procstat.Procstat)
pstat.PidFile = "/var/run/grafana-server.pid" pstat.PidFile = "/var/run/grafana-server.pid"
pConfig := &InputConfig{Name: "procstat"} pConfig := &models.InputConfig{Name: "procstat"}
pConfig.Tags = make(map[string]string) pConfig.Tags = make(map[string]string)
assert.Equal(t, pstat, c.Inputs[3].Input, assert.Equal(t, pstat, c.Inputs[3].Input,
@ -118,175 +119,3 @@ func TestConfig_LoadDirectory(t *testing.T) {
assert.Equal(t, pConfig, c.Inputs[3].Config, assert.Equal(t, pConfig, c.Inputs[3].Config,
"Merged Testdata did not produce correct procstat metadata.") "Merged Testdata did not produce correct procstat metadata.")
} }
func TestFilter_Empty(t *testing.T) {
f := Filter{}
measurements := []string{
"foo",
"bar",
"barfoo",
"foo_bar",
"foo.bar",
"foo-bar",
"supercalifradjulisticexpialidocious",
}
for _, measurement := range measurements {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
}
func TestFilter_Pass(t *testing.T) {
f := Filter{
Pass: []string{"foo*", "cpu_usage_idle"},
}
passes := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
drops := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_Drop(t *testing.T) {
f := Filter{
Drop: []string{"foo*", "cpu_usage_idle"},
}
drops := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
passes := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_TagPass(t *testing.T) {
filters := []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"cpu-*"},
},
TagFilter{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagPass: filters,
}
passes := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
}
drops := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
}
for _, tags := range passes {
if !f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}
func TestFilter_TagDrop(t *testing.T) {
filters := []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"cpu-*"},
},
TagFilter{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagDrop: filters,
}
drops := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
}
passes := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
}
for _, tags := range passes {
if !f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}

92
internal/models/filter.go Normal file
View File

@ -0,0 +1,92 @@
package models
import (
"strings"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/internal"
)
// TagFilter is the name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
}
// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
Drop []string
Pass []string
TagDrop []TagFilter
TagPass []TagFilter
IsActive bool
}
func (f Filter) ShouldPointPass(point *client.Point) bool {
if f.ShouldPass(point.Name()) && f.ShouldTagsPass(point.Tags()) {
return true
}
return false
}
// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass filter parameters
func (f Filter) ShouldPass(key string) bool {
if f.Pass != nil {
for _, pat := range f.Pass {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return true
}
}
return false
}
if f.Drop != nil {
for _, pat := range f.Drop {
// TODO remove HasPrefix check, leaving it for now for legacy support.
// Cam, 2015-12-07
if strings.HasPrefix(key, pat) || internal.Glob(pat, key) {
return false
}
}
return true
}
return true
}
// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass filter parameters
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
if f.TagPass != nil {
for _, pat := range f.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return true
}
}
}
}
return false
}
if f.TagDrop != nil {
for _, pat := range f.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if internal.Glob(filter, tagval) {
return false
}
}
}
}
return true
}
return true
}

View File

@ -0,0 +1,177 @@
package models
import (
"testing"
)
func TestFilter_Empty(t *testing.T) {
f := Filter{}
measurements := []string{
"foo",
"bar",
"barfoo",
"foo_bar",
"foo.bar",
"foo-bar",
"supercalifradjulisticexpialidocious",
}
for _, measurement := range measurements {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
}
func TestFilter_Pass(t *testing.T) {
f := Filter{
Pass: []string{"foo*", "cpu_usage_idle"},
}
passes := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
drops := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_Drop(t *testing.T) {
f := Filter{
Drop: []string{"foo*", "cpu_usage_idle"},
}
drops := []string{
"foo",
"foo_bar",
"foo.bar",
"foo-bar",
"cpu_usage_idle",
}
passes := []string{
"bar",
"barfoo",
"bar_foo",
"cpu_usage_busy",
}
for _, measurement := range passes {
if !f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to pass", measurement)
}
}
for _, measurement := range drops {
if f.ShouldPass(measurement) {
t.Errorf("Expected measurement %s to drop", measurement)
}
}
}
func TestFilter_TagPass(t *testing.T) {
filters := []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"cpu-*"},
},
TagFilter{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagPass: filters,
}
passes := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
}
drops := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
}
for _, tags := range passes {
if !f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}
func TestFilter_TagDrop(t *testing.T) {
filters := []TagFilter{
TagFilter{
Name: "cpu",
Filter: []string{"cpu-*"},
},
TagFilter{
Name: "mem",
Filter: []string{"mem_free"},
}}
f := Filter{
TagDrop: filters,
}
drops := []map[string]string{
{"cpu": "cpu-total"},
{"cpu": "cpu-0"},
{"cpu": "cpu-1"},
{"cpu": "cpu-2"},
{"mem": "mem_free"},
}
passes := []map[string]string{
{"cpu": "cputotal"},
{"cpu": "cpu0"},
{"cpu": "cpu1"},
{"cpu": "cpu2"},
{"mem": "mem_used"},
}
for _, tags := range passes {
if !f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to pass", tags)
}
}
for _, tags := range drops {
if f.ShouldTagsPass(tags) {
t.Errorf("Expected tags %v to drop", tags)
}
}
}

View File

@ -0,0 +1,24 @@
package models
import (
"time"
"github.com/influxdata/telegraf/plugins/inputs"
)
type RunningInput struct {
Name string
Input inputs.Input
Config *InputConfig
}
// InputConfig containing a name, interval, and filter
type InputConfig struct {
Name string
NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Tags map[string]string
Filter Filter
Interval time.Duration
}

View File

@ -0,0 +1,77 @@
package models
import (
"log"
"time"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/influxdb/client/v2"
)
const DEFAULT_POINT_BUFFER_LIMIT = 10000
type RunningOutput struct {
Name string
Output outputs.Output
Config *OutputConfig
Quiet bool
PointBufferLimit int
points []*client.Point
overwriteCounter int
}
func NewRunningOutput(
name string,
output outputs.Output,
conf *OutputConfig,
) *RunningOutput {
ro := &RunningOutput{
Name: name,
points: make([]*client.Point, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
}
return ro
}
func (ro *RunningOutput) AddPoint(point *client.Point) {
if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldPointPass(point) {
return
}
}
if len(ro.points) < ro.PointBufferLimit {
ro.points = append(ro.points, point)
} else {
if ro.overwriteCounter == len(ro.points) {
ro.overwriteCounter = 0
}
ro.points[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}
func (ro *RunningOutput) Write() error {
start := time.Now()
err := ro.Output.Write(ro.points)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.points), ro.Name, elapsed)
}
ro.points = make([]*client.Point, 0)
ro.overwriteCounter = 0
}
return err
}
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}

View File

@ -110,15 +110,12 @@ func (d *Docker) gatherContainer(
Timeout: time.Duration(time.Second * 5), Timeout: time.Duration(time.Second * 5),
} }
var err error
go func() { go func() {
err = d.client.Stats(statOpts) d.client.Stats(statOpts)
}() }()
stat := <-statChan stat := <-statChan
if err != nil { close(done)
return err
}
// Add labels to tags // Add labels to tags
for k, v := range container.Labels { for k, v := range container.Labels {

View File

@ -130,7 +130,7 @@ func (i *InfluxDB) gatherURL(
p.Tags["url"] = url p.Tags["url"] = url
acc.AddFields( acc.AddFields(
p.Name, "influxdb_"+p.Name,
p.Values, p.Values,
p.Tags, p.Tags,
) )

View File

@ -84,7 +84,7 @@ func TestBasic(t *testing.T) {
"id": "ex1", "id": "ex1",
"url": fakeServer.URL + "/endpoint", "url": fakeServer.URL + "/endpoint",
} }
acc.AssertContainsTaggedFields(t, "foo", fields, tags) acc.AssertContainsTaggedFields(t, "influxdb_foo", fields, tags)
fields = map[string]interface{}{ fields = map[string]interface{}{
"x": "x", "x": "x",
@ -93,5 +93,5 @@ func TestBasic(t *testing.T) {
"id": "ex2", "id": "ex2",
"url": fakeServer.URL + "/endpoint", "url": fakeServer.URL + "/endpoint",
} }
acc.AssertContainsTaggedFields(t, "bar", fields, tags) acc.AssertContainsTaggedFields(t, "influxdb_bar", fields, tags)
} }

View File

@ -129,9 +129,13 @@ func pidsFromFile(file string) ([]int32, error) {
func pidsFromExe(exe string) ([]int32, error) { func pidsFromExe(exe string) ([]int32, error) {
var out []int32 var out []int32
var outerr error var outerr error
pgrep, err := exec.Command("pgrep", exe).Output() bin, err := exec.LookPath("pgrep")
if err != nil { if err != nil {
return out, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err) return out, fmt.Errorf("Couldn't find pgrep binary: %s", err)
}
pgrep, err := exec.Command(bin, exe).Output()
if err != nil {
return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err)
} else { } else {
pids := strings.Fields(string(pgrep)) pids := strings.Fields(string(pgrep))
for _, pid := range pids { for _, pid := range pids {
@ -149,9 +153,13 @@ func pidsFromExe(exe string) ([]int32, error) {
func pidsFromPattern(pattern string) ([]int32, error) { func pidsFromPattern(pattern string) ([]int32, error) {
var out []int32 var out []int32
var outerr error var outerr error
pgrep, err := exec.Command("pgrep", "-f", pattern).Output() bin, err := exec.LookPath("pgrep")
if err != nil { if err != nil {
return out, fmt.Errorf("Failed to execute pgrep. Error: '%s'", err) return out, fmt.Errorf("Couldn't find pgrep binary: %s", err)
}
pgrep, err := exec.Command(bin, "-f", pattern).Output()
if err != nil {
return out, fmt.Errorf("Failed to execute %s. Error: '%s'", bin, err)
} else { } else {
pids := strings.Fields(string(pgrep)) pids := strings.Fields(string(pgrep))
for _, pid := range pids { for _, pid := range pids {