Add pass, drop, and interval to the plugin options

This commit is contained in:
Evan Phoenix 2015-05-19 22:19:32 -07:00
parent 203d3695b4
commit 8aa7e355f6
4 changed files with 211 additions and 12 deletions

View File

@ -14,11 +14,19 @@ type BatchPoints struct {
Debug bool
Prefix string
Config *ConfiguredPlugin
}
func (bp *BatchPoints) Add(name string, val interface{}, tags map[string]string) {
name = bp.Prefix + name
if bp.Config != nil {
if !bp.Config.ShouldPass(name) {
return
}
}
if bp.Debug {
var tg []string

108
agent.go
View File

@ -6,6 +6,7 @@ import (
"net/url"
"os"
"sort"
"sync"
"time"
"github.com/influxdb/influxdb/client"
@ -15,12 +16,12 @@ import (
type runningPlugin struct {
name string
plugin plugins.Plugin
config *ConfiguredPlugin
}
type Agent struct {
Interval Duration
Debug bool
HTTP string
Hostname string
Config *Config
@ -31,9 +32,9 @@ type Agent struct {
}
func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{Config: config}
agent := &Agent{Config: config, Interval: Duration{10 * time.Second}}
err := config.Apply("agent", agent)
err := config.ApplyAgent(agent)
if err != nil {
return nil, err
}
@ -91,12 +92,12 @@ func (a *Agent) LoadPlugins() ([]string, error) {
plugin := creator()
err := a.Config.Apply(name, plugin)
config, err := a.Config.ApplyPlugin(name, plugin)
if err != nil {
return nil, err
}
a.plugins = append(a.plugins, &runningPlugin{name, plugin})
a.plugins = append(a.plugins, &runningPlugin{name, plugin, config})
names = append(names, name)
}
@ -105,6 +106,49 @@ func (a *Agent) LoadPlugins() ([]string, error) {
return names, nil
}
func (a *Agent) crankParallel() error {
points := make(chan *BatchPoints, len(a.plugins))
var wg sync.WaitGroup
for _, plugin := range a.plugins {
if plugin.config.Interval != 0 {
continue
}
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
var acc BatchPoints
acc.Debug = a.Debug
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
plugin.plugin.Gather(&acc)
points <- &acc
}(plugin)
}
wg.Wait()
close(points)
var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
}
return nil
// _, err := a.conn.Write(acc.BatchPoints)
// return err
}
func (a *Agent) crank() error {
var acc BatchPoints
@ -112,6 +156,7 @@ func (a *Agent) crank() error {
for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
@ -126,6 +171,36 @@ func (a *Agent) crank() error {
return err
}
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)
for {
var acc BatchPoints
acc.Debug = a.Debug
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
}
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
a.conn.Write(acc.BatchPoints)
select {
case <-shutdown:
return nil
case <-ticker.C:
continue
}
}
}
func (a *Agent) TestAllPlugins() error {
var names []string
@ -162,6 +237,13 @@ func (a *Agent) Test() error {
for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
fmt.Printf("* Plugin: %s\n", plugin.name)
if plugin.config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
}
err := plugin.plugin.Gather(&acc)
if err != nil {
return err
@ -179,10 +261,24 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}
var wg sync.WaitGroup
for _, plugin := range a.plugins {
if plugin.config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
defer wg.Done()
a.crankSeparate(shutdown, plugin)
}(plugin)
}
}
defer wg.Wait()
ticker := time.NewTicker(a.Interval.Duration)
for {
err := a.crank()
err := a.crankParallel()
if err != nil {
log.Printf("Error in plugins: %s", err)
}

View File

@ -95,7 +95,8 @@ func main() {
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: %#v", ag)
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}
if config.URL != "" {

104
config.go
View File

@ -36,6 +36,7 @@ type Config struct {
UserAgent string
Tags map[string]string
agent *ast.Table
plugins map[string]*ast.Table
}
@ -43,14 +44,98 @@ func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}
func (c *Config) Apply(name string, v interface{}) error {
if tbl, ok := c.plugins[name]; ok {
return toml.UnmarshalTable(tbl, v)
type ConfiguredPlugin struct {
Name string
Drop []string
Pass []string
Interval time.Duration
}
func (cp *ConfiguredPlugin) ShouldPass(name string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
if strings.HasPrefix(name, pat) {
return true
}
}
return false
}
if cp.Drop != nil {
for _, pat := range cp.Drop {
if strings.HasPrefix(name, pat) {
return false
}
}
return true
}
return true
}
func (c *Config) ApplyAgent(v interface{}) error {
if c.agent != nil {
return toml.UnmarshalTable(c.agent, v)
}
return nil
}
func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) {
cp := &ConfiguredPlugin{Name: name}
if tbl, ok := c.plugins[name]; ok {
if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
return cp, toml.UnmarshalTable(tbl, v)
}
return cp, nil
}
func (c *Config) PluginsDeclared() []string {
var plugins []string
@ -90,12 +175,15 @@ func LoadConfig(path string) (*Config, error) {
return nil, ErrInvalidConfig
}
if name == "influxdb" {
switch name {
case "influxdb":
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
} else {
case "agent":
c.agent = subtbl
default:
c.plugins[name] = subtbl
}
}
@ -154,6 +242,12 @@ var header = `# Tivan configuration
# [influxdb.tags]
# dc = "us-east-1"
# Configuration for tivan itself
# [agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
# PLUGINS
`