move tags to influxdb struct, update all sample configs
This commit is contained in:
parent
16c424de2a
commit
53969ae054
|
@ -1,3 +1,4 @@
|
||||||
pkg/
|
pkg/
|
||||||
tivan
|
tivan
|
||||||
.vagrant
|
.vagrant
|
||||||
|
telegraf
|
||||||
|
|
79
agent.go
79
agent.go
|
@ -10,10 +10,15 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdb/telegraf/outputs"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type runningOutput struct {
|
||||||
|
name string
|
||||||
|
output outputs.Output
|
||||||
|
}
|
||||||
|
|
||||||
type runningPlugin struct {
|
type runningPlugin struct {
|
||||||
name string
|
name string
|
||||||
plugin plugins.Plugin
|
plugin plugins.Plugin
|
||||||
|
@ -32,9 +37,8 @@ type Agent struct {
|
||||||
|
|
||||||
Config *Config
|
Config *Config
|
||||||
|
|
||||||
|
outputs []*runningOutput
|
||||||
plugins []*runningPlugin
|
plugins []*runningPlugin
|
||||||
|
|
||||||
conn *client.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAgent returns an Agent struct based off the given Config
|
// NewAgent returns an Agent struct based off the given Config
|
||||||
|
@ -66,27 +70,37 @@ func NewAgent(config *Config) (*Agent, error) {
|
||||||
|
|
||||||
// Connect connects to the agent's config URL
|
// Connect connects to the agent's config URL
|
||||||
func (a *Agent) Connect() error {
|
func (a *Agent) Connect() error {
|
||||||
config := a.Config
|
for _, o := range a.outputs {
|
||||||
|
err := o.output.Connect(a.Hostname)
|
||||||
u, err := url.Parse(config.URL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
c, err := client.NewClient(client.Config{
|
func (a *Agent) LoadOutputs() ([]string, error) {
|
||||||
URL: *u,
|
var names []string
|
||||||
Username: config.Username,
|
|
||||||
Password: config.Password,
|
|
||||||
UserAgent: config.UserAgent,
|
|
||||||
Timeout: config.Timeout.Duration,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
for _, name := range a.Config.OutputsDeclared() {
|
||||||
|
creator, ok := outputs.Outputs[name]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("Undefined but requested output: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
output := creator()
|
||||||
|
|
||||||
|
err := a.Config.ApplyOutput(name, output)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.outputs = append(a.outputs, &runningOutput{name, output})
|
||||||
|
names = append(names, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.Query(client.Query{
|
_, err = c.Query(client.Query{
|
||||||
Command: fmt.Sprintf("CREATE DATABASE %s", config.Database),
|
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil && !strings.Contains(err.Error(), "database already exists") {
|
if err != nil && !strings.Contains(err.Error(), "database already exists") {
|
||||||
|
@ -95,7 +109,7 @@ func (a *Agent) Connect() error {
|
||||||
|
|
||||||
a.conn = c
|
a.conn = c
|
||||||
|
|
||||||
return nil
|
return names, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadPlugins loads the agent's plugins
|
// LoadPlugins loads the agent's plugins
|
||||||
|
@ -114,6 +128,8 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
|
||||||
return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
|
return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
plugin := creator()
|
||||||
|
|
||||||
isPluginEnabled := false
|
isPluginEnabled := false
|
||||||
if len(filters) > 0 {
|
if len(filters) > 0 {
|
||||||
for _, runeValue := range filters {
|
for _, runeValue := range filters {
|
||||||
|
@ -205,8 +221,7 @@ func (a *Agent) crank() error {
|
||||||
acc.Time = time.Now()
|
acc.Time = time.Now()
|
||||||
acc.Database = a.Config.Database
|
acc.Database = a.Config.Database
|
||||||
|
|
||||||
_, err := a.conn.Write(acc.BatchPoints)
|
return a.flush(&acc)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
|
func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
|
||||||
|
@ -228,7 +243,10 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
|
||||||
acc.Time = time.Now()
|
acc.Time = time.Now()
|
||||||
acc.Database = a.Config.Database
|
acc.Database = a.Config.Database
|
||||||
|
|
||||||
a.conn.Write(acc.BatchPoints)
|
err = a.flush(&acc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-shutdown:
|
case <-shutdown:
|
||||||
|
@ -239,6 +257,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) flush(bp *BatchPoints) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var outerr error
|
||||||
|
for _, o := range a.outputs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(ro *runningOutput) {
|
||||||
|
defer wg.Done()
|
||||||
|
outerr = ro.output.Write(bp.BatchPoints)
|
||||||
|
}(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return outerr
|
||||||
|
}
|
||||||
|
|
||||||
// TestAllPlugins verifies that we can 'Gather' from all plugins with the
|
// TestAllPlugins verifies that we can 'Gather' from all plugins with the
|
||||||
// default configuration
|
// default configuration
|
||||||
func (a *Agent) TestAllPlugins() error {
|
func (a *Agent) TestAllPlugins() error {
|
||||||
|
@ -297,13 +331,6 @@ func (a *Agent) Test() error {
|
||||||
|
|
||||||
// Run runs the agent daemon, gathering every Interval
|
// Run runs the agent daemon, gathering every Interval
|
||||||
func (a *Agent) Run(shutdown chan struct{}) error {
|
func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
if a.conn == nil {
|
|
||||||
err := a.Connect()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for _, plugin := range a.plugins {
|
for _, plugin := range a.plugins {
|
||||||
|
|
|
@ -59,6 +59,11 @@ func main() {
|
||||||
ag.Debug = true
|
ag.Debug = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
outputs, err := ag.LoadOutputs()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
plugins, err := ag.LoadPlugins(*fPLuginsFilter)
|
plugins, err := ag.LoadPlugins(*fPLuginsFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -99,6 +104,7 @@ func main() {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
log.Printf("Starting Telegraf (version %s)\n", Version)
|
log.Printf("Starting Telegraf (version %s)\n", Version)
|
||||||
|
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
|
||||||
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
|
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
|
||||||
if ag.Debug {
|
if ag.Debug {
|
||||||
log.Printf("Debug: enabled")
|
log.Printf("Debug: enabled")
|
||||||
|
@ -106,11 +112,6 @@ func main() {
|
||||||
ag.Interval, ag.Debug, ag.Hostname)
|
ag.Interval, ag.Debug, ag.Hostname)
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.URL != "" {
|
|
||||||
log.Printf("Sending metrics to: %s", config.URL)
|
|
||||||
log.Printf("Tags enabled: %v", config.ListTags())
|
|
||||||
}
|
|
||||||
|
|
||||||
if *fPidfile != "" {
|
if *fPidfile != "" {
|
||||||
f, err := os.Create(*fPidfile)
|
f, err := os.Create(*fPidfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
59
config.go
59
config.go
|
@ -34,22 +34,24 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
|
||||||
// will be logging to, as well as all the plugins that the user has
|
// will be logging to, as well as all the plugins that the user has
|
||||||
// specified
|
// specified
|
||||||
type Config struct {
|
type Config struct {
|
||||||
URL string
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
Database string
|
|
||||||
UserAgent string
|
|
||||||
Timeout Duration
|
|
||||||
Tags map[string]string
|
|
||||||
|
|
||||||
agent *ast.Table
|
agent *ast.Table
|
||||||
plugins map[string]*ast.Table
|
plugins map[string]*ast.Table
|
||||||
|
outputs map[string]*ast.Table
|
||||||
}
|
}
|
||||||
|
|
||||||
// Plugins returns the configured plugins as a map of name -> plugin toml
|
// Plugins returns the configured plugins as a map of name -> plugin toml
|
||||||
func (c *Config) Plugins() map[string]*ast.Table {
|
func (c *Config) Plugins() map[string]*ast.Table {
|
||||||
return c.plugins
|
return c.plugins
|
||||||
}
|
}
|
||||||
|
type TagFilter struct {
|
||||||
|
Name string
|
||||||
|
Filter []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Outputs returns the configured outputs as a map of name -> output toml
|
||||||
|
func (c *Config) Outputs() map[string]*ast.Table {
|
||||||
|
return c.outputs
|
||||||
|
}
|
||||||
|
|
||||||
// The name of a tag, and the values on which to filter
|
// The name of a tag, and the values on which to filter
|
||||||
type TagFilter struct {
|
type TagFilter struct {
|
||||||
|
@ -64,6 +66,9 @@ type ConfiguredPlugin struct {
|
||||||
|
|
||||||
Drop []string
|
Drop []string
|
||||||
Pass []string
|
Pass []string
|
||||||
|
TagDrop []TagFilter
|
||||||
|
|
||||||
|
TagPass []TagFilter
|
||||||
|
|
||||||
TagDrop []TagFilter
|
TagDrop []TagFilter
|
||||||
TagPass []TagFilter
|
TagPass []TagFilter
|
||||||
|
@ -122,6 +127,13 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ApplyOutput loads the toml config into the given interface
|
||||||
|
func (c *Config) ApplyOutput(name string, v interface{}) error {
|
||||||
|
if c.outputs[name] != nil {
|
||||||
|
return toml.UnmarshalTable(c.outputs[name], v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ApplyAgent loads the toml config into the given interface
|
// ApplyAgent loads the toml config into the given interface
|
||||||
func (c *Config) ApplyAgent(v interface{}) error {
|
func (c *Config) ApplyAgent(v interface{}) error {
|
||||||
if c.agent != nil {
|
if c.agent != nil {
|
||||||
|
@ -225,6 +237,15 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err
|
||||||
|
|
||||||
// PluginsDeclared returns the name of all plugins declared in the config.
|
// PluginsDeclared returns the name of all plugins declared in the config.
|
||||||
func (c *Config) PluginsDeclared() []string {
|
func (c *Config) PluginsDeclared() []string {
|
||||||
|
return declared(c.plugins)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OutputsDeclared returns the name of all outputs declared in the config.
|
||||||
|
func (c *Config) OutputsDeclared() []string {
|
||||||
|
return declared(c.outputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func declared(endpoints map[string]*ast.Table) []string {
|
||||||
var plugins []string
|
var plugins []string
|
||||||
|
|
||||||
for name := range c.plugins {
|
for name := range c.plugins {
|
||||||
|
@ -257,6 +278,7 @@ func LoadConfig(path string) (*Config, error) {
|
||||||
|
|
||||||
c := &Config{
|
c := &Config{
|
||||||
plugins: make(map[string]*ast.Table),
|
plugins: make(map[string]*ast.Table),
|
||||||
|
outputs: make(map[string]*ast.Table),
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, val := range tbl.Fields {
|
for name, val := range tbl.Fields {
|
||||||
|
@ -266,13 +288,16 @@ func LoadConfig(path string) (*Config, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
switch name {
|
switch name {
|
||||||
case "influxdb":
|
|
||||||
err := toml.UnmarshalTable(subtbl, c)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
case "agent":
|
case "agent":
|
||||||
c.agent = subtbl
|
c.agent = subtbl
|
||||||
|
case "outputs":
|
||||||
|
for outputName, outputVal := range subtbl.Fields {
|
||||||
|
outputSubtbl, ok := outputVal.(*ast.Table)
|
||||||
|
if !ok {
|
||||||
|
return nil, errInvalidConfig
|
||||||
|
}
|
||||||
|
c.outputs[outputName] = outputSubtbl
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
c.plugins[name] = subtbl
|
c.plugins[name] = subtbl
|
||||||
}
|
}
|
||||||
|
@ -327,8 +352,11 @@ var header = `# Telegraf configuration
|
||||||
# NOTE: The configuration has a few required parameters. They are marked
|
# NOTE: The configuration has a few required parameters. They are marked
|
||||||
# with 'required'. Be sure to edit those to make this configuration work.
|
# with 'required'. Be sure to edit those to make this configuration work.
|
||||||
|
|
||||||
|
# OUTPUTS
|
||||||
|
[outputs]
|
||||||
|
|
||||||
# Configuration for influxdb server to send metrics to
|
# Configuration for influxdb server to send metrics to
|
||||||
[influxdb]
|
[outputs.influxdb]
|
||||||
# The full HTTP endpoint URL for your InfluxDB instance
|
# The full HTTP endpoint URL for your InfluxDB instance
|
||||||
url = "http://localhost:8086" # required.
|
url = "http://localhost:8086" # required.
|
||||||
|
|
||||||
|
@ -345,12 +373,11 @@ database = "telegraf" # required.
|
||||||
|
|
||||||
# Set the user agent for the POSTs (can be useful for log differentiation)
|
# Set the user agent for the POSTs (can be useful for log differentiation)
|
||||||
# user_agent = "telegraf"
|
# user_agent = "telegraf"
|
||||||
# tags = { "dc": "us-east-1" }
|
|
||||||
|
|
||||||
# Tags can also be specified via a normal map, but only one form at a time:
|
# Tags can also be specified via a normal map, but only one form at a time:
|
||||||
|
|
||||||
# [influxdb.tags]
|
# [influxdb.tags]
|
||||||
# dc = "us-east-1"
|
# tags = { "dc" = "us-east-1" }
|
||||||
|
|
||||||
# Configuration for telegraf itself
|
# Configuration for telegraf itself
|
||||||
# [agent]
|
# [agent]
|
||||||
|
|
|
@ -35,12 +35,11 @@ database = "telegraf" # required.
|
||||||
|
|
||||||
# Set the user agent for the POSTs (can be useful for log differentiation)
|
# Set the user agent for the POSTs (can be useful for log differentiation)
|
||||||
# user_agent = "telegraf"
|
# user_agent = "telegraf"
|
||||||
# tags = { "dc": "us-east-1" }
|
|
||||||
|
|
||||||
# Tags can also be specified via a normal map, but only one form at a time:
|
# Tags can also be specified via a normal map, but only one form at a time:
|
||||||
|
|
||||||
# [influxdb.tags]
|
# [influxdb.tags]
|
||||||
# dc = "us-east-1"
|
# tags = { "dc" = "us-east-1" }
|
||||||
|
|
||||||
# Configuration for telegraf itself
|
# Configuration for telegraf itself
|
||||||
# [agent]
|
# [agent]
|
||||||
|
|
|
@ -3,12 +3,15 @@ interval = "5s"
|
||||||
http = ":11213"
|
http = ":11213"
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
[influxdb]
|
[outputs]
|
||||||
|
[outputs.influxdb]
|
||||||
url = "http://localhost:8086"
|
url = "http://localhost:8086"
|
||||||
username = "root"
|
username = "root"
|
||||||
password = "root"
|
password = "root"
|
||||||
database = "telegraf"
|
database = "telegraf"
|
||||||
tags = { dc = "us-phx-1" }
|
|
||||||
|
[tags.influxdb]
|
||||||
|
tags = { "dc" = "us-phx-1" }
|
||||||
|
|
||||||
[redis]
|
[redis]
|
||||||
address = ":6379"
|
address = ":6379"
|
||||||
|
|
Loading…
Reference in New Issue