Move exec WaitGroup from Exec instance level to Gather function level. If Gather is run concurently the shared WaitGroup variable never finishes.

This commit is contained in:
Tim Allen 2016-07-11 08:58:00 -05:00
parent 207c5498e7
commit 033d583f41
2 changed files with 7 additions and 7 deletions

View File

@ -50,6 +50,7 @@ should now look like:
- [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load. - [#1432](https://github.com/influxdata/telegraf/issues/1432): Panic fix for multiple graphite outputs under very high load.
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

View File

@ -48,8 +48,6 @@ type Exec struct {
parser parsers.Parser parser parsers.Parser
wg sync.WaitGroup
runner Runner runner Runner
errChan chan error errChan chan error
} }
@ -119,8 +117,8 @@ func (c CommandRunner) Run(
return out.Bytes(), nil return out.Bytes(), nil
} }
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer e.wg.Done() defer wg.Done()
out, err := e.runner.Run(e, command, acc) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) {
} }
func (e *Exec) Gather(acc telegraf.Accumulator) error { func (e *Exec) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Legacy single command support // Legacy single command support
if e.Command != "" { if e.Command != "" {
e.Commands = append(e.Commands, e.Command) e.Commands = append(e.Commands, e.Command)
@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
errChan := errchan.New(len(commands)) errChan := errchan.New(len(commands))
e.errChan = errChan.C e.errChan = errChan.C
e.wg.Add(len(commands)) wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc) go e.ProcessCommand(command, acc, &wg)
} }
e.wg.Wait() wg.Wait()
return errChan.Error() return errChan.Error()
} }