resolve merge conflicts

This commit is contained in:
JP 2015-08-11 20:41:45 -05:00
commit c5c701d178
11 changed files with 155 additions and 78 deletions

View File

@ -13,6 +13,7 @@
- [#92](https://github.com/influxdb/telegraf/pull/92): Exec plugin. Thanks @alvaromorales! - [#92](https://github.com/influxdb/telegraf/pull/92): Exec plugin. Thanks @alvaromorales!
- [#98](https://github.com/influxdb/telegraf/pull/98): LeoFS plugin. Thanks @mocchira! - [#98](https://github.com/influxdb/telegraf/pull/98): LeoFS plugin. Thanks @mocchira!
- [#103](https://github.com/influxdb/telegraf/pull/103): Filter by metric tags. Thanks @srfraser! - [#103](https://github.com/influxdb/telegraf/pull/103): Filter by metric tags. Thanks @srfraser!
- [#106](https://github.com/influxdb/telegraf/pull/106): Options to filter plugins on startup. Thanks @zepouet!
### Bugfixes ### Bugfixes
- [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users - [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users

View File

@ -36,7 +36,20 @@ brew install telegraf
* Edit the configuration to match your needs * Edit the configuration to match your needs
* Run `telegraf -config telegraf.toml -test` to output one full measurement sample to STDOUT * Run `telegraf -config telegraf.toml -test` to output one full measurement sample to STDOUT
* Run `telegraf -config telegraf.toml` to gather and send metrics to InfluxDB * Run `telegraf -config telegraf.toml` to gather and send metrics to InfluxDB
* Run `telegraf -config telegraf.toml -filter system:swap` to enable only two plugins described into config file
### Telegraf Usage
```telegraf --help```
* -config="": configuration file to load
* -debug=false: show metrics as they're generated to stdout
* -filter="": filter the plugins to enable, separator is :
* -httptest.serve="": if non-empty, httptest.NewServer serves on this address and blocks
* -pidfile="": file to write our pid to
* -sample-config=false: print out full sample configuration
* -test=false: gather metrics, print them out, and exit
* -version=false: display the version
## Telegraf Options ## Telegraf Options

View File

@ -71,7 +71,7 @@ func NewAgent(config *Config) (*Agent, error) {
// Connect connects to the agent's config URL // Connect connects to the agent's config URL
func (a *Agent) Connect() error { func (a *Agent) Connect() error {
for _, o := range a.outputs { for _, o := range a.outputs {
err := o.output.Connect(a.Hostname) err := o.output.Connect()
if err != nil { if err != nil {
return err return err
} }
@ -99,26 +99,20 @@ func (a *Agent) LoadOutputs() ([]string, error) {
names = append(names, name) names = append(names, name)
} }
<<<<<<< HEAD
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}
a.conn = c
=======
sort.Strings(names) sort.Strings(names)
>>>>>>> jipperinbham-outputs-phase1
return names, nil return names, nil
} }
// LoadPlugins loads the agent's plugins // LoadPlugins loads the agent's plugins
func (a *Agent) LoadPlugins() ([]string, error) { func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
var names []string var names []string
var filters []string
pluginsFilter = strings.TrimSpace(pluginsFilter)
if pluginsFilter != "" {
filters = strings.Split(":"+pluginsFilter+":", ":")
}
for _, name := range a.Config.PluginsDeclared() { for _, name := range a.Config.PluginsDeclared() {
creator, ok := plugins.Plugins[name] creator, ok := plugins.Plugins[name]
@ -126,8 +120,22 @@ func (a *Agent) LoadPlugins() ([]string, error) {
return nil, fmt.Errorf("Undefined but requested plugin: %s", name) return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
} }
plugin := creator() isPluginEnabled := false
if len(filters) > 0 {
for _, runeValue := range filters {
if runeValue != "" && strings.ToLower(runeValue) == strings.ToLower(name) {
fmt.Printf("plugin [%s] is enabled (filter options)\n", name)
isPluginEnabled = true
break
}
}
} else {
// if no filter, we ALWAYS accept the plugin
isPluginEnabled = true
}
if isPluginEnabled {
plugin := creator()
config, err := a.Config.ApplyPlugin(name, plugin) config, err := a.Config.ApplyPlugin(name, plugin)
if err != nil { if err != nil {
return nil, err return nil, err
@ -136,6 +144,7 @@ func (a *Agent) LoadPlugins() ([]string, error) {
a.plugins = append(a.plugins, &runningPlugin{name, plugin, config}) a.plugins = append(a.plugins, &runningPlugin{name, plugin, config})
names = append(names, name) names = append(names, name)
} }
}
sort.Strings(names) sort.Strings(names)
@ -173,53 +182,54 @@ func (a *Agent) crankParallel() error {
var bp BatchPoints var bp BatchPoints
bp.Time = time.Now() bp.Time = time.Now()
bp.Tags = a.Config.Tags
for sub := range points { for sub := range points {
bp.Points = append(bp.Points, sub.Points...) bp.Points = append(bp.Points, sub.Points...)
} }
return a.flush(&bp) return a.flush(bp)
} }
func (a *Agent) crank() error { func (a *Agent) crank() error {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
for _, plugin := range a.plugins { for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
} }
acc.Time = time.Now() bp.Time = time.Now()
bp.Tags = a.Config.Tags
return a.flush(&acc) return a.flush(bp)
} }
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error { func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval) ticker := time.NewTicker(plugin.config.Interval)
for { for {
var acc BatchPoints var bp BatchPoints
acc.Debug = a.Debug bp.Debug = a.Debug
acc.Prefix = plugin.name + "_" bp.Prefix = plugin.name + "_"
acc.Config = plugin.config bp.Config = plugin.config
err := plugin.plugin.Gather(&acc) err := plugin.plugin.Gather(&bp)
if err != nil { if err != nil {
return err return err
} }
acc.Tags = a.Config.Tags bp.Tags = a.Config.Tags
acc.Time = time.Now() bp.Time = time.Now()
acc.Database = a.Config.Database
err = a.flush(&acc) err = a.flush(acc)
if err != nil { if err != nil {
return err return err
} }
@ -233,14 +243,14 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
} }
} }
func (a *Agent) flush(bp *BatchPoints) error { func (a *Agent) flush(bp BatchPoints) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var outerr error var outerr error
for _, o := range a.outputs { for _, o := range a.outputs {
wg.Add(1) wg.Add(1)
go func(ro *runningOutput) { go func(output *runningOutput) {
defer wg.Done() defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints) outerr = o.output.Write(bp.BatchPoints)
}(o) }(o)
} }

View File

@ -1,5 +1,47 @@
package telegraf package telegraf
import (
"github.com/stretchr/testify/assert"
"testing"
// needing to load the plugins
_ "github.com/influxdb/telegraf/plugins/all"
)
func TestAgent_LoadPlugin(t *testing.T) {
// load a dedicated configuration file
config, _ := LoadConfig("./testdata/telegraf-agent.toml")
a, _ := NewAgent(config)
pluginsEnabled, _ := a.LoadPlugins("mysql")
assert.Equal(t, 1, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("foo")
assert.Equal(t, 0, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("mysql:foo")
assert.Equal(t, 1, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("mysql:redis")
assert.Equal(t, 2, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins(":mysql:foo:redis:bar")
assert.Equal(t, 2, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("")
assert.Equal(t, 24, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins(" ")
assert.Equal(t, 24, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins(" ")
assert.Equal(t, 24, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins("\n\t")
assert.Equal(t, 24, len(pluginsEnabled))
}
/* /*
func TestAgent_DrivesMetrics(t *testing.T) { func TestAgent_DrivesMetrics(t *testing.T) {
var ( var (

View File

@ -9,7 +9,6 @@ import (
"strings" "strings"
"github.com/influxdb/telegraf" "github.com/influxdb/telegraf"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all" _ "github.com/influxdb/telegraf/plugins/all"
) )
@ -19,6 +18,7 @@ var fConfig = flag.String("config", "", "configuration file to load")
var fVersion = flag.Bool("version", false, "display the version") var fVersion = flag.Bool("version", false, "display the version")
var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration") var fSampleConfig = flag.Bool("sample-config", false, "print out full sample configuration")
var fPidfile = flag.String("pidfile", "", "file to write our pid to") var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fPLuginsFilter = flag.String("filter", "", "filter the plugins to enable, separator is :")
// Telegraf version // Telegraf version
var Version = "unreleased" var Version = "unreleased"
@ -62,14 +62,13 @@ func main() {
ag.Debug = true ag.Debug = true
} }
outputs, err := ag.LoadOutputs() plugins, err := ag.LoadPlugins(*fPLuginsFilter)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
if len(plugins) == 0 {
plugins, err := ag.LoadPlugins() log.Printf("Error: no plugins found, did you provide a config file?")
if err != nil { os.Exit(1)
log.Fatal(err)
} }
if len(plugins) == 0 { if len(plugins) == 0 {
log.Printf("Error: no plugins found, did you provide a config file?") log.Printf("Error: no plugins found, did you provide a config file?")

View File

@ -30,10 +30,12 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil return nil
} }
// Config specifies the outputs that telegraf // Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has // will be logging to, as well as all the plugins that the user has
// specified // specified
type Config struct { type Config struct {
Tags map[string]string
agent *ast.Table agent *ast.Table
plugins map[string]*ast.Table plugins map[string]*ast.Table
outputs map[string]*ast.Table outputs map[string]*ast.Table
@ -43,17 +45,12 @@ type Config struct {
func (c *Config) Plugins() map[string]*ast.Table { func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins return c.plugins
} }
type TagFilter struct {
Name string
Filter []string
}
// Outputs returns the configured outputs as a map of name -> output toml // Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table { func (c *Config) Outputs() map[string]*ast.Table {
return c.outputs return c.outputs
} }
// The name of a tag, and the values on which to filter
type TagFilter struct { type TagFilter struct {
Name string Name string
Filter []string Filter []string
@ -66,9 +63,6 @@ type ConfiguredPlugin struct {
Drop []string Drop []string
Pass []string Pass []string
TagDrop []TagFilter
TagPass []TagFilter
TagDrop []TagFilter TagDrop []TagFilter
TagPass []TagFilter TagPass []TagFilter
@ -111,10 +105,6 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return false return false
} }
<<<<<<< HEAD
=======
>>>>>>> jipperinbham-outputs-phase1
if cp.TagDrop != nil { if cp.TagDrop != nil {
for _, pat := range cp.TagDrop { for _, pat := range cp.TagDrop {
if tagval, ok := tags[pat.Name]; ok { if tagval, ok := tags[pat.Name]; ok {
@ -128,10 +118,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return true return true
} }
<<<<<<< HEAD
return true return true
=======
return nil
} }
// ApplyOutput loads the toml config into the given interface // ApplyOutput loads the toml config into the given interface
@ -139,7 +126,7 @@ func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil { if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v) return toml.UnmarshalTable(c.outputs[name], v)
} }
>>>>>>> jipperinbham-outputs-phase1 return nil
} }
// ApplyAgent loads the toml config into the given interface // ApplyAgent loads the toml config into the given interface
@ -285,6 +272,7 @@ func LoadConfig(path string) (*Config, error) {
} }
c := &Config{ c := &Config{
Tags: make(map[string]string),
plugins: make(map[string]*ast.Table), plugins: make(map[string]*ast.Table),
outputs: make(map[string]*ast.Table), outputs: make(map[string]*ast.Table),
} }
@ -298,6 +286,10 @@ func LoadConfig(path string) (*Config, error) {
switch name { switch name {
case "agent": case "agent":
c.agent = subtbl c.agent = subtbl
case "tags":
if err := toml.UnmarshalTable(subtbl, c.Tags); err != nil {
return nil, errInvalidConfig
}
case "outputs": case "outputs":
for outputName, outputVal := range subtbl.Fields { for outputName, outputVal := range subtbl.Fields {
outputSubtbl, ok := outputVal.(*ast.Table) outputSubtbl, ok := outputVal.(*ast.Table)
@ -382,10 +374,9 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation) # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf" # user_agent = "telegraf"
# Tags can also be specified via a normal map, but only one form at a time: # Tags can be specified via a normal map, but only one form at a time:
[tags]
# [influxdb.tags] dc = "us-east-1"
# tags = { "dc" = "us-east-1" }
# Configuration for telegraf itself # Configuration for telegraf itself
# [agent] # [agent]

10
etc/logrotate.d/telegraf Normal file
View File

@ -0,0 +1,10 @@
/var/log/telegraf/telegraf.log
{
rotate 6
daily
missingok
notifempty
nocreate
compress
}

View File

@ -13,12 +13,12 @@ type InfluxDB struct {
Password string Password string
Database string Database string
UserAgent string UserAgent string
Tags map[string]string Timeout Duration
conn *client.Client conn *client.Client
} }
func (i *InfluxDB) Connect(host string) error { func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL) u, err := url.Parse(i.URL)
if err != nil { if err != nil {
return err return err
@ -29,16 +29,20 @@ func (i *InfluxDB) Connect(host string) error {
Username: i.Username, Username: i.Username,
Password: i.Password, Password: i.Password,
UserAgent: i.UserAgent, UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
}) })
if err != nil { if err != nil {
return err return err
} }
if i.Tags == nil { _, err = c.Query(client.Query{
i.Tags = make(map[string]string) Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
} }
i.Tags["host"] = host
i.conn = c i.conn = c
return nil return nil

View File

@ -5,7 +5,7 @@ import (
) )
type Output interface { type Output interface {
Connect(string) error Connect() error
Write(client.BatchPoints) error Write(client.BatchPoints) error
} }

View File

@ -167,10 +167,18 @@ do_build() {
for b in ${BINS[*]}; do for b in ${BINS[*]}; do
rm -f $GOPATH_INSTALL/bin/$b rm -f $GOPATH_INSTALL/bin/$b
done done
# If the branch has an upstream, go get switches to master for some reason
# unsetting the upstream causes go get to stay on the current branch, but
# is admittedly a little annoying
git branch --unset-upstream
if [ $? == 0 ]; then
echo "WARNING: upstream branch unset for go get command to work"
fi
go get -u -f ./... go get -u -f ./...
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
echo "WARNING: failed to 'go get' packages." echo "WARNING: failed to 'go get' packages."
fi fi
go install -a -ldflags="-X main.Version $version -X main.Commit $commit" ./... go install -a -ldflags="-X main.Version $version -X main.Commit $commit" ./...
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
echo "Build failed, unable to create package -- aborting" echo "Build failed, unable to create package -- aborting"

View File

@ -3,13 +3,12 @@ interval = "5s"
http = ":11213" http = ":11213"
debug = true debug = true
[outputs] [influxdb]
[outputs.influxdb]
url = "http://localhost:8086" url = "http://localhost:8086"
username = "root" username = "root"
password = "root" password = "root"
database = "telegraf" database = "telegraf"
tags = { "dc" = "us-phx-1" } tags = { dc = "us-phx-1" }
[redis] [redis]
address = ":6379" address = ":6379"