Compare commits

...

11 Commits

Author SHA1 Message Date
shubhamDX aa2d76afb6
Updating the config file description 2018-02-06 11:51:51 -08:00
shubhamDX d193a9416d
normalizing metric names to fit better with snakecase 2018-02-06 11:51:51 -08:00
shubhamDX dcf81d7dfe
Updating value receiver to pointer receiver 2018-02-06 11:51:51 -08:00
Shubham Srivastava dd75a883ee
Edited variable names 2018-02-06 11:51:51 -08:00
shubhamDX 42fd21c19d
Cleaning the code and correcting the Makefile 2018-02-06 11:51:51 -08:00
Shubham Srivastava f4255d331f
Updating 2018-02-06 11:50:22 -08:00
Shubham Srivastava 68c453c355
Updating 2018-02-06 11:50:22 -08:00
shubhamDX 7069ef46b2
Formatting the source code 2018-02-06 11:50:22 -08:00
shubhamDX 44034d1b73
Updating CHANGELOG.md 2018-02-06 11:50:22 -08:00
Shubham Srivastava a4cf3eb98b
Updating README.md 2018-02-06 11:50:22 -08:00
shubhamDX 99f494f0d8
Adding spark plugin 2018-02-06 11:50:22 -08:00
5 changed files with 509 additions and 0 deletions

View File

@ -816,6 +816,7 @@ consistent with the behavior of `collection_jitter`.
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
- [#1611](https://github.com/influxdata/telegraf/pull/1611): Adding Spark Plugin
### Bugfixes

View File

@ -88,6 +88,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
_ "github.com/influxdata/telegraf/plugins/inputs/spark"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"

View File

@ -0,0 +1,47 @@
# Telegraf plugin: Spark
#### Plugin arguments:
- **spark_servers** []string: List of spark nodes with the format ["host:port"] (optional)
- **yarn_server** string: Address of Yarn resource manager (optional)
#### Description
The Spark plugin collects metrics in 2 ways, both being optional:
- Spark-JVM metrics exposed as MBean's attributes through jolokia REST endpoint. Metrics are collected for each server configured. See:https://jolokia.org/
- Spark application metrics if managed by Yarn Resource manager. The plugin collects through the Yarn API. If some spark job has been submitted then only it will fetch else it will not produce any spark application result.
# Measurements:
Spark plugin produces one or more measurements according to the SparkServer or YarnServer provided.
Given a configuration like:
```toml
[[inputs.spark]]
spark_servers = ["127.0.0.1:8778"]
yarn_server = "127.0.0.1:8088"
```
The maximum collected measurements will be:
```
spark_HeapMemoryUsage , spark_Threading , spark_apps , spark_clusterInfo , spark_clusterMetrics , spark_jolokiaMetrics , spark_jvmMetrics , spark_nodes
```
# Useful Metrics:
Here is a list of metrics that are fetched and might be useful to monitor your spark cluster.
####measurement domains collected through Jolokia
- "/metrics:*"
- "/java.lang:type=Memory/HeapMemoryUsage"
- "/java.lang:type=Threading
####measurement domains collected through YarnServer
- "/cluster"
- "/cluster/metrics"
- "/cluster/apps"
- "/cluster/nodes"

View File

@ -0,0 +1,437 @@
package spark
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type YarnClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type YarnClientImpl struct {
client *http.Client
}
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Spark struct {
jClient JolokiaClient
spark_servers []string
yarn_server string
}
type javaMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type sparkMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type Yarn struct {
yClient YarnClient
serverAddress string
}
type yarnMetric struct {
host string
acc telegraf.Accumulator
}
type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newSparkMetric(host string, metric string,
acc telegraf.Accumulator) *sparkMetric {
return &sparkMetric{host: host, metric: metric, acc: acc}
}
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
return &yarnMetric{host: host, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
mname string) {
for k, v := range values {
if v != nil {
fields[mname+"_"+k] = v
}
}
}
func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "metrics" {
tokens["class"] = "spark_jolokia_metrics"
} else if classAndPairs[0] == "java.lang" {
tokens["class"] = "java"
} else {
return tokens
}
pair := strings.Split(classAndPairs[1], "=")
tokens[pair[0]] = pair[1]
return tokens
}
func addTokensToTags(tokens map[string]string, tags map[string]string) {
for k, v := range tokens {
if k == "name" {
tags["mname"] = v // name seems to a reserved word in influxdb
} else if k == "class" || k == "type" {
continue // class and type are used in the metric name
} else {
tags[k] = v
}
}
}
func addJavaMetric(class string, c *javaMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["spark_host"] = c.host
tags["spark_class"] = class
if class == "spark_threading" {
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
for _, value := range list {
if values[value] != nil {
fields[value] = values[value]
}
}
} else {
for k, v := range values {
if v != nil {
fields[k] = v
}
}
}
c.acc.AddFields(class, fields, tags)
}
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
request := out["request"].(map[string]interface{})
var mbean = request["mbean"].(string)
var mbeansplit = strings.Split(mbean, "=")
var class = mbeansplit[1]
if valuesMap, ok := out["value"]; ok {
if class == "Memory" {
addJavaMetric("spark_heap_memory_usage", j, valuesMap.(map[string]interface{}))
} else if class == "Threading" {
addJavaMetric("spark_threading", j, valuesMap.(map[string]interface{}))
} else {
fmt.Printf("Missing key in '%s' output response\n%v\n",
j.metric, out)
return
}
}
}
func addSparkMetric(mbean string, c *sparkMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
tags["spark_host"] = c.host
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
}
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) {
addSparkMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
}
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["yarn_host"] = c.host
for key, val := range value {
fields[key] = val
}
c.acc.AddFields(metrictype, fields, tags)
}
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["clusterMetrics"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_metrics")
} else if valuesMap, ok := out["clusterInfo"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_info")
} else if valuesMap, ok := out["apps"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
}
}
} else if valuesMap, ok := out["nodes"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
}
}
} else {
fmt.Printf("Missing the required key in output response\n%v\n", out)
return
}
}
func (j *Spark) SampleConfig() string {
return `
## Spark servers exposing jolokia read service
#spark_servers = ["127.0.0.1:8778"] #optional
## Server running Yarn Resource Manager
#yarn_server = "127.0.0.1:8088" #optional
`
}
func (j *Spark) Description() string {
return "Read Spark metrics through Jolokia and Yarn"
}
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.yClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
log.Printf("Parsing %s", server)
hostAndUser := strings.Split(server, "@")
hostPort := ""
if len(hostAndUser) == 1 {
hostPort = hostAndUser[0]
} else {
log.Printf("Unsupported Server info, skipping")
return nil
}
log.Printf("%s \n", hostPort)
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
return serverTokens
}
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
context := "/jolokia/read"
servers := c.spark_servers
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
if len(servers) == 0 {
wg.Done()
return nil
}
for _, server := range servers {
for _, metric := range metrics {
serverTokens := parseServerTokens(server)
var m jmxMetric
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric, "/metrics:") {
m = newSparkMetric(serverTokens["host"], metric, acc)
} else {
log.Printf("Unsupported Spark metric [%s], skipping",
metric)
continue
}
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
}
wg.Done()
return nil
}
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
server := c.serverAddress
if server == "" {
wg.Done()
return nil
}
serverTokens := parseServerTokens(server)
for _, context := range contexts {
var m = newYarnMetric(server, acc)
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
wg.Done()
return nil
}
func (c *Spark) Gather(acc telegraf.Accumulator) error {
log.Println("Config is ", c)
yarn := Yarn{
yClient: &YarnClientImpl{client: &http.Client{}},
serverAddress: c.yarn_server,
}
wg := sync.WaitGroup{}
wg.Add(1)
go yarn.GatherYarn(acc, &wg)
spark := Spark{
jClient: &JolokiaClientImpl{client: &http.Client{}},
spark_servers: c.spark_servers,
}
wg.Add(1)
go spark.GatherJolokia(acc, &wg)
wg.Wait()
return nil
}
func init() {
inputs.Add("spark", func() telegraf.Input {
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}

View File

@ -0,0 +1,23 @@
package spark
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestSparkMeasurements(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
a := &Spark{
yarn_server: testutil.GetLocalHost() + ":8088",
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
}