Merge pull request #109 from influxdb/pr-107

Merge of PR #107, Allow Telegraf to output data to multiple locations beyond InfluxDB, such as Riemann or Kafka
This commit is contained in:
Cameron Sparr 2015-08-12 11:21:35 -06:00
commit ed13924c5a
11 changed files with 247 additions and 101 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
pkg/ pkg/
tivan tivan
.vagrant .vagrant
telegraf

View File

@ -14,6 +14,7 @@
- [#98](https://github.com/influxdb/telegraf/pull/98): LeoFS plugin. Thanks @mocchira! - [#98](https://github.com/influxdb/telegraf/pull/98): LeoFS plugin. Thanks @mocchira!
- [#103](https://github.com/influxdb/telegraf/pull/103): Filter by metric tags. Thanks @srfraser! - [#103](https://github.com/influxdb/telegraf/pull/103): Filter by metric tags. Thanks @srfraser!
- [#106](https://github.com/influxdb/telegraf/pull/106): Options to filter plugins on startup. Thanks @zepouet! - [#106](https://github.com/influxdb/telegraf/pull/106): Options to filter plugins on startup. Thanks @zepouet!
- [#107](https://github.com/influxdb/telegraf/pull/107): Multiple outputs beyong influxdb. Thanks @jipperinbham!
### Bugfixes ### Bugfixes
- [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users - [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users

154
agent.go
View File

@ -3,17 +3,21 @@ package telegraf
import ( import (
"fmt" "fmt"
"log" "log"
"net/url"
"os" "os"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/influxdb/influxdb/client" "github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
type runningOutput struct {
name string
output outputs.Output
}
type runningPlugin struct { type runningPlugin struct {
name string name string
plugin plugins.Plugin plugin plugins.Plugin
@ -32,9 +36,8 @@ type Agent struct {
Config *Config Config *Config
outputs []*runningOutput
plugins []*runningPlugin plugins []*runningPlugin
conn *client.Client
} }
// NewAgent returns an Agent struct based off the given Config // NewAgent returns an Agent struct based off the given Config
@ -64,40 +67,52 @@ func NewAgent(config *Config) (*Agent, error) {
return agent, nil return agent, nil
} }
// Connect connects to the agent's config URL // Connect connects to all configured outputs
func (a *Agent) Connect() error { func (a *Agent) Connect() error {
config := a.Config for _, o := range a.outputs {
err := o.output.Connect()
u, err := url.Parse(config.URL) if err != nil {
if err != nil { return err
return err }
} }
c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
Timeout: config.Timeout.Duration,
})
if err != nil {
return err
}
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE %s", config.Database),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}
a.conn = c
return nil return nil
} }
// Close closes the connection to all configured outputs
func (a *Agent) Close() error {
var err error
for _, o := range a.outputs {
err = o.output.Close()
}
return err
}
// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) {
var names []string
for _, name := range a.Config.OutputsDeclared() {
creator, ok := outputs.Outputs[name]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}
output := creator()
err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}
a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
}
sort.Strings(names)
return names, nil
}
// LoadPlugins loads the agent's plugins // LoadPlugins loads the agent's plugins
func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) { func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
var names []string var names []string
@ -174,61 +189,59 @@ func (a *Agent) crankParallel() error {
close(points) close(points)
var acc BatchPoints var bp BatchPoints
acc.Tags = a.Config.Tags bp.Time = time.Now()
acc.Time = time.Now() bp.Tags = a.Config.Tags
acc.Database = a.Config.Database
for sub := range points { for sub := range points {
acc.Points = append(acc.Points, sub.Points...) bp.Points = append(bp.Points, sub.Points...)
} }
_, err := a.conn.Write(acc.BatchPoints) return a.flush(&bp)
return err
} }
func (a *Agent) crank() error { func (a *Agent) crank() error {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
} }
acc.Tags = a.Config.Tags bp.Time = time.Now()
acc.Time = time.Now() bp.Tags = a.Config.Tags
acc.Database = a.Config.Database
_, err := a.conn.Write(acc.BatchPoints) return a.flush(&bp)
return err
} }
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval) ticker := time.NewTicker(plugin.config.Interval)
for { for {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
acc.Tags = a.Config.Tags bp.Tags = a.Config.Tags
acc.Time = time.Now() bp.Time = time.Now()
acc.Database = a.Config.Database
a.conn.Write(acc.BatchPoints) err = a.flush(&bp)
if err != nil {
return err
}
select { select {
case <-shutdown: case <-shutdown:
@ -239,6 +252,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
} }
} }
func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
}(o)
}
wg.Wait()
return outerr
}
// TestAllPlugins verifies that we can 'Gather' from all plugins with the // TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration // default configuration
func (a *Agent) TestAllPlugins() error { func (a *Agent) TestAllPlugins() error {
@ -297,13 +326,6 @@ func (a *Agent) Test() error {
// Run runs the agent daemon, gathering every Interval // Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error { func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()
if err != nil {
return err
}
}
var wg sync.WaitGroup var wg sync.WaitGroup
for _, plugin := range a.plugins { for _, plugin := range a.plugins {

View File

@ -59,6 +59,11 @@ func main() {
ag.Debug = true ag.Debug = true
} }
outputs, err := ag.LoadOutputs()
if err != nil {
log.Fatal(err)
}
plugins, err := ag.LoadPlugins(*fPLuginsFilter) plugins, err := ag.LoadPlugins(*fPLuginsFilter)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -99,6 +104,7 @@ func main() {
}() }()
log.Printf("Starting Telegraf (version %s)\n", Version) log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " ")) log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug { if ag.Debug {
log.Printf("Debug: enabled") log.Printf("Debug: enabled")
@ -106,11 +112,6 @@ func main() {
ag.Interval, ag.Debug, ag.Hostname) ag.Interval, ag.Debug, ag.Hostname)
} }
if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}
if *fPidfile != "" { if *fPidfile != "" {
f, err := os.Create(*fPidfile) f, err := os.Create(*fPidfile)
if err != nil { if err != nil {

View File

@ -34,16 +34,11 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
// will be logging to, as well as all the plugins that the user has // will be logging to, as well as all the plugins that the user has
// specified // specified
type Config struct { type Config struct {
URL string Tags map[string]string
Username string
Password string
Database string
UserAgent string
Timeout Duration
Tags map[string]string
agent *ast.Table agent *ast.Table
plugins map[string]*ast.Table plugins map[string]*ast.Table
outputs map[string]*ast.Table
} }
// Plugins returns the configured plugins as a map of name -> plugin toml // Plugins returns the configured plugins as a map of name -> plugin toml
@ -51,6 +46,11 @@ func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins return c.plugins
} }
// Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table {
return c.outputs
}
// The name of a tag, and the values on which to filter // The name of a tag, and the values on which to filter
type TagFilter struct { type TagFilter struct {
Name string Name string
@ -122,6 +122,14 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return true return true
} }
// ApplyOutput loads the toml config into the given interface
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v)
}
return nil
}
// ApplyAgent loads the toml config into the given interface // ApplyAgent loads the toml config into the given interface
func (c *Config) ApplyAgent(v interface{}) error { func (c *Config) ApplyAgent(v interface{}) error {
if c.agent != nil { if c.agent != nil {
@ -225,15 +233,24 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err
// PluginsDeclared returns the name of all plugins declared in the config. // PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string { func (c *Config) PluginsDeclared() []string {
var plugins []string return declared(c.plugins)
}
for name := range c.plugins { // OutputsDeclared returns the name of all outputs declared in the config.
plugins = append(plugins, name) func (c *Config) OutputsDeclared() []string {
return declared(c.outputs)
}
func declared(endpoints map[string]*ast.Table) []string {
var names []string
for name := range endpoints {
names = append(names, name)
} }
sort.Strings(plugins) sort.Strings(names)
return plugins return names
} }
// DefaultConfig returns an empty default configuration // DefaultConfig returns an empty default configuration
@ -257,6 +274,7 @@ func LoadConfig(path string) (*Config, error) {
c := &Config{ c := &Config{
plugins: make(map[string]*ast.Table), plugins: make(map[string]*ast.Table),
outputs: make(map[string]*ast.Table),
} }
for name, val := range tbl.Fields { for name, val := range tbl.Fields {
@ -266,13 +284,16 @@ func LoadConfig(path string) (*Config, error) {
} }
switch name { switch name {
case "influxdb":
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
case "agent": case "agent":
c.agent = subtbl c.agent = subtbl
case "outputs":
for outputName, outputVal := range subtbl.Fields {
outputSubtbl, ok := outputVal.(*ast.Table)
if !ok {
return nil, errInvalidConfig
}
c.outputs[outputName] = outputSubtbl
}
default: default:
c.plugins[name] = subtbl c.plugins[name] = subtbl
} }
@ -327,8 +348,11 @@ var header = `# Telegraf configuration
# NOTE: The configuration has a few required parameters. They are marked # NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work. # with 'required'. Be sure to edit those to make this configuration work.
# OUTPUTS
[outputs]
# Configuration for influxdb server to send metrics to # Configuration for influxdb server to send metrics to
[influxdb] [outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance # The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required. url = "http://localhost:8086" # required.
@ -345,12 +369,11 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# tags = { "dc": "us-east-1" }
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags] # [tags]
# dc = "us-east-1" # dc = "us-east-1" }
# Configuration for telegraf itself # Configuration for telegraf itself
# [agent] # [agent]

View File

@ -35,12 +35,11 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# tags = { "dc": "us-east-1" }
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags] # [influxdb.tags]
# dc = "us-east-1" # tags = { "dc" = "us-east-1" }
# Configuration for telegraf itself # Configuration for telegraf itself
# [agent] # [agent]

5
outputs/all/all.go Normal file
View File

@ -0,0 +1,5 @@
package all
import (
_ "github.com/influxdb/telegraf/outputs/influxdb"
)

View File

@ -0,0 +1,72 @@
package influxdb
import (
"fmt"
"log"
"net/url"
"strings"
"github.com/influxdb/influxdb/client"
t "github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/outputs"
)
type InfluxDB struct {
URL string
Username string
Password string
Database string
UserAgent string
Timeout t.Duration
conn *client.Client
}
func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL)
if err != nil {
return err
}
c, err := client.NewClient(client.Config{
URL: *u,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})
if err != nil {
return err
}
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}
i.conn = c
return nil
}
func (i *InfluxDB) Close() error {
// InfluxDB client does not provide a Close() function
return nil
}
func (i *InfluxDB) Write(bp client.BatchPoints) error {
bp.Database = i.Database
if _, err := i.conn.Write(bp); err != nil {
return err
}
return nil
}
func init() {
outputs.Add("influxdb", func() outputs.Output {
return &InfluxDB{}
})
}

19
outputs/registry.go Normal file
View File

@ -0,0 +1,19 @@
package outputs
import (
"github.com/influxdb/influxdb/client"
)
type Output interface {
Connect() error
Close() error
Write(client.BatchPoints) error
}
type Creator func() Output
var Outputs = map[string]Creator{}
func Add(name string, creator Creator) {
Outputs[name] = creator
}

View File

@ -3,12 +3,15 @@ interval = "5s"
http = ":11213" http = ":11213"
debug = true debug = true
[influxdb] [outputs]
[outputs.influxdb]
url = "http://localhost:8086" url = "http://localhost:8086"
username = "root" username = "root"
password = "root" password = "root"
database = "telegraf" database = "telegraf"
tags = { dc = "us-phx-1" }
[tags]
dc = "us-phx-1" }
[redis] [redis]
address = ":6379" address = ":6379"

View File

@ -23,7 +23,8 @@
# with 'required'. Be sure to edit those to make this configuration work. # with 'required'. Be sure to edit those to make this configuration work.
# Configuration for influxdb server to send metrics to # Configuration for influxdb server to send metrics to
[influxdb] [outputs]
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance # The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required. url = "http://localhost:8086" # required.
@ -40,11 +41,10 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# tags = { "dc": "us-east-1" }
# Tags can also be specified via a normal map, but only one form at a time: # Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags] # [tags]
# dc = "us-east-1" # dc = "us-east-1"
# Configuration for telegraf itself # Configuration for telegraf itself
@ -204,11 +204,11 @@ urls = ["localhost/status"]
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string: # or a simple string:
# host=localhost user=pqotest password=... sslmode=... # host=localhost user=pqotest password=... sslmode=...
# #
# All connection parameters are optional. By default, the host is localhost # All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default # and the user is the currently running user. For localhost, we default
# to sslmode=disable as well. # to sslmode=disable as well.
# #
address = "sslmode=disable" address = "sslmode=disable"