Start service plugins immediately, fix off-by-one bug

This commit is contained in:
Cameron Sparr 2016-02-17 14:50:19 -07:00
parent 4860dc148c
commit d0734b105b
3 changed files with 19 additions and 17 deletions

View File

@ -322,6 +322,22 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// channel shared between all input threads for accumulating metrics // channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000) metricC := make(chan telegraf.Metric, 10000)
for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
defer p.Stop()
}
}
// Round collection to nearest interval by sleeping // Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval { if a.Config.Agent.RoundInterval {
i := int64(a.Config.Agent.Interval.Duration) i := int64(a.Config.Agent.Interval.Duration)
@ -339,21 +355,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}() }()
for _, input := range a.Config.Inputs { for _, input := range a.Config.Inputs {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
defer p.Stop()
}
// Special handling for inputs that have their own collection interval // Special handling for inputs that have their own collection interval
// configured. Default intervals are handled below with gatherParallel // configured. Default intervals are handled below with gatherParallel
if input.Config.Interval != 0 { if input.Config.Interval != 0 {

View File

@ -59,10 +59,11 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
ro.Lock() ro.Lock()
defer ro.Unlock() defer ro.Unlock()
if len(ro.metrics) < ro.MetricBufferLimit { if len(ro.metrics) < ro.MetricBufferLimit-1 {
ro.metrics = append(ro.metrics, metric) ro.metrics = append(ro.metrics, metric)
} else { } else {
if ro.FlushBufferWhenFull { if ro.FlushBufferWhenFull {
ro.metrics = append(ro.metrics, metric)
tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics) copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0) ro.metrics = make([]telegraf.Metric, 0)

View File

@ -18,7 +18,7 @@ var sampleConfig = `
### MQTT outputs send metrics to this topic format ### MQTT outputs send metrics to this topic format
### "<topic_prefix>/<hostname>/<pluginname>/" ### "<topic_prefix>/<hostname>/<pluginname>/"
### ex: prefix/host/web01.example.com/mem ### ex: prefix/web01.example.com/mem
topic_prefix = "telegraf" topic_prefix = "telegraf"
### username and password to connect MQTT server. ### username and password to connect MQTT server.