From 08042089f9d0a8760d786eae904bfc6d2fb2a770 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 11 Aug 2015 14:02:04 -0600 Subject: [PATCH] Followup to issue #77, create configured database name from toml file --- agent.go | 62 +++++++++++++-------------------- config.go | 26 ++++++-------- outputs/all/all.go | 5 +++ outputs/influxdb/influxdb.go | 67 ++++++++++++++++++++++++++++++++++++ outputs/registry.go | 18 ++++++++++ testdata/influx.toml | 4 +-- testdata/telegraf-agent.toml | 10 +++--- 7 files changed, 132 insertions(+), 60 deletions(-) create mode 100644 outputs/all/all.go create mode 100644 outputs/influxdb/influxdb.go create mode 100644 outputs/registry.go diff --git a/agent.go b/agent.go index 6540601ea..f86e27920 100644 --- a/agent.go +++ b/agent.go @@ -3,7 +3,6 @@ package telegraf import ( "fmt" "log" - "net/url" "os" "sort" "strings" @@ -71,7 +70,7 @@ func NewAgent(config *Config) (*Agent, error) { // Connect connects to the agent's config URL func (a *Agent) Connect() error { for _, o := range a.outputs { - err := o.output.Connect(a.Hostname) + err := o.output.Connect() if err != nil { return err } @@ -79,6 +78,7 @@ func (a *Agent) Connect() error { return nil } +// LoadOutputs loads the agent's outputs func (a *Agent) LoadOutputs() ([]string, error) { var names []string @@ -99,15 +99,7 @@ func (a *Agent) LoadOutputs() ([]string, error) { names = append(names, name) } - _, err = c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE telegraf"), - }) - - if err != nil && !strings.Contains(err.Error(), "database already exists") { - log.Fatal(err) - } - - a.conn = c + sort.Strings(names) return names, nil } @@ -128,8 +120,6 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) { return nil, fmt.Errorf("Undefined but requested plugin: %s", name) } - plugin := creator() - isPluginEnabled := false if len(filters) > 0 { for _, runeValue := range filters { @@ -190,60 +180,56 @@ func (a *Agent) crankParallel() error { close(points) - var acc BatchPoints - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + var bp BatchPoints + bp.Time = time.Now() + bp.Tags = a.Config.Tags for sub := range points { - acc.Points = append(acc.Points, sub.Points...) + bp.Points = append(bp.Points, sub.Points...) } - _, err := a.conn.Write(acc.BatchPoints) - return err + return a.flush(&bp) } func (a *Agent) crank() error { - var acc BatchPoints + var bp BatchPoints - acc.Debug = a.Debug + bp.Debug = a.Debug for _, plugin := range a.plugins { - acc.Prefix = plugin.name + "_" - acc.Config = plugin.config - err := plugin.plugin.Gather(&acc) + bp.Prefix = plugin.name + "_" + bp.Config = plugin.config + err := plugin.plugin.Gather(&bp) if err != nil { return err } } - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + bp.Time = time.Now() + bp.Tags = a.Config.Tags - return a.flush(&acc) + return a.flush(&bp) } func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { ticker := time.NewTicker(plugin.config.Interval) for { - var acc BatchPoints + var bp BatchPoints - acc.Debug = a.Debug + bp.Debug = a.Debug - acc.Prefix = plugin.name + "_" - acc.Config = plugin.config - err := plugin.plugin.Gather(&acc) + bp.Prefix = plugin.name + "_" + bp.Config = plugin.config + err := plugin.plugin.Gather(&bp) if err != nil { return err } - acc.Tags = a.Config.Tags - acc.Time = time.Now() - acc.Database = a.Config.Database + bp.Tags = a.Config.Tags + bp.Time = time.Now() - err = a.flush(&acc) + err = a.flush(&bp) if err != nil { return err } diff --git a/config.go b/config.go index b943f5442..db49dfa2f 100644 --- a/config.go +++ b/config.go @@ -34,6 +34,8 @@ func (d *Duration) UnmarshalTOML(b []byte) error { // will be logging to, as well as all the plugins that the user has // specified type Config struct { + Tags map[string]string + agent *ast.Table plugins map[string]*ast.Table outputs map[string]*ast.Table @@ -43,10 +45,6 @@ type Config struct { func (c *Config) Plugins() map[string]*ast.Table { return c.plugins } -type TagFilter struct { - Name string - Filter []string -} // Outputs returns the configured outputs as a map of name -> output toml func (c *Config) Outputs() map[string]*ast.Table { @@ -66,9 +64,6 @@ type ConfiguredPlugin struct { Drop []string Pass []string - TagDrop []TagFilter - - TagPass []TagFilter TagDrop []TagFilter TagPass []TagFilter @@ -131,7 +126,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin func (c *Config) ApplyOutput(name string, v interface{}) error { if c.outputs[name] != nil { return toml.UnmarshalTable(c.outputs[name], v) - } + } + return nil } // ApplyAgent loads the toml config into the given interface @@ -246,15 +242,15 @@ func (c *Config) OutputsDeclared() []string { } func declared(endpoints map[string]*ast.Table) []string { - var plugins []string + var names []string - for name := range c.plugins { - plugins = append(plugins, name) + for name := range endpoints { + names = append(names, name) } - sort.Strings(plugins) + sort.Strings(names) - return plugins + return names } // DefaultConfig returns an empty default configuration @@ -376,8 +372,8 @@ database = "telegraf" # required. # Tags can also be specified via a normal map, but only one form at a time: -# [influxdb.tags] -# tags = { "dc" = "us-east-1" } +# [tags] +# dc = "us-east-1" } # Configuration for telegraf itself # [agent] diff --git a/outputs/all/all.go b/outputs/all/all.go new file mode 100644 index 000000000..2a8018674 --- /dev/null +++ b/outputs/all/all.go @@ -0,0 +1,5 @@ +package all + +import ( + _ "github.com/influxdb/telegraf/outputs/influxdb" +) diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go new file mode 100644 index 000000000..1153f064b --- /dev/null +++ b/outputs/influxdb/influxdb.go @@ -0,0 +1,67 @@ +package influxdb + +import ( + "fmt" + "log" + "net/url" + "strings" + + "github.com/influxdb/influxdb/client" + t "github.com/influxdb/telegraf" + "github.com/influxdb/telegraf/outputs" +) + +type InfluxDB struct { + URL string + Username string + Password string + Database string + UserAgent string + Timeout t.Duration + + conn *client.Client +} + +func (i *InfluxDB) Connect() error { + u, err := url.Parse(i.URL) + if err != nil { + return err + } + + c, err := client.NewClient(client.Config{ + URL: *u, + Username: i.Username, + Password: i.Password, + UserAgent: i.UserAgent, + Timeout: i.Timeout.Duration, + }) + + if err != nil { + return err + } + + _, err = c.Query(client.Query{ + Command: fmt.Sprintf("CREATE DATABASE telegraf"), + }) + + if err != nil && !strings.Contains(err.Error(), "database already exists") { + log.Fatal(err) + } + + i.conn = c + return nil +} + +func (i *InfluxDB) Write(bp client.BatchPoints) error { + bp.Database = i.Database + if _, err := i.conn.Write(bp); err != nil { + return err + } + return nil +} + +func init() { + outputs.Add("influxdb", func() outputs.Output { + return &InfluxDB{} + }) +} diff --git a/outputs/registry.go b/outputs/registry.go new file mode 100644 index 000000000..ccc40f9b2 --- /dev/null +++ b/outputs/registry.go @@ -0,0 +1,18 @@ +package outputs + +import ( + "github.com/influxdb/influxdb/client" +) + +type Output interface { + Connect() error + Write(client.BatchPoints) error +} + +type Creator func() Output + +var Outputs = map[string]Creator{} + +func Add(name string, creator Creator) { + Outputs[name] = creator +} diff --git a/testdata/influx.toml b/testdata/influx.toml index 80d03fa70..492528cae 100644 --- a/testdata/influx.toml +++ b/testdata/influx.toml @@ -10,8 +10,8 @@ username = "root" password = "root" database = "telegraf" -[tags.influxdb] -tags = { "dc" = "us-phx-1" } +[tags] +dc = "us-phx-1" } [redis] address = ":6379" diff --git a/testdata/telegraf-agent.toml b/testdata/telegraf-agent.toml index b58995594..fd0221a47 100644 --- a/testdata/telegraf-agent.toml +++ b/testdata/telegraf-agent.toml @@ -23,7 +23,8 @@ # with 'required'. Be sure to edit those to make this configuration work. # Configuration for influxdb server to send metrics to -[influxdb] +[outputs] +[outputs.influxdb] # The full HTTP endpoint URL for your InfluxDB instance url = "http://localhost:8086" # required. @@ -40,11 +41,10 @@ database = "telegraf" # required. # Set the user agent for the POSTs (can be useful for log differentiation) # user_agent = "telegraf" -# tags = { "dc": "us-east-1" } # Tags can also be specified via a normal map, but only one form at a time: -# [influxdb.tags] +# [tags] # dc = "us-east-1" # Configuration for telegraf itself @@ -204,11 +204,11 @@ urls = ["localhost/status"] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # or a simple string: # host=localhost user=pqotest password=... sslmode=... -# +# # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. -# +# address = "sslmode=disable"