From 9c85c05fcba73886732846cf8cd5b14b91577502 Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Wed, 13 Jun 2018 23:23:10 -0700 Subject: [PATCH 1/6] add jenkins lib --- Godeps | 0 plugins/inputs/all/all.go | 1 + plugins/inputs/jenkins/README.md | 79 ++++ plugins/inputs/jenkins/jenkins.go | 483 +++++++++++++++++++++++++ plugins/inputs/jenkins/jenkins_test.go | 92 +++++ 5 files changed, 655 insertions(+) create mode 100644 Godeps create mode 100644 plugins/inputs/jenkins/README.md create mode 100644 plugins/inputs/jenkins/jenkins.go create mode 100644 plugins/inputs/jenkins/jenkins_test.go diff --git a/Godeps b/Godeps new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b2be2be5a..51c242e8a 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -42,6 +42,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" _ "github.com/influxdata/telegraf/plugins/inputs/ipset" _ "github.com/influxdata/telegraf/plugins/inputs/iptables" + _ "github.com/influxdata/telegraf/plugins/inputs/jenkins" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/jolokia2" _ "github.com/influxdata/telegraf/plugins/inputs/jti_openconfig_telemetry" diff --git a/plugins/inputs/jenkins/README.md b/plugins/inputs/jenkins/README.md new file mode 100644 index 000000000..d1362b7ed --- /dev/null +++ b/plugins/inputs/jenkins/README.md @@ -0,0 +1,79 @@ +# Jenkins Plugin + +The jenkins plugin gathers information about the nodes and jobs running in a jenkins instance. + +This plugin does not require a plugin on jenkins and it makes use of Jenkins API to retrieve all the information needed. + +### Configuration: + +```toml + url = "http://my-jenkins-instance:8080" + username = "admin" + password = "admin" + ## Set response_timeout + response_timeout = "5s" + + ## Optional SSL Config + # ssl_ca = /path/to/cafile + # ssl_cert = /path/to/certfile + # ssl_key = /path/to/keyfile + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ## Job & build filter + # max_build_age = "1h" + ## jenkins can have unlimited layer of sub jobs + ## this config will limit the layers of pull, default value 0 means + ## unlimited pulling until no more sub jobs + # max_sub_jobs_layer = 0 + ## in workflow-multibranch-plugin, each branch will be created as a sub job + ## this config will limit to call only the lasted branches + ## sub jobs fetch in each layer + # empty will use default value 10 + # newest_sub_jobs_each_layer = 10 + # job_exclude = [ "MyJob", "MyOtherJob" ] + + ## Node filter + # node_exlude = [ "node1", "node2" ] + + ## Woker pool for jenkins plugin only + # empty this field will use default value 30 + # max_tcp_concurrent_connections = 30 +``` + +### Measurements & Fields: + +- jenkins_node + - disk_available + - temp_available + - memory_available + - memory_total + - swap_available + - swap_total + - response_time + +- jenkins_job + - duration + - result_code (0 = SUCCESS, 1 = FAILURE, 2 = NOT_BUILD, 3 = UNSTABLE, 4 = ABORTED) + +### Tags: + +- jenkins_node + - arch + - disk_path + - temp_path + - node_name + - status ("online", "offline") + +- jenkins_job + - job_name + - result + +### Example Output: + +``` +$ ./telegraf --config telegraf.conf --input-filter jenkins --test +jenkins_node,arch=Linux\ (amd64),disk_path=/var/jenkins_home,temp_path=/tmp,host=myhost,node_name=master swap_total=4294963200,memory_available=586711040,memory_total=6089498624,status=online,response_time=1000i,disk_available=152392036352,temp_available=152392036352,swap_available=3503263744 1516031535000000000 +jenkins_job,host=myhost,job_name=JOB1,result=SUCCESS duration=2831i,result_code=0i 1516026630000000000 +jenkins_job,host=myhost,job_name=JOB2,result=SUCCESS duration=2285i,result_code=0i 1516027230000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go new file mode 100644 index 000000000..a93810ff3 --- /dev/null +++ b/plugins/inputs/jenkins/jenkins.go @@ -0,0 +1,483 @@ +package jenkins + +import ( + "fmt" + "log" + "net/http" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/kelwang/gojenkins" +) + +// Jenkins plugin gathers information about the nodes and jobs running in a jenkins instance. +type Jenkins struct { + URL string + Username string + Password string + // HTTP Timeout specified as a string - 3s, 1m, 1h + ResponseTimeout internal.Duration + + tls.ClientConfig + instance *gojenkins.Jenkins + + MaxTCPConcurrentConnections int `toml:"max_tcp_concurrent_connections"` + MaxBuildAge internal.Duration `toml:"max_build_age"` + MaxSubJobsLayer int `toml:"max_sub_jobs_layer"` + NewestSubJobsEachLayer int `toml:"newest_sub_jobs_each_layer"` + JobExclude []string `toml:"job_exclude"` + jobFilter map[string]bool + + NodeExclude []string `toml:"node_exclude"` + nodeFilter map[string]bool +} + +type byBuildNumber []gojenkins.JobBuild + +const sampleConfig = ` +url = "http://my-jenkins-instance:8080" +username = "admin" +password = "admin" +## Set response_timeout +response_timeout = "5s" + +## Optional SSL Config +# ssl_ca = /path/to/cafile +# ssl_cert = /path/to/certfile +# ssl_key = /path/to/keyfile +## Use SSL but skip chain & host verification +# insecure_skip_verify = false + +## Job & build filter +# max_build_age = "1h" +## jenkins can have unlimited layer of sub jobs +## this config will limit the layers of pull, default value 0 means +## unlimited pulling until no more sub jobs +# max_sub_jobs_layer = 0 +## in workflow-multibranch-plugin, each branch will be created as a sub job +## this config will limit to call only the lasted branches +## sub jobs fetch in each layer +# empty will use default value 10 +# newest_sub_jobs_each_layer = 10 +# job_exclude = [ "MyJob", "MyOtherJob" ] + +## Node filter +# node_exlude = [ "node1", "node2" ] + +## Woker pool for jenkins plugin only +# empty this field will use default value 30 +# max_tcp_concurrent_connections = 30 +` + +// measurement +const ( + measurementNode = "jenkins_node" + measurementJob = "jenkins_job" +) + +type typedErr struct { + level int + err error + reference string + url string +} + +// const of the error level, default 0 to be the errLevel +const ( + errLevel int = iota + continueLevel + infoLevel +) + +func wrapErr(e typedErr, err error, ref string) *typedErr { + return &typedErr{ + level: e.level, + err: err, + reference: ref, + url: e.url, + } +} + +func (e *typedErr) Error() string { + if e == nil { + return "" + } + return fmt.Sprintf("error "+e.reference+"[%s]: %v", e.url, e.err) +} + +func badFormatErr(te typedErr, field interface{}, want string, fieldName string) *typedErr { + return &typedErr{ + level: te.level, + err: fmt.Errorf("fieldName: %s, want %s, got %s", fieldName, want, reflect.TypeOf(field).String()), + reference: errBadFormat, + url: te.url, + } +} + +// err references +const ( + errParseConfig = "parse jenkins config" + errConnectJenkins = "connect jenkins instance" + errInitJenkins = "init jenkins instance" + errRetrieveNode = "retrieving nodes" + errRetrieveJobs = "retrieving jobs" + errReadNodeInfo = "reading node info" + errEmptyMonitorData = "empty monitor data" + errBadFormat = "bad format" + errRetrieveInnerJobs = "retrieving inner jobs" + errRetrieveLatestBuild = "retrieving latest build" +) + +// SampleConfig implements telegraf.Input interface +func (j *Jenkins) SampleConfig() string { + return sampleConfig +} + +// Description implements telegraf.Input interface +func (j *Jenkins) Description() string { + return "Read jobs and cluster metrics from Jenkins instances" +} + +// Gather implements telegraf.Input interface +func (j *Jenkins) Gather(acc telegraf.Accumulator) error { + var err error + te := typedErr{ + url: j.URL, + } + if j.instance == nil { + if tErr := j.initJenkins(te); tErr != nil { + return tErr + } + } + + nodes, err := j.instance.GetAllNodes() + if err != nil { + return wrapErr(te, err, errRetrieveNode) + } + + jobs, err := j.instance.GetAllJobNames() + if err != nil { + return wrapErr(te, err, errRetrieveJobs) + } + + j.gatherNodesData(nodes, acc) + j.gatherJobs(jobs, acc) + + return nil +} + +func (j *Jenkins) initJenkins(te typedErr) *typedErr { + // create instance + tlsCfg, err := j.ClientConfig.TLSConfig() + if err != nil { + return wrapErr(te, err, errParseConfig) + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: j.ResponseTimeout.Duration, + } + + j.instance, err = gojenkins.CreateJenkins(client, j.URL, j.Username, j.Password).Init() + if err != nil { + return wrapErr(te, err, errConnectJenkins) + } + _, err = j.instance.Init() + if err != nil { + return wrapErr(te, err, errConnectJenkins) + } + // init job filter + j.jobFilter = make(map[string]bool) + for _, name := range j.JobExclude { + j.jobFilter[name] = false + } + + // init node filter + j.nodeFilter = make(map[string]bool) + for _, name := range j.NodeExclude { + j.nodeFilter[name] = false + } + + // init tcp pool with default value + if j.MaxTCPConcurrentConnections <= 0 { + j.MaxTCPConcurrentConnections = 30 + } + + // default sub jobs can be acquired + if j.NewestSubJobsEachLayer <= 0 { + j.NewestSubJobsEachLayer = 10 + } + + return nil +} + +func (j *Jenkins) gatherNodeData(node *gojenkins.Node, te typedErr, fields map[string]interface{}, tags map[string]string) *typedErr { + tags["node_name"] = node.Raw.DisplayName + var ok bool + if _, ok = j.nodeFilter[tags["node_name"]]; ok { + (&te).level = continueLevel + return &te + } + + info := node.Raw + if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { + return wrapErr(te, fmt.Errorf("check your permission"), errEmptyMonitorData) + } + tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) + if !ok { + return badFormatErr(te, info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor, "string", "hudson.node_monitors.ArchitectureMonitor") + } + + tags["status"] = "online" + if node.Raw.Offline { + tags["status"] = "offline" + } + + fields["response_time"] = info.MonitorData.Hudson_NodeMonitors_ResponseTimeMonitor.Average + if diskSpaceMonitor := info.MonitorData.Hudson_NodeMonitors_DiskSpaceMonitor; diskSpaceMonitor != nil { + diskSpaceMonitorRoute := "hudson.node_monitors.DiskSpaceMonitor" + var diskSpace map[string]interface{} + if diskSpace, ok = diskSpaceMonitor.(map[string]interface{}); !ok { + return badFormatErr(te, diskSpaceMonitor, "map[string]interface{}", diskSpaceMonitorRoute) + } + if tags["disk_path"], ok = diskSpace["path"].(string); !ok { + return badFormatErr(te, diskSpace["path"], "string", diskSpaceMonitorRoute+".path") + } + if fields["disk_available"], ok = diskSpace["size"].(float64); !ok { + return badFormatErr(te, diskSpace["size"], "float64", diskSpaceMonitorRoute+".size") + } + } + + if tempSpaceMonitor := info.MonitorData.Hudson_NodeMonitors_TemporarySpaceMonitor; tempSpaceMonitor != nil { + tempSpaceMonitorRoute := "hudson.node_monitors.TemporarySpaceMonitor" + var tempSpace map[string]interface{} + if tempSpace, ok = tempSpaceMonitor.(map[string]interface{}); !ok { + return badFormatErr(te, tempSpaceMonitor, "map[string]interface{}", tempSpaceMonitorRoute) + } + if tags["temp_path"], ok = tempSpace["path"].(string); !ok { + return badFormatErr(te, tempSpace["path"], "string", tempSpaceMonitorRoute+".path") + } + if fields["temp_available"], ok = tempSpace["size"].(float64); !ok { + return badFormatErr(te, tempSpace["size"], "float64", tempSpaceMonitorRoute+".size") + } + } + + if swapSpaceMonitor := info.MonitorData.Hudson_NodeMonitors_SwapSpaceMonitor; swapSpaceMonitor != nil { + swapSpaceMonitorRouter := "hudson.node_monitors.SwapSpaceMonitor" + var swapSpace map[string]interface{} + if swapSpace, ok = swapSpaceMonitor.(map[string]interface{}); !ok { + return badFormatErr(te, swapSpaceMonitor, "map[string]interface{}", swapSpaceMonitorRouter) + } + if fields["swap_available"], ok = swapSpace["availableSwapSpace"].(float64); !ok { + return badFormatErr(te, swapSpace["availableSwapSpace"], "float64", swapSpaceMonitorRouter+".availableSwapSpace") + } + if fields["swap_total"], ok = swapSpace["totalSwapSpace"].(float64); !ok { + return badFormatErr(te, swapSpace["totalSwapSpace"], "float64", swapSpaceMonitorRouter+".totalSwapSpace") + } + if fields["memory_available"], ok = swapSpace["availablePhysicalMemory"].(float64); !ok { + return badFormatErr(te, swapSpace["availablePhysicalMemory"], "float64", swapSpaceMonitorRouter+".availablePhysicalMemory") + } + if fields["memory_total"], ok = swapSpace["totalPhysicalMemory"].(float64); !ok { + return badFormatErr(te, swapSpace["totalPhysicalMemory"], "float64", swapSpaceMonitorRouter+".totalPhysicalMemory") + } + } + return nil +} + +func (j *Jenkins) gatherNodesData(nodes []*gojenkins.Node, acc telegraf.Accumulator) { + + tags := map[string]string{} + fields := make(map[string]interface{}) + baseTe := typedErr{ + url: j.URL + "/computer/api/json", + } + + // get node data + for _, node := range nodes { + te := j.gatherNodeData(node, baseTe, fields, tags) + if te == nil { + acc.AddFields(measurementNode, fields, tags) + continue + } + switch te.level { + case continueLevel: + continue + default: + acc.AddError(te) + } + } +} + +func (j *Jenkins) gatherJobs(jobNames []gojenkins.InnerJob, acc telegraf.Accumulator) { + jobsC := make(chan srcJob, j.MaxTCPConcurrentConnections) + errC := make(chan *typedErr) + var wg sync.WaitGroup + for _, job := range jobNames { + wg.Add(1) + go func(job gojenkins.InnerJob) { + jobsC <- srcJob{ + name: job.Name, + parents: []string{}, + layer: 0, + } + }(job) + } + + for i := 0; i < j.MaxTCPConcurrentConnections; i++ { + go j.getJobDetail(jobsC, errC, &wg, acc) + } + + go func() { + wg.Wait() + close(errC) + }() + + select { + case te := <-errC: + if te != nil { + acc.AddError(te) + } + } +} + +func (j *Jenkins) getJobDetail(jobsC chan srcJob, errC chan<- *typedErr, wg *sync.WaitGroup, acc telegraf.Accumulator) { + for sj := range jobsC { + if j.MaxSubJobsLayer > 0 && sj.layer == j.MaxSubJobsLayer { + wg.Done() + continue + } + // exclude filter + if _, ok := j.jobFilter[sj.name]; ok { + wg.Done() + continue + } + te := &typedErr{ + url: j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json", + } + jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) + if err != nil { + go func(te typedErr, err error) { + errC <- wrapErr(te, err, errRetrieveInnerJobs) + }(*te, err) + return + } + + for k, innerJob := range jobDetail.Raw.Jobs { + if k < len(jobDetail.Raw.Jobs)-j.NewestSubJobsEachLayer-1 { + continue + } + wg.Add(1) + // schedule tcp fetch for inner jobs + go func(innerJob gojenkins.InnerJob, sj srcJob) { + jobsC <- srcJob{ + name: innerJob.Name, + parents: sj.combined(), + layer: sj.layer + 1, + } + }(innerJob, sj) + } + + // collect build info + number := jobDetail.Raw.LastBuild.Number + if number < 1 { + // no build info + wg.Done() + continue + } + baseURL := "/job/" + strings.Join(sj.combined(), "/job/") + "/" + strconv.Itoa(int(number)) + // jobDetail.GetBuild is not working, doing poll directly + build := &gojenkins.Build{ + Jenkins: j.instance, + Depth: 1, + Base: baseURL, + Raw: new(gojenkins.BuildResponse), + } + status, err := build.Poll() + if err != nil || status != 200 { + if err == nil && status != 200 { + err = fmt.Errorf("status code %d", status) + } + te.url = j.URL + baseURL + "/api/json" + go func(te typedErr, err error) { + errC <- wrapErr(te, err, errRetrieveLatestBuild) + }(*te, err) + return + } + + if build.Raw.Building { + log.Printf("D! Ignore running build on %s, build %v", sj.name, number) + wg.Done() + continue + } + + // stop if build is too old + + if (j.MaxBuildAge != internal.Duration{Duration: 0}) { + buildSecAgo := time.Now().Sub(build.GetTimestamp()).Seconds() + if time.Now().Sub(build.GetTimestamp()).Seconds() > j.MaxBuildAge.Duration.Seconds() { + log.Printf("D! Job %s build %v too old (%v seconds ago), skipping to next job", sj.name, number, buildSecAgo) + wg.Done() + continue + } + } + + gatherJobBuild(sj, build, acc) + wg.Done() + + } +} + +type srcJob struct { + name string + parents []string + layer int +} + +func (sj srcJob) combined() []string { + return append(sj.parents, sj.name) +} + +func (sj srcJob) hierarchyName() string { + return strings.Join(sj.combined(), "/") +} + +func gatherJobBuild(sj srcJob, build *gojenkins.Build, acc telegraf.Accumulator) { + tags := map[string]string{"job_name": sj.hierarchyName(), "result": build.GetResult()} + fields := make(map[string]interface{}) + fields["duration"] = build.GetDuration() + fields["result_code"] = mapResultCode(build.GetResult()) + + acc.AddFields(measurementJob, fields, tags, build.GetTimestamp()) +} + +// perform status mapping +func mapResultCode(s string) int { + switch strings.ToLower(s) { + case "success": + return 0 + case "failure": + return 1 + case "not_built": + return 2 + case "unstable": + return 3 + case "aborted": + return 4 + } + return -1 +} + +func init() { + inputs.Add("jenkins", func() telegraf.Input { + return &Jenkins{} + }) +} diff --git a/plugins/inputs/jenkins/jenkins_test.go b/plugins/inputs/jenkins/jenkins_test.go new file mode 100644 index 000000000..35bad3a13 --- /dev/null +++ b/plugins/inputs/jenkins/jenkins_test.go @@ -0,0 +1,92 @@ +package jenkins + +import ( + "errors" + "testing" +) + +func TestErr(t *testing.T) { + tests := []struct { + err *typedErr + output string + }{ + { + nil, + "", + }, + { + &typedErr{ + reference: errConnectJenkins, + url: "http://badurl.com", + err: errors.New("unknown error"), + }, + "error connect jenkins instance[http://badurl.com]: unknown error", + }, + { + wrapErr(typedErr{ + reference: errConnectJenkins, + url: "http://badurl.com", + err: errors.New("unknown error"), + }, errors.New("2"), errEmptyMonitorData), + "error empty monitor data[http://badurl.com]: 2", + }, + { + badFormatErr(typedErr{ + reference: errConnectJenkins, + url: "http://badurl.com", + err: errors.New("unknown error"), + }, "20", "float64", "arch"), + "error bad format[http://badurl.com]: fieldName: arch, want float64, got string", + }, + } + for _, test := range tests { + output := test.err.Error() + if output != test.output { + t.Errorf("Expected %s, got %s\n", test.output, output) + } + } +} + +func TestSrcJob(t *testing.T) { + tests := []struct { + input srcJob + output string + }{ + { + srcJob{}, + "", + }, + { + srcJob{ + name: "1", + parents: []string{"3", "2"}, + }, + "3/2/1", + }, + } + for _, test := range tests { + output := test.input.hierarchyName() + if output != test.output { + t.Errorf("Expected %s, got %s\n", test.output, output) + } + } +} + +func TestResultCode(t *testing.T) { + tests := []struct { + input string + output int + }{ + {"SUCCESS", 0}, + {"Failure", 1}, + {"NOT_BUILT", 2}, + {"UNSTABLE", 3}, + {"ABORTED", 4}, + } + for _, test := range tests { + output := mapResultCode(test.input) + if output != test.output { + t.Errorf("Expected %d, got %d\n", test.output, output) + } + } +} From 73eaa057d170f0e8f6dc89b4b24af056c4777dac Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Fri, 22 Jun 2018 09:20:26 -0700 Subject: [PATCH 2/6] add requested changes --- plugins/inputs/jenkins/README.md | 56 +-- plugins/inputs/jenkins/jenkins.go | 431 ++++++++-------- plugins/inputs/jenkins/jenkins_test.go | 652 ++++++++++++++++++++++++- 3 files changed, 864 insertions(+), 275 deletions(-) diff --git a/plugins/inputs/jenkins/README.md b/plugins/inputs/jenkins/README.md index d1362b7ed..efd1c26cb 100644 --- a/plugins/inputs/jenkins/README.md +++ b/plugins/inputs/jenkins/README.md @@ -7,38 +7,38 @@ This plugin does not require a plugin on jenkins and it makes use of Jenkins API ### Configuration: ```toml - url = "http://my-jenkins-instance:8080" - username = "admin" - password = "admin" - ## Set response_timeout - response_timeout = "5s" +url = "http://my-jenkins-instance:8080" +# username = "admin" +# password = "admin" +## Set response_timeout +response_timeout = "5s" - ## Optional SSL Config - # ssl_ca = /path/to/cafile - # ssl_cert = /path/to/certfile - # ssl_key = /path/to/keyfile - ## Use SSL but skip chain & host verification - # insecure_skip_verify = false +## Optional SSL Config +# ssl_ca = /path/to/cafile +# ssl_cert = /path/to/certfile +# ssl_key = /path/to/keyfile +## Use SSL but skip chain & host verification +# insecure_skip_verify = false - ## Job & build filter - # max_build_age = "1h" - ## jenkins can have unlimited layer of sub jobs - ## this config will limit the layers of pull, default value 0 means - ## unlimited pulling until no more sub jobs - # max_sub_jobs_layer = 0 - ## in workflow-multibranch-plugin, each branch will be created as a sub job - ## this config will limit to call only the lasted branches - ## sub jobs fetch in each layer - # empty will use default value 10 - # newest_sub_jobs_each_layer = 10 - # job_exclude = [ "MyJob", "MyOtherJob" ] +## Job & build filter +# max_build_age = "1h" +## jenkins can have unlimited layer of sub jobs +## this config will limit the layers of pull, default value 0 means +## unlimited pulling until no more sub jobs +# max_subjob_depth = 0 +## in workflow-multibranch-plugin, each branch will be created as a sub job +## this config will limit to call only the lasted branches +## sub jobs fetch in each layer +# empty will use default value 10 +# max_subjob_per_layer = 10 +# job_exclude = [ "job1", "job2/subjob1/subjob2", "job3/*"] - ## Node filter - # node_exlude = [ "node1", "node2" ] +## Node filter +# node_exclude = [ "node1", "node2" ] - ## Woker pool for jenkins plugin only - # empty this field will use default value 30 - # max_tcp_concurrent_connections = 30 +## Woker pool for jenkins plugin only +# empty this field will use default value 30 +# max_connections = 30 ``` ### Measurements & Fields: diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go index a93810ff3..db6e0f10c 100644 --- a/plugins/inputs/jenkins/jenkins.go +++ b/plugins/inputs/jenkins/jenkins.go @@ -4,13 +4,13 @@ import ( "fmt" "log" "net/http" - "reflect" "strconv" "strings" "sync" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" @@ -28,52 +28,52 @@ type Jenkins struct { tls.ClientConfig instance *gojenkins.Jenkins - MaxTCPConcurrentConnections int `toml:"max_tcp_concurrent_connections"` - MaxBuildAge internal.Duration `toml:"max_build_age"` - MaxSubJobsLayer int `toml:"max_sub_jobs_layer"` - NewestSubJobsEachLayer int `toml:"newest_sub_jobs_each_layer"` - JobExclude []string `toml:"job_exclude"` - jobFilter map[string]bool + MaxConnections int `toml:"max_connections"` + MaxBuildAge internal.Duration `toml:"max_build_age"` + MaxSubJobDepth int `toml:"max_subjob_depth"` + MaxSubJobPerLayer int `toml:"max_subjob_per_layer"` + JobExclude []string `toml:"job_exclude"` + jobFilter filter.Filter NodeExclude []string `toml:"node_exclude"` - nodeFilter map[string]bool + nodeFilter filter.Filter } type byBuildNumber []gojenkins.JobBuild const sampleConfig = ` url = "http://my-jenkins-instance:8080" -username = "admin" -password = "admin" +# username = "admin" +# password = "admin" ## Set response_timeout response_timeout = "5s" ## Optional SSL Config -# ssl_ca = /path/to/cafile -# ssl_cert = /path/to/certfile -# ssl_key = /path/to/keyfile +# ssl_ca = /path/to/cafile +# ssl_cert = /path/to/certfile +# ssl_key = /path/to/keyfile ## Use SSL but skip chain & host verification -# insecure_skip_verify = false +# insecure_skip_verify = false ## Job & build filter -# max_build_age = "1h" +# max_build_age = "1h" ## jenkins can have unlimited layer of sub jobs ## this config will limit the layers of pull, default value 0 means ## unlimited pulling until no more sub jobs -# max_sub_jobs_layer = 0 +# max_subjob_depth = 0 ## in workflow-multibranch-plugin, each branch will be created as a sub job ## this config will limit to call only the lasted branches ## sub jobs fetch in each layer -# empty will use default value 10 -# newest_sub_jobs_each_layer = 10 -# job_exclude = [ "MyJob", "MyOtherJob" ] +# empty will use default value 10 +# max_subjob_per_layer = 10 +# job_exclude = [ "job1", "job2/subjob1/subjob2", "job3/*"] ## Node filter -# node_exlude = [ "node1", "node2" ] +# node_exclude = [ "node1", "node2" ] ## Woker pool for jenkins plugin only -# empty this field will use default value 30 -# max_tcp_concurrent_connections = 30 +# empty this field will use default value 30 +# max_connections = 30 ` // measurement @@ -82,53 +82,46 @@ const ( measurementJob = "jenkins_job" ) -type typedErr struct { - level int +// Error base type of error. +type Error struct { err error reference string url string } -// const of the error level, default 0 to be the errLevel -const ( - errLevel int = iota - continueLevel - infoLevel -) - -func wrapErr(e typedErr, err error, ref string) *typedErr { - return &typedErr{ - level: e.level, +func newError(err error, ref, url string) *Error { + return &Error{ err: err, reference: ref, - url: e.url, + url: url, } } -func (e *typedErr) Error() string { +func (e *Error) Error() string { if e == nil { return "" } - return fmt.Sprintf("error "+e.reference+"[%s]: %v", e.url, e.err) + return fmt.Sprintf("error %s[%s]: %v", e.reference, e.url, e.err) } -func badFormatErr(te typedErr, field interface{}, want string, fieldName string) *typedErr { - return &typedErr{ - level: te.level, - err: fmt.Errorf("fieldName: %s, want %s, got %s", fieldName, want, reflect.TypeOf(field).String()), +func badFormatErr(url string, field interface{}, want string, fieldName string) *Error { + return &Error{ + err: fmt.Errorf("fieldName: %s, want %s, got %T", fieldName, want, field), reference: errBadFormat, - url: te.url, + url: url, } } // err references const ( errParseConfig = "parse jenkins config" + errJobFilterCompile = "compile job filters" + errNodeFilterCompile = "compile node filters" errConnectJenkins = "connect jenkins instance" errInitJenkins = "init jenkins instance" errRetrieveNode = "retrieving nodes" errRetrieveJobs = "retrieving jobs" - errReadNodeInfo = "reading node info" + errEmptyNodeName = "empty node name" errEmptyMonitorData = "empty monitor data" errBadFormat = "bad format" errRetrieveInnerJobs = "retrieving inner jobs" @@ -147,113 +140,111 @@ func (j *Jenkins) Description() string { // Gather implements telegraf.Input interface func (j *Jenkins) Gather(acc telegraf.Accumulator) error { - var err error - te := typedErr{ - url: j.URL, - } if j.instance == nil { - if tErr := j.initJenkins(te); tErr != nil { - return tErr + client, te := j.initClient() + if te != nil { + return te + } + if te = j.newInstance(client); te != nil { + return te } } - nodes, err := j.instance.GetAllNodes() - if err != nil { - return wrapErr(te, err, errRetrieveNode) - } - - jobs, err := j.instance.GetAllJobNames() - if err != nil { - return wrapErr(te, err, errRetrieveJobs) - } - - j.gatherNodesData(nodes, acc) - j.gatherJobs(jobs, acc) + j.gatherNodesData(acc) + j.gatherJobs(acc) return nil } -func (j *Jenkins) initJenkins(te typedErr) *typedErr { - // create instance +func (j *Jenkins) initClient() (*http.Client, *Error) { tlsCfg, err := j.ClientConfig.TLSConfig() if err != nil { - return wrapErr(te, err, errParseConfig) + return nil, newError(err, errParseConfig, j.URL) } - - client := &http.Client{ + return &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, }, Timeout: j.ResponseTimeout.Duration, - } + }, nil +} +// seperate the client as dependency to use httptest Client for mocking +func (j *Jenkins) newInstance(client *http.Client) *Error { + // create instance + var err error j.instance, err = gojenkins.CreateJenkins(client, j.URL, j.Username, j.Password).Init() if err != nil { - return wrapErr(te, err, errConnectJenkins) - } - _, err = j.instance.Init() - if err != nil { - return wrapErr(te, err, errConnectJenkins) + return newError(err, errConnectJenkins, j.URL) } + // init job filter - j.jobFilter = make(map[string]bool) - for _, name := range j.JobExclude { - j.jobFilter[name] = false + j.jobFilter, err = filter.Compile(j.JobExclude) + if err != nil { + return newError(err, errJobFilterCompile, j.URL) } // init node filter - j.nodeFilter = make(map[string]bool) - for _, name := range j.NodeExclude { - j.nodeFilter[name] = false + j.nodeFilter, err = filter.Compile(j.NodeExclude) + if err != nil { + return newError(err, errNodeFilterCompile, j.URL) } // init tcp pool with default value - if j.MaxTCPConcurrentConnections <= 0 { - j.MaxTCPConcurrentConnections = 30 + if j.MaxConnections <= 0 { + j.MaxConnections = 30 } // default sub jobs can be acquired - if j.NewestSubJobsEachLayer <= 0 { - j.NewestSubJobsEachLayer = 10 + if j.MaxSubJobPerLayer <= 0 { + j.MaxSubJobPerLayer = 10 } return nil } -func (j *Jenkins) gatherNodeData(node *gojenkins.Node, te typedErr, fields map[string]interface{}, tags map[string]string) *typedErr { - tags["node_name"] = node.Raw.DisplayName - var ok bool - if _, ok = j.nodeFilter[tags["node_name"]]; ok { - (&te).level = continueLevel - return &te - } +func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.Accumulator) *Error { + tags := map[string]string{} + fields := make(map[string]interface{}) info := node.Raw + + // detect the parsing error, since gojenkins lib won't do it + if info == nil || info.DisplayName == "" { + return newError(nil, errEmptyNodeName, url) + } + + tags["node_name"] = info.DisplayName + var ok bool + // filter out excluded node_name + if j.nodeFilter != nil && j.nodeFilter.Match(tags["node_name"]) { + return nil + } + if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { - return wrapErr(te, fmt.Errorf("check your permission"), errEmptyMonitorData) + return newError(fmt.Errorf("maybe check your permission"), errEmptyMonitorData, url) } tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) if !ok { - return badFormatErr(te, info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor, "string", "hudson.node_monitors.ArchitectureMonitor") + return badFormatErr(url, info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor, "string", "hudson.node_monitors.ArchitectureMonitor") } tags["status"] = "online" - if node.Raw.Offline { + if info.Offline { tags["status"] = "offline" } - fields["response_time"] = info.MonitorData.Hudson_NodeMonitors_ResponseTimeMonitor.Average if diskSpaceMonitor := info.MonitorData.Hudson_NodeMonitors_DiskSpaceMonitor; diskSpaceMonitor != nil { diskSpaceMonitorRoute := "hudson.node_monitors.DiskSpaceMonitor" var diskSpace map[string]interface{} if diskSpace, ok = diskSpaceMonitor.(map[string]interface{}); !ok { - return badFormatErr(te, diskSpaceMonitor, "map[string]interface{}", diskSpaceMonitorRoute) + return badFormatErr(url, diskSpaceMonitor, "map[string]interface{}", diskSpaceMonitorRoute) } if tags["disk_path"], ok = diskSpace["path"].(string); !ok { - return badFormatErr(te, diskSpace["path"], "string", diskSpaceMonitorRoute+".path") + return badFormatErr(url, diskSpace["path"], "string", diskSpaceMonitorRoute+".path") } if fields["disk_available"], ok = diskSpace["size"].(float64); !ok { - return badFormatErr(te, diskSpace["size"], "float64", diskSpaceMonitorRoute+".size") + return badFormatErr(url, diskSpace["size"], "float64", diskSpaceMonitorRoute+".size") } } @@ -261,13 +252,13 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, te typedErr, fields map[s tempSpaceMonitorRoute := "hudson.node_monitors.TemporarySpaceMonitor" var tempSpace map[string]interface{} if tempSpace, ok = tempSpaceMonitor.(map[string]interface{}); !ok { - return badFormatErr(te, tempSpaceMonitor, "map[string]interface{}", tempSpaceMonitorRoute) + return badFormatErr(url, tempSpaceMonitor, "map[string]interface{}", tempSpaceMonitorRoute) } if tags["temp_path"], ok = tempSpace["path"].(string); !ok { - return badFormatErr(te, tempSpace["path"], "string", tempSpaceMonitorRoute+".path") + return badFormatErr(url, tempSpace["path"], "string", tempSpaceMonitorRoute+".path") } if fields["temp_available"], ok = tempSpace["size"].(float64); !ok { - return badFormatErr(te, tempSpace["size"], "float64", tempSpaceMonitorRoute+".size") + return badFormatErr(url, tempSpace["size"], "float64", tempSpaceMonitorRoute+".size") } } @@ -275,182 +266,162 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, te typedErr, fields map[s swapSpaceMonitorRouter := "hudson.node_monitors.SwapSpaceMonitor" var swapSpace map[string]interface{} if swapSpace, ok = swapSpaceMonitor.(map[string]interface{}); !ok { - return badFormatErr(te, swapSpaceMonitor, "map[string]interface{}", swapSpaceMonitorRouter) + return badFormatErr(url, swapSpaceMonitor, "map[string]interface{}", swapSpaceMonitorRouter) } if fields["swap_available"], ok = swapSpace["availableSwapSpace"].(float64); !ok { - return badFormatErr(te, swapSpace["availableSwapSpace"], "float64", swapSpaceMonitorRouter+".availableSwapSpace") + return badFormatErr(url, swapSpace["availableSwapSpace"], "float64", swapSpaceMonitorRouter+".availableSwapSpace") } if fields["swap_total"], ok = swapSpace["totalSwapSpace"].(float64); !ok { - return badFormatErr(te, swapSpace["totalSwapSpace"], "float64", swapSpaceMonitorRouter+".totalSwapSpace") + return badFormatErr(url, swapSpace["totalSwapSpace"], "float64", swapSpaceMonitorRouter+".totalSwapSpace") } if fields["memory_available"], ok = swapSpace["availablePhysicalMemory"].(float64); !ok { - return badFormatErr(te, swapSpace["availablePhysicalMemory"], "float64", swapSpaceMonitorRouter+".availablePhysicalMemory") + return badFormatErr(url, swapSpace["availablePhysicalMemory"], "float64", swapSpaceMonitorRouter+".availablePhysicalMemory") } if fields["memory_total"], ok = swapSpace["totalPhysicalMemory"].(float64); !ok { - return badFormatErr(te, swapSpace["totalPhysicalMemory"], "float64", swapSpaceMonitorRouter+".totalPhysicalMemory") + return badFormatErr(url, swapSpace["totalPhysicalMemory"], "float64", swapSpaceMonitorRouter+".totalPhysicalMemory") } } + acc.AddFields(measurementNode, fields, tags) + return nil } -func (j *Jenkins) gatherNodesData(nodes []*gojenkins.Node, acc telegraf.Accumulator) { - - tags := map[string]string{} - fields := make(map[string]interface{}) - baseTe := typedErr{ - url: j.URL + "/computer/api/json", +func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { + nodes, err := j.instance.GetAllNodes() + url := j.URL + "/computer/api/json" + // since gojenkins lib will never return error + // returns error for len(nodes) is 0 + if err != nil || len(nodes) == 0 { + acc.AddError(newError(err, errRetrieveNode, url)) + return } - // get node data for _, node := range nodes { - te := j.gatherNodeData(node, baseTe, fields, tags) + te := j.gatherNodeData(node, url, acc) if te == nil { - acc.AddFields(measurementNode, fields, tags) continue } - switch te.level { - case continueLevel: - continue - default: - acc.AddError(te) - } + acc.AddError(te) } } -func (j *Jenkins) gatherJobs(jobNames []gojenkins.InnerJob, acc telegraf.Accumulator) { - jobsC := make(chan srcJob, j.MaxTCPConcurrentConnections) - errC := make(chan *typedErr) +func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) { + jobs, err := j.instance.GetAllJobNames() + if err != nil { + acc.AddError(newError(err, errRetrieveJobs, j.URL)) + return + } + jobsC := make(chan jobRequest, j.MaxConnections) var wg sync.WaitGroup - for _, job := range jobNames { + for _, job := range jobs { wg.Add(1) - go func(job gojenkins.InnerJob) { - jobsC <- srcJob{ - name: job.Name, + go func(name string) { + jobsC <- jobRequest{ + name: name, parents: []string{}, layer: 0, } - }(job) + }(job.Name) } - for i := 0; i < j.MaxTCPConcurrentConnections; i++ { - go j.getJobDetail(jobsC, errC, &wg, acc) - } - - go func() { - wg.Wait() - close(errC) - }() - - select { - case te := <-errC: - if te != nil { - acc.AddError(te) - } - } -} - -func (j *Jenkins) getJobDetail(jobsC chan srcJob, errC chan<- *typedErr, wg *sync.WaitGroup, acc telegraf.Accumulator) { - for sj := range jobsC { - if j.MaxSubJobsLayer > 0 && sj.layer == j.MaxSubJobsLayer { - wg.Done() - continue - } - // exclude filter - if _, ok := j.jobFilter[sj.name]; ok { - wg.Done() - continue - } - te := &typedErr{ - url: j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json", - } - jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) - if err != nil { - go func(te typedErr, err error) { - errC <- wrapErr(te, err, errRetrieveInnerJobs) - }(*te, err) - return - } - - for k, innerJob := range jobDetail.Raw.Jobs { - if k < len(jobDetail.Raw.Jobs)-j.NewestSubJobsEachLayer-1 { - continue - } - wg.Add(1) - // schedule tcp fetch for inner jobs - go func(innerJob gojenkins.InnerJob, sj srcJob) { - jobsC <- srcJob{ - name: innerJob.Name, - parents: sj.combined(), - layer: sj.layer + 1, + for i := 0; i < j.MaxConnections; i++ { + go func(jobsC chan jobRequest, acc telegraf.Accumulator, wg *sync.WaitGroup) { + for sj := range jobsC { + if te := j.getJobDetail(sj, jobsC, wg, acc); te != nil { + acc.AddError(te) } - }(innerJob, sj) - } - - // collect build info - number := jobDetail.Raw.LastBuild.Number - if number < 1 { - // no build info - wg.Done() - continue - } - baseURL := "/job/" + strings.Join(sj.combined(), "/job/") + "/" + strconv.Itoa(int(number)) - // jobDetail.GetBuild is not working, doing poll directly - build := &gojenkins.Build{ - Jenkins: j.instance, - Depth: 1, - Base: baseURL, - Raw: new(gojenkins.BuildResponse), - } - status, err := build.Poll() - if err != nil || status != 200 { - if err == nil && status != 200 { - err = fmt.Errorf("status code %d", status) } - te.url = j.URL + baseURL + "/api/json" - go func(te typedErr, err error) { - errC <- wrapErr(te, err, errRetrieveLatestBuild) - }(*te, err) - return - } - - if build.Raw.Building { - log.Printf("D! Ignore running build on %s, build %v", sj.name, number) - wg.Done() - continue - } - - // stop if build is too old - - if (j.MaxBuildAge != internal.Duration{Duration: 0}) { - buildSecAgo := time.Now().Sub(build.GetTimestamp()).Seconds() - if time.Now().Sub(build.GetTimestamp()).Seconds() > j.MaxBuildAge.Duration.Seconds() { - log.Printf("D! Job %s build %v too old (%v seconds ago), skipping to next job", sj.name, number, buildSecAgo) - wg.Done() - continue - } - } - - gatherJobBuild(sj, build, acc) - wg.Done() - + }(jobsC, acc, &wg) } + wg.Wait() } -type srcJob struct { +func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) *Error { + defer wg.Done() + if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth { + return nil + } + // filter out excluded job. + if j.jobFilter != nil && j.jobFilter.Match(sj.hierarchyName()) { + return nil + } + url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json" + jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) + if err != nil { + return newError(err, errRetrieveInnerJobs, url) + } + + for k, innerJob := range jobDetail.Raw.Jobs { + if k < len(jobDetail.Raw.Jobs)-j.MaxSubJobPerLayer-1 { + continue + } + wg.Add(1) + // schedule tcp fetch for inner jobs + go func(innerJob gojenkins.InnerJob, sj jobRequest) { + jobsC <- jobRequest{ + name: innerJob.Name, + parents: sj.combined(), + layer: sj.layer + 1, + } + }(innerJob, sj) + } + + // collect build info + number := jobDetail.Raw.LastBuild.Number + if number < 1 { + // no build info + return nil + } + baseURL := "/job/" + strings.Join(sj.combined(), "/job/") + "/" + strconv.Itoa(int(number)) + // jobDetail.GetBuild is not working, doing poll directly + build := &gojenkins.Build{ + Jenkins: j.instance, + Depth: 1, + Base: baseURL, + Raw: new(gojenkins.BuildResponse), + } + status, err := build.Poll() + if err != nil || status != 200 { + if err == nil && status != 200 { + err = fmt.Errorf("status code %d", status) + } + return newError(err, errRetrieveLatestBuild, j.URL+baseURL+"/api/json") + } + + if build.Raw.Building { + log.Printf("D! Ignore running build on %s, build %v", sj.name, number) + return nil + } + + // stop if build is too old + + if (j.MaxBuildAge != internal.Duration{Duration: 0}) { + buildAgo := time.Now().Sub(build.GetTimestamp()) + if buildAgo.Seconds() > j.MaxBuildAge.Duration.Seconds() { + log.Printf("D! Job %s build %v too old (%s ago), skipping to next job", sj.name, number, buildAgo) + return nil + } + } + + gatherJobBuild(sj, build, acc) + return nil +} + +type jobRequest struct { name string parents []string layer int } -func (sj srcJob) combined() []string { +func (sj jobRequest) combined() []string { return append(sj.parents, sj.name) } -func (sj srcJob) hierarchyName() string { +func (sj jobRequest) hierarchyName() string { return strings.Join(sj.combined(), "/") } -func gatherJobBuild(sj srcJob, build *gojenkins.Build, acc telegraf.Accumulator) { +func gatherJobBuild(sj jobRequest, build *gojenkins.Build, acc telegraf.Accumulator) { tags := map[string]string{"job_name": sj.hierarchyName(), "result": build.GetResult()} fields := make(map[string]interface{}) fields["duration"] = build.GetDuration() diff --git a/plugins/inputs/jenkins/jenkins_test.go b/plugins/inputs/jenkins/jenkins_test.go index 35bad3a13..70b350a90 100644 --- a/plugins/inputs/jenkins/jenkins_test.go +++ b/plugins/inputs/jenkins/jenkins_test.go @@ -1,13 +1,23 @@ package jenkins import ( + "encoding/json" "errors" + "net/http" + "net/http/httptest" + "sort" + "strings" "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/kelwang/gojenkins" ) func TestErr(t *testing.T) { tests := []struct { - err *typedErr + err *Error output string }{ { @@ -15,7 +25,7 @@ func TestErr(t *testing.T) { "", }, { - &typedErr{ + &Error{ reference: errConnectJenkins, url: "http://badurl.com", err: errors.New("unknown error"), @@ -23,20 +33,12 @@ func TestErr(t *testing.T) { "error connect jenkins instance[http://badurl.com]: unknown error", }, { - wrapErr(typedErr{ - reference: errConnectJenkins, - url: "http://badurl.com", - err: errors.New("unknown error"), - }, errors.New("2"), errEmptyMonitorData), + newError(errors.New("2"), errEmptyMonitorData, "http://badurl.com"), "error empty monitor data[http://badurl.com]: 2", }, { - badFormatErr(typedErr{ - reference: errConnectJenkins, - url: "http://badurl.com", - err: errors.New("unknown error"), - }, "20", "float64", "arch"), - "error bad format[http://badurl.com]: fieldName: arch, want float64, got string", + badFormatErr("http://badurl.com", 20.12, "string", "arch"), + "error bad format[http://badurl.com]: fieldName: arch, want string, got float64", }, } for _, test := range tests { @@ -47,17 +49,17 @@ func TestErr(t *testing.T) { } } -func TestSrcJob(t *testing.T) { +func TestJobRequest(t *testing.T) { tests := []struct { - input srcJob + input jobRequest output string }{ { - srcJob{}, + jobRequest{}, "", }, { - srcJob{ + jobRequest{ name: "1", parents: []string{"3", "2"}, }, @@ -90,3 +92,619 @@ func TestResultCode(t *testing.T) { } } } + +type mockHandler struct { + // responseMap is the path to repsonse interface + // we will ouput the serialized response in json when serving http + // example '/computer/api/json': *gojenkins. + responseMap map[string]interface{} +} + +func (h mockHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + o, ok := h.responseMap[r.URL.Path] + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + + b, err := json.Marshal(o) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +// copied the embed struct from gojenkins lib +type monitorData struct { + Hudson_NodeMonitors_ArchitectureMonitor interface{} `json:"hudson.node_monitors.ArchitectureMonitor"` + Hudson_NodeMonitors_ClockMonitor interface{} `json:"hudson.node_monitors.ClockMonitor"` + Hudson_NodeMonitors_DiskSpaceMonitor interface{} `json:"hudson.node_monitors.DiskSpaceMonitor"` + Hudson_NodeMonitors_ResponseTimeMonitor struct { + Average int64 `json:"average"` + } `json:"hudson.node_monitors.ResponseTimeMonitor"` + Hudson_NodeMonitors_SwapSpaceMonitor interface{} `json:"hudson.node_monitors.SwapSpaceMonitor"` + Hudson_NodeMonitors_TemporarySpaceMonitor interface{} `json:"hudson.node_monitors.TemporarySpaceMonitor"` +} + +func TestGatherNodeData(t *testing.T) { + tests := []struct { + name string + input mockHandler + output *testutil.Accumulator + oe *Error + }{ + { + name: "bad endpoint", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": nil, + }, + }, + oe: &Error{ + reference: errRetrieveNode, + }, + }, + { + name: "bad node data", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": gojenkins.Computers{ + Computers: []*gojenkins.NodeResponse{ + {}, + {}, + {}, + }, + }, + }, + }, + oe: &Error{ + reference: errEmptyNodeName, + }, + }, + { + name: "bad empty monitor data", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": gojenkins.Computers{ + Computers: []*gojenkins.NodeResponse{ + {DisplayName: "master"}, + {DisplayName: "node1"}, + }, + }, + }, + }, + oe: &Error{ + reference: errEmptyMonitorData, + }, + }, + { + name: "bad monitor data format", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": gojenkins.Computers{ + Computers: []*gojenkins.NodeResponse{ + {DisplayName: "master", MonitorData: monitorData{ + Hudson_NodeMonitors_ArchitectureMonitor: 1, + }}, + }, + }, + }, + }, + oe: &Error{ + reference: errBadFormat, + }, + }, + { + name: "filtered nodes", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": gojenkins.Computers{ + Computers: []*gojenkins.NodeResponse{ + {DisplayName: "ignore-1"}, + {DisplayName: "ignore-2"}, + }, + }, + }, + }, + }, + { + name: "normal data collection", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + "/computer/api/json": gojenkins.Computers{ + Computers: []*gojenkins.NodeResponse{ + { + DisplayName: "master", + MonitorData: monitorData{ + Hudson_NodeMonitors_ArchitectureMonitor: "linux", + Hudson_NodeMonitors_ResponseTimeMonitor: struct { + Average int64 `json:"average"` + }{ + Average: 10032, + }, + Hudson_NodeMonitors_DiskSpaceMonitor: map[string]interface{}{ + "path": "/path/1", + "size": 123, + }, + Hudson_NodeMonitors_TemporarySpaceMonitor: map[string]interface{}{ + "path": "/path/2", + "size": 245, + }, + Hudson_NodeMonitors_SwapSpaceMonitor: map[string]interface{}{ + "availableSwapSpace": 212, + "totalSwapSpace": 500, + "availablePhysicalMemory": 101, + "totalPhysicalMemory": 500, + }, + }, + Offline: false, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Tags: map[string]string{ + "node_name": "master", + "arch": "linux", + "status": "online", + "disk_path": "/path/1", + "temp_path": "/path/2", + }, + Fields: map[string]interface{}{ + "response_time": int64(10032), + "disk_available": float64(123), + "temp_available": float64(245), + "swap_available": float64(212), + "swap_total": float64(500), + "memory_available": float64(101), + "memory_total": float64(500), + }, + }, + }, + }, + }, + } + for _, test := range tests { + ts := httptest.NewServer(test.input) + defer ts.Close() + j := &Jenkins{ + URL: ts.URL, + ResponseTimeout: internal.Duration{Duration: time.Microsecond}, + NodeExclude: []string{"ignore-1", "ignore-2"}, + } + te := j.newInstance(ts.Client()) + acc := new(testutil.Accumulator) + j.gatherNodesData(acc) + if err := acc.FirstError(); err != nil { + te = err.(*Error) + } + + if test.oe == nil && te != nil { + t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) + } else if test.oe != nil { + test.oe.url = ts.URL + "/computer/api/json" + if te == nil { + t.Fatalf("%s: want err: %s, got nil", test.name, test.oe.Error()) + } + if test.oe.reference != te.reference { + t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) + } + if test.oe.url != te.url { + t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) + } + } + if test.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", test.name) + } else if test.output != nil && len(test.output.Metrics) > 0 { + for k, m := range test.output.Metrics[0].Tags { + if acc.Metrics[0].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", test.name, k, m, acc.Metrics[0].Tags[k]) + } + } + for k, m := range test.output.Metrics[0].Fields { + if acc.Metrics[0].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", test.name, k, m, m, acc.Metrics[0].Fields[k], acc.Metrics[0].Fields[k]) + } + } + } + } +} + +func TestNewInstance(t *testing.T) { + mh := mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": struct{}{}, + }, + } + ts := httptest.NewServer(mh) + defer ts.Close() + mockClient := ts.Client() + tests := []struct { + // name of the test + name string + input *Jenkins + output *Jenkins + oe *Error + }{ + { + name: "bad jenkins config", + input: &Jenkins{ + URL: "http://a bad url", + ResponseTimeout: internal.Duration{Duration: time.Microsecond}, + }, + oe: &Error{ + url: "http://a bad url", + reference: errConnectJenkins, + }, + }, + { + name: "has filter", + input: &Jenkins{ + URL: ts.URL, + ResponseTimeout: internal.Duration{Duration: time.Microsecond}, + JobExclude: []string{"job1", "job2"}, + NodeExclude: []string{"node1", "node2"}, + }, + }, + { + name: "default config", + input: &Jenkins{ + URL: ts.URL, + ResponseTimeout: internal.Duration{Duration: time.Microsecond}, + }, + output: &Jenkins{ + MaxConnections: 30, + MaxSubJobPerLayer: 10, + }, + }, + } + for _, test := range tests { + te := test.input.newInstance(mockClient) + if test.oe == nil && te != nil { + t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) + } else if test.oe != nil { + if test.oe.reference != te.reference { + t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) + } + if test.oe.url != te.url { + t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) + } + } + if test.output != nil { + if test.input.instance == nil { + t.Fatalf("%s: failed %s, jenkins instance shouldn't be nil", test.name, te.Error()) + } + if test.input.MaxConnections != test.output.MaxConnections { + t.Fatalf("%s: different MaxConnections Expected %d, got %d\n", test.name, test.output.MaxConnections, test.input.MaxConnections) + } + } + + } +} + +func TestGatherJobs(t *testing.T) { + tests := []struct { + name string + input mockHandler + output *testutil.Accumulator + oe *Error + }{ + { + name: "empty job", + input: mockHandler{}, + }, + { + name: "bad inner jobs", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + }, + }, + }, + }, + oe: &Error{ + reference: errRetrieveInnerJobs, + url: "/job/job1/api/json", + }, + }, + { + name: "jobs has no build", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + }, + }, + "/job/job1/api/json": &gojenkins.JobResponse{}, + }, + }, + }, + { + name: "bad build info", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + }, + }, + "/job/job1/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 1, + }, + }, + }, + }, + oe: &Error{ + url: "/job/job1/1/api/json", + reference: errRetrieveLatestBuild, + }, + }, + { + name: "ignore building job", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + }, + }, + "/job/job1/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 1, + }, + }, + "/job/job1/1/api/json": &gojenkins.BuildResponse{ + Building: true, + }, + }, + }, + }, + { + name: "ignore old build", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + }, + }, + "/job/job1/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 2, + }, + }, + "/job/job1/2/api/json": &gojenkins.BuildResponse{ + Building: false, + Timestamp: 100, + }, + }, + }, + }, + { + name: "gather metrics", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "job1"}, + {Name: "job2"}, + }, + }, + "/job/job1/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 3, + }, + }, + "/job/job2/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 1, + }, + }, + "/job/job1/3/api/json": &gojenkins.BuildResponse{ + Building: false, + Result: "SUCCESS", + Duration: 25558, + Timestamp: (time.Now().Unix() - int64(time.Minute.Seconds())) * 1000, + }, + "/job/job2/1/api/json": &gojenkins.BuildResponse{ + Building: false, + Result: "FAILURE", + Duration: 1558, + Timestamp: (time.Now().Unix() - int64(time.Minute.Seconds())) * 1000, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Tags: map[string]string{ + "job_name": "job1", + "result": "SUCCESS", + }, + Fields: map[string]interface{}{ + "duration": int64(25558), + "result_code": 0, + }, + }, + { + Tags: map[string]string{ + "job_name": "job2", + "result": "FAILURE", + }, + Fields: map[string]interface{}{ + "duration": int64(1558), + "result_code": 1, + }, + }, + }, + }, + }, + { + name: "gather sub jobs, jobs filter", + input: mockHandler{ + responseMap: map[string]interface{}{ + "/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "apps"}, + {Name: "ignore-1"}, + }, + }, + "/job/apps/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "k8s-cloud"}, + {Name: "chronograf"}, + {Name: "ignore-all"}, + }, + }, + "/job/apps/job/ignore-all/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "1"}, + {Name: "2"}, + }, + }, + "/job/apps/job/chronograf/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 1, + }, + }, + "/job/apps/job/k8s-cloud/api/json": &gojenkins.JobResponse{ + Jobs: []gojenkins.InnerJob{ + {Name: "PR-100"}, + {Name: "PR-101"}, + {Name: "PR-ignore2"}, + }, + }, + "/job/apps/job/k8s-cloud/job/PR-100/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 1, + }, + }, + "/job/apps/job/k8s-cloud/job/PR-101/api/json": &gojenkins.JobResponse{ + LastBuild: gojenkins.JobBuild{ + Number: 4, + }, + }, + "/job/apps/job/chronograf/1/api/json": &gojenkins.BuildResponse{ + Building: false, + Result: "FAILURE", + Duration: 1558, + Timestamp: (time.Now().Unix() - int64(time.Minute.Seconds())) * 1000, + }, + "/job/apps/job/k8s-cloud/job/PR-101/4/api/json": &gojenkins.BuildResponse{ + Building: false, + Result: "SUCCESS", + Duration: 76558, + Timestamp: (time.Now().Unix() - int64(time.Minute.Seconds())) * 1000, + }, + "/job/apps/job/k8s-cloud/job/PR-100/1/api/json": &gojenkins.BuildResponse{ + Building: false, + Result: "SUCCESS", + Duration: 91558, + Timestamp: (time.Now().Unix() - int64(time.Minute.Seconds())) * 1000, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Tags: map[string]string{ + "job_name": "apps/chronograf", + "result": "FAILURE", + }, + Fields: map[string]interface{}{ + "duration": int64(1558), + "result_code": 1, + }, + }, + { + Tags: map[string]string{ + "job_name": "apps/k8s-cloud/PR-100", + "result": "SUCCESS", + }, + Fields: map[string]interface{}{ + "duration": int64(91558), + "result_code": 0, + }, + }, + { + Tags: map[string]string{ + "job_name": "apps/k8s-cloud/PR-101", + "result": "SUCCESS", + }, + Fields: map[string]interface{}{ + "duration": int64(76558), + "result_code": 0, + }, + }, + }, + }, + }, + } + for _, test := range tests { + ts := httptest.NewServer(test.input) + defer ts.Close() + j := &Jenkins{ + URL: ts.URL, + MaxBuildAge: internal.Duration{Duration: time.Hour}, + ResponseTimeout: internal.Duration{Duration: time.Microsecond}, + JobExclude: []string{ + "ignore-1", + "apps/ignore-all/*", + "apps/k8s-cloud/PR-ignore2", + }, + } + te := j.newInstance(ts.Client()) + acc := new(testutil.Accumulator) + j.gatherJobs(acc) + if err := acc.FirstError(); err != nil { + te = err.(*Error) + } + if test.oe == nil && te != nil { + t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) + } else if test.oe != nil { + test.oe.url = ts.URL + test.oe.url + if te == nil { + t.Fatalf("%s: want err: %s, got nil", test.name, test.oe.Error()) + } + if test.oe.reference != te.reference { + t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) + } + if test.oe.url != te.url { + t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) + } + } + if test.output != nil && len(test.output.Metrics) > 0 { + // sort metrics + sort.Slice(acc.Metrics, func(i, j int) bool { + return strings.Compare(acc.Metrics[i].Tags["job_name"], acc.Metrics[j].Tags["job_name"]) < 0 + }) + for i := range test.output.Metrics { + for k, m := range test.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", test.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range test.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", test.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[0].Fields[k]) + } + } + } + + } + } +} From cdc15205d85b8f0cb7940c99e957ae9199bf432e Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Fri, 22 Jun 2018 10:13:58 -0700 Subject: [PATCH 3/6] fix go 1.8 compatibility --- plugins/inputs/jenkins/jenkins_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/jenkins/jenkins_test.go b/plugins/inputs/jenkins/jenkins_test.go index 70b350a90..881d5c61d 100644 --- a/plugins/inputs/jenkins/jenkins_test.go +++ b/plugins/inputs/jenkins/jenkins_test.go @@ -282,7 +282,7 @@ func TestGatherNodeData(t *testing.T) { ResponseTimeout: internal.Duration{Duration: time.Microsecond}, NodeExclude: []string{"ignore-1", "ignore-2"}, } - te := j.newInstance(ts.Client()) + te := j.newInstance(&http.Client{Transport: &http.Transport{}}) acc := new(testutil.Accumulator) j.gatherNodesData(acc) if err := acc.FirstError(); err != nil { @@ -328,7 +328,7 @@ func TestNewInstance(t *testing.T) { } ts := httptest.NewServer(mh) defer ts.Close() - mockClient := ts.Client() + mockClient := &http.Client{Transport: &http.Transport{}} tests := []struct { // name of the test name string @@ -667,7 +667,7 @@ func TestGatherJobs(t *testing.T) { "apps/k8s-cloud/PR-ignore2", }, } - te := j.newInstance(ts.Client()) + te := j.newInstance(&http.Client{Transport: &http.Transport{}}) acc := new(testutil.Accumulator) j.gatherJobs(acc) if err := acc.FirstError(); err != nil { From e7ff7d506bbd1a7a249ab64fad93451a053619c6 Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Fri, 22 Jun 2018 10:48:18 -0700 Subject: [PATCH 4/6] fix conflicts --- Gopkg.lock | 2 +- Gopkg.toml | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Gopkg.lock b/Gopkg.lock index 194bb61e6..243c15fac 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -968,6 +968,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "024194b983d91b9500fe97e0aa0ddb5fe725030cb51ddfb034e386cae1098370" + inputs-digest = "ef9c9e12bdc7296af3d450d44966b8ae465e9b845c1409b8027e52998b3c01e0" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 78d3749a9..64c1762a5 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -108,6 +108,10 @@ name = "github.com/kballard/go-shellquote" branch = "master" +[[constraint]] + name = "github.com/kelwang/gojenkins" + version = "=v1.0.1" + [[constraint]] name = "github.com/matttproud/golang_protobuf_extensions" version = "1.0.1" From 3457c98eb1dc93e60c46465e7673ed0e4f156ff6 Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Mon, 25 Jun 2018 15:43:10 -0700 Subject: [PATCH 5/6] remove err struct --- plugins/inputs/jenkins/jenkins.go | 75 ++++---------- plugins/inputs/jenkins/jenkins_test.go | 136 ++++++------------------- 2 files changed, 48 insertions(+), 163 deletions(-) diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go index db6e0f10c..e68e4715b 100644 --- a/plugins/inputs/jenkins/jenkins.go +++ b/plugins/inputs/jenkins/jenkins.go @@ -82,52 +82,10 @@ const ( measurementJob = "jenkins_job" ) -// Error base type of error. -type Error struct { - err error - reference string - url string +func badFormatErr(url string, field interface{}, want string, fieldName string) error { + return fmt.Errorf("error bad format[%s]: fieldName: %s, want %s, got %T", url, fieldName, want, field) } -func newError(err error, ref, url string) *Error { - return &Error{ - err: err, - reference: ref, - url: url, - } -} - -func (e *Error) Error() string { - if e == nil { - return "" - } - return fmt.Sprintf("error %s[%s]: %v", e.reference, e.url, e.err) -} - -func badFormatErr(url string, field interface{}, want string, fieldName string) *Error { - return &Error{ - err: fmt.Errorf("fieldName: %s, want %s, got %T", fieldName, want, field), - reference: errBadFormat, - url: url, - } -} - -// err references -const ( - errParseConfig = "parse jenkins config" - errJobFilterCompile = "compile job filters" - errNodeFilterCompile = "compile node filters" - errConnectJenkins = "connect jenkins instance" - errInitJenkins = "init jenkins instance" - errRetrieveNode = "retrieving nodes" - errRetrieveJobs = "retrieving jobs" - errEmptyNodeName = "empty node name" - errEmptyMonitorData = "empty monitor data" - errBadFormat = "bad format" - errRetrieveInnerJobs = "retrieving inner jobs" - errRetrieveLatestBuild = "retrieving latest build" -) - // SampleConfig implements telegraf.Input interface func (j *Jenkins) SampleConfig() string { return sampleConfig @@ -156,38 +114,39 @@ func (j *Jenkins) Gather(acc telegraf.Accumulator) error { return nil } -func (j *Jenkins) initClient() (*http.Client, *Error) { +func (j *Jenkins) initClient() (*http.Client, error) { tlsCfg, err := j.ClientConfig.TLSConfig() if err != nil { - return nil, newError(err, errParseConfig, j.URL) + return nil, fmt.Errorf("error parse jenkins config[%s]: %v", j.URL, err) } return &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, + MaxIdleConns: j.MaxConnections, }, Timeout: j.ResponseTimeout.Duration, }, nil } // seperate the client as dependency to use httptest Client for mocking -func (j *Jenkins) newInstance(client *http.Client) *Error { +func (j *Jenkins) newInstance(client *http.Client) error { // create instance var err error j.instance, err = gojenkins.CreateJenkins(client, j.URL, j.Username, j.Password).Init() if err != nil { - return newError(err, errConnectJenkins, j.URL) + return fmt.Errorf("error connect jenkins instance[%s]: %v", j.URL, err) } // init job filter j.jobFilter, err = filter.Compile(j.JobExclude) if err != nil { - return newError(err, errJobFilterCompile, j.URL) + return fmt.Errorf("error compile job filters[%s]: %v", j.URL, err) } // init node filter j.nodeFilter, err = filter.Compile(j.NodeExclude) if err != nil { - return newError(err, errNodeFilterCompile, j.URL) + return fmt.Errorf("error compile node filters[%s]: %v", j.URL, err) } // init tcp pool with default value @@ -203,7 +162,7 @@ func (j *Jenkins) newInstance(client *http.Client) *Error { return nil } -func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.Accumulator) *Error { +func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.Accumulator) error { tags := map[string]string{} fields := make(map[string]interface{}) @@ -211,7 +170,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. // detect the parsing error, since gojenkins lib won't do it if info == nil || info.DisplayName == "" { - return newError(nil, errEmptyNodeName, url) + return fmt.Errorf("error empty node name[%s]: ", j.URL) } tags["node_name"] = info.DisplayName @@ -222,7 +181,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. } if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { - return newError(fmt.Errorf("maybe check your permission"), errEmptyMonitorData, url) + return fmt.Errorf("error empty monitor data[%s]: ", j.URL) } tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) if !ok { @@ -292,7 +251,7 @@ func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { // since gojenkins lib will never return error // returns error for len(nodes) is 0 if err != nil || len(nodes) == 0 { - acc.AddError(newError(err, errRetrieveNode, url)) + acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", j.URL, err)) return } // get node data @@ -308,7 +267,7 @@ func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) { jobs, err := j.instance.GetAllJobNames() if err != nil { - acc.AddError(newError(err, errRetrieveJobs, j.URL)) + acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err)) return } jobsC := make(chan jobRequest, j.MaxConnections) @@ -336,7 +295,7 @@ func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) { wg.Wait() } -func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) *Error { +func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { defer wg.Done() if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth { return nil @@ -348,7 +307,7 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json" jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) if err != nil { - return newError(err, errRetrieveInnerJobs, url) + return fmt.Errorf("error retrieving inner jobs[%s]: ", url) } for k, innerJob := range jobDetail.Raw.Jobs { @@ -385,7 +344,7 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. if err == nil && status != 200 { err = fmt.Errorf("status code %d", status) } - return newError(err, errRetrieveLatestBuild, j.URL+baseURL+"/api/json") + return fmt.Errorf("error retrieving inner jobs[%s]: %v", j.URL+baseURL+"/api/json", err) } if build.Raw.Building { diff --git a/plugins/inputs/jenkins/jenkins_test.go b/plugins/inputs/jenkins/jenkins_test.go index 881d5c61d..d71cf10b9 100644 --- a/plugins/inputs/jenkins/jenkins_test.go +++ b/plugins/inputs/jenkins/jenkins_test.go @@ -2,7 +2,6 @@ package jenkins import ( "encoding/json" - "errors" "net/http" "net/http/httptest" "sort" @@ -15,40 +14,6 @@ import ( "github.com/kelwang/gojenkins" ) -func TestErr(t *testing.T) { - tests := []struct { - err *Error - output string - }{ - { - nil, - "", - }, - { - &Error{ - reference: errConnectJenkins, - url: "http://badurl.com", - err: errors.New("unknown error"), - }, - "error connect jenkins instance[http://badurl.com]: unknown error", - }, - { - newError(errors.New("2"), errEmptyMonitorData, "http://badurl.com"), - "error empty monitor data[http://badurl.com]: 2", - }, - { - badFormatErr("http://badurl.com", 20.12, "string", "arch"), - "error bad format[http://badurl.com]: fieldName: arch, want string, got float64", - }, - } - for _, test := range tests { - output := test.err.Error() - if output != test.output { - t.Errorf("Expected %s, got %s\n", test.output, output) - } - } -} - func TestJobRequest(t *testing.T) { tests := []struct { input jobRequest @@ -129,10 +94,10 @@ type monitorData struct { func TestGatherNodeData(t *testing.T) { tests := []struct { - name string - input mockHandler - output *testutil.Accumulator - oe *Error + name string + input mockHandler + output *testutil.Accumulator + wantErr bool }{ { name: "bad endpoint", @@ -142,9 +107,7 @@ func TestGatherNodeData(t *testing.T) { "/computer/api/json": nil, }, }, - oe: &Error{ - reference: errRetrieveNode, - }, + wantErr: true, }, { name: "bad node data", @@ -160,9 +123,7 @@ func TestGatherNodeData(t *testing.T) { }, }, }, - oe: &Error{ - reference: errEmptyNodeName, - }, + wantErr: true, }, { name: "bad empty monitor data", @@ -177,9 +138,7 @@ func TestGatherNodeData(t *testing.T) { }, }, }, - oe: &Error{ - reference: errEmptyMonitorData, - }, + wantErr: true, }, { name: "bad monitor data format", @@ -195,9 +154,7 @@ func TestGatherNodeData(t *testing.T) { }, }, }, - oe: &Error{ - reference: errBadFormat, - }, + wantErr: true, }, { name: "filtered nodes", @@ -286,22 +243,13 @@ func TestGatherNodeData(t *testing.T) { acc := new(testutil.Accumulator) j.gatherNodesData(acc) if err := acc.FirstError(); err != nil { - te = err.(*Error) + te = err } - if test.oe == nil && te != nil { + if !test.wantErr && te != nil { t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) - } else if test.oe != nil { - test.oe.url = ts.URL + "/computer/api/json" - if te == nil { - t.Fatalf("%s: want err: %s, got nil", test.name, test.oe.Error()) - } - if test.oe.reference != te.reference { - t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) - } - if test.oe.url != te.url { - t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) - } + } else if test.wantErr && te == nil { + t.Fatalf("%s: expected err, got nil", test.name) } if test.output == nil && len(acc.Metrics) > 0 { t.Fatalf("%s: collected extra data", test.name) @@ -331,10 +279,10 @@ func TestNewInstance(t *testing.T) { mockClient := &http.Client{Transport: &http.Transport{}} tests := []struct { // name of the test - name string - input *Jenkins - output *Jenkins - oe *Error + name string + input *Jenkins + output *Jenkins + wantErr bool }{ { name: "bad jenkins config", @@ -342,10 +290,7 @@ func TestNewInstance(t *testing.T) { URL: "http://a bad url", ResponseTimeout: internal.Duration{Duration: time.Microsecond}, }, - oe: &Error{ - url: "http://a bad url", - reference: errConnectJenkins, - }, + wantErr: true, }, { name: "has filter", @@ -370,15 +315,10 @@ func TestNewInstance(t *testing.T) { } for _, test := range tests { te := test.input.newInstance(mockClient) - if test.oe == nil && te != nil { + if !test.wantErr && te != nil { t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) - } else if test.oe != nil { - if test.oe.reference != te.reference { - t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) - } - if test.oe.url != te.url { - t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) - } + } else if test.wantErr && te == nil { + t.Fatalf("%s: expected err, got nil", test.name) } if test.output != nil { if test.input.instance == nil { @@ -394,10 +334,10 @@ func TestNewInstance(t *testing.T) { func TestGatherJobs(t *testing.T) { tests := []struct { - name string - input mockHandler - output *testutil.Accumulator - oe *Error + name string + input mockHandler + output *testutil.Accumulator + wantErr bool }{ { name: "empty job", @@ -414,10 +354,7 @@ func TestGatherJobs(t *testing.T) { }, }, }, - oe: &Error{ - reference: errRetrieveInnerJobs, - url: "/job/job1/api/json", - }, + wantErr: true, }, { name: "jobs has no build", @@ -448,10 +385,7 @@ func TestGatherJobs(t *testing.T) { }, }, }, - oe: &Error{ - url: "/job/job1/1/api/json", - reference: errRetrieveLatestBuild, - }, + wantErr: true, }, { name: "ignore building job", @@ -671,22 +605,14 @@ func TestGatherJobs(t *testing.T) { acc := new(testutil.Accumulator) j.gatherJobs(acc) if err := acc.FirstError(); err != nil { - te = err.(*Error) + te = err } - if test.oe == nil && te != nil { + if !test.wantErr && te != nil { t.Fatalf("%s: failed %s, expected to be nil", test.name, te.Error()) - } else if test.oe != nil { - test.oe.url = ts.URL + test.oe.url - if te == nil { - t.Fatalf("%s: want err: %s, got nil", test.name, test.oe.Error()) - } - if test.oe.reference != te.reference { - t.Fatalf("%s: bad error msg Expected %s, got %s\n", test.name, test.oe.reference, te.reference) - } - if test.oe.url != te.url { - t.Fatalf("%s: bad error url Expected %s, got %s\n", test.name, test.oe.url, te.url) - } + } else if test.wantErr && te == nil { + t.Fatalf("%s: expected err, got nil", test.name) } + if test.output != nil && len(test.output.Metrics) > 0 { // sort metrics sort.Slice(acc.Metrics, func(i, j int) bool { From 4ec7999186e674fb7a5e975a14082b4e934b904a Mon Sep 17 00:00:00 2001 From: Kelvin Wang Date: Mon, 25 Jun 2018 17:03:41 -0700 Subject: [PATCH 6/6] use semaphore model --- Gopkg.lock | 8 ++- plugins/inputs/jenkins/jenkins.go | 81 +++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 243c15fac..15aeb9bf2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -465,6 +465,12 @@ packages = ["."] revision = "95032a82bc518f77982ea72343cc1ade730072f0" +[[projects]] + name = "github.com/kelwang/gojenkins" + packages = ["."] + revision = "4ea2f2d0a3e1350cf32d33b31ad175a2521425de" + version = "v1.0.1" + [[projects]] branch = "master" name = "github.com/kr/logfmt" @@ -968,6 +974,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "ef9c9e12bdc7296af3d450d44966b8ae465e9b845c1409b8027e52998b3c01e0" + inputs-digest = "ef689441bcd85892bf7b2818fd5def10c2d59112ad13cdf44512efb1f2e48c6a" solver-name = "gps-cdcl" solver-version = 1 diff --git a/plugins/inputs/jenkins/jenkins.go b/plugins/inputs/jenkins/jenkins.go index e68e4715b..c83884db4 100644 --- a/plugins/inputs/jenkins/jenkins.go +++ b/plugins/inputs/jenkins/jenkins.go @@ -37,6 +37,8 @@ type Jenkins struct { NodeExclude []string `toml:"node_exclude"` nodeFilter filter.Filter + + semaphore chan struct{} } type byBuildNumber []gojenkins.JobBuild @@ -159,6 +161,8 @@ func (j *Jenkins) newInstance(client *http.Client) error { j.MaxSubJobPerLayer = 10 } + j.semaphore = make(chan struct{}, j.MaxConnections) + return nil } @@ -170,7 +174,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. // detect the parsing error, since gojenkins lib won't do it if info == nil || info.DisplayName == "" { - return fmt.Errorf("error empty node name[%s]: ", j.URL) + return fmt.Errorf("error empty node name[%s]: ", url) } tags["node_name"] = info.DisplayName @@ -181,7 +185,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. } if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil { - return fmt.Errorf("error empty monitor data[%s]: ", j.URL) + return fmt.Errorf("error empty monitor data[%s]: ", url) } tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string) if !ok { @@ -246,12 +250,18 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf. } func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) { - nodes, err := j.instance.GetAllNodes() + var nodes []*gojenkins.Node + var err error + err = j.doGet(func() error { + nodes, err = j.instance.GetAllNodes() + return err + }) + url := j.URL + "/computer/api/json" // since gojenkins lib will never return error // returns error for len(nodes) is 0 if err != nil || len(nodes) == 0 { - acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", j.URL, err)) + acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", url, err)) return } // get node data @@ -270,33 +280,42 @@ func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) { acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err)) return } - jobsC := make(chan jobRequest, j.MaxConnections) var wg sync.WaitGroup for _, job := range jobs { wg.Add(1) - go func(name string) { - jobsC <- jobRequest{ + go func(name string, wg *sync.WaitGroup, acc telegraf.Accumulator) { + defer wg.Done() + if te := j.getJobDetail(jobRequest{ name: name, parents: []string{}, layer: 0, + }, wg, acc); te != nil { + acc.AddError(te) } - }(job.Name) - } - - for i := 0; i < j.MaxConnections; i++ { - go func(jobsC chan jobRequest, acc telegraf.Accumulator, wg *sync.WaitGroup) { - for sj := range jobsC { - if te := j.getJobDetail(sj, jobsC, wg, acc); te != nil { - acc.AddError(te) - } - } - }(jobsC, acc, &wg) + }(job.Name, &wg, acc) } wg.Wait() } -func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { - defer wg.Done() +// wrap the tcp request with doGet +// block tcp request if buffered channel is full +func (j *Jenkins) doGet(tcp func() error) error { + j.semaphore <- struct{}{} + if err := tcp(); err != nil { + if err == gojenkins.ErrSessionExpired { + // ignore the error here, since config parsing should be finished. + client, _ := j.initClient() + // SessionExpired use a go routine to create a new session + go j.newInstance(client) + } + <-j.semaphore + return err + } + <-j.semaphore + return nil +} + +func (j *Jenkins) getJobDetail(sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error { if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth { return nil } @@ -305,7 +324,12 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. return nil } url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json" - jobDetail, err := j.instance.GetJob(sj.name, sj.parents...) + var jobDetail *gojenkins.Job + var err error + err = j.doGet(func() error { + jobDetail, err = j.instance.GetJob(sj.name, sj.parents...) + return err + }) if err != nil { return fmt.Errorf("error retrieving inner jobs[%s]: ", url) } @@ -316,13 +340,16 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. } wg.Add(1) // schedule tcp fetch for inner jobs - go func(innerJob gojenkins.InnerJob, sj jobRequest) { - jobsC <- jobRequest{ + go func(innerJob gojenkins.InnerJob, sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) { + defer wg.Done() + if te := j.getJobDetail(jobRequest{ name: innerJob.Name, parents: sj.combined(), layer: sj.layer + 1, + }, wg, acc); te != nil { + acc.AddError(te) } - }(innerJob, sj) + }(innerJob, sj, wg, acc) } // collect build info @@ -339,7 +366,11 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync. Base: baseURL, Raw: new(gojenkins.BuildResponse), } - status, err := build.Poll() + var status int + err = j.doGet(func() error { + status, err = build.Poll() + return err + }) if err != nil || status != 200 { if err == nil && status != 200 { err = fmt.Errorf("status code %d", status)