Cleaning the code and correcting the Makefile

This commit is contained in:
shubhamDX 2016-08-09 20:02:37 +05:30 committed by Daniel Nelson
parent f4255d331f
commit 42fd21c19d
No known key found for this signature in database
GPG Key ID: CAAD59C9444F6155
2 changed files with 10 additions and 14 deletions

View File

@ -40,9 +40,9 @@ func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
} }
type Spark struct { type Spark struct {
jClient JolokiaClient jClient JolokiaClient
SparkServer []string spark_servers []string
YarnServer string yarn_server string
} }
type javaMetric struct { type javaMetric struct {
@ -150,7 +150,6 @@ func addJavaMetric(class string, c javaMetric,
} }
func (j javaMetric) addTagsFields(out map[string]interface{}) { func (j javaMetric) addTagsFields(out map[string]interface{}) {
fmt.Println(out["request"])
request := out["request"].(map[string]interface{}) request := out["request"].(map[string]interface{})
var mbean = request["mbean"].(string) var mbean = request["mbean"].(string)
var mbeansplit = strings.Split(mbean, "=") var mbeansplit = strings.Split(mbean, "=")
@ -237,9 +236,9 @@ func (c yarnMetric) addTagsFields(out map[string]interface{}) {
func (j *Spark) SampleConfig() string { func (j *Spark) SampleConfig() string {
return ` return `
## Spark server exposing jolokia read service ## Spark server exposing jolokia read service
SparkServer = ["127.0.0.1:8778"] #optional spark_servers = ["127.0.0.1:8778"] #optional
## Server running Yarn Resource Manager ## Server running Yarn Resource Manager
YarnServer = "127.0.0.1:8088" #optional yarn_server = "127.0.0.1:8088" #optional
` `
} }
@ -344,7 +343,7 @@ func parseServerTokens(server string) map[string]string {
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error { func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
context := "/jolokia/read" context := "/jolokia/read"
servers := c.SparkServer servers := c.spark_servers
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"} metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
if len(servers) == 0 { if len(servers) == 0 {
wg.Done() wg.Done()
@ -370,7 +369,6 @@ func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) erro
if err != nil { if err != nil {
return err return err
} }
fmt.Println("Request url is ", requestUrl)
out, err := c.getAttr(requestUrl) out, err := c.getAttr(requestUrl)
if len(out) == 0 { if len(out) == 0 {
@ -393,8 +391,6 @@ func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
return nil return nil
} }
fmt.Println("Going to collect data of server ", server)
serverTokens := parseServerTokens(server) serverTokens := parseServerTokens(server)
for _, context := range contexts { for _, context := range contexts {
var m = newYarnMetric(server, acc) var m = newYarnMetric(server, acc)
@ -419,14 +415,14 @@ func (c *Spark) Gather(acc telegraf.Accumulator) error {
log.Println("Config is ", c) log.Println("Config is ", c)
yarn := Yarn{ yarn := Yarn{
yClient: &YarnClientImpl{client: &http.Client{}}, yClient: &YarnClientImpl{client: &http.Client{}},
serverAddress: c.YarnServer, serverAddress: c.yarn_server,
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go yarn.GatherYarn(acc, &wg) go yarn.GatherYarn(acc, &wg)
spark := Spark{ spark := Spark{
jClient: &JolokiaClientImpl{client: &http.Client{}}, jClient: &JolokiaClientImpl{client: &http.Client{}},
SparkServer: c.SparkServer, spark_servers: c.spark_servers,
} }
wg.Add(1) wg.Add(1)
go spark.GatherJolokia(acc, &wg) go spark.GatherJolokia(acc, &wg)

View File

@ -13,7 +13,7 @@ func TestSparkMeasurements(t *testing.T) {
} }
a := &Spark{ a := &Spark{
YarnServer: testutil.GetLocalHost() + ":8088", yarn_server: testutil.GetLocalHost() + ":8088",
} }
var acc testutil.Accumulator var acc testutil.Accumulator