From 4ec7999186e674fb7a5e975a14082b4e934b904a Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Mon, 25 Jun 2018 17:03:41 -0700 Subject: [PATCH] use semaphore model --- Gopkg.lock | 8 ++- plugins/inputs/jenkins/jenkins.go | 81 +++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 243c15fac..15aeb9bf2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -465,6 +465,12 @@ packages = ["."] revision = "95032a82bc518f77982ea72343cc1ade730072f0" +[[projects]] + name = "github.com/kelwang/gojenkins" + packages = ["."] + revision = "4ea2f2d0a3e1350cf32d33b31ad175a2521425de" + version = "v1.0.1" + [[projects]] branch = "master" name = "github.com/kr/logfmt" @@ -968,6 +974,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "ef9c9e12bdc7296af3d450d44966b8ae465e9b845c1409b8027e52998b3c01e0" + inputs-digest = "ef689441bcd85892bf7b2818fd5def10c2d59112ad13cdf44512efb1f2e48c6a" solver-name = "gps-cdcl" solver-version = 1 diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go index e68e4715b..c83884db4 100644 --- a/plugins/inputs/jenkins/jenkins.go +++ b/plugins/inputs/jenkins/jenkins.go @@ -37,6 +37,8 @@ type Jenkins struct { NodeExclude []string `toml:"node_exclude"` nodeFilter filter.Filter + + semaphore chan struct{} } type byBuildNumber []gojenkins.JobBuild @@ -159,6 +161,8 @@ func (j *Jenkins) newInstance(client *http.Client) error { j.MaxSubJobPerLayer = 10 } + j.semaphore = make(chan struct{}, j.MaxConnections) + return nil } @@ -170,7 +174,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. // detect the parsing error, since gojenkins lib won't do it if info == nil || info.DisplayName == "" { - return fmt.Errorf("error empty node name[%s]: ", j.URL) + return fmt.Errorf("error empty node name[%s]: ", url) } tags["node_name"] = info.DisplayName @@ -181,7 +185,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. } if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { - return fmt.Errorf("error empty monitor data[%s]: ", j.URL) + return fmt.Errorf("error empty monitor data[%s]: ", url) } tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) if !ok { @@ -246,12 +250,18 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. } func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { - nodes, err := j.instance.GetAllNodes() + var nodes []*gojenkins.Node + var err error + err = j.doGet(func() error { + nodes, err = j.instance.GetAllNodes() + return err + }) + url := j.URL + "/computer/api/json" // since gojenkins lib will never return error // returns error for len(nodes) is 0 if err != nil || len(nodes) == 0 { - acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", j.URL, err)) + acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", url, err)) return } // get node data @@ -270,33 +280,42 @@ func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) { acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err)) return } - jobsC := make(chan jobRequest, j.MaxConnections) var wg sync.WaitGroup for _, job := range jobs { wg.Add(1) - go func(name string) { - jobsC <- jobRequest{ + go func(name string, wg *sync.WaitGroup, acc telegraf.Accumulator) { + defer wg.Done() + if te := j.getJobDetail(jobRequest{ name: name, parents: []string{}, layer: 0, + }, wg, acc); te != nil { + acc.AddError(te) } - }(job.Name) - } - - for i := 0; i < j.MaxConnections; i++ { - go func(jobsC chan jobRequest, acc telegraf.Accumulator, wg *sync.WaitGroup) { - for sj := range jobsC { - if te := j.getJobDetail(sj, jobsC, wg, acc); te != nil { - acc.AddError(te) - } - } - }(jobsC, acc, &wg) + }(job.Name, &wg, acc) } wg.Wait() } -func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { - defer wg.Done() +// wrap the tcp request with doGet +// block tcp request if buffered channel is full +func (j *Jenkins) doGet(tcp func() error) error { + j.semaphore <- struct{}{} + if err := tcp(); err != nil { + if err == gojenkins.ErrSessionExpired { + // ignore the error here, since config parsing should be finished. + client, _ := j.initClient() + // SessionExpired use a go routine to create a new session + go j.newInstance(client) + } + <-j.semaphore + return err + } + <-j.semaphore + return nil +} + +func (j *Jenkins) getJobDetail(sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth { return nil } @@ -305,7 +324,12 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. return nil } url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json" - jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) + var jobDetail *gojenkins.Job + var err error + err = j.doGet(func() error { + jobDetail, err = j.instance.GetJob(sj.name, sj.parents...) + return err + }) if err != nil { return fmt.Errorf("error retrieving inner jobs[%s]: ", url) } @@ -316,13 +340,16 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. } wg.Add(1) // schedule tcp fetch for inner jobs - go func(innerJob gojenkins.InnerJob, sj jobRequest) { - jobsC <- jobRequest{ + go func(innerJob gojenkins.InnerJob, sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) { + defer wg.Done() + if te := j.getJobDetail(jobRequest{ name: innerJob.Name, parents: sj.combined(), layer: sj.layer + 1, + }, wg, acc); te != nil { + acc.AddError(te) } - }(innerJob, sj) + }(innerJob, sj, wg, acc) } // collect build info @@ -339,7 +366,11 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. Base: baseURL, Raw: new(gojenkins.BuildResponse), } - status, err := build.Poll() + var status int + err = j.doGet(func() error { + status, err = build.Poll() + return err + }) if err != nil || status != 200 { if err == nil && status != 200 { err = fmt.Errorf("status code %d", status)