telegraf/plugins/inputs/spark/spark.go

438 lines
10 KiB
Go

package spark
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type YarnClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type YarnClientImpl struct {
client *http.Client
}
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Spark struct {
jClient JolokiaClient
spark_servers []string
yarn_server string
}
type javaMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type sparkMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type Yarn struct {
yClient YarnClient
serverAddress string
}
type yarnMetric struct {
host string
acc telegraf.Accumulator
}
type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newSparkMetric(host string, metric string,
acc telegraf.Accumulator) *sparkMetric {
return &sparkMetric{host: host, metric: metric, acc: acc}
}
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
return &yarnMetric{host: host, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
mname string) {
for k, v := range values {
if v != nil {
fields[mname+"_"+k] = v
}
}
}
func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "metrics" {
tokens["class"] = "spark_jolokiaMetrics"
} else if classAndPairs[0] == "java.lang" {
tokens["class"] = "java"
} else {
return tokens
}
pair := strings.Split(classAndPairs[1], "=")
tokens[pair[0]] = pair[1]
return tokens
}
func addTokensToTags(tokens map[string]string, tags map[string]string) {
for k, v := range tokens {
if k == "name" {
tags["mname"] = v // name seems to a reserved word in influxdb
} else if k == "class" || k == "type" {
continue // class and type are used in the metric name
} else {
tags[k] = v
}
}
}
func addJavaMetric(class string, c *javaMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["spark_host"] = c.host
tags["spark_class"] = class
if class == "spark_Threading" {
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
for _, value := range list {
if values[value] != nil {
fields[value] = values[value]
}
}
} else {
for k, v := range values {
if v != nil {
fields[k] = v
}
}
}
c.acc.AddFields(class, fields, tags)
}
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
request := out["request"].(map[string]interface{})
var mbean = request["mbean"].(string)
var mbeansplit = strings.Split(mbean, "=")
var class = mbeansplit[1]
if valuesMap, ok := out["value"]; ok {
if class == "Memory" {
addJavaMetric("spark_HeapMemoryUsage", j, valuesMap.(map[string]interface{}))
} else if class == "Threading" {
addJavaMetric("spark_Threading", j, valuesMap.(map[string]interface{}))
} else {
fmt.Printf("Missing key in '%s' output response\n%v\n",
j.metric, out)
return
}
}
}
func addSparkMetric(mbean string, c *sparkMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
tags["spark_host"] = c.host
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
}
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) {
addSparkMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
}
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["yarn_host"] = c.host
for key, val := range value {
fields[key] = val
}
c.acc.AddFields(metrictype, fields, tags)
}
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["clusterMetrics"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_clusterMetrics")
} else if valuesMap, ok := out["clusterInfo"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_clusterInfo")
} else if valuesMap, ok := out["apps"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
}
}
} else if valuesMap, ok := out["nodes"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
}
}
} else {
fmt.Printf("Missing the required key in output response\n%v\n", out)
return
}
}
func (j *Spark) SampleConfig() string {
return `
## Spark server exposing jolokia read service
spark_servers = ["127.0.0.1:8778"] #optional
## Server running Yarn Resource Manager
yarn_server = "127.0.0.1:8088" #optional
`
}
func (j *Spark) Description() string {
return "Read Spark metrics through Jolokia and Yarn"
}
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.yClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
log.Printf("Parsing %s", server)
hostAndUser := strings.Split(server, "@")
hostPort := ""
if len(hostAndUser) == 1 {
hostPort = hostAndUser[0]
} else {
log.Printf("Unsupported Server info, skipping")
return nil
}
log.Printf("%s \n", hostPort)
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
return serverTokens
}
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
context := "/jolokia/read"
servers := c.spark_servers
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
if len(servers) == 0 {
wg.Done()
return nil
}
for _, server := range servers {
for _, metric := range metrics {
serverTokens := parseServerTokens(server)
var m jmxMetric
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric, "/metrics:") {
m = newSparkMetric(serverTokens["host"], metric, acc)
} else {
log.Printf("Unsupported Spark metric [%s], skipping",
metric)
continue
}
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
}
wg.Done()
return nil
}
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
server := c.serverAddress
if server == "" {
wg.Done()
return nil
}
serverTokens := parseServerTokens(server)
for _, context := range contexts {
var m = newYarnMetric(server, acc)
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
wg.Done()
return nil
}
func (c *Spark) Gather(acc telegraf.Accumulator) error {
log.Println("Config is ", c)
yarn := Yarn{
yClient: &YarnClientImpl{client: &http.Client{}},
serverAddress: c.yarn_server,
}
wg := sync.WaitGroup{}
wg.Add(1)
go yarn.GatherYarn(acc, &wg)
spark := Spark{
jClient: &JolokiaClientImpl{client: &http.Client{}},
spark_servers: c.spark_servers,
}
wg.Add(1)
go spark.GatherJolokia(acc, &wg)
wg.Wait()
return nil
}
func init() {
inputs.Add("spark", func() telegraf.Input {
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}