From 979e5f193ac4945edc4da64a1fd462c06fdbcced Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 24 Nov 2015 14:22:11 -0700 Subject: [PATCH] Overhaul config <-> agent coupling. Put config in it's own package. --- CHANGELOG.md | 16 + README.md | 43 +- accumulator.go | 12 +- agent.go | 175 +++----- agent_test.go | 87 ++-- cmd/telegraf/telegraf.go | 51 +-- config_test.go | 307 --------------- config.go => internal/config/config.go | 372 ++++++++++-------- internal/config/config_test.go | 118 ++++++ .../config/testdata}/single_plugin.toml | 6 +- .../config/testdata}/subconfig/exec.conf | 4 +- .../config/testdata}/subconfig/memcached.conf | 6 +- .../config/testdata/subconfig/procstat.conf | 5 + .../config/testdata}/telegraf-agent.toml | 54 +-- plugins/exec/exec.go | 2 +- plugins/httpjson/httpjson.go | 4 +- plugins/jolokia/jolokia.go | 8 +- plugins/postgresql/postgresql.go | 4 +- plugins/procstat/procstat.go | 2 +- plugins/rabbitmq/rabbitmq.go | 2 +- plugins/twemproxy/twemproxy.go | 2 +- testdata/influx.toml | 17 - testdata/subconfig/procstat.conf | 5 - 23 files changed, 554 insertions(+), 748 deletions(-) delete mode 100644 config_test.go rename config.go => internal/config/config.go (61%) create mode 100644 internal/config/config_test.go rename {testdata => internal/config/testdata}/single_plugin.toml (64%) rename {testdata => internal/config/testdata}/subconfig/exec.conf (82%) rename {testdata => internal/config/testdata}/subconfig/memcached.conf (65%) create mode 100644 internal/config/testdata/subconfig/procstat.conf rename {testdata => internal/config/testdata}/telegraf-agent.toml (95%) delete mode 100644 testdata/influx.toml delete mode 100644 testdata/subconfig/procstat.conf diff --git a/CHANGELOG.md b/CHANGELOG.md index da13902db..6185bca35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,15 +7,31 @@ This only affects the kafka consumer _plugin_ (not the output). There were a number of problems with the kafka plugin that led to it only collecting data once at startup, so the kafka plugin was basically non- functional. +- Plugins can now be specified as a list, and multiple plugin instances of the +same type can be specified, like this: + +``` +[[plugins.cpu]] + percpu = false + totalcpu = true + +[[plugins.cpu]] + percpu = true + totalcpu = false + drop = ["cpu_time"] +``` + - Riemann output added ### Features - [#379](https://github.com/influxdb/telegraf/pull/379): Riemann output, thanks @allenj! - [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin. - [#392](https://github.com/influxdb/telegraf/pull/392): Procstat plugin can now accept pgrep -f pattern, thanks @ecarreras! +- [#383](https://github.com/influxdb/telegraf/pull/383): Specify plugins as a list. ### Bugfixes - [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning. +- [#389](https://github.com/influxdb/telegraf/issues/389): NaN value panic ## v0.2.2 [2015-11-18] diff --git a/README.md b/README.md index 8fb139aec..9d9ef3859 100644 --- a/README.md +++ b/README.md @@ -110,37 +110,45 @@ you can configure that here. This is a full working config that will output CPU data to an InfluxDB instance at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output -measurements at a 10s interval and will collect totalcpu & percpu data. +measurements at a 10s interval and will collect per-cpu data, dropping any +measurements which begin with `cpu_time`. ``` [tags] - dc = "denver-1" + dc = "denver-1" [agent] - interval = "10s" + interval = "10s" # OUTPUTS [outputs] [[outputs.influxdb]] - url = "http://192.168.59.103:8086" # required. - database = "telegraf" # required. - precision = "s" + url = "http://192.168.59.103:8086" # required. + database = "telegraf" # required. + precision = "s" # PLUGINS -[cpu] - percpu = true - totalcpu = true +[plugins] +[[plugins.cpu]] + percpu = true + totalcpu = false + drop = ["cpu_time"] ``` Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) ``` -# Don't collect CPU data for cpu6 & cpu7 -[cpu.tagdrop] +[plugins] +[[plugins.cpu]] + percpu = true + totalcpu = false + drop = ["cpu_time"] + # Don't collect CPU data for cpu6 & cpu7 + [plugins.cpu.tagdrop] cpu = [ "cpu6", "cpu7" ] -[disk] -[disk.tagpass] +[[plugins.disk]] + [plugins.disk.tagpass] # tagpass conditions are OR, not AND. # If the (filesystem is ext4 or xfs) OR (the path is /opt or /home) # then the metric passes @@ -148,6 +156,15 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) path = [ "/opt", "/home" ] ``` +Additional plugins (or outputs) of the same type can be specified, +just define another instance in the config file: + +``` +[[plugins.cpu]] + percpu = false + totalcpu = true +``` + ## Supported Plugins **You can view usage instructions for each plugin by running** diff --git a/accumulator.go b/accumulator.go index e489c1dc9..2e8b61d1c 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/influxdb/telegraf/internal/config" + "github.com/influxdb/influxdb/client/v2" ) @@ -27,12 +29,12 @@ type Accumulator interface { } func NewAccumulator( - plugin *ConfiguredPlugin, + pluginConfig *config.PluginConfig, points chan *client.Point, ) Accumulator { acc := accumulator{} acc.points = points - acc.plugin = plugin + acc.pluginConfig = pluginConfig return &acc } @@ -45,7 +47,7 @@ type accumulator struct { debug bool - plugin *ConfiguredPlugin + pluginConfig *config.PluginConfig prefix string } @@ -104,8 +106,8 @@ func (ac *accumulator) AddFields( measurement = ac.prefix + measurement } - if ac.plugin != nil { - if !ac.plugin.ShouldPass(measurement) || !ac.plugin.ShouldTagsPass(tags) { + if ac.pluginConfig != nil { + if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) { return } } diff --git a/agent.go b/agent.go index 564d43dd6..c1e7bb434 100644 --- a/agent.go +++ b/agent.go @@ -6,30 +6,17 @@ import ( "log" "math/big" "os" - "sort" - "strings" "sync" "time" "github.com/influxdb/telegraf/internal" + "github.com/influxdb/telegraf/internal/config" "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" "github.com/influxdb/influxdb/client/v2" ) -type runningOutput struct { - name string - output outputs.Output -} - -type runningPlugin struct { - name string - filtername string - plugin plugins.Plugin - config *ConfiguredPlugin -} - // Agent runs telegraf and collects data based on the given config type Agent struct { @@ -66,14 +53,11 @@ type Agent struct { Tags map[string]string - Config *Config - - outputs []*runningOutput - plugins []*runningPlugin + Config *config.Config } // NewAgent returns an Agent struct based off the given Config -func NewAgent(config *Config) (*Agent, error) { +func NewAgent(config *config.Config) (*Agent, error) { agent := &Agent{ Tags: make(map[string]string), Config: config, @@ -110,30 +94,30 @@ func NewAgent(config *Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { - for _, o := range a.outputs { - switch ot := o.output.(type) { + for _, o := range a.Config.Outputs { + switch ot := o.Output.(type) { case outputs.ServiceOutput: if err := ot.Start(); err != nil { log.Printf("Service for output %s failed to start, exiting\n%s\n", - o.name, err.Error()) + o.Name, err.Error()) return err } } if a.Debug { - log.Printf("Attempting connection to output: %s\n", o.name) + log.Printf("Attempting connection to output: %s\n", o.Name) } - err := o.output.Connect() + err := o.Output.Connect() if err != nil { - log.Printf("Failed to connect to output %s, retrying in 15s\n", o.name) + log.Printf("Failed to connect to output %s, retrying in 15s\n", o.Name) time.Sleep(15 * time.Second) - err = o.output.Connect() + err = o.Output.Connect() if err != nil { return err } } if a.Debug { - log.Printf("Successfully connected to output: %s\n", o.name) + log.Printf("Successfully connected to output: %s\n", o.Name) } } return nil @@ -142,9 +126,9 @@ func (a *Agent) Connect() error { // Close closes the connection to all configured outputs func (a *Agent) Close() error { var err error - for _, o := range a.outputs { - err = o.output.Close() - switch ot := o.output.(type) { + for _, o := range a.Config.Outputs { + err = o.Output.Close() + switch ot := o.Output.(type) { case outputs.ServiceOutput: ot.Stop() } @@ -152,67 +136,6 @@ func (a *Agent) Close() error { return err } -// LoadOutputs loads the agent's outputs -func (a *Agent) LoadOutputs(filters []string) ([]string, error) { - var names []string - - for _, name := range a.Config.OutputsDeclared() { - // Trim the ID off the output name for filtering - filtername := strings.TrimRight(name, "-0123456789") - creator, ok := outputs.Outputs[filtername] - if !ok { - return nil, fmt.Errorf("Undefined but requested output: %s", name) - } - - if sliceContains(filtername, filters) || len(filters) == 0 { - output := creator() - - err := a.Config.ApplyOutput(name, output) - if err != nil { - return nil, err - } - - a.outputs = append(a.outputs, &runningOutput{name, output}) - names = append(names, name) - } - } - - sort.Strings(names) - - return names, nil -} - -// LoadPlugins loads the agent's plugins -func (a *Agent) LoadPlugins(filters []string) ([]string, error) { - var names []string - - for _, name := range a.Config.PluginsDeclared() { - // Trim the ID off the output name for filtering - filtername := strings.TrimRight(name, "-0123456789") - creator, ok := plugins.Plugins[filtername] - if !ok { - return nil, fmt.Errorf("Undefined but requested plugin: %s", name) - } - - if sliceContains(filtername, filters) || len(filters) == 0 { - plugin := creator() - - config, err := a.Config.ApplyPlugin(name, plugin) - if err != nil { - return nil, err - } - - a.plugins = append(a.plugins, - &runningPlugin{name, filtername, plugin, config}) - names = append(names, name) - } - } - - sort.Strings(names) - - return names, nil -} - // gatherParallel runs the plugins that are using the same reporting interval // as the telegraf agent. func (a *Agent) gatherParallel(pointChan chan *client.Point) error { @@ -220,23 +143,23 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { start := time.Now() counter := 0 - for _, plugin := range a.plugins { - if plugin.config.Interval != 0 { + for _, plugin := range a.Config.Plugins { + if plugin.Config.Interval != 0 { continue } wg.Add(1) counter++ - go func(plugin *runningPlugin) { + go func(plugin *config.RunningPlugin) { defer wg.Done() - acc := NewAccumulator(plugin.config, pointChan) + acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(a.Debug) - acc.SetPrefix(plugin.filtername + "_") + acc.SetPrefix(plugin.Name + "_") acc.SetDefaultTags(a.Tags) - if err := plugin.plugin.Gather(acc); err != nil { - log.Printf("Error in plugin [%s]: %s", plugin.name, err) + if err := plugin.Plugin.Gather(acc); err != nil { + log.Printf("Error in plugin [%s]: %s", plugin.Name, err) } }(plugin) @@ -254,27 +177,27 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { // reporting interval. func (a *Agent) gatherSeparate( shutdown chan struct{}, - plugin *runningPlugin, + plugin *config.RunningPlugin, pointChan chan *client.Point, ) error { - ticker := time.NewTicker(plugin.config.Interval) + ticker := time.NewTicker(plugin.Config.Interval) for { var outerr error start := time.Now() - acc := NewAccumulator(plugin.config, pointChan) + acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(a.Debug) - acc.SetPrefix(plugin.filtername + "_") + acc.SetPrefix(plugin.Name + "_") acc.SetDefaultTags(a.Tags) - if err := plugin.plugin.Gather(acc); err != nil { - log.Printf("Error in plugin [%s]: %s", plugin.name, err) + if err := plugin.Plugin.Gather(acc); err != nil { + log.Printf("Error in plugin [%s]: %s", plugin.Name, err) } elapsed := time.Since(start) log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n", - plugin.config.Interval, plugin.name, elapsed) + plugin.Config.Interval, plugin.Name, elapsed) if outerr != nil { return outerr @@ -308,27 +231,27 @@ func (a *Agent) Test() error { } }() - for _, plugin := range a.plugins { - acc := NewAccumulator(plugin.config, pointChan) + for _, plugin := range a.Config.Plugins { + acc := NewAccumulator(plugin.Config, pointChan) acc.SetDebug(true) - acc.SetPrefix(plugin.filtername + "_") + acc.SetPrefix(plugin.Name + "_") - fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name) - if plugin.config.Interval != 0 { - fmt.Printf("* Internal: %s\n", plugin.config.Interval) + fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name) + if plugin.Config.Interval != 0 { + fmt.Printf("* Internal: %s\n", plugin.Config.Interval) } - if err := plugin.plugin.Gather(acc); err != nil { + if err := plugin.Plugin.Gather(acc); err != nil { return err } // Special instructions for some plugins. cpu, for example, needs to be // run twice in order to return cpu usage percentages. - switch plugin.filtername { + switch plugin.Name { case "cpu", "mongodb": time.Sleep(500 * time.Millisecond) - fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name) - if err := plugin.plugin.Gather(acc); err != nil { + fmt.Printf("* Plugin: %s, Collection 2\n", plugin.Name) + if err := plugin.Plugin.Gather(acc); err != nil { return err } } @@ -341,7 +264,7 @@ func (a *Agent) Test() error { // Optionally takes a `done` channel to indicate that it is done writing. func (a *Agent) writeOutput( points []*client.Point, - ro *runningOutput, + ro *config.RunningOutput, shutdown chan struct{}, wg *sync.WaitGroup, ) { @@ -354,12 +277,12 @@ func (a *Agent) writeOutput( start := time.Now() for { - err := ro.output.Write(points) + err := ro.Output.Write(points) if err == nil { // Write successful elapsed := time.Since(start) log.Printf("Flushed %d metrics to output %s in %s\n", - len(points), ro.name, elapsed) + len(points), ro.Name, elapsed) return } @@ -371,12 +294,12 @@ func (a *Agent) writeOutput( // No more retries msg := "FATAL: Write to output [%s] failed %d times, dropping" + " %d metrics\n" - log.Printf(msg, ro.name, retries+1, len(points)) + log.Printf(msg, ro.Name, retries+1, len(points)) return } else if err != nil { // Sleep for a retry log.Printf("Error in output [%s]: %s, retrying in %s", - ro.name, err.Error(), a.FlushInterval.Duration) + ro.Name, err.Error(), a.FlushInterval.Duration) time.Sleep(a.FlushInterval.Duration) } } @@ -392,7 +315,7 @@ func (a *Agent) flush( wait bool, ) { var wg sync.WaitGroup - for _, o := range a.outputs { + for _, o := range a.Config.Outputs { wg.Add(1) go a.writeOutput(points, o, shutdown, &wg) } @@ -476,14 +399,14 @@ func (a *Agent) Run(shutdown chan struct{}) error { } }() - for _, plugin := range a.plugins { + for _, plugin := range a.Config.Plugins { // Start service of any ServicePlugins - switch p := plugin.plugin.(type) { + switch p := plugin.Plugin.(type) { case plugins.ServicePlugin: if err := p.Start(); err != nil { log.Printf("Service for plugin %s failed to start, exiting\n%s\n", - plugin.name, err.Error()) + plugin.Name, err.Error()) return err } defer p.Stop() @@ -491,9 +414,9 @@ func (a *Agent) Run(shutdown chan struct{}) error { // Special handling for plugins that have their own collection interval // configured. Default intervals are handled below with gatherParallel - if plugin.config.Interval != 0 { + if plugin.Config.Interval != 0 { wg.Add(1) - go func(plugin *runningPlugin) { + go func(plugin *config.RunningPlugin) { defer wg.Done() if err := a.gatherSeparate(shutdown, plugin, pointChan); err != nil { log.Printf(err.Error()) diff --git a/agent_test.go b/agent_test.go index 403b409f3..275ab96ee 100644 --- a/agent_test.go +++ b/agent_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdb/telegraf/internal" + "github.com/influxdb/telegraf/internal/config" // needing to load the plugins _ "github.com/influxdb/telegraf/plugins/all" @@ -14,49 +15,73 @@ import ( ) func TestAgent_LoadPlugin(t *testing.T) { + c := config.NewConfig() + c.PluginFilters = []string{"mysql"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ := NewAgent(c) + assert.Equal(t, 1, len(a.Config.Plugins)) - // load a dedicated configuration file - config, _ := LoadConfig("./testdata/telegraf-agent.toml") - a, _ := NewAgent(config) + c = config.NewConfig() + c.PluginFilters = []string{"foo"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 0, len(a.Config.Plugins)) - pluginsEnabled, _ := a.LoadPlugins([]string{"mysql"}, config) - assert.Equal(t, 1, len(pluginsEnabled)) + c = config.NewConfig() + c.PluginFilters = []string{"mysql", "foo"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 1, len(a.Config.Plugins)) - pluginsEnabled, _ = a.LoadPlugins([]string{"foo"}, config) - assert.Equal(t, 0, len(pluginsEnabled)) + c = config.NewConfig() + c.PluginFilters = []string{"mysql", "redis"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 2, len(a.Config.Plugins)) - pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "foo"}, config) - assert.Equal(t, 1, len(pluginsEnabled)) - - pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "redis"}, config) - assert.Equal(t, 2, len(pluginsEnabled)) - - pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "foo", "redis", "bar"}, config) - assert.Equal(t, 2, len(pluginsEnabled)) + c = config.NewConfig() + c.PluginFilters = []string{"mysql", "foo", "redis", "bar"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 2, len(a.Config.Plugins)) } func TestAgent_LoadOutput(t *testing.T) { - // load a dedicated configuration file - config, _ := LoadConfig("./testdata/telegraf-agent.toml") - a, _ := NewAgent(config) + c := config.NewConfig() + c.OutputFilters = []string{"influxdb"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ := NewAgent(c) + assert.Equal(t, 2, len(a.Config.Outputs)) - outputsEnabled, _ := a.LoadOutputs([]string{"influxdb"}, config) - assert.Equal(t, 2, len(outputsEnabled)) + c = config.NewConfig() + c.OutputFilters = []string{} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 3, len(a.Config.Outputs)) - outputsEnabled, _ = a.LoadOutputs([]string{}, config) - assert.Equal(t, 3, len(outputsEnabled)) + c = config.NewConfig() + c.OutputFilters = []string{"foo"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 0, len(a.Config.Outputs)) - outputsEnabled, _ = a.LoadOutputs([]string{"foo"}, config) - assert.Equal(t, 0, len(outputsEnabled)) + c = config.NewConfig() + c.OutputFilters = []string{"influxdb", "foo"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 2, len(a.Config.Outputs)) - outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo"}, config) - assert.Equal(t, 2, len(outputsEnabled)) + c = config.NewConfig() + c.OutputFilters = []string{"influxdb", "kafka"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 3, len(a.Config.Outputs)) - outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "kafka"}, config) - assert.Equal(t, 3, len(outputsEnabled)) - - outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo", "kafka", "bar"}, config) - assert.Equal(t, 3, len(outputsEnabled)) + c = config.NewConfig() + c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"} + c.LoadConfig("./internal/config/testdata/telegraf-agent.toml") + a, _ = NewAgent(c) + assert.Equal(t, 3, len(a.Config.Outputs)) } func TestAgent_ZeroJitter(t *testing.T) { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 7f25fc7ac..f5f724ad1 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/influxdb/telegraf" + "github.com/influxdb/telegraf/internal/config" _ "github.com/influxdb/telegraf/outputs/all" _ "github.com/influxdb/telegraf/plugins/all" ) @@ -56,13 +57,13 @@ func main() { } if *fSampleConfig { - telegraf.PrintSampleConfig(pluginFilters, outputFilters) + config.PrintSampleConfig(pluginFilters, outputFilters) return } if *fUsage != "" { - if err := telegraf.PrintPluginConfig(*fUsage); err != nil { - if err2 := telegraf.PrintOutputConfig(*fUsage); err2 != nil { + if err := config.PrintPluginConfig(*fUsage); err != nil { + if err2 := config.PrintOutputConfig(*fUsage); err2 != nil { log.Fatalf("%s and %s", err, err2) } } @@ -70,13 +71,15 @@ func main() { } var ( - config *telegraf.Config - err error + c *config.Config + err error ) if *fConfig != "" { - config = telegraf.NewConfig() - err = config.LoadConfig(*fConfig) + c = config.NewConfig() + c.OutputFilters = outputFilters + c.PluginFilters = pluginFilters + err = c.LoadConfig(*fConfig) if err != nil { log.Fatal(err) } @@ -87,13 +90,19 @@ func main() { } if *fConfigDirectory != "" { - err = config.LoadDirectory(*fConfigDirectory) + err = c.LoadDirectory(*fConfigDirectory) if err != nil { log.Fatal(err) } } + if len(c.Outputs) == 0 { + log.Fatalf("Error: no outputs found, did you provide a valid config file?") + } + if len(c.Plugins) == 0 { + log.Fatalf("Error: no plugins found, did you provide a valid config file?") + } - ag, err := telegraf.NewAgent(config) + ag, err := telegraf.NewAgent(c) if err != nil { log.Fatal(err) } @@ -102,24 +111,6 @@ func main() { ag.Debug = true } - outputs, err := ag.LoadOutputs(outputFilters) - if err != nil { - log.Fatal(err) - } - if len(outputs) == 0 { - log.Printf("Error: no outputs found, did you provide a valid config file?") - os.Exit(1) - } - - plugins, err := ag.LoadPlugins(pluginFilters) - if err != nil { - log.Fatal(err) - } - if len(plugins) == 0 { - log.Printf("Error: no plugins found, did you provide a valid config file?") - os.Exit(1) - } - if *fTest { err = ag.Test() if err != nil { @@ -142,9 +133,9 @@ func main() { }() log.Printf("Starting Telegraf (version %s)\n", Version) - log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) - log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) - log.Printf("Tags enabled: %s", config.ListTags()) + log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) + log.Printf("Loaded plugins: %s", strings.Join(c.PluginNames(), " ")) + log.Printf("Tags enabled: %s", c.ListTags()) if *fPidfile != "" { f, err := os.Create(*fPidfile) diff --git a/config_test.go b/config_test.go deleted file mode 100644 index 89b678b7b..000000000 --- a/config_test.go +++ /dev/null @@ -1,307 +0,0 @@ -package telegraf - -import ( - "fmt" - "io/ioutil" - "testing" - "time" - - "github.com/influxdb/telegraf/plugins" - "github.com/influxdb/telegraf/plugins/exec" - "github.com/influxdb/telegraf/plugins/memcached" - "github.com/influxdb/telegraf/plugins/procstat" - "github.com/naoina/toml" - "github.com/naoina/toml/ast" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" -) - -type subTest struct { - AField string - AnotherField int -} -type test struct { - StringField string - IntegerField int - FloatField float32 - BooleansField bool `toml:"boolean_field"` - DatetimeField time.Time - ArrayField []string - TableArrayField []subTest -} - -type MergeStructSuite struct { - suite.Suite - EmptyStruct *test - FullStruct *test - AnotherFullStruct *test - AllFields []string -} - -func (s *MergeStructSuite) SetupSuite() { - s.AllFields = []string{"string_field", "integer_field", "float_field", - "boolean_field", "date_time_field", "array_field", "table_array_field"} -} - -func (s *MergeStructSuite) SetupTest() { - s.EmptyStruct = &test{ - ArrayField: []string{}, - TableArrayField: []subTest{}, - } - s.FullStruct = &test{ - StringField: "one", - IntegerField: 1, - FloatField: 1.1, - BooleansField: false, - DatetimeField: time.Date(1963, time.August, 28, 17, 0, 0, 0, time.UTC), - ArrayField: []string{"one", "two", "three"}, - TableArrayField: []subTest{ - subTest{ - AField: "one", - AnotherField: 1, - }, - subTest{ - AField: "two", - AnotherField: 2, - }, - }, - } - s.AnotherFullStruct = &test{ - StringField: "two", - IntegerField: 2, - FloatField: 2.2, - BooleansField: true, - DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC), - ArrayField: []string{"four", "five", "six"}, - TableArrayField: []subTest{ - subTest{ - AField: "three", - AnotherField: 3, - }, - subTest{ - AField: "four", - AnotherField: 4, - }, - }, - } -} - -func (s *MergeStructSuite) TestEmptyMerge() { - err := mergeStruct(s.EmptyStruct, s.FullStruct, s.AllFields) - if err != nil { - s.T().Error(err) - } - s.Equal(s.FullStruct, s.EmptyStruct, - fmt.Sprintf("Full merge of %v onto an empty struct failed.", s.FullStruct)) -} - -func (s *MergeStructSuite) TestFullMerge() { - result := &test{ - StringField: "two", - IntegerField: 2, - FloatField: 2.2, - BooleansField: true, - DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC), - ArrayField: []string{"four", "five", "six"}, - TableArrayField: []subTest{ - subTest{ - AField: "three", - AnotherField: 3, - }, - subTest{ - AField: "four", - AnotherField: 4, - }, - }, - } - - err := mergeStruct(s.FullStruct, s.AnotherFullStruct, s.AllFields) - if err != nil { - s.T().Error(err) - } - s.Equal(result, s.FullStruct, - fmt.Sprintf("Full merge of %v onto FullStruct failed.", s.AnotherFullStruct)) -} - -func (s *MergeStructSuite) TestPartialMergeWithoutSlices() { - result := &test{ - StringField: "two", - IntegerField: 1, - FloatField: 2.2, - BooleansField: false, - DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC), - ArrayField: []string{"one", "two", "three"}, - TableArrayField: []subTest{ - subTest{ - AField: "one", - AnotherField: 1, - }, - subTest{ - AField: "two", - AnotherField: 2, - }, - }, - } - - err := mergeStruct(s.FullStruct, s.AnotherFullStruct, - []string{"string_field", "float_field", "date_time_field"}) - if err != nil { - s.T().Error(err) - } - s.Equal(result, s.FullStruct, - fmt.Sprintf("Partial merge without slices of %v onto FullStruct failed.", - s.AnotherFullStruct)) -} - -func (s *MergeStructSuite) TestPartialMergeWithSlices() { - result := &test{ - StringField: "two", - IntegerField: 1, - FloatField: 2.2, - BooleansField: false, - DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC), - ArrayField: []string{"one", "two", "three"}, - TableArrayField: []subTest{ - subTest{ - AField: "three", - AnotherField: 3, - }, - subTest{ - AField: "four", - AnotherField: 4, - }, - }, - } - - err := mergeStruct(s.FullStruct, s.AnotherFullStruct, - []string{"string_field", "float_field", "date_time_field", "table_array_field"}) - if err != nil { - s.T().Error(err) - } - s.Equal(result, s.FullStruct, - fmt.Sprintf("Partial merge with slices of %v onto FullStruct failed.", - s.AnotherFullStruct)) -} - -func TestConfig_mergeStruct(t *testing.T) { - suite.Run(t, new(MergeStructSuite)) -} - -func TestConfig_parsePlugin(t *testing.T) { - data, err := ioutil.ReadFile("./testdata/single_plugin.toml") - if err != nil { - t.Error(err) - } - - tbl, err := toml.Parse(data) - if err != nil { - t.Error(err) - } - - c := &Config{ - plugins: make(map[string]plugins.Plugin), - pluginConfigurations: make(map[string]*ConfiguredPlugin), - pluginFieldsSet: make(map[string][]string), - pluginConfigurationFieldsSet: make(map[string][]string), - } - - subtbl := tbl.Fields["memcached"].(*ast.Table) - err = c.parsePlugin("memcached", subtbl, 0) - - memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) - memcached.Servers = []string{"localhost"} - - mConfig := &ConfiguredPlugin{ - Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, - }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, - }, - }, - Interval: 5 * time.Second, - } - - assert.Equal(t, memcached, c.plugins["memcached-0"], - "Testdata did not produce a correct memcached struct.") - assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"], - "Testdata did not produce correct memcached metadata.") -} - -func TestConfig_LoadDirectory(t *testing.T) { - c, err := LoadConfig("./testdata/telegraf-agent.toml") - if err != nil { - t.Error(err) - } - err = c.LoadDirectory("./testdata/subconfig") - if err != nil { - t.Error(err) - } - - memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) - memcached.Servers = []string{"192.168.1.1"} - - mConfig := &ConfiguredPlugin{ - Name: "memcached", - Drop: []string{"other", "stuff"}, - Pass: []string{"some", "strings"}, - TagDrop: []TagFilter{ - TagFilter{ - Name: "badtag", - Filter: []string{"othertag"}, - }, - }, - TagPass: []TagFilter{ - TagFilter{ - Name: "goodtag", - Filter: []string{"mytag"}, - }, - }, - Interval: 5 * time.Second, - } - - ex := plugins.Plugins["exec"]().(*exec.Exec) - ex.Commands = []*exec.Command{ - &exec.Command{ - Command: "/usr/bin/myothercollector --foo=bar", - Name: "myothercollector", - }, - } - - eConfig := &ConfiguredPlugin{Name: "exec"} - - pstat := plugins.Plugins["procstat"]().(*procstat.Procstat) - pstat.Specifications = []*procstat.Specification{ - &procstat.Specification{ - PidFile: "/var/run/grafana-server.pid", - }, - &procstat.Specification{ - PidFile: "/var/run/influxdb/influxd.pid", - }, - } - - pConfig := &ConfiguredPlugin{Name: "procstat"} - - assert.Equal(t, memcached, c.plugins["memcached-0"], - "Merged Testdata did not produce a correct memcached struct.") - assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"], - "Merged Testdata did not produce correct memcached metadata.") - - assert.Equal(t, ex, c.plugins["exec-0"], - "Merged Testdata did not produce a correct exec struct.") - assert.Equal(t, eConfig, c.pluginConfigurations["exec-0"], - "Merged Testdata did not produce correct exec metadata.") - - assert.Equal(t, pstat, c.plugins["procstat-0"], - "Merged Testdata did not produce a correct procstat struct.") - assert.Equal(t, pConfig, c.pluginConfigurations["procstat-0"], - "Merged Testdata did not produce correct procstat metadata.") -} diff --git a/config.go b/internal/config/config.go similarity index 61% rename from config.go rename to internal/config/config.go index 9ffd0c80f..e0f6b673b 100644 --- a/config.go +++ b/internal/config/config.go @@ -1,4 +1,4 @@ -package telegraf +package config import ( "errors" @@ -12,6 +12,7 @@ import ( "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" + "github.com/naoina/toml" "github.com/naoina/toml/ast" ) @@ -20,18 +21,22 @@ import ( // will be logging to, as well as all the plugins that the user has // specified type Config struct { - Tags map[string]string + Tags map[string]string + PluginFilters []string + OutputFilters []string - agent *ast.Table - plugins map[string]*ast.Table - outputs map[string]*ast.Table + Agent *ast.Table + Plugins []*RunningPlugin + Outputs []*RunningOutput } func NewConfig() *Config { c := &Config{ - Tags: make(map[string]string), - plugins: make(map[string]*ast.Table), - outputs: make(map[string]*ast.Table), + Tags: make(map[string]string), + Plugins: make([]*RunningPlugin, 0), + Outputs: make([]*RunningOutput, 0), + PluginFilters: make([]string, 0), + OutputFilters: make([]string, 0), } return c } @@ -42,9 +47,20 @@ type TagFilter struct { Filter []string } -// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists +type RunningOutput struct { + Name string + Output outputs.Output +} + +type RunningPlugin struct { + Name string + Plugin plugins.Plugin + Config *PluginConfig +} + +// PluginConfig containing a name, interval, and drop/pass prefix lists // Also lists the tags to filter -type ConfiguredPlugin struct { +type PluginConfig struct { Name string Drop []string @@ -58,7 +74,7 @@ type ConfiguredPlugin struct { // ShouldPass returns true if the metric should pass, false if should drop // based on the drop/pass plugin parameters -func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool { +func (cp *PluginConfig) ShouldPass(measurement string) bool { if cp.Pass != nil { for _, pat := range cp.Pass { if strings.HasPrefix(measurement, pat) { @@ -82,7 +98,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool { // ShouldTagsPass returns true if the metric should pass, false if should drop // based on the tagdrop/tagpass plugin parameters -func (cp *ConfiguredPlugin) ShouldTagsPass(tags map[string]string) bool { +func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { if cp.TagPass != nil { for _, pat := range cp.TagPass { if tagval, ok := tags[pat.Name]; ok { @@ -112,138 +128,34 @@ func (cp *ConfiguredPlugin) ShouldTagsPass(tags map[string]string) bool { return true } -// ApplyOutput loads the toml config into the given interface -func (c *Config) ApplyOutput(name string, v interface{}) error { - if c.outputs[name] != nil { - return toml.UnmarshalTable(c.outputs[name], v) +// Plugins returns a list of strings of the configured plugins. +func (c *Config) PluginNames() []string { + var name []string + for _, plugin := range c.Plugins { + name = append(name, plugin.Name) } - return nil + return name +} + +// Outputs returns a list of strings of the configured plugins. +func (c *Config) OutputNames() []string { + var name []string + for _, output := range c.Outputs { + name = append(name, output.Name) + } + return name } // ApplyAgent loads the toml config into the given Agent object, overriding // defaults (such as collection duration) with the values from the toml config. -func (c *Config) ApplyAgent(a *Agent) error { - if c.agent != nil { - return toml.UnmarshalTable(c.agent, a) +func (c *Config) ApplyAgent(a interface{}) error { + if c.Agent != nil { + return toml.UnmarshalTable(c.Agent, a) } return nil } -// ApplyPlugin takes defined plugin names and applies them to the given -// interface, returning a ConfiguredPlugin object in the end that can -// be inserted into a runningPlugin by the agent. -func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) { - cp := &ConfiguredPlugin{Name: name} - - if tbl, ok := c.plugins[name]; ok { - - if node, ok := tbl.Fields["pass"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if ary, ok := kv.Value.(*ast.Array); ok { - for _, elem := range ary.Value { - if str, ok := elem.(*ast.String); ok { - cp.Pass = append(cp.Pass, str.Value) - } - } - } - } - } - - if node, ok := tbl.Fields["drop"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if ary, ok := kv.Value.(*ast.Array); ok { - for _, elem := range ary.Value { - if str, ok := elem.(*ast.String); ok { - cp.Drop = append(cp.Drop, str.Value) - } - } - } - } - } - - if node, ok := tbl.Fields["interval"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - cp.Interval = dur - } - } - } - - if node, ok := tbl.Fields["tagpass"]; ok { - if subtbl, ok := node.(*ast.Table); ok { - for name, val := range subtbl.Fields { - if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} - if ary, ok := kv.Value.(*ast.Array); ok { - for _, elem := range ary.Value { - if str, ok := elem.(*ast.String); ok { - tagfilter.Filter = append(tagfilter.Filter, str.Value) - } - } - } - cp.TagPass = append(cp.TagPass, *tagfilter) - } - } - } - } - - if node, ok := tbl.Fields["tagdrop"]; ok { - if subtbl, ok := node.(*ast.Table); ok { - for name, val := range subtbl.Fields { - if kv, ok := val.(*ast.KeyValue); ok { - tagfilter := &TagFilter{Name: name} - if ary, ok := kv.Value.(*ast.Array); ok { - for _, elem := range ary.Value { - if str, ok := elem.(*ast.String); ok { - tagfilter.Filter = append(tagfilter.Filter, str.Value) - } - } - } - cp.TagDrop = append(cp.TagDrop, *tagfilter) - } - } - } - } - - delete(tbl.Fields, "drop") - delete(tbl.Fields, "pass") - delete(tbl.Fields, "interval") - delete(tbl.Fields, "tagdrop") - delete(tbl.Fields, "tagpass") - return cp, toml.UnmarshalTable(tbl, v) - } - - return cp, nil -} - -// PluginsDeclared returns the name of all plugins declared in the config. -func (c *Config) PluginsDeclared() []string { - return declared(c.plugins) -} - -// OutputsDeclared returns the name of all outputs declared in the config. -func (c *Config) OutputsDeclared() []string { - return declared(c.outputs) -} - -func declared(endpoints map[string]*ast.Table) []string { - var names []string - - for name := range endpoints { - names = append(names, name) - } - - sort.Strings(names) - - return names -} - // ListTags returns a string of tags specified in the config, // line-protocol style func (c *Config) ListTags() string { @@ -258,14 +170,6 @@ func (c *Config) ListTags() string { return strings.Join(tags, " ") } -type hasConfig interface { - BasicConfig() string -} - -type hasDescr interface { - Description() string -} - var header = `# Telegraf configuration # Telegraf is entirely plugin driven. All metrics are gathered from the @@ -353,15 +257,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) { for _, oname := range onames { creator := outputs.Outputs[oname] output := creator() - - fmt.Printf("\n# %s\n[[outputs.%s]]", output.Description(), oname) - - config := output.SampleConfig() - if config == "" { - fmt.Printf("\n # no configuration\n") - } else { - fmt.Printf(config) - } + printConfig(oname, output, "outputs") } // Filter plugins @@ -386,13 +282,13 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) { continue } - printConfig(pname, plugin) + printConfig(pname, plugin, "plugins") } // Print Service Plugins fmt.Printf(servicePluginHeader) for name, plugin := range servPlugins { - printConfig(name, plugin) + printConfig(name, plugin, "plugins") } } @@ -401,8 +297,8 @@ type printer interface { SampleConfig() string } -func printConfig(name string, p printer) { - fmt.Printf("\n# %s\n[[plugins.%s]]", p.Description(), name) +func printConfig(name string, p printer, op string) { + fmt.Printf("\n# %s\n[[%s.%s]]", p.Description(), op, name) config := p.SampleConfig() if config == "" { fmt.Printf("\n # no configuration\n") @@ -423,7 +319,7 @@ func sliceContains(name string, list []string) bool { // PrintPluginConfig prints the config usage of a single plugin. func PrintPluginConfig(name string) error { if creator, ok := plugins.Plugins[name]; ok { - printConfig(name, creator()) + printConfig(name, creator(), "plugins") } else { return errors.New(fmt.Sprintf("Plugin %s not found", name)) } @@ -433,7 +329,7 @@ func PrintPluginConfig(name string) error { // PrintOutputConfig prints the config usage of a single output. func PrintOutputConfig(name string) error { if creator, ok := outputs.Outputs[name]; ok { - printConfig(name, creator()) + printConfig(name, creator(), "outputs") } else { return errors.New(fmt.Sprintf("Output %s not found", name)) } @@ -461,7 +357,7 @@ func (c *Config) LoadDirectory(path string) error { return nil } -// LoadConfig loads the given config file and returns a *Config pointer +// LoadConfig loads the given config file and applies it to c func (c *Config) LoadConfig(path string) error { data, err := ioutil.ReadFile(path) if err != nil { @@ -481,7 +377,7 @@ func (c *Config) LoadConfig(path string) error { switch name { case "agent": - c.agent = subTable + c.Agent = subTable case "tags": if err = toml.UnmarshalTable(subTable, c.Tags); err != nil { log.Printf("Could not parse [tags] config\n") @@ -491,11 +387,14 @@ func (c *Config) LoadConfig(path string) error { for outputName, outputVal := range subTable.Fields { switch outputSubTable := outputVal.(type) { case *ast.Table: - c.outputs[outputName] = outputSubTable + if err = c.addOutput(outputName, outputSubTable); err != nil { + return err + } case []*ast.Table: - for id, t := range outputSubTable { - nameID := fmt.Sprintf("%s-%d", outputName, id) - c.outputs[nameID] = t + for _, t := range outputSubTable { + if err = c.addOutput(outputName, t); err != nil { + return err + } } default: return fmt.Errorf("Unsupported config format: %s", @@ -506,11 +405,14 @@ func (c *Config) LoadConfig(path string) error { for pluginName, pluginVal := range subTable.Fields { switch pluginSubTable := pluginVal.(type) { case *ast.Table: - c.plugins[pluginName] = pluginSubTable + if err = c.addPlugin(pluginName, pluginSubTable); err != nil { + return err + } case []*ast.Table: - for id, t := range pluginSubTable { - nameID := fmt.Sprintf("%s-%d", pluginName, id) - c.plugins[nameID] = t + for _, t := range pluginSubTable { + if err = c.addPlugin(pluginName, t); err != nil { + return err + } } default: return fmt.Errorf("Unsupported config format: %s", @@ -520,8 +422,142 @@ func (c *Config) LoadConfig(path string) error { // Assume it's a plugin for legacy config file support if no other // identifiers are present default: - c.plugins[name] = subTable + if err = c.addPlugin(name, subTable); err != nil { + return err + } } } return nil } + +func (c *Config) addOutput(name string, table *ast.Table) error { + if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) { + return nil + } + creator, ok := outputs.Outputs[name] + if !ok { + return fmt.Errorf("Undefined but requested output: %s", name) + } + o := creator() + + if err := toml.UnmarshalTable(table, o); err != nil { + return err + } + + ro := &RunningOutput{ + Name: name, + Output: o, + } + c.Outputs = append(c.Outputs, ro) + return nil +} + +func (c *Config) addPlugin(name string, table *ast.Table) error { + if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) { + return nil + } + creator, ok := plugins.Plugins[name] + if !ok { + return fmt.Errorf("Undefined but requested plugin: %s", name) + } + plugin := creator() + + pluginConfig, err := applyPlugin(name, table, plugin) + if err != nil { + return err + } + rp := &RunningPlugin{ + Name: name, + Plugin: plugin, + Config: pluginConfig, + } + c.Plugins = append(c.Plugins, rp) + return nil +} + +// applyPlugin takes defined plugin names and applies them to the given +// interface, returning a PluginConfig object in the end that can +// be inserted into a runningPlugin by the agent. +func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) { + cp := &PluginConfig{Name: name} + + if node, ok := tbl.Fields["pass"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + cp.Pass = append(cp.Pass, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["drop"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + cp.Drop = append(cp.Drop, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["interval"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + cp.Interval = dur + } + } + } + + if node, ok := tbl.Fields["tagpass"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + tagfilter := &TagFilter{Name: name} + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + tagfilter.Filter = append(tagfilter.Filter, str.Value) + } + } + } + cp.TagPass = append(cp.TagPass, *tagfilter) + } + } + } + } + + if node, ok := tbl.Fields["tagdrop"]; ok { + if subtbl, ok := node.(*ast.Table); ok { + for name, val := range subtbl.Fields { + if kv, ok := val.(*ast.KeyValue); ok { + tagfilter := &TagFilter{Name: name} + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + tagfilter.Filter = append(tagfilter.Filter, str.Value) + } + } + } + cp.TagDrop = append(cp.TagDrop, *tagfilter) + } + } + } + } + + delete(tbl.Fields, "drop") + delete(tbl.Fields, "pass") + delete(tbl.Fields, "interval") + delete(tbl.Fields, "tagdrop") + delete(tbl.Fields, "tagpass") + return cp, toml.UnmarshalTable(tbl, p) +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 000000000..e53911224 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,118 @@ +package config + +import ( + "testing" + "time" + + "github.com/influxdb/telegraf/plugins" + "github.com/influxdb/telegraf/plugins/exec" + "github.com/influxdb/telegraf/plugins/memcached" + "github.com/influxdb/telegraf/plugins/procstat" + "github.com/stretchr/testify/assert" +) + +func TestConfig_LoadSinglePlugin(t *testing.T) { + c := NewConfig() + c.LoadConfig("./testdata/single_plugin.toml") + + memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) + memcached.Servers = []string{"localhost"} + + mConfig := &PluginConfig{ + Name: "memcached", + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, + }, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, + }, + Interval: 5 * time.Second, + } + + assert.Equal(t, memcached, c.Plugins[0].Plugin, + "Testdata did not produce a correct memcached struct.") + assert.Equal(t, mConfig, c.Plugins[0].Config, + "Testdata did not produce correct memcached metadata.") +} + +func TestConfig_LoadDirectory(t *testing.T) { + c := NewConfig() + err := c.LoadConfig("./testdata/single_plugin.toml") + if err != nil { + t.Error(err) + } + err = c.LoadDirectory("./testdata/subconfig") + if err != nil { + t.Error(err) + } + + memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) + memcached.Servers = []string{"localhost"} + + mConfig := &PluginConfig{ + Name: "memcached", + Drop: []string{"other", "stuff"}, + Pass: []string{"some", "strings"}, + TagDrop: []TagFilter{ + TagFilter{ + Name: "badtag", + Filter: []string{"othertag"}, + }, + }, + TagPass: []TagFilter{ + TagFilter{ + Name: "goodtag", + Filter: []string{"mytag"}, + }, + }, + Interval: 5 * time.Second, + } + assert.Equal(t, memcached, c.Plugins[0].Plugin, + "Testdata did not produce a correct memcached struct.") + assert.Equal(t, mConfig, c.Plugins[0].Config, + "Testdata did not produce correct memcached metadata.") + + ex := plugins.Plugins["exec"]().(*exec.Exec) + ex.Commands = []*exec.Command{ + &exec.Command{ + Command: "/usr/bin/myothercollector --foo=bar", + Name: "myothercollector", + }, + } + eConfig := &PluginConfig{Name: "exec"} + assert.Equal(t, ex, c.Plugins[1].Plugin, + "Merged Testdata did not produce a correct exec struct.") + assert.Equal(t, eConfig, c.Plugins[1].Config, + "Merged Testdata did not produce correct exec metadata.") + + memcached.Servers = []string{"192.168.1.1"} + assert.Equal(t, memcached, c.Plugins[2].Plugin, + "Testdata did not produce a correct memcached struct.") + assert.Equal(t, mConfig, c.Plugins[2].Config, + "Testdata did not produce correct memcached metadata.") + + pstat := plugins.Plugins["procstat"]().(*procstat.Procstat) + pstat.Specifications = []*procstat.Specification{ + &procstat.Specification{ + PidFile: "/var/run/grafana-server.pid", + }, + &procstat.Specification{ + PidFile: "/var/run/influxdb/influxd.pid", + }, + } + + pConfig := &PluginConfig{Name: "procstat"} + + assert.Equal(t, pstat, c.Plugins[3].Plugin, + "Merged Testdata did not produce a correct procstat struct.") + assert.Equal(t, pConfig, c.Plugins[3].Config, + "Merged Testdata did not produce correct procstat metadata.") +} diff --git a/testdata/single_plugin.toml b/internal/config/testdata/single_plugin.toml similarity index 64% rename from testdata/single_plugin.toml rename to internal/config/testdata/single_plugin.toml index 643541126..e591984f1 100644 --- a/testdata/single_plugin.toml +++ b/internal/config/testdata/single_plugin.toml @@ -1,9 +1,9 @@ -[memcached] +[[plugins.memcached]] servers = ["localhost"] pass = ["some", "strings"] drop = ["other", "stuff"] interval = "5s" - [memcached.tagpass] + [plugins.memcached.tagpass] goodtag = ["mytag"] - [memcached.tagdrop] + [plugins.memcached.tagdrop] badtag = ["othertag"] diff --git a/testdata/subconfig/exec.conf b/internal/config/testdata/subconfig/exec.conf similarity index 82% rename from testdata/subconfig/exec.conf rename to internal/config/testdata/subconfig/exec.conf index ec14f7f6e..552441031 100644 --- a/testdata/subconfig/exec.conf +++ b/internal/config/testdata/subconfig/exec.conf @@ -1,6 +1,6 @@ -[exec] +[[plugins.exec]] # specify commands via an array of tables - [[exec.commands]] + [[plugins.exec.commands]] # the command to run command = "/usr/bin/myothercollector --foo=bar" diff --git a/testdata/subconfig/memcached.conf b/internal/config/testdata/subconfig/memcached.conf similarity index 65% rename from testdata/subconfig/memcached.conf rename to internal/config/testdata/subconfig/memcached.conf index 95189469a..8d67886c1 100644 --- a/testdata/subconfig/memcached.conf +++ b/internal/config/testdata/subconfig/memcached.conf @@ -1,9 +1,9 @@ -[memcached] +[[plugins.memcached]] servers = ["192.168.1.1"] pass = ["some", "strings"] drop = ["other", "stuff"] interval = "5s" - [memcached.tagpass] + [plugins.memcached.tagpass] goodtag = ["mytag"] - [memcached.tagdrop] + [plugins.memcached.tagdrop] badtag = ["othertag"] diff --git a/internal/config/testdata/subconfig/procstat.conf b/internal/config/testdata/subconfig/procstat.conf new file mode 100644 index 000000000..33f288d84 --- /dev/null +++ b/internal/config/testdata/subconfig/procstat.conf @@ -0,0 +1,5 @@ +[[plugins.procstat]] + [[plugins.procstat.specifications]] + pid_file = "/var/run/grafana-server.pid" + [[plugins.procstat.specifications]] + pid_file = "/var/run/influxdb/influxd.pid" diff --git a/testdata/telegraf-agent.toml b/internal/config/testdata/telegraf-agent.toml similarity index 95% rename from testdata/telegraf-agent.toml rename to internal/config/testdata/telegraf-agent.toml index 28d086739..e63e47b56 100644 --- a/testdata/telegraf-agent.toml +++ b/internal/config/testdata/telegraf-agent.toml @@ -88,13 +88,15 @@ # PLUGINS # ############################################################################### +[plugins] + # Read Apache status information (mod_status) -[apache] +[[plugins.apache]] # An array of Apache status URI to gather stats. urls = ["http://localhost/server-status?auto"] # Read metrics about cpu usage -[cpu] +[[plugins.cpu]] # Whether to report per-cpu stats or not percpu = true # Whether to report total system cpu stats or not @@ -103,11 +105,11 @@ urls = ["http://localhost/server-status?auto"] drop = ["cpu_time"] # Read metrics about disk usage by mount point -[disk] +[[plugins.disk]] # no configuration # Read metrics from one or many disque servers -[disque] +[[plugins.disque]] # An array of URI to gather stats about. Specify an ip or hostname # with optional port and password. ie disque://localhost, disque://10.10.3.33:18832, # 10.0.0.1:10000, etc. @@ -116,7 +118,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["localhost"] # Read stats from one or more Elasticsearch servers or clusters -[elasticsearch] +[[plugins.elasticsearch]] # specify a list of one or more Elasticsearch servers servers = ["http://localhost:9200"] @@ -125,7 +127,7 @@ urls = ["http://localhost/server-status?auto"] local = true # Read flattened metrics from one or more commands that output JSON to stdout -[exec] +[[plugins.exec]] # specify commands via an array of tables [[exec.commands]] # the command to run @@ -135,7 +137,7 @@ urls = ["http://localhost/server-status?auto"] name = "mycollector" # Read metrics of haproxy, via socket or csv stats page -[haproxy] +[[plugins.haproxy]] # An array of address to gather stats about. Specify an ip on hostname # with optional port. ie localhost, 10.10.3.33:1936, etc. # @@ -145,7 +147,7 @@ urls = ["http://localhost/server-status?auto"] # servers = ["socket:/run/haproxy/admin.sock"] # Read flattened metrics from one or more JSON HTTP endpoints -[httpjson] +[[plugins.httpjson]] # Specify services via an array of tables [[httpjson.services]] @@ -167,11 +169,11 @@ urls = ["http://localhost/server-status?auto"] threshold = "0.75" # Read metrics about disk IO by device -[io] +[[plugins.io]] # no configuration # read metrics from a Kafka topic -[kafka_consumer] +[[plugins.kafka_consumer]] # topic(s) to consume topics = ["telegraf"] # an array of Zookeeper connection strings @@ -184,7 +186,7 @@ urls = ["http://localhost/server-status?auto"] offset = "oldest" # Read metrics from a LeoFS Server via SNMP -[leofs] +[[plugins.leofs]] # An array of URI to gather stats about LeoFS. # Specify an ip or hostname with port. ie 127.0.0.1:4020 # @@ -192,7 +194,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["127.0.0.1:4021"] # Read metrics from local Lustre service on OST, MDS -[lustre2] +[[plugins.lustre2]] # An array of /proc globs to search for Lustre stats # If not specified, the default will work on Lustre 2.5.x # @@ -200,11 +202,11 @@ urls = ["http://localhost/server-status?auto"] # mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"] # Read metrics about memory usage -[mem] +[[plugins.mem]] # no configuration # Read metrics from one or many memcached servers -[memcached] +[[plugins.memcached]] # An array of address to gather stats about. Specify an ip on hostname # with optional port. ie localhost, 10.0.0.1:11211, etc. # @@ -212,7 +214,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["localhost"] # Read metrics from one or many MongoDB servers -[mongodb] +[[plugins.mongodb]] # An array of URI to gather stats about. Specify an ip or hostname # with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017, # mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc. @@ -221,7 +223,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["127.0.0.1:27017"] # Read metrics from one or many mysql servers -[mysql] +[[plugins.mysql]] # specify servers via a url matching: # [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]] # e.g. @@ -232,7 +234,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["localhost"] # Read metrics about network interface usage -[net] +[[plugins.net]] # By default, telegraf gathers stats from any up interface (excluding loopback) # Setting interfaces will tell it to gather these explicit interfaces, # regardless of status. @@ -240,12 +242,12 @@ urls = ["http://localhost/server-status?auto"] # interfaces = ["eth0", ... ] # Read Nginx's basic status information (ngx_http_stub_status_module) -[nginx] +[[plugins.nginx]] # An array of Nginx stub_status URI to gather stats. urls = ["http://localhost/status"] # Ping given url(s) and return statistics -[ping] +[[plugins.ping]] # urls to ping urls = ["www.google.com"] # required # number of pings to send (ping -c ) @@ -258,7 +260,7 @@ urls = ["http://localhost/server-status?auto"] interface = "" # Read metrics from one or many postgresql servers -[postgresql] +[[plugins.postgresql]] # specify servers via an array of tables [[postgresql.servers]] @@ -288,12 +290,12 @@ urls = ["http://localhost/server-status?auto"] # address = "influx@remoteserver" # Read metrics from one or many prometheus clients -[prometheus] +[[plugins.prometheus]] # An array of urls to scrape metrics from. urls = ["http://localhost:9100/metrics"] # Read metrics from one or many RabbitMQ servers via the management API -[rabbitmq] +[[plugins.rabbitmq]] # Specify servers via an array of tables [[rabbitmq.servers]] # name = "rmq-server-1" # optional tag @@ -306,7 +308,7 @@ urls = ["http://localhost/server-status?auto"] # nodes = ["rabbit@node1", "rabbit@node2"] # Read metrics from one or many redis servers -[redis] +[[plugins.redis]] # An array of URI to gather stats about. Specify an ip or hostname # with optional port add password. ie redis://localhost, redis://10.10.3.33:18832, # 10.0.0.1:10000, etc. @@ -315,7 +317,7 @@ urls = ["http://localhost/server-status?auto"] servers = ["localhost"] # Read metrics from one or many RethinkDB servers -[rethinkdb] +[[plugins.rethinkdb]] # An array of URI to gather stats about. Specify an ip or hostname # with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, # rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. @@ -324,9 +326,9 @@ urls = ["http://localhost/server-status?auto"] servers = ["127.0.0.1:28015"] # Read metrics about swap memory usage -[swap] +[[plugins.swap]] # no configuration # Read metrics about system load & uptime -[system] +[[plugins.system]] # no configuration diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index 1fdcbfbf6..d4a42b6c4 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -16,7 +16,7 @@ import ( const sampleConfig = ` # specify commands via an array of tables - [[exec.commands]] + [[plugins.exec.commands]] # the command to run command = "/usr/bin/mycollector --foo=bar" diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go index aa8a0d443..f1d2ef927 100644 --- a/plugins/httpjson/httpjson.go +++ b/plugins/httpjson/httpjson.go @@ -48,7 +48,7 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { var sampleConfig = ` # Specify services via an array of tables - [[httpjson.services]] + [[plugins.httpjson.services]] # a name for the service being polled name = "webserver_stats" @@ -69,7 +69,7 @@ var sampleConfig = ` # ] # HTTP parameters (all values must be strings) - [httpjson.services.parameters] + [plugins.httpjson.services.parameters] event_type = "cpu_spike" threshold = "0.75" ` diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index 0527d9e70..041534b87 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -55,7 +55,7 @@ func (j *Jolokia) SampleConfig() string { group = "as" # List of servers exposing jolokia read service - [[jolokia.servers]] + [[plugins.jolokia.servers]] name = "stable" host = "192.168.103.2" port = "8180" @@ -63,20 +63,20 @@ func (j *Jolokia) SampleConfig() string { # List of metrics collected on above servers # Each metric consists in a name, a jmx path and either a pass or drop slice attributes # This collect all heap memory usage metrics - [[jolokia.metrics]] + [[plugins.jolokia.metrics]] name = "heap_memory_usage" jmx = "/java.lang:type=Memory/HeapMemoryUsage" # This drops the 'committed' value from Eden space measurement - [[jolokia.metrics]] + [[plugins.jolokia.metrics]] name = "memory_eden" jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" drop = [ "committed" ] # This passes only DaemonThreadCount and ThreadCount - [[jolokia.metrics]] + [[plugins.jolokia.metrics]] name = "heap_threads" jmx = "/java.lang:type=Threading" pass = [ diff --git a/plugins/postgresql/postgresql.go b/plugins/postgresql/postgresql.go index 2a550b5a3..a31a9b4d2 100644 --- a/plugins/postgresql/postgresql.go +++ b/plugins/postgresql/postgresql.go @@ -25,7 +25,7 @@ var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_rese var sampleConfig = ` # specify servers via an array of tables - [[postgresql.servers]] + [[plugins.postgresql.servers]] # specify address via a url matching: # postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full] @@ -49,7 +49,7 @@ var sampleConfig = ` # databases = ["app_production", "blah_testing"] - # [[postgresql.servers]] + # [[plugins.postgresql.servers]] # address = "influx@remoteserver" ` diff --git a/plugins/procstat/procstat.go b/plugins/procstat/procstat.go index e9191bbc2..1370a0003 100644 --- a/plugins/procstat/procstat.go +++ b/plugins/procstat/procstat.go @@ -30,7 +30,7 @@ func NewProcstat() *Procstat { } var sampleConfig = ` - [[procstat.specifications]] + [[plugins.procstat.specifications]] prefix = "" # optional string to prefix measurements # Must specify one of: pid_file, exe, or pattern # PID file to monitor process diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go index 8fe5437d3..27580a13a 100644 --- a/plugins/rabbitmq/rabbitmq.go +++ b/plugins/rabbitmq/rabbitmq.go @@ -100,7 +100,7 @@ var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} var sampleConfig = ` # Specify servers via an array of tables - [[rabbitmq.servers]] + [[plugins.rabbitmq.servers]] # name = "rmq-server-1" # optional tag # url = "http://localhost:15672" # username = "guest" diff --git a/plugins/twemproxy/twemproxy.go b/plugins/twemproxy/twemproxy.go index 520347280..0b1f6139e 100644 --- a/plugins/twemproxy/twemproxy.go +++ b/plugins/twemproxy/twemproxy.go @@ -22,7 +22,7 @@ type TwemproxyInstance struct { } var sampleConfig = ` - [[twemproxy.instances]] + [[plugins.twemproxy.instances]] # Twemproxy stats address and port (no scheme) addr = "localhost:22222" # Monitor pool name diff --git a/testdata/influx.toml b/testdata/influx.toml deleted file mode 100644 index 10684b159..000000000 --- a/testdata/influx.toml +++ /dev/null @@ -1,17 +0,0 @@ -[agent] -interval = "5s" -http = ":11213" -debug = true - -[outputs] -[outputs.influxdb] -url = "http://localhost:8086" -username = "root" -password = "root" -database = "telegraf" - -[tags] -dc = "us-phx-1" - -[redis] -address = ":6379" diff --git a/testdata/subconfig/procstat.conf b/testdata/subconfig/procstat.conf deleted file mode 100644 index 94cfcb694..000000000 --- a/testdata/subconfig/procstat.conf +++ /dev/null @@ -1,5 +0,0 @@ -[procstat] - [[procstat.specifications]] - pid_file = "/var/run/grafana-server.pid" - [[procstat.specifications]] - pid_file = "/var/run/influxdb/influxd.pid"