Overhaul config <-> agent coupling. Put config in it's own package.

This commit is contained in:
Cameron Sparr 2015-11-24 14:22:11 -07:00
parent 8dde60e869
commit 979e5f193a
23 changed files with 554 additions and 748 deletions

View File

@ -7,15 +7,31 @@ This only affects the kafka consumer _plugin_ (not the
output). There were a number of problems with the kafka plugin that led to it
only collecting data once at startup, so the kafka plugin was basically non-
functional.
- Plugins can now be specified as a list, and multiple plugin instances of the
same type can be specified, like this:
```
[[plugins.cpu]]
percpu = false
totalcpu = true
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
```
- Riemann output added
### Features
- [#379](https://github.com/influxdb/telegraf/pull/379): Riemann output, thanks @allenj!
- [#375](https://github.com/influxdb/telegraf/pull/375): kafka_consumer service plugin.
- [#392](https://github.com/influxdb/telegraf/pull/392): Procstat plugin can now accept pgrep -f pattern, thanks @ecarreras!
- [#383](https://github.com/influxdb/telegraf/pull/383): Specify plugins as a list.
### Bugfixes
- [#371](https://github.com/influxdb/telegraf/issues/371): Kafka consumer plugin not functioning.
- [#389](https://github.com/influxdb/telegraf/issues/389): NaN value panic
## v0.2.2 [2015-11-18]

View File

@ -110,37 +110,45 @@ you can configure that here.
This is a full working config that will output CPU data to an InfluxDB instance
at 192.168.59.103:8086, tagging measurements with dc="denver-1". It will output
measurements at a 10s interval and will collect totalcpu & percpu data.
measurements at a 10s interval and will collect per-cpu data, dropping any
measurements which begin with `cpu_time`.
```
[tags]
dc = "denver-1"
dc = "denver-1"
[agent]
interval = "10s"
interval = "10s"
# OUTPUTS
[outputs]
[[outputs.influxdb]]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"
# PLUGINS
[cpu]
percpu = true
totalcpu = true
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
```
Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
```
# Don't collect CPU data for cpu6 & cpu7
[cpu.tagdrop]
[plugins]
[[plugins.cpu]]
percpu = true
totalcpu = false
drop = ["cpu_time"]
# Don't collect CPU data for cpu6 & cpu7
[plugins.cpu.tagdrop]
cpu = [ "cpu6", "cpu7" ]
[disk]
[disk.tagpass]
[[plugins.disk]]
[plugins.disk.tagpass]
# tagpass conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) OR (the path is /opt or /home)
# then the metric passes
@ -148,6 +156,15 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ]
```
Additional plugins (or outputs) of the same type can be specified,
just define another instance in the config file:
```
[[plugins.cpu]]
percpu = false
totalcpu = true
```
## Supported Plugins
**You can view usage instructions for each plugin by running**

View File

@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/influxdb/telegraf/internal/config"
"github.com/influxdb/influxdb/client/v2"
)
@ -27,12 +29,12 @@ type Accumulator interface {
}
func NewAccumulator(
plugin *ConfiguredPlugin,
pluginConfig *config.PluginConfig,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
acc.points = points
acc.plugin = plugin
acc.pluginConfig = pluginConfig
return &acc
}
@ -45,7 +47,7 @@ type accumulator struct {
debug bool
plugin *ConfiguredPlugin
pluginConfig *config.PluginConfig
prefix string
}
@ -104,8 +106,8 @@ func (ac *accumulator) AddFields(
measurement = ac.prefix + measurement
}
if ac.plugin != nil {
if !ac.plugin.ShouldPass(measurement) || !ac.plugin.ShouldTagsPass(tags) {
if ac.pluginConfig != nil {
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
return
}
}

175
agent.go
View File

@ -6,30 +6,17 @@ import (
"log"
"math/big"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/internal/config"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/influxdb/client/v2"
)
type runningOutput struct {
name string
output outputs.Output
}
type runningPlugin struct {
name string
filtername string
plugin plugins.Plugin
config *ConfiguredPlugin
}
// Agent runs telegraf and collects data based on the given config
type Agent struct {
@ -66,14 +53,11 @@ type Agent struct {
Tags map[string]string
Config *Config
outputs []*runningOutput
plugins []*runningPlugin
Config *config.Config
}
// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *Config) (*Agent, error) {
func NewAgent(config *config.Config) (*Agent, error) {
agent := &Agent{
Tags: make(map[string]string),
Config: config,
@ -110,30 +94,30 @@ func NewAgent(config *Config) (*Agent, error) {
// Connect connects to all configured outputs
func (a *Agent) Connect() error {
for _, o := range a.outputs {
switch ot := o.output.(type) {
for _, o := range a.Config.Outputs {
switch ot := o.Output.(type) {
case outputs.ServiceOutput:
if err := ot.Start(); err != nil {
log.Printf("Service for output %s failed to start, exiting\n%s\n",
o.name, err.Error())
o.Name, err.Error())
return err
}
}
if a.Debug {
log.Printf("Attempting connection to output: %s\n", o.name)
log.Printf("Attempting connection to output: %s\n", o.Name)
}
err := o.output.Connect()
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s\n", o.name)
log.Printf("Failed to connect to output %s, retrying in 15s\n", o.Name)
time.Sleep(15 * time.Second)
err = o.output.Connect()
err = o.Output.Connect()
if err != nil {
return err
}
}
if a.Debug {
log.Printf("Successfully connected to output: %s\n", o.name)
log.Printf("Successfully connected to output: %s\n", o.Name)
}
}
return nil
@ -142,9 +126,9 @@ func (a *Agent) Connect() error {
// Close closes the connection to all configured outputs
func (a *Agent) Close() error {
var err error
for _, o := range a.outputs {
err = o.output.Close()
switch ot := o.output.(type) {
for _, o := range a.Config.Outputs {
err = o.Output.Close()
switch ot := o.Output.(type) {
case outputs.ServiceOutput:
ot.Stop()
}
@ -152,67 +136,6 @@ func (a *Agent) Close() error {
return err
}
// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs(filters []string) ([]string, error) {
var names []string
for _, name := range a.Config.OutputsDeclared() {
// Trim the ID off the output name for filtering
filtername := strings.TrimRight(name, "-0123456789")
creator, ok := outputs.Outputs[filtername]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}
if sliceContains(filtername, filters) || len(filters) == 0 {
output := creator()
err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}
a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
}
}
sort.Strings(names)
return names, nil
}
// LoadPlugins loads the agent's plugins
func (a *Agent) LoadPlugins(filters []string) ([]string, error) {
var names []string
for _, name := range a.Config.PluginsDeclared() {
// Trim the ID off the output name for filtering
filtername := strings.TrimRight(name, "-0123456789")
creator, ok := plugins.Plugins[filtername]
if !ok {
return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
}
if sliceContains(filtername, filters) || len(filters) == 0 {
plugin := creator()
config, err := a.Config.ApplyPlugin(name, plugin)
if err != nil {
return nil, err
}
a.plugins = append(a.plugins,
&runningPlugin{name, filtername, plugin, config})
names = append(names, name)
}
}
sort.Strings(names)
return names, nil
}
// gatherParallel runs the plugins that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
@ -220,23 +143,23 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
start := time.Now()
counter := 0
for _, plugin := range a.plugins {
if plugin.config.Interval != 0 {
for _, plugin := range a.Config.Plugins {
if plugin.Config.Interval != 0 {
continue
}
wg.Add(1)
counter++
go func(plugin *runningPlugin) {
go func(plugin *config.RunningPlugin) {
defer wg.Done()
acc := NewAccumulator(plugin.config, pointChan)
acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.filtername + "_")
acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Tags)
if err := plugin.plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
if err := plugin.Plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.Name, err)
}
}(plugin)
@ -254,27 +177,27 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
// reporting interval.
func (a *Agent) gatherSeparate(
shutdown chan struct{},
plugin *runningPlugin,
plugin *config.RunningPlugin,
pointChan chan *client.Point,
) error {
ticker := time.NewTicker(plugin.config.Interval)
ticker := time.NewTicker(plugin.Config.Interval)
for {
var outerr error
start := time.Now()
acc := NewAccumulator(plugin.config, pointChan)
acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Debug)
acc.SetPrefix(plugin.filtername + "_")
acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Tags)
if err := plugin.plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
if err := plugin.Plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.Name, err)
}
elapsed := time.Since(start)
log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n",
plugin.config.Interval, plugin.name, elapsed)
plugin.Config.Interval, plugin.Name, elapsed)
if outerr != nil {
return outerr
@ -308,27 +231,27 @@ func (a *Agent) Test() error {
}
}()
for _, plugin := range a.plugins {
acc := NewAccumulator(plugin.config, pointChan)
for _, plugin := range a.Config.Plugins {
acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(true)
acc.SetPrefix(plugin.filtername + "_")
acc.SetPrefix(plugin.Name + "_")
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.name)
if plugin.config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.config.Interval)
fmt.Printf("* Plugin: %s, Collection 1\n", plugin.Name)
if plugin.Config.Interval != 0 {
fmt.Printf("* Internal: %s\n", plugin.Config.Interval)
}
if err := plugin.plugin.Gather(acc); err != nil {
if err := plugin.Plugin.Gather(acc); err != nil {
return err
}
// Special instructions for some plugins. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch plugin.filtername {
switch plugin.Name {
case "cpu", "mongodb":
time.Sleep(500 * time.Millisecond)
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.name)
if err := plugin.plugin.Gather(acc); err != nil {
fmt.Printf("* Plugin: %s, Collection 2\n", plugin.Name)
if err := plugin.Plugin.Gather(acc); err != nil {
return err
}
}
@ -341,7 +264,7 @@ func (a *Agent) Test() error {
// Optionally takes a `done` channel to indicate that it is done writing.
func (a *Agent) writeOutput(
points []*client.Point,
ro *runningOutput,
ro *config.RunningOutput,
shutdown chan struct{},
wg *sync.WaitGroup,
) {
@ -354,12 +277,12 @@ func (a *Agent) writeOutput(
start := time.Now()
for {
err := ro.output.Write(points)
err := ro.Output.Write(points)
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
len(points), ro.Name, elapsed)
return
}
@ -371,12 +294,12 @@ func (a *Agent) writeOutput(
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.name, retries+1, len(points))
log.Printf(msg, ro.Name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.name, err.Error(), a.FlushInterval.Duration)
ro.Name, err.Error(), a.FlushInterval.Duration)
time.Sleep(a.FlushInterval.Duration)
}
}
@ -392,7 +315,7 @@ func (a *Agent) flush(
wait bool,
) {
var wg sync.WaitGroup
for _, o := range a.outputs {
for _, o := range a.Config.Outputs {
wg.Add(1)
go a.writeOutput(points, o, shutdown, &wg)
}
@ -476,14 +399,14 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}()
for _, plugin := range a.plugins {
for _, plugin := range a.Config.Plugins {
// Start service of any ServicePlugins
switch p := plugin.plugin.(type) {
switch p := plugin.Plugin.(type) {
case plugins.ServicePlugin:
if err := p.Start(); err != nil {
log.Printf("Service for plugin %s failed to start, exiting\n%s\n",
plugin.name, err.Error())
plugin.Name, err.Error())
return err
}
defer p.Stop()
@ -491,9 +414,9 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Special handling for plugins that have their own collection interval
// configured. Default intervals are handled below with gatherParallel
if plugin.config.Interval != 0 {
if plugin.Config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
go func(plugin *config.RunningPlugin) {
defer wg.Done()
if err := a.gatherSeparate(shutdown, plugin, pointChan); err != nil {
log.Printf(err.Error())

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/internal/config"
// needing to load the plugins
_ "github.com/influxdb/telegraf/plugins/all"
@ -14,49 +15,73 @@ import (
)
func TestAgent_LoadPlugin(t *testing.T) {
c := config.NewConfig()
c.PluginFilters = []string{"mysql"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ := NewAgent(c)
assert.Equal(t, 1, len(a.Config.Plugins))
// load a dedicated configuration file
config, _ := LoadConfig("./testdata/telegraf-agent.toml")
a, _ := NewAgent(config)
c = config.NewConfig()
c.PluginFilters = []string{"foo"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Plugins))
pluginsEnabled, _ := a.LoadPlugins([]string{"mysql"}, config)
assert.Equal(t, 1, len(pluginsEnabled))
c = config.NewConfig()
c.PluginFilters = []string{"mysql", "foo"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 1, len(a.Config.Plugins))
pluginsEnabled, _ = a.LoadPlugins([]string{"foo"}, config)
assert.Equal(t, 0, len(pluginsEnabled))
c = config.NewConfig()
c.PluginFilters = []string{"mysql", "redis"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Plugins))
pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "foo"}, config)
assert.Equal(t, 1, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "redis"}, config)
assert.Equal(t, 2, len(pluginsEnabled))
pluginsEnabled, _ = a.LoadPlugins([]string{"mysql", "foo", "redis", "bar"}, config)
assert.Equal(t, 2, len(pluginsEnabled))
c = config.NewConfig()
c.PluginFilters = []string{"mysql", "foo", "redis", "bar"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Plugins))
}
func TestAgent_LoadOutput(t *testing.T) {
// load a dedicated configuration file
config, _ := LoadConfig("./testdata/telegraf-agent.toml")
a, _ := NewAgent(config)
c := config.NewConfig()
c.OutputFilters = []string{"influxdb"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ := NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))
outputsEnabled, _ := a.LoadOutputs([]string{"influxdb"}, config)
assert.Equal(t, 2, len(outputsEnabled))
c = config.NewConfig()
c.OutputFilters = []string{}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
outputsEnabled, _ = a.LoadOutputs([]string{}, config)
assert.Equal(t, 3, len(outputsEnabled))
c = config.NewConfig()
c.OutputFilters = []string{"foo"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Outputs))
outputsEnabled, _ = a.LoadOutputs([]string{"foo"}, config)
assert.Equal(t, 0, len(outputsEnabled))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))
outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo"}, config)
assert.Equal(t, 2, len(outputsEnabled))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "kafka"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "kafka"}, config)
assert.Equal(t, 3, len(outputsEnabled))
outputsEnabled, _ = a.LoadOutputs([]string{"influxdb", "foo", "kafka", "bar"}, config)
assert.Equal(t, 3, len(outputsEnabled))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"}
c.LoadConfig("./internal/config/testdata/telegraf-agent.toml")
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
}
func TestAgent_ZeroJitter(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"strings"
"github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/internal/config"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all"
)
@ -56,13 +57,13 @@ func main() {
}
if *fSampleConfig {
telegraf.PrintSampleConfig(pluginFilters, outputFilters)
config.PrintSampleConfig(pluginFilters, outputFilters)
return
}
if *fUsage != "" {
if err := telegraf.PrintPluginConfig(*fUsage); err != nil {
if err2 := telegraf.PrintOutputConfig(*fUsage); err2 != nil {
if err := config.PrintPluginConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
}
@ -70,13 +71,15 @@ func main() {
}
var (
config *telegraf.Config
err error
c *config.Config
err error
)
if *fConfig != "" {
config = telegraf.NewConfig()
err = config.LoadConfig(*fConfig)
c = config.NewConfig()
c.OutputFilters = outputFilters
c.PluginFilters = pluginFilters
err = c.LoadConfig(*fConfig)
if err != nil {
log.Fatal(err)
}
@ -87,13 +90,19 @@ func main() {
}
if *fConfigDirectory != "" {
err = config.LoadDirectory(*fConfigDirectory)
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
log.Fatal(err)
}
}
if len(c.Outputs) == 0 {
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
}
if len(c.Plugins) == 0 {
log.Fatalf("Error: no plugins found, did you provide a valid config file?")
}
ag, err := telegraf.NewAgent(config)
ag, err := telegraf.NewAgent(c)
if err != nil {
log.Fatal(err)
}
@ -102,24 +111,6 @@ func main() {
ag.Debug = true
}
outputs, err := ag.LoadOutputs(outputFilters)
if err != nil {
log.Fatal(err)
}
if len(outputs) == 0 {
log.Printf("Error: no outputs found, did you provide a valid config file?")
os.Exit(1)
}
plugins, err := ag.LoadPlugins(pluginFilters)
if err != nil {
log.Fatal(err)
}
if len(plugins) == 0 {
log.Printf("Error: no plugins found, did you provide a valid config file?")
os.Exit(1)
}
if *fTest {
err = ag.Test()
if err != nil {
@ -142,9 +133,9 @@ func main() {
}()
log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
log.Printf("Tags enabled: %s", config.ListTags())
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded plugins: %s", strings.Join(c.PluginNames(), " "))
log.Printf("Tags enabled: %s", c.ListTags())
if *fPidfile != "" {
f, err := os.Create(*fPidfile)

View File

@ -1,307 +0,0 @@
package telegraf
import (
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/plugins/exec"
"github.com/influxdb/telegraf/plugins/memcached"
"github.com/influxdb/telegraf/plugins/procstat"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type subTest struct {
AField string
AnotherField int
}
type test struct {
StringField string
IntegerField int
FloatField float32
BooleansField bool `toml:"boolean_field"`
DatetimeField time.Time
ArrayField []string
TableArrayField []subTest
}
type MergeStructSuite struct {
suite.Suite
EmptyStruct *test
FullStruct *test
AnotherFullStruct *test
AllFields []string
}
func (s *MergeStructSuite) SetupSuite() {
s.AllFields = []string{"string_field", "integer_field", "float_field",
"boolean_field", "date_time_field", "array_field", "table_array_field"}
}
func (s *MergeStructSuite) SetupTest() {
s.EmptyStruct = &test{
ArrayField: []string{},
TableArrayField: []subTest{},
}
s.FullStruct = &test{
StringField: "one",
IntegerField: 1,
FloatField: 1.1,
BooleansField: false,
DatetimeField: time.Date(1963, time.August, 28, 17, 0, 0, 0, time.UTC),
ArrayField: []string{"one", "two", "three"},
TableArrayField: []subTest{
subTest{
AField: "one",
AnotherField: 1,
},
subTest{
AField: "two",
AnotherField: 2,
},
},
}
s.AnotherFullStruct = &test{
StringField: "two",
IntegerField: 2,
FloatField: 2.2,
BooleansField: true,
DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC),
ArrayField: []string{"four", "five", "six"},
TableArrayField: []subTest{
subTest{
AField: "three",
AnotherField: 3,
},
subTest{
AField: "four",
AnotherField: 4,
},
},
}
}
func (s *MergeStructSuite) TestEmptyMerge() {
err := mergeStruct(s.EmptyStruct, s.FullStruct, s.AllFields)
if err != nil {
s.T().Error(err)
}
s.Equal(s.FullStruct, s.EmptyStruct,
fmt.Sprintf("Full merge of %v onto an empty struct failed.", s.FullStruct))
}
func (s *MergeStructSuite) TestFullMerge() {
result := &test{
StringField: "two",
IntegerField: 2,
FloatField: 2.2,
BooleansField: true,
DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC),
ArrayField: []string{"four", "five", "six"},
TableArrayField: []subTest{
subTest{
AField: "three",
AnotherField: 3,
},
subTest{
AField: "four",
AnotherField: 4,
},
},
}
err := mergeStruct(s.FullStruct, s.AnotherFullStruct, s.AllFields)
if err != nil {
s.T().Error(err)
}
s.Equal(result, s.FullStruct,
fmt.Sprintf("Full merge of %v onto FullStruct failed.", s.AnotherFullStruct))
}
func (s *MergeStructSuite) TestPartialMergeWithoutSlices() {
result := &test{
StringField: "two",
IntegerField: 1,
FloatField: 2.2,
BooleansField: false,
DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC),
ArrayField: []string{"one", "two", "three"},
TableArrayField: []subTest{
subTest{
AField: "one",
AnotherField: 1,
},
subTest{
AField: "two",
AnotherField: 2,
},
},
}
err := mergeStruct(s.FullStruct, s.AnotherFullStruct,
[]string{"string_field", "float_field", "date_time_field"})
if err != nil {
s.T().Error(err)
}
s.Equal(result, s.FullStruct,
fmt.Sprintf("Partial merge without slices of %v onto FullStruct failed.",
s.AnotherFullStruct))
}
func (s *MergeStructSuite) TestPartialMergeWithSlices() {
result := &test{
StringField: "two",
IntegerField: 1,
FloatField: 2.2,
BooleansField: false,
DatetimeField: time.Date(1965, time.March, 25, 17, 0, 0, 0, time.UTC),
ArrayField: []string{"one", "two", "three"},
TableArrayField: []subTest{
subTest{
AField: "three",
AnotherField: 3,
},
subTest{
AField: "four",
AnotherField: 4,
},
},
}
err := mergeStruct(s.FullStruct, s.AnotherFullStruct,
[]string{"string_field", "float_field", "date_time_field", "table_array_field"})
if err != nil {
s.T().Error(err)
}
s.Equal(result, s.FullStruct,
fmt.Sprintf("Partial merge with slices of %v onto FullStruct failed.",
s.AnotherFullStruct))
}
func TestConfig_mergeStruct(t *testing.T) {
suite.Run(t, new(MergeStructSuite))
}
func TestConfig_parsePlugin(t *testing.T) {
data, err := ioutil.ReadFile("./testdata/single_plugin.toml")
if err != nil {
t.Error(err)
}
tbl, err := toml.Parse(data)
if err != nil {
t.Error(err)
}
c := &Config{
plugins: make(map[string]plugins.Plugin),
pluginConfigurations: make(map[string]*ConfiguredPlugin),
pluginFieldsSet: make(map[string][]string),
pluginConfigurationFieldsSet: make(map[string][]string),
}
subtbl := tbl.Fields["memcached"].(*ast.Table)
err = c.parsePlugin("memcached", subtbl, 0)
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
TagFilter{
Name: "badtag",
Filter: []string{"othertag"},
},
},
TagPass: []TagFilter{
TagFilter{
Name: "goodtag",
Filter: []string{"mytag"},
},
},
Interval: 5 * time.Second,
}
assert.Equal(t, memcached, c.plugins["memcached-0"],
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"],
"Testdata did not produce correct memcached metadata.")
}
func TestConfig_LoadDirectory(t *testing.T) {
c, err := LoadConfig("./testdata/telegraf-agent.toml")
if err != nil {
t.Error(err)
}
err = c.LoadDirectory("./testdata/subconfig")
if err != nil {
t.Error(err)
}
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"192.168.1.1"}
mConfig := &ConfiguredPlugin{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
TagFilter{
Name: "badtag",
Filter: []string{"othertag"},
},
},
TagPass: []TagFilter{
TagFilter{
Name: "goodtag",
Filter: []string{"mytag"},
},
},
Interval: 5 * time.Second,
}
ex := plugins.Plugins["exec"]().(*exec.Exec)
ex.Commands = []*exec.Command{
&exec.Command{
Command: "/usr/bin/myothercollector --foo=bar",
Name: "myothercollector",
},
}
eConfig := &ConfiguredPlugin{Name: "exec"}
pstat := plugins.Plugins["procstat"]().(*procstat.Procstat)
pstat.Specifications = []*procstat.Specification{
&procstat.Specification{
PidFile: "/var/run/grafana-server.pid",
},
&procstat.Specification{
PidFile: "/var/run/influxdb/influxd.pid",
},
}
pConfig := &ConfiguredPlugin{Name: "procstat"}
assert.Equal(t, memcached, c.plugins["memcached-0"],
"Merged Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.pluginConfigurations["memcached-0"],
"Merged Testdata did not produce correct memcached metadata.")
assert.Equal(t, ex, c.plugins["exec-0"],
"Merged Testdata did not produce a correct exec struct.")
assert.Equal(t, eConfig, c.pluginConfigurations["exec-0"],
"Merged Testdata did not produce correct exec metadata.")
assert.Equal(t, pstat, c.plugins["procstat-0"],
"Merged Testdata did not produce a correct procstat struct.")
assert.Equal(t, pConfig, c.pluginConfigurations["procstat-0"],
"Merged Testdata did not produce correct procstat metadata.")
}

View File

@ -1,4 +1,4 @@
package telegraf
package config
import (
"errors"
@ -12,6 +12,7 @@ import (
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
"github.com/naoina/toml"
"github.com/naoina/toml/ast"
)
@ -20,18 +21,22 @@ import (
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
Tags map[string]string
Tags map[string]string
PluginFilters []string
OutputFilters []string
agent *ast.Table
plugins map[string]*ast.Table
outputs map[string]*ast.Table
Agent *ast.Table
Plugins []*RunningPlugin
Outputs []*RunningOutput
}
func NewConfig() *Config {
c := &Config{
Tags: make(map[string]string),
plugins: make(map[string]*ast.Table),
outputs: make(map[string]*ast.Table),
Tags: make(map[string]string),
Plugins: make([]*RunningPlugin, 0),
Outputs: make([]*RunningOutput, 0),
PluginFilters: make([]string, 0),
OutputFilters: make([]string, 0),
}
return c
}
@ -42,9 +47,20 @@ type TagFilter struct {
Filter []string
}
// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
type RunningOutput struct {
Name string
Output outputs.Output
}
type RunningPlugin struct {
Name string
Plugin plugins.Plugin
Config *PluginConfig
}
// PluginConfig containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type ConfiguredPlugin struct {
type PluginConfig struct {
Name string
Drop []string
@ -58,7 +74,7 @@ type ConfiguredPlugin struct {
// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass plugin parameters
func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
func (cp *PluginConfig) ShouldPass(measurement string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
if strings.HasPrefix(measurement, pat) {
@ -82,7 +98,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass plugin parameters
func (cp *ConfiguredPlugin) ShouldTagsPass(tags map[string]string) bool {
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
if cp.TagPass != nil {
for _, pat := range cp.TagPass {
if tagval, ok := tags[pat.Name]; ok {
@ -112,138 +128,34 @@ func (cp *ConfiguredPlugin) ShouldTagsPass(tags map[string]string) bool {
return true
}
// ApplyOutput loads the toml config into the given interface
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v)
// Plugins returns a list of strings of the configured plugins.
func (c *Config) PluginNames() []string {
var name []string
for _, plugin := range c.Plugins {
name = append(name, plugin.Name)
}
return nil
return name
}
// Outputs returns a list of strings of the configured plugins.
func (c *Config) OutputNames() []string {
var name []string
for _, output := range c.Outputs {
name = append(name, output.Name)
}
return name
}
// ApplyAgent loads the toml config into the given Agent object, overriding
// defaults (such as collection duration) with the values from the toml config.
func (c *Config) ApplyAgent(a *Agent) error {
if c.agent != nil {
return toml.UnmarshalTable(c.agent, a)
func (c *Config) ApplyAgent(a interface{}) error {
if c.Agent != nil {
return toml.UnmarshalTable(c.Agent, a)
}
return nil
}
// ApplyPlugin takes defined plugin names and applies them to the given
// interface, returning a ConfiguredPlugin object in the end that can
// be inserted into a runningPlugin by the agent.
func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) {
cp := &ConfiguredPlugin{Name: name}
if tbl, ok := c.plugins[name]; ok {
if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
}
}
}
}
if node, ok := tbl.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
}
}
}
}
delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, v)
}
return cp, nil
}
// PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string {
return declared(c.plugins)
}
// OutputsDeclared returns the name of all outputs declared in the config.
func (c *Config) OutputsDeclared() []string {
return declared(c.outputs)
}
func declared(endpoints map[string]*ast.Table) []string {
var names []string
for name := range endpoints {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
@ -258,14 +170,6 @@ func (c *Config) ListTags() string {
return strings.Join(tags, " ")
}
type hasConfig interface {
BasicConfig() string
}
type hasDescr interface {
Description() string
}
var header = `# Telegraf configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
@ -353,15 +257,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
fmt.Printf("\n# %s\n[[outputs.%s]]", output.Description(), oname)
config := output.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
printConfig(oname, output, "outputs")
}
// Filter plugins
@ -386,13 +282,13 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
continue
}
printConfig(pname, plugin)
printConfig(pname, plugin, "plugins")
}
// Print Service Plugins
fmt.Printf(servicePluginHeader)
for name, plugin := range servPlugins {
printConfig(name, plugin)
printConfig(name, plugin, "plugins")
}
}
@ -401,8 +297,8 @@ type printer interface {
SampleConfig() string
}
func printConfig(name string, p printer) {
fmt.Printf("\n# %s\n[[plugins.%s]]", p.Description(), name)
func printConfig(name string, p printer, op string) {
fmt.Printf("\n# %s\n[[%s.%s]]", p.Description(), op, name)
config := p.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
@ -423,7 +319,7 @@ func sliceContains(name string, list []string) bool {
// PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
printConfig(name, creator())
printConfig(name, creator(), "plugins")
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}
@ -433,7 +329,7 @@ func PrintPluginConfig(name string) error {
// PrintOutputConfig prints the config usage of a single output.
func PrintOutputConfig(name string) error {
if creator, ok := outputs.Outputs[name]; ok {
printConfig(name, creator())
printConfig(name, creator(), "outputs")
} else {
return errors.New(fmt.Sprintf("Output %s not found", name))
}
@ -461,7 +357,7 @@ func (c *Config) LoadDirectory(path string) error {
return nil
}
// LoadConfig loads the given config file and returns a *Config pointer
// LoadConfig loads the given config file and applies it to c
func (c *Config) LoadConfig(path string) error {
data, err := ioutil.ReadFile(path)
if err != nil {
@ -481,7 +377,7 @@ func (c *Config) LoadConfig(path string) error {
switch name {
case "agent":
c.agent = subTable
c.Agent = subTable
case "tags":
if err = toml.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("Could not parse [tags] config\n")
@ -491,11 +387,14 @@ func (c *Config) LoadConfig(path string) error {
for outputName, outputVal := range subTable.Fields {
switch outputSubTable := outputVal.(type) {
case *ast.Table:
c.outputs[outputName] = outputSubTable
if err = c.addOutput(outputName, outputSubTable); err != nil {
return err
}
case []*ast.Table:
for id, t := range outputSubTable {
nameID := fmt.Sprintf("%s-%d", outputName, id)
c.outputs[nameID] = t
for _, t := range outputSubTable {
if err = c.addOutput(outputName, t); err != nil {
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s",
@ -506,11 +405,14 @@ func (c *Config) LoadConfig(path string) error {
for pluginName, pluginVal := range subTable.Fields {
switch pluginSubTable := pluginVal.(type) {
case *ast.Table:
c.plugins[pluginName] = pluginSubTable
if err = c.addPlugin(pluginName, pluginSubTable); err != nil {
return err
}
case []*ast.Table:
for id, t := range pluginSubTable {
nameID := fmt.Sprintf("%s-%d", pluginName, id)
c.plugins[nameID] = t
for _, t := range pluginSubTable {
if err = c.addPlugin(pluginName, t); err != nil {
return err
}
}
default:
return fmt.Errorf("Unsupported config format: %s",
@ -520,8 +422,142 @@ func (c *Config) LoadConfig(path string) error {
// Assume it's a plugin for legacy config file support if no other
// identifiers are present
default:
c.plugins[name] = subTable
if err = c.addPlugin(name, subTable); err != nil {
return err
}
}
}
return nil
}
func (c *Config) addOutput(name string, table *ast.Table) error {
if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
return nil
}
creator, ok := outputs.Outputs[name]
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
o := creator()
if err := toml.UnmarshalTable(table, o); err != nil {
return err
}
ro := &RunningOutput{
Name: name,
Output: o,
}
c.Outputs = append(c.Outputs, ro)
return nil
}
func (c *Config) addPlugin(name string, table *ast.Table) error {
if len(c.PluginFilters) > 0 && !sliceContains(name, c.PluginFilters) {
return nil
}
creator, ok := plugins.Plugins[name]
if !ok {
return fmt.Errorf("Undefined but requested plugin: %s", name)
}
plugin := creator()
pluginConfig, err := applyPlugin(name, table, plugin)
if err != nil {
return err
}
rp := &RunningPlugin{
Name: name,
Plugin: plugin,
Config: pluginConfig,
}
c.Plugins = append(c.Plugins, rp)
return nil
}
// applyPlugin takes defined plugin names and applies them to the given
// interface, returning a PluginConfig object in the end that can
// be inserted into a runningPlugin by the agent.
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["drop"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
}
}
}
}
}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
cp.Interval = dur
}
}
}
if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
}
}
}
}
if node, ok := tbl.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
}
}
}
}
delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, p)
}

View File

@ -0,0 +1,118 @@
package config
import (
"testing"
"time"
"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/plugins/exec"
"github.com/influxdb/telegraf/plugins/memcached"
"github.com/influxdb/telegraf/plugins/procstat"
"github.com/stretchr/testify/assert"
)
func TestConfig_LoadSinglePlugin(t *testing.T) {
c := NewConfig()
c.LoadConfig("./testdata/single_plugin.toml")
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}
mConfig := &PluginConfig{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
TagFilter{
Name: "badtag",
Filter: []string{"othertag"},
},
},
TagPass: []TagFilter{
TagFilter{
Name: "goodtag",
Filter: []string{"mytag"},
},
},
Interval: 5 * time.Second,
}
assert.Equal(t, memcached, c.Plugins[0].Plugin,
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.Plugins[0].Config,
"Testdata did not produce correct memcached metadata.")
}
func TestConfig_LoadDirectory(t *testing.T) {
c := NewConfig()
err := c.LoadConfig("./testdata/single_plugin.toml")
if err != nil {
t.Error(err)
}
err = c.LoadDirectory("./testdata/subconfig")
if err != nil {
t.Error(err)
}
memcached := plugins.Plugins["memcached"]().(*memcached.Memcached)
memcached.Servers = []string{"localhost"}
mConfig := &PluginConfig{
Name: "memcached",
Drop: []string{"other", "stuff"},
Pass: []string{"some", "strings"},
TagDrop: []TagFilter{
TagFilter{
Name: "badtag",
Filter: []string{"othertag"},
},
},
TagPass: []TagFilter{
TagFilter{
Name: "goodtag",
Filter: []string{"mytag"},
},
},
Interval: 5 * time.Second,
}
assert.Equal(t, memcached, c.Plugins[0].Plugin,
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.Plugins[0].Config,
"Testdata did not produce correct memcached metadata.")
ex := plugins.Plugins["exec"]().(*exec.Exec)
ex.Commands = []*exec.Command{
&exec.Command{
Command: "/usr/bin/myothercollector --foo=bar",
Name: "myothercollector",
},
}
eConfig := &PluginConfig{Name: "exec"}
assert.Equal(t, ex, c.Plugins[1].Plugin,
"Merged Testdata did not produce a correct exec struct.")
assert.Equal(t, eConfig, c.Plugins[1].Config,
"Merged Testdata did not produce correct exec metadata.")
memcached.Servers = []string{"192.168.1.1"}
assert.Equal(t, memcached, c.Plugins[2].Plugin,
"Testdata did not produce a correct memcached struct.")
assert.Equal(t, mConfig, c.Plugins[2].Config,
"Testdata did not produce correct memcached metadata.")
pstat := plugins.Plugins["procstat"]().(*procstat.Procstat)
pstat.Specifications = []*procstat.Specification{
&procstat.Specification{
PidFile: "/var/run/grafana-server.pid",
},
&procstat.Specification{
PidFile: "/var/run/influxdb/influxd.pid",
},
}
pConfig := &PluginConfig{Name: "procstat"}
assert.Equal(t, pstat, c.Plugins[3].Plugin,
"Merged Testdata did not produce a correct procstat struct.")
assert.Equal(t, pConfig, c.Plugins[3].Config,
"Merged Testdata did not produce correct procstat metadata.")
}

View File

@ -1,9 +1,9 @@
[memcached]
[[plugins.memcached]]
servers = ["localhost"]
pass = ["some", "strings"]
drop = ["other", "stuff"]
interval = "5s"
[memcached.tagpass]
[plugins.memcached.tagpass]
goodtag = ["mytag"]
[memcached.tagdrop]
[plugins.memcached.tagdrop]
badtag = ["othertag"]

View File

@ -1,6 +1,6 @@
[exec]
[[plugins.exec]]
# specify commands via an array of tables
[[exec.commands]]
[[plugins.exec.commands]]
# the command to run
command = "/usr/bin/myothercollector --foo=bar"

View File

@ -1,9 +1,9 @@
[memcached]
[[plugins.memcached]]
servers = ["192.168.1.1"]
pass = ["some", "strings"]
drop = ["other", "stuff"]
interval = "5s"
[memcached.tagpass]
[plugins.memcached.tagpass]
goodtag = ["mytag"]
[memcached.tagdrop]
[plugins.memcached.tagdrop]
badtag = ["othertag"]

View File

@ -0,0 +1,5 @@
[[plugins.procstat]]
[[plugins.procstat.specifications]]
pid_file = "/var/run/grafana-server.pid"
[[plugins.procstat.specifications]]
pid_file = "/var/run/influxdb/influxd.pid"

View File

@ -88,13 +88,15 @@
# PLUGINS #
###############################################################################
[plugins]
# Read Apache status information (mod_status)
[apache]
[[plugins.apache]]
# An array of Apache status URI to gather stats.
urls = ["http://localhost/server-status?auto"]
# Read metrics about cpu usage
[cpu]
[[plugins.cpu]]
# Whether to report per-cpu stats or not
percpu = true
# Whether to report total system cpu stats or not
@ -103,11 +105,11 @@ urls = ["http://localhost/server-status?auto"]
drop = ["cpu_time"]
# Read metrics about disk usage by mount point
[disk]
[[plugins.disk]]
# no configuration
# Read metrics from one or many disque servers
[disque]
[[plugins.disque]]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port and password. ie disque://localhost, disque://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
@ -116,7 +118,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["localhost"]
# Read stats from one or more Elasticsearch servers or clusters
[elasticsearch]
[[plugins.elasticsearch]]
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
@ -125,7 +127,7 @@ urls = ["http://localhost/server-status?auto"]
local = true
# Read flattened metrics from one or more commands that output JSON to stdout
[exec]
[[plugins.exec]]
# specify commands via an array of tables
[[exec.commands]]
# the command to run
@ -135,7 +137,7 @@ urls = ["http://localhost/server-status?auto"]
name = "mycollector"
# Read metrics of haproxy, via socket or csv stats page
[haproxy]
[[plugins.haproxy]]
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.10.3.33:1936, etc.
#
@ -145,7 +147,7 @@ urls = ["http://localhost/server-status?auto"]
# servers = ["socket:/run/haproxy/admin.sock"]
# Read flattened metrics from one or more JSON HTTP endpoints
[httpjson]
[[plugins.httpjson]]
# Specify services via an array of tables
[[httpjson.services]]
@ -167,11 +169,11 @@ urls = ["http://localhost/server-status?auto"]
threshold = "0.75"
# Read metrics about disk IO by device
[io]
[[plugins.io]]
# no configuration
# read metrics from a Kafka topic
[kafka_consumer]
[[plugins.kafka_consumer]]
# topic(s) to consume
topics = ["telegraf"]
# an array of Zookeeper connection strings
@ -184,7 +186,7 @@ urls = ["http://localhost/server-status?auto"]
offset = "oldest"
# Read metrics from a LeoFS Server via SNMP
[leofs]
[[plugins.leofs]]
# An array of URI to gather stats about LeoFS.
# Specify an ip or hostname with port. ie 127.0.0.1:4020
#
@ -192,7 +194,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["127.0.0.1:4021"]
# Read metrics from local Lustre service on OST, MDS
[lustre2]
[[plugins.lustre2]]
# An array of /proc globs to search for Lustre stats
# If not specified, the default will work on Lustre 2.5.x
#
@ -200,11 +202,11 @@ urls = ["http://localhost/server-status?auto"]
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]
# Read metrics about memory usage
[mem]
[[plugins.mem]]
# no configuration
# Read metrics from one or many memcached servers
[memcached]
[[plugins.memcached]]
# An array of address to gather stats about. Specify an ip on hostname
# with optional port. ie localhost, 10.0.0.1:11211, etc.
#
@ -212,7 +214,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["localhost"]
# Read metrics from one or many MongoDB servers
[mongodb]
[[plugins.mongodb]]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
@ -221,7 +223,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["127.0.0.1:27017"]
# Read metrics from one or many mysql servers
[mysql]
[[plugins.mysql]]
# specify servers via a url matching:
# [username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify]]
# e.g.
@ -232,7 +234,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["localhost"]
# Read metrics about network interface usage
[net]
[[plugins.net]]
# By default, telegraf gathers stats from any up interface (excluding loopback)
# Setting interfaces will tell it to gather these explicit interfaces,
# regardless of status.
@ -240,12 +242,12 @@ urls = ["http://localhost/server-status?auto"]
# interfaces = ["eth0", ... ]
# Read Nginx's basic status information (ngx_http_stub_status_module)
[nginx]
[[plugins.nginx]]
# An array of Nginx stub_status URI to gather stats.
urls = ["http://localhost/status"]
# Ping given url(s) and return statistics
[ping]
[[plugins.ping]]
# urls to ping
urls = ["www.google.com"] # required
# number of pings to send (ping -c <COUNT>)
@ -258,7 +260,7 @@ urls = ["http://localhost/server-status?auto"]
interface = ""
# Read metrics from one or many postgresql servers
[postgresql]
[[plugins.postgresql]]
# specify servers via an array of tables
[[postgresql.servers]]
@ -288,12 +290,12 @@ urls = ["http://localhost/server-status?auto"]
# address = "influx@remoteserver"
# Read metrics from one or many prometheus clients
[prometheus]
[[plugins.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]
# Read metrics from one or many RabbitMQ servers via the management API
[rabbitmq]
[[plugins.rabbitmq]]
# Specify servers via an array of tables
[[rabbitmq.servers]]
# name = "rmq-server-1" # optional tag
@ -306,7 +308,7 @@ urls = ["http://localhost/server-status?auto"]
# nodes = ["rabbit@node1", "rabbit@node2"]
# Read metrics from one or many redis servers
[redis]
[[plugins.redis]]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie redis://localhost, redis://10.10.3.33:18832,
# 10.0.0.1:10000, etc.
@ -315,7 +317,7 @@ urls = ["http://localhost/server-status?auto"]
servers = ["localhost"]
# Read metrics from one or many RethinkDB servers
[rethinkdb]
[[plugins.rethinkdb]]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
@ -324,9 +326,9 @@ urls = ["http://localhost/server-status?auto"]
servers = ["127.0.0.1:28015"]
# Read metrics about swap memory usage
[swap]
[[plugins.swap]]
# no configuration
# Read metrics about system load & uptime
[system]
[[plugins.system]]
# no configuration

View File

@ -16,7 +16,7 @@ import (
const sampleConfig = `
# specify commands via an array of tables
[[exec.commands]]
[[plugins.exec.commands]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"

View File

@ -48,7 +48,7 @@ func (c RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) {
var sampleConfig = `
# Specify services via an array of tables
[[httpjson.services]]
[[plugins.httpjson.services]]
# a name for the service being polled
name = "webserver_stats"
@ -69,7 +69,7 @@ var sampleConfig = `
# ]
# HTTP parameters (all values must be strings)
[httpjson.services.parameters]
[plugins.httpjson.services.parameters]
event_type = "cpu_spike"
threshold = "0.75"
`

View File

@ -55,7 +55,7 @@ func (j *Jolokia) SampleConfig() string {
group = "as"
# List of servers exposing jolokia read service
[[jolokia.servers]]
[[plugins.jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"
@ -63,20 +63,20 @@ func (j *Jolokia) SampleConfig() string {
# List of metrics collected on above servers
# Each metric consists in a name, a jmx path and either a pass or drop slice attributes
# This collect all heap memory usage metrics
[[jolokia.metrics]]
[[plugins.jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"
# This drops the 'committed' value from Eden space measurement
[[jolokia.metrics]]
[[plugins.jolokia.metrics]]
name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
drop = [ "committed" ]
# This passes only DaemonThreadCount and ThreadCount
[[jolokia.metrics]]
[[plugins.jolokia.metrics]]
name = "heap_threads"
jmx = "/java.lang:type=Threading"
pass = [

View File

@ -25,7 +25,7 @@ var ignoredColumns = map[string]bool{"datid": true, "datname": true, "stats_rese
var sampleConfig = `
# specify servers via an array of tables
[[postgresql.servers]]
[[plugins.postgresql.servers]]
# specify address via a url matching:
# postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]
@ -49,7 +49,7 @@ var sampleConfig = `
# databases = ["app_production", "blah_testing"]
# [[postgresql.servers]]
# [[plugins.postgresql.servers]]
# address = "influx@remoteserver"
`

View File

@ -30,7 +30,7 @@ func NewProcstat() *Procstat {
}
var sampleConfig = `
[[procstat.specifications]]
[[plugins.procstat.specifications]]
prefix = "" # optional string to prefix measurements
# Must specify one of: pid_file, exe, or pattern
# PID file to monitor process

View File

@ -100,7 +100,7 @@ var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var sampleConfig = `
# Specify servers via an array of tables
[[rabbitmq.servers]]
[[plugins.rabbitmq.servers]]
# name = "rmq-server-1" # optional tag
# url = "http://localhost:15672"
# username = "guest"

View File

@ -22,7 +22,7 @@ type TwemproxyInstance struct {
}
var sampleConfig = `
[[twemproxy.instances]]
[[plugins.twemproxy.instances]]
# Twemproxy stats address and port (no scheme)
addr = "localhost:22222"
# Monitor pool name

17
testdata/influx.toml vendored
View File

@ -1,17 +0,0 @@
[agent]
interval = "5s"
http = ":11213"
debug = true
[outputs]
[outputs.influxdb]
url = "http://localhost:8086"
username = "root"
password = "root"
database = "telegraf"
[tags]
dc = "us-phx-1"
[redis]
address = ":6379"

View File

@ -1,5 +0,0 @@
[procstat]
[[procstat.specifications]]
pid_file = "/var/run/grafana-server.pid"
[[procstat.specifications]]
pid_file = "/var/run/influxdb/influxd.pid"