Move exec WaitGroup from Exec instance level to Gather.

If Gather is run concurently the shared WaitGroup variable never finishes.

closes #1463
closes #1464
This commit is contained in:
Tim Allen 2016-07-11 08:58:00 -05:00 committed by Cameron Sparr
parent 2d6c8767f7
commit 1d9745ee98
2 changed files with 7 additions and 7 deletions

View File

@ -59,6 +59,7 @@ should now look like:
- [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior - [#1412](https://github.com/influxdata/telegraf/pull/1412): Instrumental output has better reconnect behavior
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues. - [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix. - [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
## v1.0 beta 2 [2016-06-21] ## v1.0 beta 2 [2016-06-21]

View File

@ -48,8 +48,6 @@ type Exec struct {
parser parsers.Parser parser parsers.Parser
wg sync.WaitGroup
runner Runner runner Runner
errChan chan error errChan chan error
} }
@ -119,8 +117,8 @@ func (c CommandRunner) Run(
return out.Bytes(), nil return out.Bytes(), nil
} }
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator) { func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer e.wg.Done() defer wg.Done()
out, err := e.runner.Run(e, command, acc) out, err := e.runner.Run(e, command, acc)
if err != nil { if err != nil {
@ -151,6 +149,7 @@ func (e *Exec) SetParser(parser parsers.Parser) {
} }
func (e *Exec) Gather(acc telegraf.Accumulator) error { func (e *Exec) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Legacy single command support // Legacy single command support
if e.Command != "" { if e.Command != "" {
e.Commands = append(e.Commands, e.Command) e.Commands = append(e.Commands, e.Command)
@ -190,11 +189,11 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
errChan := errchan.New(len(commands)) errChan := errchan.New(len(commands))
e.errChan = errChan.C e.errChan = errChan.C
e.wg.Add(len(commands)) wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc) go e.ProcessCommand(command, acc, &wg)
} }
e.wg.Wait() wg.Wait()
return errChan.Error() return errChan.Error()
} }