Start service plugins immediately, fix off-by-one bug
This commit is contained in:
parent
8b4af25c93
commit
716b375ab6
|
@ -322,6 +322,22 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
// channel shared between all input threads for accumulating metrics
|
// channel shared between all input threads for accumulating metrics
|
||||||
metricC := make(chan telegraf.Metric, 10000)
|
metricC := make(chan telegraf.Metric, 10000)
|
||||||
|
|
||||||
|
for _, input := range a.Config.Inputs {
|
||||||
|
// Start service of any ServicePlugins
|
||||||
|
switch p := input.Input.(type) {
|
||||||
|
case telegraf.ServiceInput:
|
||||||
|
acc := NewAccumulator(input.Config, metricC)
|
||||||
|
acc.SetDebug(a.Config.Agent.Debug)
|
||||||
|
acc.setDefaultTags(a.Config.Tags)
|
||||||
|
if err := p.Start(acc); err != nil {
|
||||||
|
log.Printf("Service for input %s failed to start, exiting\n%s\n",
|
||||||
|
input.Name, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer p.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Round collection to nearest interval by sleeping
|
// Round collection to nearest interval by sleeping
|
||||||
if a.Config.Agent.RoundInterval {
|
if a.Config.Agent.RoundInterval {
|
||||||
i := int64(a.Config.Agent.Interval.Duration)
|
i := int64(a.Config.Agent.Interval.Duration)
|
||||||
|
@ -339,21 +355,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, input := range a.Config.Inputs {
|
for _, input := range a.Config.Inputs {
|
||||||
|
|
||||||
// Start service of any ServicePlugins
|
|
||||||
switch p := input.Input.(type) {
|
|
||||||
case telegraf.ServiceInput:
|
|
||||||
acc := NewAccumulator(input.Config, metricC)
|
|
||||||
acc.SetDebug(a.Config.Agent.Debug)
|
|
||||||
acc.setDefaultTags(a.Config.Tags)
|
|
||||||
if err := p.Start(acc); err != nil {
|
|
||||||
log.Printf("Service for input %s failed to start, exiting\n%s\n",
|
|
||||||
input.Name, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer p.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Special handling for inputs that have their own collection interval
|
// Special handling for inputs that have their own collection interval
|
||||||
// configured. Default intervals are handled below with gatherParallel
|
// configured. Default intervals are handled below with gatherParallel
|
||||||
if input.Config.Interval != 0 {
|
if input.Config.Interval != 0 {
|
||||||
|
|
|
@ -59,10 +59,11 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
|
||||||
ro.Lock()
|
ro.Lock()
|
||||||
defer ro.Unlock()
|
defer ro.Unlock()
|
||||||
|
|
||||||
if len(ro.metrics) < ro.MetricBufferLimit {
|
if len(ro.metrics) < ro.MetricBufferLimit-1 {
|
||||||
ro.metrics = append(ro.metrics, metric)
|
ro.metrics = append(ro.metrics, metric)
|
||||||
} else {
|
} else {
|
||||||
if ro.FlushBufferWhenFull {
|
if ro.FlushBufferWhenFull {
|
||||||
|
ro.metrics = append(ro.metrics, metric)
|
||||||
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
|
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
|
||||||
copy(tmpmetrics, ro.metrics)
|
copy(tmpmetrics, ro.metrics)
|
||||||
ro.metrics = make([]telegraf.Metric, 0)
|
ro.metrics = make([]telegraf.Metric, 0)
|
||||||
|
|
|
@ -18,7 +18,7 @@ var sampleConfig = `
|
||||||
|
|
||||||
### MQTT outputs send metrics to this topic format
|
### MQTT outputs send metrics to this topic format
|
||||||
### "<topic_prefix>/<hostname>/<pluginname>/"
|
### "<topic_prefix>/<hostname>/<pluginname>/"
|
||||||
### ex: prefix/host/web01.example.com/mem
|
### ex: prefix/web01.example.com/mem
|
||||||
topic_prefix = "telegraf"
|
topic_prefix = "telegraf"
|
||||||
|
|
||||||
### username and password to connect MQTT server.
|
### username and password to connect MQTT server.
|
||||||
|
|
Loading…
Reference in New Issue