From 8aa7e355f6e0e944bffa3f8be194ed3e240dd1d2 Mon Sep 17 00:00:00 2001 From: Evan Phoenix Date: Tue, 19 May 2015 22:19:32 -0700 Subject: [PATCH] Add pass, drop, and interval to the plugin options --- accumulator.go | 8 ++++ agent.go | 108 ++++++++++++++++++++++++++++++++++++++++++--- cmd/tivan/tivan.go | 3 +- config.go | 104 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 211 insertions(+), 12 deletions(-) diff --git a/accumulator.go b/accumulator.go index 4428ca8bf..ebe227e46 100644 --- a/accumulator.go +++ b/accumulator.go @@ -14,11 +14,19 @@ type BatchPoints struct { Debug bool Prefix string + + Config *ConfiguredPlugin } func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) { name = bp.Prefix + name + if bp.Config != nil { + if !bp.Config.ShouldPass(name) { + return + } + } + if bp.Debug { var tg []string diff --git a/agent.go b/agent.go index 5a87f39e4..dd80adf77 100644 --- a/agent.go +++ b/agent.go @@ -6,6 +6,7 @@ import ( "net/url" "os" "sort" + "sync" "time" "github.com/influxdb/influxdb/client" @@ -15,12 +16,12 @@ import ( type runningPlugin struct { name string plugin plugins.Plugin + config *ConfiguredPlugin } type Agent struct { Interval Duration Debug bool - HTTP string Hostname string Config *Config @@ -31,9 +32,9 @@ type Agent struct { } func NewAgent(config *Config) (*Agent, error) { - agent := &Agent{Config: config} + agent := &Agent{Config: config, Interval: Duration{10 * time.Second}} - err := config.Apply("agent", agent) + err := config.ApplyAgent(agent) if err != nil { return nil, err } @@ -91,12 +92,12 @@ func (a *Agent) LoadPlugins() ([]string, error) { plugin := creator() - err := a.Config.Apply(name, plugin) + config, err := a.Config.ApplyPlugin(name, plugin) if err != nil { return nil, err } - a.plugins = append(a.plugins, &runningPlugin{name, plugin}) + a.plugins = append(a.plugins, &runningPlugin{name, plugin, config}) names = append(names, name) } @@ -105,6 +106,49 @@ func (a *Agent) LoadPlugins() ([]string, error) { return names, nil } +func (a *Agent) crankParallel() error { + points := make(chan *BatchPoints, len(a.plugins)) + + var wg sync.WaitGroup + + for _, plugin := range a.plugins { + if plugin.config.Interval != 0 { + continue + } + + wg.Add(1) + go func(plugin *runningPlugin) { + defer wg.Done() + + var acc BatchPoints + acc.Debug = a.Debug + acc.Prefix = plugin.name + "_" + acc.Config = plugin.config + + plugin.plugin.Gather(&acc) + + points <- &acc + }(plugin) + } + + wg.Wait() + + close(points) + + var acc BatchPoints + acc.Tags = a.Config.Tags + acc.Time = time.Now() + acc.Database = a.Config.Database + + for sub := range points { + acc.Points = append(acc.Points, sub.Points...) + } + + return nil + // _, err := a.conn.Write(acc.BatchPoints) + // return err +} + func (a *Agent) crank() error { var acc BatchPoints @@ -112,6 +156,7 @@ func (a *Agent) crank() error { for _, plugin := range a.plugins { acc.Prefix = plugin.name + "_" + acc.Config = plugin.config err := plugin.plugin.Gather(&acc) if err != nil { return err @@ -126,6 +171,36 @@ func (a *Agent) crank() error { return err } +func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { + ticker := time.NewTicker(plugin.config.Interval) + + for { + var acc BatchPoints + + acc.Debug = a.Debug + + acc.Prefix = plugin.name + "_" + acc.Config = plugin.config + err := plugin.plugin.Gather(&acc) + if err != nil { + return err + } + + acc.Tags = a.Config.Tags + acc.Time = time.Now() + acc.Database = a.Config.Database + + a.conn.Write(acc.BatchPoints) + + select { + case <-shutdown: + return nil + case <-ticker.C: + continue + } + } +} + func (a *Agent) TestAllPlugins() error { var names []string @@ -162,6 +237,13 @@ func (a *Agent) Test() error { for _, plugin := range a.plugins { acc.Prefix = plugin.name + "_" + acc.Config = plugin.config + + fmt.Printf("* Plugin: %s\n", plugin.name) + if plugin.config.Interval != 0 { + fmt.Printf("* Internal: %s\n", plugin.config.Interval) + } + err := plugin.plugin.Gather(&acc) if err != nil { return err @@ -179,10 +261,24 @@ func (a *Agent) Run(shutdown chan struct{}) error { } } + var wg sync.WaitGroup + + for _, plugin := range a.plugins { + if plugin.config.Interval != 0 { + wg.Add(1) + go func(plugin *runningPlugin) { + defer wg.Done() + a.crankSeparate(shutdown, plugin) + }(plugin) + } + } + + defer wg.Wait() + ticker := time.NewTicker(a.Interval.Duration) for { - err := a.crank() + err := a.crankParallel() if err != nil { log.Printf("Error in plugins: %s", err) } diff --git a/cmd/tivan/tivan.go b/cmd/tivan/tivan.go index 552df22b1..d77021bad 100644 --- a/cmd/tivan/tivan.go +++ b/cmd/tivan/tivan.go @@ -95,7 +95,8 @@ func main() { log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) if ag.Debug { log.Printf("Debug: enabled") - log.Printf("Agent Config: %#v", ag) + log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n", + ag.Interval, ag.Debug, ag.Hostname) } if config.URL != "" { diff --git a/config.go b/config.go index 3399135bc..25c3ef4b1 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,7 @@ type Config struct { UserAgent string Tags map[string]string + agent *ast.Table plugins map[string]*ast.Table } @@ -43,14 +44,98 @@ func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } -func (c *Config) Apply(name string, v interface{}) error { - if tbl, ok := c.plugins[name]; ok { - return toml.UnmarshalTable(tbl, v) +type ConfiguredPlugin struct { + Name string + + Drop []string + Pass []string + + Interval time.Duration +} + +func (cp *ConfiguredPlugin) ShouldPass(name string) bool { + if cp.Pass != nil { + for _, pat := range cp.Pass { + if strings.HasPrefix(name, pat) { + return true + } + } + + return false + } + + if cp.Drop != nil { + for _, pat := range cp.Drop { + if strings.HasPrefix(name, pat) { + return false + } + } + + return true + } + + return true +} + +func (c *Config) ApplyAgent(v interface{}) error { + if c.agent != nil { + return toml.UnmarshalTable(c.agent, v) } return nil } +func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) { + cp := &ConfiguredPlugin{Name: name} + + if tbl, ok := c.plugins[name]; ok { + + if node, ok := tbl.Fields["pass"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + cp.Pass = append(cp.Pass, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["drop"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + cp.Drop = append(cp.Drop, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["interval"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + cp.Interval = dur + } + } + } + + delete(tbl.Fields, "drop") + delete(tbl.Fields, "pass") + delete(tbl.Fields, "interval") + return cp, toml.UnmarshalTable(tbl, v) + } + + return cp, nil +} + func (c *Config) PluginsDeclared() []string { var plugins []string @@ -90,12 +175,15 @@ func LoadConfig(path string) (*Config, error) { return nil, ErrInvalidConfig } - if name == "influxdb" { + switch name { + case "influxdb": err := toml.UnmarshalTable(subtbl, c) if err != nil { return nil, err } - } else { + case "agent": + c.agent = subtbl + default: c.plugins[name] = subtbl } } @@ -154,6 +242,12 @@ var header = `# Tivan configuration # [influxdb.tags] # dc = "us-east-1" +# Configuration for tivan itself +# [agent] +# interval = "10s" +# debug = false +# hostname = "prod3241" + # PLUGINS `