From d0734b105bd6ecc9686fe665a10494b8529b3339 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 17 Feb 2016 14:50:19 -0700 Subject: [PATCH] Start service plugins immediately, fix off-by-one bug --- agent/agent.go | 31 ++++++++++++++++--------------- internal/models/running_output.go | 3 ++- plugins/outputs/mqtt/mqtt.go | 2 +- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 5a70097fc..42ade45f2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -322,6 +322,22 @@ func (a *Agent) Run(shutdown chan struct{}) error { // channel shared between all input threads for accumulating metrics metricC := make(chan telegraf.Metric, 10000) + for _, input := range a.Config.Inputs { + // Start service of any ServicePlugins + switch p := input.Input.(type) { + case telegraf.ServiceInput: + acc := NewAccumulator(input.Config, metricC) + acc.SetDebug(a.Config.Agent.Debug) + acc.setDefaultTags(a.Config.Tags) + if err := p.Start(acc); err != nil { + log.Printf("Service for input %s failed to start, exiting\n%s\n", + input.Name, err.Error()) + return err + } + defer p.Stop() + } + } + // Round collection to nearest interval by sleeping if a.Config.Agent.RoundInterval { i := int64(a.Config.Agent.Interval.Duration) @@ -339,21 +355,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { }() for _, input := range a.Config.Inputs { - - // Start service of any ServicePlugins - switch p := input.Input.(type) { - case telegraf.ServiceInput: - acc := NewAccumulator(input.Config, metricC) - acc.SetDebug(a.Config.Agent.Debug) - acc.setDefaultTags(a.Config.Tags) - if err := p.Start(acc); err != nil { - log.Printf("Service for input %s failed to start, exiting\n%s\n", - input.Name, err.Error()) - return err - } - defer p.Stop() - } - // Special handling for inputs that have their own collection interval // configured. Default intervals are handled below with gatherParallel if input.Config.Interval != 0 { diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 1b27f66de..de7c8ab21 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -59,10 +59,11 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { ro.Lock() defer ro.Unlock() - if len(ro.metrics) < ro.MetricBufferLimit { + if len(ro.metrics) < ro.MetricBufferLimit-1 { ro.metrics = append(ro.metrics, metric) } else { if ro.FlushBufferWhenFull { + ro.metrics = append(ro.metrics, metric) tmpmetrics := make([]telegraf.Metric, len(ro.metrics)) copy(tmpmetrics, ro.metrics) ro.metrics = make([]telegraf.Metric, 0) diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index d28a04d72..48046878b 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -18,7 +18,7 @@ var sampleConfig = ` ### MQTT outputs send metrics to this topic format ### "///" - ### ex: prefix/host/web01.example.com/mem + ### ex: prefix/web01.example.com/mem topic_prefix = "telegraf" ### username and password to connect MQTT server.