package vsphere import ( "context" "log" "sync" ) // WorkerFunc is a function that is supposed to do the actual work // of the WorkerPool. It is similar to the "map" portion of the // map/reduce semantics, in that it takes a single value as an input, // does some processing and returns a single result. type WorkerFunc func(context.Context, interface{}) interface{} // PushFunc is called from a FillerFunc to push a workitem onto // the input channel. Wraps some logic for gracefulk shutdowns. type PushFunc func(context.Context, interface{}) bool // DrainerFunc represents a function used to "drain" the WorkerPool, // i.e. pull out all the results generated by the workers and processing // them. The DrainerFunc is called once per result produced. // If the function returns false, the draining of the pool is aborted. type DrainerFunc func(context.Context, interface{}) bool // FillerFunc represents a function for filling the WorkerPool with jobs. // It is called once and is responsible for pushing jobs onto the supplied channel. type FillerFunc func(context.Context, PushFunc) // WorkerPool implements a simple work pooling mechanism. It runs a predefined // number of goroutines to process jobs. Jobs are inserted using the Fill call // and results are retrieved through the Drain function. type WorkerPool struct { wg sync.WaitGroup In chan interface{} Out chan interface{} } // NewWorkerPool creates a worker pool func NewWorkerPool(bufsize int) *WorkerPool { return &WorkerPool{ In: make(chan interface{}, bufsize), Out: make(chan interface{}, bufsize), } } func (w *WorkerPool) push(ctx context.Context, job interface{}) bool { select { case w.In <- job: return true case <-ctx.Done(): return false } } func (w *WorkerPool) pushOut(ctx context.Context, result interface{}) bool { select { case w.Out <- result: return true case <-ctx.Done(): return false } } // Run takes a WorkerFunc and runs it in 'n' goroutines. func (w *WorkerPool) Run(ctx context.Context, f WorkerFunc, n int) bool { w.wg.Add(1) go func() { defer w.wg.Done() var localWg sync.WaitGroup localWg.Add(n) for i := 0; i < n; i++ { go func() { defer localWg.Done() for { select { case job, ok := <-w.In: if !ok { return } w.pushOut(ctx, f(ctx, job)) case <-ctx.Done(): log.Printf("D! [input.vsphere]: Stop requested for worker pool. Exiting.") return } } }() } localWg.Wait() close(w.Out) }() return ctx.Err() == nil } // Fill runs a FillerFunc responsible for supplying work to the pool. You may only // call Fill once. Calling it twice will panic. func (w *WorkerPool) Fill(ctx context.Context, f FillerFunc) bool { w.wg.Add(1) go func() { defer w.wg.Done() f(ctx, w.push) close(w.In) }() return true } // Drain runs a DrainerFunc for each result generated by the workers. func (w *WorkerPool) Drain(ctx context.Context, f DrainerFunc) bool { w.wg.Add(1) go func() { defer w.wg.Done() for result := range w.Out { if !f(ctx, result) { break } } }() w.wg.Wait() return ctx.Err() != nil }