Set a timeout for calls to input.Gather

Changing the internal behavior around running plugins. Each plugin
will now have it's own goroutine with it's own ticker. This means that a
hung plugin will not block any other plugins. When a plugin is hung, we
will log an error message every interval, letting users know which
plugin is hung.

Currently the input interface does not have any methods for killing a
running Gather call, so there is nothing we can do but log an "ERROR"
and move on. This will give some visibility into the plugin that is
acting up.

closes #1230
fixes #479
This commit is contained in:
Cameron Sparr
2016-05-19 16:36:58 +01:00
parent 3c5c3b98df
commit 4f5d5926d9
3 changed files with 91 additions and 92 deletions

View File

@@ -102,71 +102,20 @@ func panicRecover(input *internal_models.RunningInput) {
}
}
// gatherParallel runs the inputs that are using the same reporting interval
// as the telegraf agent.
func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error {
var wg sync.WaitGroup
start := time.Now()
counter := 0
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for _, input := range a.Config.Inputs {
if input.Config.Interval != 0 {
continue
}
wg.Add(1)
counter++
go func(input *internal_models.RunningInput) {
defer panicRecover(input)
defer wg.Done()
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}
if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}
}(input)
}
if counter == 0 {
return nil
}
wg.Wait()
elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Gathered metrics, (%s interval), from %d inputs in %s\n",
a.Config.Agent.Interval.Duration, counter, elapsed)
}
return nil
}
// gatherSeparate runs the inputs that have been configured with their own
// gatherer runs the inputs that have been configured with their own
// reporting interval.
func (a *Agent) gatherSeparate(
func (a *Agent) gatherer(
shutdown chan struct{},
input *internal_models.RunningInput,
interval time.Duration,
metricC chan telegraf.Metric,
) error {
defer panicRecover(input)
ticker := time.NewTicker(input.Config.Interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for {
var outerr error
@@ -176,14 +125,23 @@ func (a *Agent) gatherSeparate(
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}
gatherWithTimeout(shutdown, input, acc, interval)
elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
log.Printf("Gathered metrics, (separate %s interval), from %s in %s\n",
input.Config.Interval, input.Name, elapsed)
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}
if outerr != nil {
@@ -199,6 +157,42 @@ func (a *Agent) gatherSeparate(
}
}
// gatherWithTimeout gathers from the given input, with the given timeout.
// when the given timeout is reached, gatherWithTimeout logs an error message
// but continues waiting for it to return. This is to avoid leaving behind
// hung processes, and to prevent re-calling the same hung process over and
// over.
func gatherWithTimeout(
shutdown chan struct{},
input *internal_models.RunningInput,
acc *accumulator,
timeout time.Duration,
) {
ticker := time.NewTicker(timeout)
defer ticker.Stop()
done := make(chan error)
go func() {
done <- input.Input.Gather(acc)
}()
for {
select {
case err := <-done:
if err != nil {
log.Printf("ERROR in input [%s]: %s", input.Name, err)
}
return
case <-ticker.C:
log.Printf("ERROR: input [%s] took longer to collect than "+
"collection interval (%s)",
input.Name, timeout)
continue
case <-shutdown:
return
}
}
}
// Test verifies that we can 'Gather' from all inputs with their configured
// Config struct
func (a *Agent) Test() error {
@@ -220,7 +214,7 @@ func (a *Agent) Test() error {
for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(true)
acc.SetTrace(true)
acc.setDefaultTags(a.Config.Tags)
fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
@@ -348,7 +342,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
i := int64(a.Config.Agent.Interval.Duration)
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
ticker := time.NewTicker(a.Config.Agent.Interval.Duration)
wg.Add(1)
go func() {
@@ -359,32 +352,21 @@ func (a *Agent) Run(shutdown chan struct{}) error {
}
}()
wg.Add(len(a.Config.Inputs))
for _, input := range a.Config.Inputs {
// Special handling for inputs that have their own collection interval
// configured. Default intervals are handled below with gatherParallel
interval := a.Config.Agent.Interval.Duration
// overwrite global interval if this plugin has it's own.
if input.Config.Interval != 0 {
wg.Add(1)
go func(input *internal_models.RunningInput) {
defer wg.Done()
if err := a.gatherSeparate(shutdown, input, metricC); err != nil {
log.Printf(err.Error())
}
}(input)
interval = input.Config.Interval
}
go func(in *internal_models.RunningInput, interv time.Duration) {
defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error())
}
}(input, interval)
}
defer wg.Wait()
for {
if err := a.gatherParallel(metricC); err != nil {
log.Printf(err.Error())
}
select {
case <-shutdown:
return nil
case <-ticker.C:
continue
}
}
wg.Wait()
return nil
}