From 99f494f0d83a5607fc1a9c195b5d863dc44f3951 Mon Sep 17 00:00:00 2001 From: shubhamDX Date: Tue, 9 Aug 2016 15:42:52 +0530 Subject: [PATCH] Adding spark plugin --- plugins/inputs/all/all.go | 1 + plugins/inputs/spark/README.md | 0 plugins/inputs/spark/spark.go | 441 +++++++++++++++++++++++++++++ plugins/inputs/spark/spark_test.go | 23 ++ 4 files changed, 465 insertions(+) create mode 100644 plugins/inputs/spark/README.md create mode 100644 plugins/inputs/spark/spark.go create mode 100644 plugins/inputs/spark/spark_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index a13bd6cd3..defe90d6b 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -107,4 +107,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/zfs" _ "github.com/influxdata/telegraf/plugins/inputs/zipkin" _ "github.com/influxdata/telegraf/plugins/inputs/zookeeper" + _ "github.com/influxdata/telegraf/plugins/inputs/spark" ) diff --git a/plugins/inputs/spark/README.md b/plugins/inputs/spark/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/inputs/spark/spark.go b/plugins/inputs/spark/spark.go new file mode 100644 index 000000000..fce11efe9 --- /dev/null +++ b/plugins/inputs/spark/spark.go @@ -0,0 +1,441 @@ +package spark + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type JolokiaClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type JolokiaClientImpl struct { + client *http.Client +} + +func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type YarnClient interface { + MakeRequest(req *http.Request) (*http.Response, error) +} + +type YarnClientImpl struct { + client *http.Client +} + +func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +type Spark struct { + jClient JolokiaClient + SparkServer []string + YarnServer string +} + +type javaMetric struct { + host string + metric string + acc telegraf.Accumulator +} + +type sparkMetric struct { + host string + metric string + acc telegraf.Accumulator +} + +type Yarn struct { + yClient YarnClient + serverAddress string +} + +type yarnMetric struct { + host string + acc telegraf.Accumulator +} + +type jmxMetric interface { + addTagsFields(out map[string]interface{}) +} + +func newJavaMetric(host string, metric string, + acc telegraf.Accumulator) *javaMetric { + return &javaMetric{host: host, metric: metric, acc: acc} +} + +func newSparkMetric(host string, metric string, + acc telegraf.Accumulator) *sparkMetric { + return &sparkMetric{host: host, metric: metric, acc: acc} +} + +func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric { + return &yarnMetric{host: host, acc: acc} +} + +func addValuesAsFields(values map[string]interface{}, fields map[string]interface{}, + mname string) { + for k, v := range values { + if v != nil { + fields[mname+"_"+k] = v + } + } +} + +func parseJmxMetricRequest(mbean string) map[string]string { + tokens := make(map[string]string) + classAndPairs := strings.Split(mbean, ":") + if classAndPairs[0] == "metrics" { + tokens["class"] = "spark_jolokiaMetrics" + } else if classAndPairs[0] == "java.lang" { + tokens["class"] = "java" + } else { + return tokens + } + + pair := strings.Split(classAndPairs[1], "=") + tokens[pair[0]] = pair[1] + + return tokens +} + +func addTokensToTags(tokens map[string]string, tags map[string]string) { + for k, v := range tokens { + if k == "name" { + tags["mname"] = v // name seems to a reserved word in influxdb + } else if k == "class" || k == "type" { + continue // class and type are used in the metric name + } else { + tags[k] = v + } + } +} + +func addJavaMetric(class string, c javaMetric, + values map[string]interface{}) { + + tags := make(map[string]string) + fields := make(map[string]interface{}) + tags["spark_host"] = c.host + tags["spark_class"] = class + + if class == "spark_Threading" { + list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"} + for _, value := range list { + if values[value] != nil { + fields[value] = values[value] + } + } + } else { + for k, v := range values { + if v != nil { + fields[k] = v + } + } + } + c.acc.AddFields(class, fields, tags) + +} + +func (j javaMetric) addTagsFields(out map[string]interface{}) { + fmt.Println(out["request"]) + request := out["request"].(map[string]interface{}) + var mbean = request["mbean"].(string) + var mbeansplit = strings.Split(mbean, "=") + var class = mbeansplit[1] + + if valuesMap, ok := out["value"]; ok { + if class == "Memory" { + addJavaMetric("spark_HeapMemoryUsage", j, valuesMap.(map[string]interface{})) + } else if class == "Threading" { + addJavaMetric("spark_Threading", j, valuesMap.(map[string]interface{})) + } else { + fmt.Printf("Missing key in '%s' output response\n%v\n", + j.metric, out) + return + } + + } +} + +func addSparkMetric(mbean string, c sparkMetric, + values map[string]interface{}) { + + tags := make(map[string]string) + fields := make(map[string]interface{}) + + tokens := parseJmxMetricRequest(mbean) + addTokensToTags(tokens, tags) + tags["spark_host"] = c.host + + addValuesAsFields(values, fields, tags["mname"]) + c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) + +} + +func (c sparkMetric) addTagsFields(out map[string]interface{}) { + if valuesMap, ok := out["value"]; ok { + for k, v := range valuesMap.(map[string]interface{}) { + addSparkMetric(k, c, v.(map[string]interface{})) + } + } else { + fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", + c.metric, out) + return + } + +} + +func addYarnMetric(c yarnMetric, value map[string]interface{}, metrictype string) { + + tags := make(map[string]string) + fields := make(map[string]interface{}) + tags["yarn_host"] = c.host + for key, val := range value { + fields[key] = val + } + c.acc.AddFields(metrictype, fields, tags) +} + +func (c yarnMetric) addTagsFields(out map[string]interface{}) { + + if valuesMap, ok := out["clusterMetrics"]; ok { + addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_clusterMetrics") + } else if valuesMap, ok := out["clusterInfo"]; ok { + addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_clusterInfo") + } else if valuesMap, ok := out["apps"]; ok { + for _, value := range valuesMap.(map[string]interface{}) { + for _, vv := range value.([]interface{}) { + addYarnMetric(c, vv.(map[string]interface{}), "spark_apps") + } + } + } else if valuesMap, ok := out["nodes"]; ok { + for _, value := range valuesMap.(map[string]interface{}) { + for _, vv := range value.([]interface{}) { + addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes") + } + } + } else { + fmt.Printf("Missing the required key in output response\n%v\n", out) + return + } + +} + +func (j *Spark) SampleConfig() string { + return ` + ## Spark server exposing jolokia read service + SparkServer = ["127.0.0.1:8778"] #optional + ## Server running Yarn Resource Manager + YarnServer = "127.0.0.1:8088" #optional +` +} + +func (j *Spark) Description() string { + return "Read Spark metrics through Jolokia and Yarn" +} + +func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + // Create + send request + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.jClient.MakeRequest(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { + req, err := http.NewRequest("GET", requestUrl.String(), nil) + if err != nil { + return nil, err + } + + resp, err := j.yClient.MakeRequest(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestUrl, + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return nil, err + } + + // read body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + // Unmarshal json + var jsonOut map[string]interface{} + if err = json.Unmarshal([]byte(body), &jsonOut); err != nil { + return nil, errors.New("Error decoding JSON response") + } + + return jsonOut, nil +} + +func parseServerTokens(server string) map[string]string { + serverTokens := make(map[string]string) + log.Printf("Parsing %s", server) + hostAndUser := strings.Split(server, "@") + hostPort := "" + + if len(hostAndUser) == 1 { + hostPort = hostAndUser[0] + } else { + log.Printf("Unsupported Server info, skipping") + return nil + } + log.Printf("%s \n", hostPort) + hostTokens := strings.Split(hostPort, ":") + serverTokens["host"] = hostTokens[0] + serverTokens["port"] = hostTokens[1] + return serverTokens +} + +func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error { + context := "/jolokia/read" + servers := c.SparkServer + metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"} + if len(servers) == 0 { + wg.Done() + return nil + } + for _, server := range servers { + for _, metric := range metrics { + serverTokens := parseServerTokens(server) + var m jmxMetric + if strings.HasPrefix(metric, "/java.lang:") { + m = newJavaMetric(serverTokens["host"], metric, acc) + } else if strings.HasPrefix(metric, "/metrics:") { + m = newSparkMetric(serverTokens["host"], metric, acc) + } else { + log.Printf("Unsupported Spark metric [%s], skipping", + metric) + continue + } + + requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + + serverTokens["port"] + context + metric) + + if err != nil { + return err + } + fmt.Println("Request url is ", requestUrl) + + out, err := c.getAttr(requestUrl) + if len(out) == 0 { + continue + } + m.addTagsFields(out) + } + } + + wg.Done() + return nil +} + +func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error { + contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"} + server := c.serverAddress + + if server == "" { + wg.Done() + return nil + } + + fmt.Println("Going to collect data of server ", server) + + serverTokens := parseServerTokens(server) + for _, context := range contexts { + var m = newYarnMetric(server, acc) + requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context) + if err != nil { + return err + } + + out, err := c.getAttr(requestUrl) + if len(out) == 0 { + continue + } + m.addTagsFields(out) + + } + wg.Done() + return nil +} + +func (c *Spark) Gather(acc telegraf.Accumulator) error { + + log.Println("Config is ", c) + yarn := Yarn{ + yClient: &YarnClientImpl{client: &http.Client{}}, + serverAddress: c.YarnServer, + } + wg := sync.WaitGroup{} + wg.Add(1) + go yarn.GatherYarn(acc, &wg) + spark := Spark{ + jClient: &JolokiaClientImpl{client: &http.Client{}}, + SparkServer: c.SparkServer, + } + wg.Add(1) + go spark.GatherJolokia(acc, &wg) + wg.Wait() + return nil +} + +func init() { + inputs.Add("spark", func() telegraf.Input { + return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}} + }) +} diff --git a/plugins/inputs/spark/spark_test.go b/plugins/inputs/spark/spark_test.go new file mode 100644 index 000000000..8d4d221aa --- /dev/null +++ b/plugins/inputs/spark/spark_test.go @@ -0,0 +1,23 @@ +package spark + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestSparkMeasurements(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + a := &Spark{ + YarnServer: testutil.GetLocalHost() + ":8088", + } + + var acc testutil.Accumulator + + err := a.Gather(&acc) + require.NoError(t, err) +}