Followup to issue #77, create configured database name from toml file

This commit is contained in:
Cameron Sparr 2015-08-11 14:02:04 -06:00
parent 53969ae054
commit 08042089f9
7 changed files with 132 additions and 60 deletions

View File

@ -3,7 +3,6 @@ package telegraf
import ( import (
"fmt" "fmt"
"log" "log"
"net/url"
"os" "os"
"sort" "sort"
"strings" "strings"
@ -71,7 +70,7 @@ func NewAgent(config *Config) (*Agent, error) {
// Connect connects to the agent's config URL // Connect connects to the agent's config URL
func (a *Agent) Connect() error { func (a *Agent) Connect() error {
for _, o := range a.outputs { for _, o := range a.outputs {
err := o.output.Connect(a.Hostname) err := o.output.Connect()
if err != nil { if err != nil {
return err return err
} }
@ -79,6 +78,7 @@ func (a *Agent) Connect() error {
return nil return nil
} }
// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) { func (a *Agent) LoadOutputs() ([]string, error) {
var names []string var names []string
@ -99,15 +99,7 @@ func (a *Agent) LoadOutputs() ([]string, error) {
names = append(names, name) names = append(names, name)
} }
_, err = c.Query(client.Query{ sort.Strings(names)
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}
a.conn = c
return names, nil return names, nil
} }
@ -128,8 +120,6 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
return nil, fmt.Errorf("Undefined but requested plugin: %s", name) return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
} }
plugin := creator()
isPluginEnabled := false isPluginEnabled := false
if len(filters) > 0 { if len(filters) > 0 {
for _, runeValue := range filters { for _, runeValue := range filters {
@ -190,60 +180,56 @@ func (a *Agent) crankParallel() error {
close(points) close(points)
var acc BatchPoints var bp BatchPoints
acc.Tags = a.Config.Tags bp.Time = time.Now()
acc.Time = time.Now() bp.Tags = a.Config.Tags
acc.Database = a.Config.Database
for sub := range points { for sub := range points {
acc.Points = append(acc.Points, sub.Points...) bp.Points = append(bp.Points, sub.Points...)
} }
_, err := a.conn.Write(acc.BatchPoints) return a.flush(&bp)
return err
} }
func (a *Agent) crank() error { func (a *Agent) crank() error {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
} }
acc.Tags = a.Config.Tags bp.Time = time.Now()
acc.Time = time.Now() bp.Tags = a.Config.Tags
acc.Database = a.Config.Database
return a.flush(&acc) return a.flush(&bp)
} }
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval) ticker := time.NewTicker(plugin.config.Interval)
for { for {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
acc.Tags = a.Config.Tags bp.Tags = a.Config.Tags
acc.Time = time.Now() bp.Time = time.Now()
acc.Database = a.Config.Database
err = a.flush(&acc) err = a.flush(&bp)
if err != nil { if err != nil {
return err return err
} }

View File

@ -34,6 +34,8 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
// will be logging to, as well as all the plugins that the user has // will be logging to, as well as all the plugins that the user has
// specified // specified
type Config struct { type Config struct {
Tags map[string]string
agent *ast.Table agent *ast.Table
plugins map[string]*ast.Table plugins map[string]*ast.Table
outputs map[string]*ast.Table outputs map[string]*ast.Table
@ -43,10 +45,6 @@ type Config struct {
func (c *Config) Plugins() map[string]*ast.Table { func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins return c.plugins
} }
type TagFilter struct {
Name string
Filter []string
}
// Outputs returns the configured outputs as a map of name -> output toml // Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table { func (c *Config) Outputs() map[string]*ast.Table {
@ -66,9 +64,6 @@ type ConfiguredPlugin struct {
Drop []string Drop []string
Pass []string Pass []string
TagDrop []TagFilter
TagPass []TagFilter
TagDrop []TagFilter TagDrop []TagFilter
TagPass []TagFilter TagPass []TagFilter
@ -131,7 +126,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
func (c *Config) ApplyOutput(name string, v interface{}) error { func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil { if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v) return toml.UnmarshalTable(c.outputs[name], v)
} }
return nil
} }
// ApplyAgent loads the toml config into the given interface // ApplyAgent loads the toml config into the given interface
@ -246,15 +242,15 @@ func (c *Config) OutputsDeclared() []string {
} }
func declared(endpoints map[string]*ast.Table) []string { func declared(endpoints map[string]*ast.Table) []string {
var plugins []string var names []string
for name := range c.plugins { for name := range endpoints {
plugins = append(plugins, name) names = append(names, name)
} }
sort.Strings(plugins) sort.Strings(names)
return plugins return names
} }
// DefaultConfig returns an empty default configuration // DefaultConfig returns an empty default configuration
@ -376,8 +372,8 @@ database = "telegraf" # required.
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags] # [tags]
# tags = { "dc" = "us-east-1" } # dc = "us-east-1" }
# Configuration for telegraf itself # Configuration for telegraf itself
# [agent] # [agent]

5
outputs/all/all.go Normal file
View File

@ -0,0 +1,5 @@
package all
import (
_ "github.com/influxdb/telegraf/outputs/influxdb"
)

View File

@ -0,0 +1,67 @@
package influxdb
import (
"fmt"
"log"
"net/url"
"strings"
"github.com/influxdb/influxdb/client"
t "github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/outputs"
)
type InfluxDB struct {
URL string
Username string
Password string
Database string
UserAgent string
Timeout t.Duration
conn *client.Client
}
func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL)
if err != nil {
return err
}
c, err := client.NewClient(client.Config{
URL: *u,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})
if err != nil {
return err
}
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}
i.conn = c
return nil
}
func (i *InfluxDB) Write(bp client.BatchPoints) error {
bp.Database = i.Database
if _, err := i.conn.Write(bp); err != nil {
return err
}
return nil
}
func init() {
outputs.Add("influxdb", func() outputs.Output {
return &InfluxDB{}
})
}

18
outputs/registry.go Normal file
View File

@ -0,0 +1,18 @@
package outputs
import (
"github.com/influxdb/influxdb/client"
)
type Output interface {
Connect() error
Write(client.BatchPoints) error
}
type Creator func() Output
var Outputs = map[string]Creator{}
func Add(name string, creator Creator) {
Outputs[name] = creator
}

View File

@ -10,8 +10,8 @@ username = "root"
password = "root" password = "root"
database = "telegraf" database = "telegraf"
[tags.influxdb] [tags]
tags = { "dc" = "us-phx-1" } dc = "us-phx-1" }
[redis] [redis]
address = ":6379" address = ":6379"

View File

@ -23,7 +23,8 @@
# with 'required'. Be sure to edit those to make this configuration work. # with 'required'. Be sure to edit those to make this configuration work.
# Configuration for influxdb server to send metrics to # Configuration for influxdb server to send metrics to
[influxdb] [outputs]
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance # The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required. url = "http://localhost:8086" # required.
@ -40,11 +41,10 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# tags = { "dc": "us-east-1" }
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags] # [tags]
# dc = "us-east-1" # dc = "us-east-1"
# Configuration for telegraf itself # Configuration for telegraf itself
@ -204,11 +204,11 @@ urls = ["localhost/status"]
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string: # or a simple string:
# host=localhost user=pqotest password=... sslmode=... # host=localhost user=pqotest password=... sslmode=...
# #
# All connection parameters are optional. By default, the host is localhost # All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default # and the user is the currently running user. For localhost, we default
# to sslmode=disable as well. # to sslmode=disable as well.
# #
address = "sslmode=disable" address = "sslmode=disable"