use semaphore model

This commit is contained in:
Kelvin Wang 2018-06-25 17:03:41 -07:00
parent 3457c98eb1
commit 4ec7999186
2 changed files with 63 additions and 26 deletions

8
Gopkg.lock generated
View File

@ -465,6 +465,12 @@
packages = ["."] packages = ["."]
revision = "95032a82bc518f77982ea72343cc1ade730072f0" revision = "95032a82bc518f77982ea72343cc1ade730072f0"
[[projects]]
name = "github.com/kelwang/gojenkins"
packages = ["."]
revision = "4ea2f2d0a3e1350cf32d33b31ad175a2521425de"
version = "v1.0.1"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "github.com/kr/logfmt" name = "github.com/kr/logfmt"
@ -968,6 +974,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "ef9c9e12bdc7296af3d450d44966b8ae465e9b845c1409b8027e52998b3c01e0" inputs-digest = "ef689441bcd85892bf7b2818fd5def10c2d59112ad13cdf44512efb1f2e48c6a"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -37,6 +37,8 @@ type Jenkins struct {
NodeExclude []string `toml:"node_exclude"` NodeExclude []string `toml:"node_exclude"`
nodeFilter filter.Filter nodeFilter filter.Filter
semaphore chan struct{}
} }
type byBuildNumber []gojenkins.JobBuild type byBuildNumber []gojenkins.JobBuild
@ -159,6 +161,8 @@ func (j *Jenkins) newInstance(client *http.Client) error {
j.MaxSubJobPerLayer = 10 j.MaxSubJobPerLayer = 10
} }
j.semaphore = make(chan struct{}, j.MaxConnections)
return nil return nil
} }
@ -170,7 +174,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.
// detect the parsing error, since gojenkins lib won't do it // detect the parsing error, since gojenkins lib won't do it
if info == nil || info.DisplayName == "" { if info == nil || info.DisplayName == "" {
return fmt.Errorf("error empty node name[%s]: ", j.URL) return fmt.Errorf("error empty node name[%s]: ", url)
} }
tags["node_name"] = info.DisplayName tags["node_name"] = info.DisplayName
@ -181,7 +185,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.
} }
if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil {
return fmt.Errorf("error empty monitor data[%s]: ", j.URL) return fmt.Errorf("error empty monitor data[%s]: ", url)
} }
tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string)
if !ok { if !ok {
@ -246,12 +250,18 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.
} }
func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) {
nodes, err := j.instance.GetAllNodes() var nodes []*gojenkins.Node
var err error
err = j.doGet(func() error {
nodes, err = j.instance.GetAllNodes()
return err
})
url := j.URL + "/computer/api/json" url := j.URL + "/computer/api/json"
// since gojenkins lib will never return error // since gojenkins lib will never return error
// returns error for len(nodes) is 0 // returns error for len(nodes) is 0
if err != nil || len(nodes) == 0 { if err != nil || len(nodes) == 0 {
acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", j.URL, err)) acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", url, err))
return return
} }
// get node data // get node data
@ -270,33 +280,42 @@ func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) {
acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err)) acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err))
return return
} }
jobsC := make(chan jobRequest, j.MaxConnections)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, job := range jobs { for _, job := range jobs {
wg.Add(1) wg.Add(1)
go func(name string) { go func(name string, wg *sync.WaitGroup, acc telegraf.Accumulator) {
jobsC <- jobRequest{ defer wg.Done()
if te := j.getJobDetail(jobRequest{
name: name, name: name,
parents: []string{}, parents: []string{},
layer: 0, layer: 0,
} }, wg, acc); te != nil {
}(job.Name)
}
for i := 0; i < j.MaxConnections; i++ {
go func(jobsC chan jobRequest, acc telegraf.Accumulator, wg *sync.WaitGroup) {
for sj := range jobsC {
if te := j.getJobDetail(sj, jobsC, wg, acc); te != nil {
acc.AddError(te) acc.AddError(te)
} }
} }(job.Name, &wg, acc)
}(jobsC, acc, &wg)
} }
wg.Wait() wg.Wait()
} }
func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { // wrap the tcp request with doGet
defer wg.Done() // block tcp request if buffered channel is full
func (j *Jenkins) doGet(tcp func() error) error {
j.semaphore <- struct{}{}
if err := tcp(); err != nil {
if err == gojenkins.ErrSessionExpired {
// ignore the error here, since config parsing should be finished.
client, _ := j.initClient()
// SessionExpired use a go routine to create a new session
go j.newInstance(client)
}
<-j.semaphore
return err
}
<-j.semaphore
return nil
}
func (j *Jenkins) getJobDetail(sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error {
if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth { if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth {
return nil return nil
} }
@ -305,7 +324,12 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
return nil return nil
} }
url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json" url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json"
jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) var jobDetail *gojenkins.Job
var err error
err = j.doGet(func() error {
jobDetail, err = j.instance.GetJob(sj.name, sj.parents...)
return err
})
if err != nil { if err != nil {
return fmt.Errorf("error retrieving inner jobs[%s]: ", url) return fmt.Errorf("error retrieving inner jobs[%s]: ", url)
} }
@ -316,13 +340,16 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
} }
wg.Add(1) wg.Add(1)
// schedule tcp fetch for inner jobs // schedule tcp fetch for inner jobs
go func(innerJob gojenkins.InnerJob, sj jobRequest) { go func(innerJob gojenkins.InnerJob, sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) {
jobsC <- jobRequest{ defer wg.Done()
if te := j.getJobDetail(jobRequest{
name: innerJob.Name, name: innerJob.Name,
parents: sj.combined(), parents: sj.combined(),
layer: sj.layer + 1, layer: sj.layer + 1,
}, wg, acc); te != nil {
acc.AddError(te)
} }
}(innerJob, sj) }(innerJob, sj, wg, acc)
} }
// collect build info // collect build info
@ -339,7 +366,11 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
Base: baseURL, Base: baseURL,
Raw: new(gojenkins.BuildResponse), Raw: new(gojenkins.BuildResponse),
} }
status, err := build.Poll() var status int
err = j.doGet(func() error {
status, err = build.Poll()
return err
})
if err != nil || status != 200 { if err != nil || status != 200 {
if err == nil && status != 200 { if err == nil && status != 200 {
err = fmt.Errorf("status code %d", status) err = fmt.Errorf("status code %d", status)