diff --git a/agent.go b/agent.go index c1e7bb434..e1292bffe 100644 --- a/agent.go +++ b/agent.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal/config" "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" @@ -19,77 +18,27 @@ import ( // Agent runs telegraf and collects data based on the given config type Agent struct { - - // Interval at which to gather information - Interval internal.Duration - - // RoundInterval rounds collection interval to 'interval'. - // ie, if Interval=10s then always collect on :00, :10, :20, etc. - RoundInterval bool - - // Interval at which to flush data - FlushInterval internal.Duration - - // FlushRetries is the number of times to retry each data flush - FlushRetries int - - // FlushJitter tells - FlushJitter internal.Duration - - // TODO(cam): Remove UTC and Precision parameters, they are no longer - // valid for the agent config. Leaving them here for now for backwards- - // compatability - - // Option for outputting data in UTC - UTC bool `toml:"utc"` - - // Precision to write data at - // Valid values for Precision are n, u, ms, s, m, and h - Precision string - - // Option for running in debug mode - Debug bool - Hostname string - - Tags map[string]string - Config *config.Config } // NewAgent returns an Agent struct based off the given Config func NewAgent(config *config.Config) (*Agent, error) { - agent := &Agent{ - Tags: make(map[string]string), - Config: config, - Interval: internal.Duration{10 * time.Second}, - RoundInterval: true, - FlushInterval: internal.Duration{10 * time.Second}, - FlushRetries: 2, - FlushJitter: internal.Duration{5 * time.Second}, + a := &Agent{ + Config: config, } - // Apply the toml table to the agent config, overriding defaults - err := config.ApplyAgent(agent) - if err != nil { - return nil, err - } - - if agent.Hostname == "" { + if a.Config.Agent.Hostname == "" { hostname, err := os.Hostname() if err != nil { return nil, err } - agent.Hostname = hostname + a.Config.Agent.Hostname = hostname } - if config.Tags == nil { - config.Tags = map[string]string{} - } + config.Tags["host"] = a.Config.Agent.Hostname - config.Tags["host"] = agent.Hostname - - return agent, nil + return a, nil } // Connect connects to all configured outputs @@ -104,7 +53,7 @@ func (a *Agent) Connect() error { } } - if a.Debug { + if a.Config.Agent.Debug { log.Printf("Attempting connection to output: %s\n", o.Name) } err := o.Output.Connect() @@ -116,7 +65,7 @@ func (a *Agent) Connect() error { return err } } - if a.Debug { + if a.Config.Agent.Debug { log.Printf("Successfully connected to output: %s\n", o.Name) } } @@ -154,9 +103,9 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { defer wg.Done() acc := NewAccumulator(plugin.Config, pointChan) - acc.SetDebug(a.Debug) + acc.SetDebug(a.Config.Agent.Debug) acc.SetPrefix(plugin.Name + "_") - acc.SetDefaultTags(a.Tags) + acc.SetDefaultTags(a.Config.Tags) if err := plugin.Plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.Name, err) @@ -169,7 +118,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { elapsed := time.Since(start) log.Printf("Gathered metrics, (%s interval), from %d plugins in %s\n", - a.Interval, counter, elapsed) + a.Config.Agent.Interval, counter, elapsed) return nil } @@ -187,9 +136,9 @@ func (a *Agent) gatherSeparate( start := time.Now() acc := NewAccumulator(plugin.Config, pointChan) - acc.SetDebug(a.Debug) + acc.SetDebug(a.Config.Agent.Debug) acc.SetPrefix(plugin.Name + "_") - acc.SetDefaultTags(a.Tags) + acc.SetDefaultTags(a.Config.Tags) if err := plugin.Plugin.Gather(acc); err != nil { log.Printf("Error in plugin [%s]: %s", plugin.Name, err) @@ -273,7 +222,7 @@ func (a *Agent) writeOutput( return } retry := 0 - retries := a.FlushRetries + retries := a.Config.Agent.FlushRetries start := time.Now() for { @@ -299,8 +248,8 @@ func (a *Agent) writeOutput( } else if err != nil { // Sleep for a retry log.Printf("Error in output [%s]: %s, retrying in %s", - ro.Name, err.Error(), a.FlushInterval.Duration) - time.Sleep(a.FlushInterval.Duration) + ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration) + time.Sleep(a.Config.Agent.FlushInterval.Duration) } } @@ -330,7 +279,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er // the flusher will flush after metrics are collected. time.Sleep(time.Millisecond * 100) - ticker := time.NewTicker(a.FlushInterval.Duration) + ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) points := make([]*client.Point, 0) for { @@ -373,22 +322,23 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration { func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup - a.FlushInterval.Duration = jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + a.Config.Agent.FlushInterval.Duration = jitterInterval(a.Config.Agent.FlushInterval.Duration, + a.Config.Agent.FlushJitter.Duration) log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+ "Flush Interval:%s\n", - a.Interval, a.Debug, a.Hostname, a.FlushInterval) + a.Config.Agent.Interval, a.Config.Agent.Debug, + a.Config.Agent.Hostname, a.Config.Agent.FlushInterval) // channel shared between all plugin threads for accumulating points pointChan := make(chan *client.Point, 1000) // Round collection to nearest interval by sleeping - if a.RoundInterval { - i := int64(a.Interval.Duration) + if a.Config.Agent.RoundInterval { + i := int64(a.Config.Agent.Interval.Duration) time.Sleep(time.Duration(i - (time.Now().UnixNano() % i))) } - ticker := time.NewTicker(a.Interval.Duration) + ticker := time.NewTicker(a.Config.Agent.Interval.Duration) wg.Add(1) go func() { diff --git a/agent_test.go b/agent_test.go index 275ab96ee..7dd65ef26 100644 --- a/agent_test.go +++ b/agent_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/internal/config" // needing to load the plugins @@ -85,12 +84,8 @@ func TestAgent_LoadOutput(t *testing.T) { } func TestAgent_ZeroJitter(t *testing.T) { - a := &Agent{ - FlushInterval: internal.Duration{10 * time.Second}, - FlushJitter: internal.Duration{0 * time.Second}, - } - flushinterval := jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + flushinterval := jitterInterval(time.Duration(10*time.Second), + time.Duration(0*time.Second)) actual := flushinterval.Nanoseconds() exp := time.Duration(10 * time.Second).Nanoseconds() @@ -105,13 +100,8 @@ func TestAgent_ZeroInterval(t *testing.T) { max := time.Duration(5 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - a := &Agent{ - FlushInterval: internal.Duration{0 * time.Second}, - FlushJitter: internal.Duration{5 * time.Second}, - } - - flushinterval := jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + flushinterval := jitterInterval(time.Duration(0*time.Second), + time.Duration(5*time.Second)) actual := flushinterval.Nanoseconds() if actual > max { @@ -126,13 +116,8 @@ func TestAgent_ZeroInterval(t *testing.T) { } func TestAgent_ZeroBoth(t *testing.T) { - a := &Agent{ - FlushInterval: internal.Duration{0 * time.Second}, - FlushJitter: internal.Duration{0 * time.Second}, - } - - flushinterval := jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + flushinterval := jitterInterval(time.Duration(0*time.Second), + time.Duration(0*time.Second)) actual := flushinterval exp := time.Duration(500 * time.Millisecond) @@ -146,12 +131,8 @@ func TestAgent_JitterMax(t *testing.T) { max := time.Duration(32 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - a := &Agent{ - FlushInterval: internal.Duration{30 * time.Second}, - FlushJitter: internal.Duration{2 * time.Second}, - } - flushinterval := jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + flushinterval := jitterInterval(time.Duration(30*time.Second), + time.Duration(2*time.Second)) actual := flushinterval.Nanoseconds() if actual > max { t.Errorf("Didn't expect interval %d to be > %d", actual, max) @@ -164,12 +145,8 @@ func TestAgent_JitterMin(t *testing.T) { min := time.Duration(30 * time.Second).Nanoseconds() for i := 0; i < 1000; i++ { - a := &Agent{ - FlushInterval: internal.Duration{30 * time.Second}, - FlushJitter: internal.Duration{2 * time.Second}, - } - flushinterval := jitterInterval(a.FlushInterval.Duration, - a.FlushJitter.Duration) + flushinterval := jitterInterval(time.Duration(30*time.Second), + time.Duration(2*time.Second)) actual := flushinterval.Nanoseconds() if actual < min { t.Errorf("Didn't expect interval %d to be < %d", actual, min) diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index f5f724ad1..8f9a628f9 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -108,7 +108,7 @@ func main() { } if *fDebug { - ag.Debug = true + ag.Config.Agent.Debug = true } if *fTest { diff --git a/internal/config/config.go b/internal/config/config.go index e0f6b673b..5bb77f2c6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/outputs" "github.com/influxdb/telegraf/plugins" @@ -25,13 +26,22 @@ type Config struct { PluginFilters []string OutputFilters []string - Agent *ast.Table + Agent *AgentConfig Plugins []*RunningPlugin Outputs []*RunningOutput } func NewConfig() *Config { c := &Config{ + // Agent defaults: + Agent: &AgentConfig{ + Interval: internal.Duration{10 * time.Second}, + RoundInterval: true, + FlushInterval: internal.Duration{10 * time.Second}, + FlushRetries: 2, + FlushJitter: internal.Duration{5 * time.Second}, + }, + Tags: make(map[string]string), Plugins: make([]*RunningPlugin, 0), Outputs: make([]*RunningOutput, 0), @@ -41,6 +51,34 @@ func NewConfig() *Config { return c } +type AgentConfig struct { + // Interval at which to gather information + Interval internal.Duration + + // RoundInterval rounds collection interval to 'interval'. + // ie, if Interval=10s then always collect on :00, :10, :20, etc. + RoundInterval bool + + // Interval at which to flush data + FlushInterval internal.Duration + + // FlushRetries is the number of times to retry each data flush + FlushRetries int + + // FlushJitter tells + FlushJitter internal.Duration + + // TODO(cam): Remove UTC and Precision parameters, they are no longer + // valid for the agent config. Leaving them here for now for backwards- + // compatability + UTC bool `toml:"utc"` + Precision string + + // Option for running in debug mode + Debug bool + Hostname string +} + // TagFilter is the name of a tag, and the values on which to filter type TagFilter struct { Name string @@ -146,16 +184,6 @@ func (c *Config) OutputNames() []string { return name } -// ApplyAgent loads the toml config into the given Agent object, overriding -// defaults (such as collection duration) with the values from the toml config. -func (c *Config) ApplyAgent(a interface{}) error { - if c.Agent != nil { - return toml.UnmarshalTable(c.Agent, a) - } - - return nil -} - // ListTags returns a string of tags specified in the config, // line-protocol style func (c *Config) ListTags() string { @@ -346,7 +374,7 @@ func (c *Config) LoadDirectory(path string) error { continue } name := entry.Name() - if name[len(name)-5:] != ".conf" { + if len(name) < 6 || name[len(name)-5:] != ".conf" { continue } err := c.LoadConfig(filepath.Join(path, name)) @@ -377,7 +405,10 @@ func (c *Config) LoadConfig(path string) error { switch name { case "agent": - c.Agent = subTable + if err = toml.UnmarshalTable(subTable, c.Agent); err != nil { + log.Printf("Could not parse [agent] config\n") + return err + } case "tags": if err = toml.UnmarshalTable(subTable, c.Tags); err != nil { log.Printf("Could not parse [tags] config\n")