From b312e48d318a6abe43b33be6fdde07a8883467c8 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 11 Aug 2015 10:34:00 -0600 Subject: [PATCH] Revert "PR #59, implementation of multiple outputs" This reverts commit 48a075529adaa5b40d8a1c0cfcd3fece0a2ecbbf, reversing changes made to 924700f381c2575e6bc9e86e00dbc7d881d7000c. --- .gitignore | 1 - agent.go | 102 ++++++++++++++--------------------- cmd/telegraf/telegraf.go | 14 +++-- config.go | 79 ++++++++------------------- etc/config.sample.toml | 11 ++-- outputs/all/all.go | 5 -- outputs/influxdb/influxdb.go | 60 --------------------- outputs/registry.go | 18 ------- testdata/influx.toml | 5 +- 9 files changed, 75 insertions(+), 220 deletions(-) delete mode 100644 outputs/all/all.go delete mode 100644 outputs/influxdb/influxdb.go delete mode 100644 outputs/registry.go diff --git a/.gitignore b/.gitignore index a471ffe03..a127b89f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ pkg/ tivan .vagrant -telegraf diff --git a/agent.go b/agent.go index 631524c9f..1d704323c 100644 --- a/agent.go +++ b/agent.go @@ -10,15 +10,10 @@ import ( "sync" "time" - "github.com/influxdb/telegraf/outputs" + "github.com/influxdb/influxdb/client" "github.com/influxdb/telegraf/plugins" ) -type runningOutput struct { - name string - output outputs.Output -} - type runningPlugin struct { name string plugin plugins.Plugin @@ -37,8 +32,9 @@ type Agent struct { Config *Config - outputs []*runningOutput plugins []*runningPlugin + + conn *client.Client } // NewAgent returns an Agent struct based off the given Config @@ -70,36 +66,25 @@ func NewAgent(config *Config) (*Agent, error) { // Connect connects to the agent's config URL func (a *Agent) Connect() error { - for _, o := range a.outputs { - err := o.output.Connect(a.Hostname) - if err != nil { - return err - } - } - return nil -} + config := a.Config -func (a *Agent) LoadOutputs() ([]string, error) { - var names []string - - for _, name := range a.Config.OutputsDeclared() { - creator, ok := outputs.Outputs[name] - if !ok { - return nil, fmt.Errorf("Undefined but requested output: %s", name) - } - - output := creator() - - err := a.Config.ApplyOutput(name, output) - if err != nil { - return nil, err - } - - a.outputs = append(a.outputs, &runningOutput{name, output}) - names = append(names, name) + u, err := url.Parse(config.URL) + if err != nil { + return err + } + + c, err := client.NewClient(client.Config{ + URL: *u, + Username: config.Username, + Password: config.Password, + UserAgent: config.UserAgent, + Timeout: config.Timeout.Duration, + }) + + if err != nil { + return err } -<<<<<<< HEAD _, err = c.Query(client.Query{ Command: fmt.Sprintf("CREATE DATABASE telegraf"), }) @@ -109,11 +94,8 @@ func (a *Agent) LoadOutputs() ([]string, error) { } a.conn = c -======= - sort.Strings(names) ->>>>>>> jipperinbham-outputs-phase1 - return names, nil + return nil } // LoadPlugins loads the agent's plugins @@ -171,14 +153,17 @@ func (a *Agent) crankParallel() error { close(points) - var bp BatchPoints - bp.Time = time.Now() + var acc BatchPoints + acc.Tags = a.Config.Tags + acc.Time = time.Now() + acc.Database = a.Config.Database for sub := range points { - bp.Points = append(bp.Points, sub.Points...) + acc.Points = append(acc.Points, sub.Points...) } - return a.flush(&bp) + _, err := a.conn.Write(acc.BatchPoints) + return err } func (a *Agent) crank() error { @@ -195,9 +180,12 @@ func (a *Agent) crank() error { } } + acc.Tags = a.Config.Tags acc.Time = time.Now() + acc.Database = a.Config.Database - return a.flush(&acc) + _, err := a.conn.Write(acc.BatchPoints) + return err } func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { @@ -219,10 +207,7 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err acc.Time = time.Now() acc.Database = a.Config.Database - err = a.flush(&acc) - if err != nil { - return err - } + a.conn.Write(acc.BatchPoints) select { case <-shutdown: @@ -233,22 +218,6 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } -func (a *Agent) flush(bp *BatchPoints) error { - var wg sync.WaitGroup - var outerr error - for _, o := range a.outputs { - wg.Add(1) - go func(ro *runningOutput) { - defer wg.Done() - outerr = ro.output.Write(bp.BatchPoints) - }(o) - } - - wg.Wait() - - return outerr -} - // TestAllPlugins verifies that we can 'Gather' from all plugins with the // default configuration func (a *Agent) TestAllPlugins() error { @@ -307,6 +276,13 @@ func (a *Agent) Test() error { // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { + if a.conn == nil { + err := a.Connect() + if err != nil { + return err + } + } + var wg sync.WaitGroup for _, plugin := range a.plugins { diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index b207712ae..365dd20d3 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -9,7 +9,6 @@ import ( "strings" "github.com/influxdb/telegraf" - _ "github.com/influxdb/telegraf/outputs/all" _ "github.com/influxdb/telegraf/plugins/all" ) @@ -62,11 +61,6 @@ func main() { ag.Debug = true } - outputs, err := ag.LoadOutputs() - if err != nil { - log.Fatal(err) - } - plugins, err := ag.LoadPlugins() if err != nil { log.Fatal(err) @@ -106,8 +100,7 @@ func main() { close(shutdown) }() - log.Print("Telegraf Agent running") - log.Printf("Loaded outputs: %s", strings.Join(outputs, " ")) + log.Print("InfluxDB Agent running") log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) if ag.Debug { log.Printf("Debug: enabled") @@ -115,6 +108,11 @@ func main() { ag.Interval, ag.Debug, ag.Hostname) } + if config.URL != "" { + log.Printf("Sending metrics to: %s", config.URL) + log.Printf("Tags enabled: %v", config.ListTags()) + } + if *fPidfile != "" { f, err := os.Create(*fPidfile) if err != nil { diff --git a/config.go b/config.go index 4ae2ba36d..48df238ac 100644 --- a/config.go +++ b/config.go @@ -30,28 +30,26 @@ func (d *Duration) UnmarshalTOML(b []byte) error { return nil } -// Config specifies the outputs that telegraf +// Config specifies the URL/user/password for the database that telegraf // will be logging to, as well as all the plugins that the user has // specified type Config struct { + URL string + Username string + Password string + Database string + UserAgent string + Timeout Duration + Tags map[string]string + agent *ast.Table plugins map[string]*ast.Table - outputs map[string]*ast.Table } // Plugins returns the configured plugins as a map of name -> plugin toml func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } -type TagFilter struct { - Name string - Filter []string -} - -// Outputs returns the configured outputs as a map of name -> output toml -func (c *Config) Outputs() map[string]*ast.Table { - return c.outputs -} // The name of a tag, and the values on which to filter type TagFilter struct { @@ -66,9 +64,6 @@ type ConfiguredPlugin struct { Drop []string Pass []string - TagDrop []TagFilter - - TagPass []TagFilter TagDrop []TagFilter TagPass []TagFilter @@ -111,10 +106,6 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin return false } -<<<<<<< HEAD -======= - ->>>>>>> jipperinbham-outputs-phase1 if cp.TagDrop != nil { for _, pat := range cp.TagDrop { if tagval, ok := tags[pat.Name]; ok { @@ -128,18 +119,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin return true } -<<<<<<< HEAD return true -======= - return nil -} - -// ApplyOutput loads the toml config into the given interface -func (c *Config) ApplyOutput(name string, v interface{}) error { - if c.outputs[name] != nil { - return toml.UnmarshalTable(c.outputs[name], v) - } ->>>>>>> jipperinbham-outputs-phase1 } // ApplyAgent loads the toml config into the given interface @@ -245,24 +225,15 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err // PluginsDeclared returns the name of all plugins declared in the config. func (c *Config) PluginsDeclared() []string { - return declared(c.plugins) -} + var plugins []string -// OutputsDeclared returns the name of all outputs declared in the config. -func (c *Config) OutputsDeclared() []string { - return declared(c.outputs) -} - -func declared(endpoints map[string]*ast.Table) []string { - var names []string - - for name, _ := range endpoints { - names = append(names, name) + for name := range c.plugins { + plugins = append(plugins, name) } - sort.Strings(names) + sort.Strings(plugins) - return names + return plugins } // DefaultConfig returns an empty default configuration @@ -286,7 +257,6 @@ func LoadConfig(path string) (*Config, error) { c := &Config{ plugins: make(map[string]*ast.Table), - outputs: make(map[string]*ast.Table), } for name, val := range tbl.Fields { @@ -296,16 +266,13 @@ func LoadConfig(path string) (*Config, error) { } switch name { + case "influxdb": + err := toml.UnmarshalTable(subtbl, c) + if err != nil { + return nil, err + } case "agent": c.agent = subtbl - case "outputs": - for outputName, outputVal := range subtbl.Fields { - outputSubtbl, ok := outputVal.(*ast.Table) - if !ok { - return nil, errInvalidConfig - } - c.outputs[outputName] = outputSubtbl - } default: c.plugins[name] = subtbl } @@ -360,11 +327,8 @@ var header = `# Telegraf configuration # NOTE: The configuration has a few required parameters. They are marked # with 'required'. Be sure to edit those to make this configuration work. -# OUTPUTS -[outputs] - # Configuration for influxdb server to send metrics to -[outputs.influxdb] +[influxdb] # The full HTTP endpoint URL for your InfluxDB instance url = "http://localhost:8086" # required. @@ -381,11 +345,12 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" +# tags = { "dc": "us-east-1" } # Tags can also be specified via a normal map, but only one form at a time: # [influxdb.tags] -# tags = { "dc" = "us-east-1" } +# dc = "us-east-1" # Configuration for telegraf itself # [agent] diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e19eb09c2..38cfeba68 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -22,11 +22,8 @@ # NOTE: The configuration has a few required parameters. They are marked # with 'required'. Be sure to edit those to make this configuration work. -# OUTPUTS -[outputs] - # Configuration for influxdb server to send metrics to -[outputs.influxdb] +[influxdb] # The full HTTP endpoint URL for your InfluxDB instance url = "http://localhost:8086" # required. @@ -38,8 +35,12 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" +# tags = { "dc": "us-east-1" } -# tags = { "dc" = "us-east-1" } +# Tags can also be specified via a normal map, but only one form at a time: + +# [influxdb.tags] +# dc = "us-east-1" # Configuration for telegraf itself # [agent] diff --git a/outputs/all/all.go b/outputs/all/all.go deleted file mode 100644 index 2a8018674..000000000 --- a/outputs/all/all.go +++ /dev/null @@ -1,5 +0,0 @@ -package all - -import ( - _ "github.com/influxdb/telegraf/outputs/influxdb" -) diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go deleted file mode 100644 index 13cd4ea63..000000000 --- a/outputs/influxdb/influxdb.go +++ /dev/null @@ -1,60 +0,0 @@ -package influxdb - -import ( - "net/url" - - "github.com/influxdb/influxdb/client" - "github.com/influxdb/telegraf/outputs" -) - -type InfluxDB struct { - URL string - Username string - Password string - Database string - UserAgent string - Tags map[string]string - - conn *client.Client -} - -func (i *InfluxDB) Connect(host string) error { - u, err := url.Parse(i.URL) - if err != nil { - return err - } - - c, err := client.NewClient(client.Config{ - URL: *u, - Username: i.Username, - Password: i.Password, - UserAgent: i.UserAgent, - }) - - if err != nil { - return err - } - - if i.Tags == nil { - i.Tags = make(map[string]string) - } - i.Tags["host"] = host - - i.conn = c - return nil -} - -func (i *InfluxDB) Write(bp client.BatchPoints) error { - bp.Database = i.Database - bp.Tags = i.Tags - if _, err := i.conn.Write(bp); err != nil { - return err - } - return nil -} - -func init() { - outputs.Add("influxdb", func() outputs.Output { - return &InfluxDB{} - }) -} diff --git a/outputs/registry.go b/outputs/registry.go deleted file mode 100644 index a18ede493..000000000 --- a/outputs/registry.go +++ /dev/null @@ -1,18 +0,0 @@ -package outputs - -import ( - "github.com/influxdb/influxdb/client" -) - -type Output interface { - Connect(string) error - Write(client.BatchPoints) error -} - -type Creator func() Output - -var Outputs = map[string]Creator{} - -func Add(name string, creator Creator) { - Outputs[name] = creator -} diff --git a/testdata/influx.toml b/testdata/influx.toml index fe64bf17c..fc5d3c493 100644 --- a/testdata/influx.toml +++ b/testdata/influx.toml @@ -3,13 +3,12 @@ interval = "5s" http = ":11213" debug = true -[outputs] -[outputs.influxdb] +[influxdb] url = "http://localhost:8086" username = "root" password = "root" database = "telegraf" -tags = { "dc" = "us-phx-1" } +tags = { dc = "us-phx-1" } [redis] address = ":6379"