Change plugin config to be specified as a list

This makes plugin configuration more similar to output configuration,
where we can specify multiple plugins as a list. The idea behind this is
that the Telegraf agent can handle the multi-processing and error
handling better than each plugin handling that internally. This will
also allow for having different plugin configurations for different
instances of the same type of plugin.
This commit is contained in:
Cameron Sparr 2015-11-19 19:08:02 -07:00
parent 13ccf420d7
commit 78f2ea89f8
3 changed files with 57 additions and 25 deletions

View File

@ -24,9 +24,10 @@ type runningOutput struct {
} }
type runningPlugin struct { type runningPlugin struct {
name string name string
plugin plugins.Plugin filtername string
config *ConfiguredPlugin plugin plugins.Plugin
config *ConfiguredPlugin
} }
// Agent runs telegraf and collects data based on the given config // Agent runs telegraf and collects data based on the given config
@ -176,9 +177,11 @@ func (a *Agent) LoadPlugins(filters []string, config *Config) ([]string, error)
var names []string var names []string
for name, plugin := range config.PluginsDeclared() { for name, plugin := range config.PluginsDeclared() {
if sliceContains(name, filters) || len(filters) == 0 { // Trim the ID off the output name for filtering
filtername := strings.TrimRight(name, "-0123456789")
if sliceContains(filtername, filters) || len(filters) == 0 {
config := config.GetPluginConfig(name) config := config.GetPluginConfig(name)
a.plugins = append(a.plugins, &runningPlugin{name, plugin, config}) a.plugins = append(a.plugins, &runningPlugin{name, filtername, plugin, config})
names = append(names, name) names = append(names, name)
} }
} }
@ -207,7 +210,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
acc := NewAccumulator(plugin.config, pointChan) acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug) acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_") acc.SetPrefix(plugin.filtername + "_")
acc.SetDefaultTags(a.Tags) acc.SetDefaultTags(a.Tags)
if err := plugin.plugin.Gather(acc); err != nil { if err := plugin.plugin.Gather(acc); err != nil {
@ -240,7 +243,7 @@ func (a *Agent) gatherSeparate(
acc := NewAccumulator(plugin.config, pointChan) acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(a.Debug) acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.name + "_") acc.SetPrefix(plugin.filtername + "_")
acc.SetDefaultTags(a.Tags) acc.SetDefaultTags(a.Tags)
if err := plugin.plugin.Gather(acc); err != nil { if err := plugin.plugin.Gather(acc); err != nil {
@ -286,7 +289,7 @@ func (a *Agent) Test() error {
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
acc := NewAccumulator(plugin.config, pointChan) acc := NewAccumulator(plugin.config, pointChan)
acc.SetDebug(true) acc.SetDebug(true)
acc.SetPrefix(plugin.name + "_") acc.SetPrefix(plugin.filtername + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name) fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name)
if plugin.config.Interval != 0 { if plugin.config.Interval != 0 {
@ -299,7 +302,7 @@ func (a *Agent) Test() error {
// Special instructions for some plugins. cpu, for example, needs to be // Special instructions for some plugins. cpu, for example, needs to be
// run twice in order to return cpu usage percentages. // run twice in order to return cpu usage percentages.
switch plugin.name { switch plugin.filtername {
case "cpu", "mongodb": case "cpu", "mongodb":
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name) fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name)

View File

@ -236,6 +236,8 @@ var pluginHeader = `
############################################################################### ###############################################################################
# PLUGINS # # PLUGINS #
############################################################################### ###############################################################################
[plugins]
` `
var servicePluginHeader = ` var servicePluginHeader = `
@ -311,7 +313,7 @@ type printer interface {
} }
func printConfig(name string, p printer) { func printConfig(name string, p printer) {
fmt.Printf("\n# %s\n[%s]", p.Description(), name) fmt.Printf("\n# %s\n[[plugins.%s]]", p.Description(), name)
config := p.SampleConfig() config := p.SampleConfig()
if config == "" { if config == "" {
fmt.Printf("\n # no configuration\n") fmt.Printf("\n # no configuration\n")
@ -540,8 +542,34 @@ func LoadConfig(path string) (*Config, error) {
outputName) outputName)
} }
} }
case "plugins":
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
err = c.parsePlugin(pluginName, pluginSubTable, 0)
if err != nil {
log.Printf("Could not parse config for plugin: %s\n",
pluginName)
return nil, err
}
case []*ast.Table:
for id, t := range pluginSubTable {
err = c.parsePlugin(pluginName, t, id)
if err != nil {
log.Printf("Could not parse config for plugin: %s\n",
pluginName)
return nil, err
}
}
default:
return nil, fmt.Errorf("Unsupported config format: %s",
pluginName)
}
}
// Assume it's a plugin for legacy config file support if no other
// identifiers are present
default: default:
err = c.parsePlugin(name, subTable) err = c.parsePlugin(name, subTable, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -590,7 +618,7 @@ func (c *Config) parseOutput(name string, outputAst *ast.Table, id int) error {
} }
// Parse a plugin config, plus plugin meta-config, out of the given *ast.Table. // Parse a plugin config, plus plugin meta-config, out of the given *ast.Table.
func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error { func (c *Config) parsePlugin(name string, pluginAst *ast.Table, id int) error {
creator, ok := plugins.Plugins[name] creator, ok := plugins.Plugins[name]
if !ok { if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name) return fmt.Errorf("Undefined but requested plugin: %s", name)
@ -682,13 +710,14 @@ func (c *Config) parsePlugin(name string, pluginAst *ast.Table) error {
delete(pluginAst.Fields, "interval") delete(pluginAst.Fields, "interval")
delete(pluginAst.Fields, "tagdrop") delete(pluginAst.Fields, "tagdrop")
delete(pluginAst.Fields, "tagpass") delete(pluginAst.Fields, "tagpass")
c.pluginFieldsSet[name] = extractFieldNames(pluginAst) nameID := fmt.Sprintf("%s-%d", name, id)
c.pluginConfigurationFieldsSet[name] = cpFields c.pluginFieldsSet[nameID] = extractFieldNames(pluginAst)
c.pluginConfigurationFieldsSet[nameID] = cpFields
err := toml.UnmarshalTable(pluginAst, plugin) err := toml.UnmarshalTable(pluginAst, plugin)
if err != nil { if err != nil {
return err return err
} }
c.plugins[name] = plugin c.plugins[nameID] = plugin
c.pluginConfigurations[name] = cp c.pluginConfigurations[nameID] = cp
return nil return nil
} }

View File

@ -206,7 +206,7 @@ func TestConfig_parsePlugin(t *testing.T) {
} }
subtbl := tbl.Fields["memcached"].(*ast.Table) subtbl := tbl.Fields["memcached"].(*ast.Table)
err = c.parsePlugin("memcached", subtbl) err = c.parsePlugin("memcached", subtbl, 0)
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"} memcached.Servers = []string{"localhost"}
@ -230,9 +230,9 @@ func TestConfig_parsePlugin(t *testing.T) {
Interval: 5 * time.Second, Interval: 5 * time.Second,
} }
assert.Equal(t, memcached, c.plugins["memcached"], assert.Equal(t, memcached, c.plugins["memcached-0"],
"Testdata did not produce a correct memcached struct.") "Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"], assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"],
"Testdata did not produce correct memcached metadata.") "Testdata did not produce correct memcached metadata.")
} }
@ -290,18 +290,18 @@ func TestConfig_LoadDirectory(t *testing.T) {
pConfig := &ConfiguredPlugin{Name: "procstat"} pConfig := &ConfiguredPlugin{Name: "procstat"}
assert.Equal(t, memcached, c.plugins["memcached"], assert.Equal(t, memcached, c.plugins["memcached-0"],
"Merged Testdata did not produce a correct memcached struct.") "Merged Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached"], assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"],
"Merged Testdata did not produce correct memcached metadata.") "Merged Testdata did not produce correct memcached metadata.")
assert.Equal(t, ex, c.plugins["exec"], assert.Equal(t, ex, c.plugins["exec-0"],
"Merged Testdata did not produce a correct exec struct.") "Merged Testdata did not produce a correct exec struct.")
assert.Equal(t, eConfig, c.pluginConfigurations["exec"], assert.Equal(t, eConfig, c.pluginConfigurations["exec-0"],
"Merged Testdata did not produce correct exec metadata.") "Merged Testdata did not produce correct exec metadata.")
assert.Equal(t, pstat, c.plugins["procstat"], assert.Equal(t, pstat, c.plugins["procstat-0"],
"Merged Testdata did not produce a correct procstat struct.") "Merged Testdata did not produce a correct procstat struct.")
assert.Equal(t, pConfig, c.pluginConfigurations["procstat"], assert.Equal(t, pConfig, c.pluginConfigurations["procstat-0"],
"Merged Testdata did not produce correct procstat metadata.") "Merged Testdata did not produce correct procstat metadata.")
} }