diff --git a/agent.go b/agent.go index 7ac924f63..6b31e397d 100644 --- a/agent.go +++ b/agent.go @@ -3,16 +3,20 @@ package telegraf import ( "fmt" "log" - "net/url" "os" "sort" "sync" "time" - "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" ) +type runningOutput struct { + name string + output outputs.Output +} + type runningPlugin struct { name string plugin plugins.Plugin @@ -26,9 +30,8 @@ type Agent struct { Config *Config + outputs []*runningOutput plugins []*runningPlugin - - conn *client.Client } func NewAgent(config *Config) (*Agent, error) { @@ -57,30 +60,41 @@ func NewAgent(config *Config) (*Agent, error) { return agent, nil } -func (agent *Agent) Connect() error { - config := agent.Config - - u, err := url.Parse(config.URL) - if err != nil { - return err +func (a *Agent) Connect() error { + for _, o := range a.outputs { + err := o.output.Connect() + if err != nil { + return err + } } - - c, err := client.NewClient(client.Config{ - URL: *u, - Username: config.Username, - Password: config.Password, - UserAgent: config.UserAgent, - }) - - if err != nil { - return err - } - - agent.conn = c - return nil } +func (a *Agent) LoadOutputs() ([]string, error) { + var names []string + + for _, name := range a.Config.OutputsDeclared() { + creator, ok := outputs.Outputs[name] + if !ok { + return nil, fmt.Errorf("Undefined but requested output: %s", name) + } + + output := creator() + + err := a.Config.ApplyOutput(name, output) + if err != nil { + return nil, err + } + + a.outputs = append(a.outputs, &runningOutput{name, output}) + names = append(names, name) + } + + sort.Strings(names) + + return names, nil +} + func (a *Agent) LoadPlugins() ([]string, error) { var names []string @@ -135,17 +149,15 @@ func (a *Agent) crankParallel() error { close(points) - var acc BatchPoints - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + var bp BatchPoints + bp.Tags = a.Config.Tags + bp.Time = time.Now() for sub := range points { - acc.Points = append(acc.Points, sub.Points...) + bp.Points = append(bp.Points, sub.Points...) } - _, err := a.conn.Write(acc.BatchPoints) - return err + return a.flush(bp) } func (a *Agent) crank() error { @@ -164,10 +176,8 @@ func (a *Agent) crank() error { acc.Tags = a.Config.Tags acc.Time = time.Now() - acc.Database = a.Config.Database - _, err := a.conn.Write(acc.BatchPoints) - return err + return a.flush(acc) } func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { @@ -187,9 +197,11 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err acc.Tags = a.Config.Tags acc.Time = time.Now() - acc.Database = a.Config.Database - a.conn.Write(acc.BatchPoints) + err = a.flush(acc) + if err != nil { + return err + } select { case <-shutdown: @@ -200,6 +212,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } +func (a *Agent) flush(bp BatchPoints) error { + var wg sync.WaitGroup + var outerr error + for _, o := range a.outputs { + wg.Add(1) + go func(output *runningOutput) { + defer wg.Done() + outerr = o.output.Write(bp.BatchPoints) + }(o) + } + + wg.Wait() + + return outerr +} + func (a *Agent) TestAllPlugins() error { var names []string @@ -253,13 +281,6 @@ func (a *Agent) Test() error { } func (a *Agent) Run(shutdown chan struct{}) error { - if a.conn == nil { - err := a.Connect() - if err != nil { - return err - } - } - var wg sync.WaitGroup for _, plugin := range a.plugins { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index f36693ebc..ea78ef6de 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/influxdb/telegraf" + _ "github.com/influxdb/telegraf/outputs/all" _ "github.com/influxdb/telegraf/plugins/all" ) @@ -58,6 +59,11 @@ func main() { ag.Debug = true } + outputs, err := ag.LoadOutputs() + if err != nil { + log.Fatal(err) + } + plugins, err := ag.LoadPlugins() if err != nil { log.Fatal(err) @@ -94,6 +100,7 @@ func main() { }() log.Print("InfluxDB Agent running") + log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) if ag.Debug { log.Printf("Debug: enabled") @@ -101,8 +108,7 @@ func main() { ag.Interval, ag.Debug, ag.Hostname) } - if config.URL != "" { - log.Printf("Sending metrics to: %s", config.URL) + if len(outputs) > 0 { log.Printf("Tags enabled: %v", config.ListTags()) } diff --git a/config.go b/config.go index 013b8d281..c82fb0bb0 100644 --- a/config.go +++ b/config.go @@ -29,21 +29,21 @@ func (d *Duration) UnmarshalTOML(b []byte) error { } type Config struct { - URL string - Username string - Password string - Database string - UserAgent string - Tags map[string]string + Tags map[string]string agent *ast.Table plugins map[string]*ast.Table + outputs map[string]*ast.Table } func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } +func (c *Config) Outputs() map[string]*ast.Table { + return c.outputs +} + type ConfiguredPlugin struct { Name string @@ -77,6 +77,14 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool { return true } +func (c *Config) ApplyOutput(name string, v interface{}) error { + if c.outputs[name] != nil { + return toml.UnmarshalTable(c.outputs[name], v) + } + + return nil +} + func (c *Config) ApplyAgent(v interface{}) error { if c.agent != nil { return toml.UnmarshalTable(c.agent, v) @@ -137,15 +145,23 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err } func (c *Config) PluginsDeclared() []string { - var plugins []string + return declared(c.plugins) +} - for name, _ := range c.plugins { - plugins = append(plugins, name) +func (c *Config) OutputsDeclared() []string { + return declared(c.outputs) +} + +func declared(endpoints map[string]*ast.Table) []string { + var names []string + + for name, _ := range endpoints { + names = append(names, name) } - sort.Strings(plugins) + sort.Strings(names) - return plugins + return names } func DefaultConfig() *Config { @@ -167,6 +183,7 @@ func LoadConfig(path string) (*Config, error) { c := &Config{ plugins: make(map[string]*ast.Table), + outputs: make(map[string]*ast.Table), } for name, val := range tbl.Fields { @@ -176,13 +193,16 @@ func LoadConfig(path string) (*Config, error) { } switch name { - case "influxdb": - err := toml.UnmarshalTable(subtbl, c) - if err != nil { - return nil, err - } case "agent": c.agent = subtbl + case "outputs": + for outputName, outputVal := range subtbl.Fields { + outputSubtbl, ok := outputVal.(*ast.Table) + if !ok { + return nil, ErrInvalidConfig + } + c.outputs[outputName] = outputSubtbl + } default: c.plugins[name] = subtbl } diff --git a/outputs/all/all.go b/outputs/all/all.go new file mode 100644 index 000000000..2a8018674 --- /dev/null +++ b/outputs/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdb/telegraf/outputs/influxdb" +) diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go new file mode 100644 index 000000000..4a672ba5e --- /dev/null +++ b/outputs/influxdb/influxdb.go @@ -0,0 +1,53 @@ +package influxdb + +import ( + "net/url" + + "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" +) + +type InfluxDB struct { + URL string + Username string + Password string + Database string + UserAgent string + + conn *client.Client +} + +func (i *InfluxDB) Connect() error { + u, err := url.Parse(i.URL) + if err != nil { + return err + } + + c, err := client.NewClient(client.Config{ + URL: *u, + Username: i.Username, + Password: i.Password, + UserAgent: i.UserAgent, + }) + + if err != nil { + return err + } + + i.conn = c + return nil +} + +func (i *InfluxDB) Write(bp client.BatchPoints) error { + bp.Database = i.Database + if _, err := i.conn.Write(bp); err != nil { + return err + } + return nil +} + +func init() { + outputs.Add("influxdb", func() outputs.Output { + return &InfluxDB{} + }) +} diff --git a/outputs/registry.go b/outputs/registry.go new file mode 100644 index 000000000..ccc40f9b2 --- /dev/null +++ b/outputs/registry.go @@ -0,0 +1,18 @@ +package outputs + +import ( + "github.com/influxdb/influxdb/client" +) + +type Output interface { + Connect() error + Write(client.BatchPoints) error +} + +type Creator func() Output + +var Outputs = map[string]Creator{} + +func Add(name string, creator Creator) { + Outputs[name] = creator +}