Compare commits
11 Commits
master
...
ShubhamDX-
Author | SHA1 | Date |
---|---|---|
shubhamDX | aa2d76afb6 | |
shubhamDX | d193a9416d | |
shubhamDX | dcf81d7dfe | |
Shubham Srivastava | dd75a883ee | |
shubhamDX | 42fd21c19d | |
Shubham Srivastava | f4255d331f | |
Shubham Srivastava | 68c453c355 | |
shubhamDX | 7069ef46b2 | |
shubhamDX | 44034d1b73 | |
Shubham Srivastava | a4cf3eb98b | |
shubhamDX | 99f494f0d8 |
|
@ -816,6 +816,7 @@ consistent with the behavior of `collection_jitter`.
|
||||||
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
||||||
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
||||||
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
||||||
|
- [#1611](https://github.com/influxdata/telegraf/pull/1611): Adding Spark Plugin
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
|
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
|
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
|
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/spark"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
|
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
|
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
|
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
# Telegraf plugin: Spark
|
||||||
|
|
||||||
|
#### Plugin arguments:
|
||||||
|
- **spark_servers** []string: List of spark nodes with the format ["host:port"] (optional)
|
||||||
|
- **yarn_server** string: Address of Yarn resource manager (optional)
|
||||||
|
|
||||||
|
#### Description
|
||||||
|
|
||||||
|
The Spark plugin collects metrics in 2 ways, both being optional:
|
||||||
|
- Spark-JVM metrics exposed as MBean's attributes through jolokia REST endpoint. Metrics are collected for each server configured. See:https://jolokia.org/
|
||||||
|
- Spark application metrics if managed by Yarn Resource manager. The plugin collects through the Yarn API. If some spark job has been submitted then only it will fetch else it will not produce any spark application result.
|
||||||
|
|
||||||
|
# Measurements:
|
||||||
|
Spark plugin produces one or more measurements according to the SparkServer or YarnServer provided.
|
||||||
|
|
||||||
|
Given a configuration like:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.spark]]
|
||||||
|
spark_servers = ["127.0.0.1:8778"]
|
||||||
|
yarn_server = "127.0.0.1:8088"
|
||||||
|
```
|
||||||
|
|
||||||
|
The maximum collected measurements will be:
|
||||||
|
|
||||||
|
```
|
||||||
|
spark_HeapMemoryUsage , spark_Threading , spark_apps , spark_clusterInfo , spark_clusterMetrics , spark_jolokiaMetrics , spark_jvmMetrics , spark_nodes
|
||||||
|
```
|
||||||
|
|
||||||
|
# Useful Metrics:
|
||||||
|
|
||||||
|
Here is a list of metrics that are fetched and might be useful to monitor your spark cluster.
|
||||||
|
|
||||||
|
####measurement domains collected through Jolokia
|
||||||
|
|
||||||
|
- "/metrics:*"
|
||||||
|
- "/java.lang:type=Memory/HeapMemoryUsage"
|
||||||
|
- "/java.lang:type=Threading
|
||||||
|
|
||||||
|
####measurement domains collected through YarnServer
|
||||||
|
- "/cluster"
|
||||||
|
- "/cluster/metrics"
|
||||||
|
- "/cluster/apps"
|
||||||
|
- "/cluster/nodes"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,437 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JolokiaClient interface {
|
||||||
|
MakeRequest(req *http.Request) (*http.Response, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type JolokiaClientImpl struct {
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
|
||||||
|
return c.client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
type YarnClient interface {
|
||||||
|
MakeRequest(req *http.Request) (*http.Response, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type YarnClientImpl struct {
|
||||||
|
client *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
|
||||||
|
return c.client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Spark struct {
|
||||||
|
jClient JolokiaClient
|
||||||
|
spark_servers []string
|
||||||
|
yarn_server string
|
||||||
|
}
|
||||||
|
|
||||||
|
type javaMetric struct {
|
||||||
|
host string
|
||||||
|
metric string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
type sparkMetric struct {
|
||||||
|
host string
|
||||||
|
metric string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
type Yarn struct {
|
||||||
|
yClient YarnClient
|
||||||
|
serverAddress string
|
||||||
|
}
|
||||||
|
|
||||||
|
type yarnMetric struct {
|
||||||
|
host string
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
type jmxMetric interface {
|
||||||
|
addTagsFields(out map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJavaMetric(host string, metric string,
|
||||||
|
acc telegraf.Accumulator) *javaMetric {
|
||||||
|
return &javaMetric{host: host, metric: metric, acc: acc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSparkMetric(host string, metric string,
|
||||||
|
acc telegraf.Accumulator) *sparkMetric {
|
||||||
|
return &sparkMetric{host: host, metric: metric, acc: acc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
|
||||||
|
return &yarnMetric{host: host, acc: acc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
|
||||||
|
mname string) {
|
||||||
|
for k, v := range values {
|
||||||
|
if v != nil {
|
||||||
|
fields[mname+"_"+k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseJmxMetricRequest(mbean string) map[string]string {
|
||||||
|
tokens := make(map[string]string)
|
||||||
|
classAndPairs := strings.Split(mbean, ":")
|
||||||
|
if classAndPairs[0] == "metrics" {
|
||||||
|
tokens["class"] = "spark_jolokia_metrics"
|
||||||
|
} else if classAndPairs[0] == "java.lang" {
|
||||||
|
tokens["class"] = "java"
|
||||||
|
} else {
|
||||||
|
return tokens
|
||||||
|
}
|
||||||
|
|
||||||
|
pair := strings.Split(classAndPairs[1], "=")
|
||||||
|
tokens[pair[0]] = pair[1]
|
||||||
|
|
||||||
|
return tokens
|
||||||
|
}
|
||||||
|
|
||||||
|
func addTokensToTags(tokens map[string]string, tags map[string]string) {
|
||||||
|
for k, v := range tokens {
|
||||||
|
if k == "name" {
|
||||||
|
tags["mname"] = v // name seems to a reserved word in influxdb
|
||||||
|
} else if k == "class" || k == "type" {
|
||||||
|
continue // class and type are used in the metric name
|
||||||
|
} else {
|
||||||
|
tags[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addJavaMetric(class string, c *javaMetric,
|
||||||
|
values map[string]interface{}) {
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags["spark_host"] = c.host
|
||||||
|
tags["spark_class"] = class
|
||||||
|
|
||||||
|
if class == "spark_threading" {
|
||||||
|
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
|
||||||
|
for _, value := range list {
|
||||||
|
if values[value] != nil {
|
||||||
|
fields[value] = values[value]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for k, v := range values {
|
||||||
|
if v != nil {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.acc.AddFields(class, fields, tags)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
|
||||||
|
request := out["request"].(map[string]interface{})
|
||||||
|
var mbean = request["mbean"].(string)
|
||||||
|
var mbeansplit = strings.Split(mbean, "=")
|
||||||
|
var class = mbeansplit[1]
|
||||||
|
|
||||||
|
if valuesMap, ok := out["value"]; ok {
|
||||||
|
if class == "Memory" {
|
||||||
|
addJavaMetric("spark_heap_memory_usage", j, valuesMap.(map[string]interface{}))
|
||||||
|
} else if class == "Threading" {
|
||||||
|
addJavaMetric("spark_threading", j, valuesMap.(map[string]interface{}))
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing key in '%s' output response\n%v\n",
|
||||||
|
j.metric, out)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addSparkMetric(mbean string, c *sparkMetric,
|
||||||
|
values map[string]interface{}) {
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
|
||||||
|
tokens := parseJmxMetricRequest(mbean)
|
||||||
|
addTokensToTags(tokens, tags)
|
||||||
|
tags["spark_host"] = c.host
|
||||||
|
|
||||||
|
addValuesAsFields(values, fields, tags["mname"])
|
||||||
|
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
|
||||||
|
if valuesMap, ok := out["value"]; ok {
|
||||||
|
for k, v := range valuesMap.(map[string]interface{}) {
|
||||||
|
addSparkMetric(k, c, v.(map[string]interface{}))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
|
||||||
|
c.metric, out)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags["yarn_host"] = c.host
|
||||||
|
for key, val := range value {
|
||||||
|
fields[key] = val
|
||||||
|
}
|
||||||
|
c.acc.AddFields(metrictype, fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
|
||||||
|
|
||||||
|
if valuesMap, ok := out["clusterMetrics"]; ok {
|
||||||
|
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_metrics")
|
||||||
|
} else if valuesMap, ok := out["clusterInfo"]; ok {
|
||||||
|
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_info")
|
||||||
|
} else if valuesMap, ok := out["apps"]; ok {
|
||||||
|
for _, value := range valuesMap.(map[string]interface{}) {
|
||||||
|
for _, vv := range value.([]interface{}) {
|
||||||
|
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if valuesMap, ok := out["nodes"]; ok {
|
||||||
|
for _, value := range valuesMap.(map[string]interface{}) {
|
||||||
|
for _, vv := range value.([]interface{}) {
|
||||||
|
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Missing the required key in output response\n%v\n", out)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Spark) SampleConfig() string {
|
||||||
|
return `
|
||||||
|
## Spark servers exposing jolokia read service
|
||||||
|
#spark_servers = ["127.0.0.1:8778"] #optional
|
||||||
|
## Server running Yarn Resource Manager
|
||||||
|
#yarn_server = "127.0.0.1:8088" #optional
|
||||||
|
`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Spark) Description() string {
|
||||||
|
return "Read Spark metrics through Jolokia and Yarn"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
||||||
|
// Create + send request
|
||||||
|
req, err := http.NewRequest("GET", requestUrl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := j.jClient.MakeRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Process response
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||||
|
requestUrl,
|
||||||
|
resp.StatusCode,
|
||||||
|
http.StatusText(resp.StatusCode),
|
||||||
|
http.StatusOK,
|
||||||
|
http.StatusText(http.StatusOK))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// read body
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Unmarshal json
|
||||||
|
var jsonOut map[string]interface{}
|
||||||
|
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
||||||
|
return nil, errors.New("Error decoding JSON response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonOut, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
||||||
|
req, err := http.NewRequest("GET", requestUrl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := j.yClient.MakeRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Process response
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
||||||
|
requestUrl,
|
||||||
|
resp.StatusCode,
|
||||||
|
http.StatusText(resp.StatusCode),
|
||||||
|
http.StatusOK,
|
||||||
|
http.StatusText(http.StatusOK))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// read body
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal json
|
||||||
|
var jsonOut map[string]interface{}
|
||||||
|
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
||||||
|
return nil, errors.New("Error decoding JSON response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonOut, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseServerTokens(server string) map[string]string {
|
||||||
|
serverTokens := make(map[string]string)
|
||||||
|
log.Printf("Parsing %s", server)
|
||||||
|
hostAndUser := strings.Split(server, "@")
|
||||||
|
hostPort := ""
|
||||||
|
|
||||||
|
if len(hostAndUser) == 1 {
|
||||||
|
hostPort = hostAndUser[0]
|
||||||
|
} else {
|
||||||
|
log.Printf("Unsupported Server info, skipping")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Printf("%s \n", hostPort)
|
||||||
|
hostTokens := strings.Split(hostPort, ":")
|
||||||
|
serverTokens["host"] = hostTokens[0]
|
||||||
|
serverTokens["port"] = hostTokens[1]
|
||||||
|
return serverTokens
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
|
||||||
|
context := "/jolokia/read"
|
||||||
|
servers := c.spark_servers
|
||||||
|
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
|
||||||
|
if len(servers) == 0 {
|
||||||
|
wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, server := range servers {
|
||||||
|
for _, metric := range metrics {
|
||||||
|
serverTokens := parseServerTokens(server)
|
||||||
|
var m jmxMetric
|
||||||
|
if strings.HasPrefix(metric, "/java.lang:") {
|
||||||
|
m = newJavaMetric(serverTokens["host"], metric, acc)
|
||||||
|
} else if strings.HasPrefix(metric, "/metrics:") {
|
||||||
|
m = newSparkMetric(serverTokens["host"], metric, acc)
|
||||||
|
} else {
|
||||||
|
log.Printf("Unsupported Spark metric [%s], skipping",
|
||||||
|
metric)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
|
||||||
|
serverTokens["port"] + context + metric)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := c.getAttr(requestUrl)
|
||||||
|
if len(out) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
m.addTagsFields(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
|
||||||
|
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
|
||||||
|
server := c.serverAddress
|
||||||
|
|
||||||
|
if server == "" {
|
||||||
|
wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
serverTokens := parseServerTokens(server)
|
||||||
|
for _, context := range contexts {
|
||||||
|
var m = newYarnMetric(server, acc)
|
||||||
|
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := c.getAttr(requestUrl)
|
||||||
|
if len(out) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
m.addTagsFields(out)
|
||||||
|
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Spark) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
|
log.Println("Config is ", c)
|
||||||
|
yarn := Yarn{
|
||||||
|
yClient: &YarnClientImpl{client: &http.Client{}},
|
||||||
|
serverAddress: c.yarn_server,
|
||||||
|
}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go yarn.GatherYarn(acc, &wg)
|
||||||
|
spark := Spark{
|
||||||
|
jClient: &JolokiaClientImpl{client: &http.Client{}},
|
||||||
|
spark_servers: c.spark_servers,
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go spark.GatherJolokia(acc, &wg)
|
||||||
|
wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("spark", func() telegraf.Input {
|
||||||
|
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,23 @@
|
||||||
|
package spark
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSparkMeasurements(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &Spark{
|
||||||
|
yarn_server: testutil.GetLocalHost() + ":8088",
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err := a.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
Loading…
Reference in New Issue