Compare commits

..

11 Commits

Author SHA1 Message Date
shubhamDX
aa2d76afb6 Updating the config file description 2018-02-06 11:51:51 -08:00
shubhamDX
d193a9416d normalizing metric names to fit better with snakecase 2018-02-06 11:51:51 -08:00
shubhamDX
dcf81d7dfe Updating value receiver to pointer receiver 2018-02-06 11:51:51 -08:00
Shubham Srivastava
dd75a883ee Edited variable names 2018-02-06 11:51:51 -08:00
shubhamDX
42fd21c19d Cleaning the code and correcting the Makefile 2018-02-06 11:51:51 -08:00
Shubham Srivastava
f4255d331f Updating 2018-02-06 11:50:22 -08:00
Shubham Srivastava
68c453c355 Updating 2018-02-06 11:50:22 -08:00
shubhamDX
7069ef46b2 Formatting the source code 2018-02-06 11:50:22 -08:00
shubhamDX
44034d1b73 Updating CHANGELOG.md 2018-02-06 11:50:22 -08:00
Shubham Srivastava
a4cf3eb98b Updating README.md 2018-02-06 11:50:22 -08:00
shubhamDX
99f494f0d8 Adding spark plugin 2018-02-06 11:50:22 -08:00
15 changed files with 558 additions and 477 deletions

View File

@@ -42,7 +42,7 @@ workflows:
- 'build'
triggers:
- schedule:
cron: "0 18 * * *"
cron: "0 0 * * *"
filters:
branches:
only:

View File

@@ -56,7 +56,6 @@
- [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model.
- [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat.
- [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap.
- [#3769](https://github.com/influxdata/telegraf/pull/3769): Add TLS support to the mesos input plugin.
### Bugfixes
@@ -817,6 +816,7 @@ consistent with the behavior of `collection_jitter`.
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
- [#1611](https://github.com/influxdata/telegraf/pull/1611): Adding Spark Plugin
### Bugfixes

View File

@@ -88,6 +88,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
_ "github.com/influxdata/telegraf/plugins/inputs/spark"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"

View File

@@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["http://localhost:5050"]
masters = ["localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@@ -35,13 +35,6 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
@@ -242,8 +235,7 @@ Mesos slave metric groups
### Tags:
- All master/slave measurements have the following tags:
- server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- server
- role (master/slave)
- All master measurements have the extra tags:

View File

@@ -7,14 +7,11 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@@ -33,20 +30,6 @@ type Mesos struct {
Slaves []string
SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
}
var allMetrics = map[Role][]string{
@@ -58,7 +41,7 @@ var sampleConfig = `
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["http://localhost:5050"]
masters = ["localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@@ -82,13 +65,6 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
// SampleConfig returns a sample configuration block
@@ -101,28 +77,7 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}
s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}
return url.Parse(s)
}
func (m *Mesos) initialize() error {
func (m *Mesos) SetDefaults() {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
@@ -132,71 +87,33 @@ func (m *Mesos) initialize() error {
}
if m.Timeout == 0 {
log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}
client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client
return nil
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}
var wg sync.WaitGroup
for _, master := range m.masterURLs {
m.SetDefaults()
for _, v := range m.Masters {
wg.Add(1)
go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
wg.Done()
return
}(master)
}(v)
}
for _, slave := range m.slaveURLs {
for _, v := range m.Slaves {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc))
wg.Done()
return
}(slave)
}(v)
// if !m.SlaveTasks {
// continue
@@ -204,7 +121,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
// wg.Add(1)
// go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc))
// wg.Done()
// return
// }(v)
@@ -215,24 +132,6 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil
}
func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}
return client, nil
}
// metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string {
b := []string{}
@@ -494,6 +393,15 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
}
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
@@ -501,15 +409,22 @@ type TaskStats struct {
Statistics map[string]interface{} `json:"statistics"`
}
func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
var metrics []TaskStats
tags := map[string]string{
"server": u.Hostname(),
"url": urlTag(u),
host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}
resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
tags := map[string]string{
"server": host,
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil {
return err
@@ -544,31 +459,24 @@ func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) err
return nil
}
func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
return &c
}
func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{
"server": u.Hostname(),
"url": urlTag(u),
"server": host,
"role": string(role),
}
resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil {
return err

View File

@@ -6,12 +6,10 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var masterMetrics map[string]interface{}
@@ -380,19 +378,3 @@ func TestSlaveFilter(t *testing.T) {
}
}
}
func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}
func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}

View File

@@ -0,0 +1,47 @@
# Telegraf plugin: Spark
#### Plugin arguments:
- **spark_servers** []string: List of spark nodes with the format ["host:port"] (optional)
- **yarn_server** string: Address of Yarn resource manager (optional)
#### Description
The Spark plugin collects metrics in 2 ways, both being optional:
- Spark-JVM metrics exposed as MBean's attributes through jolokia REST endpoint. Metrics are collected for each server configured. See:https://jolokia.org/
- Spark application metrics if managed by Yarn Resource manager. The plugin collects through the Yarn API. If some spark job has been submitted then only it will fetch else it will not produce any spark application result.
# Measurements:
Spark plugin produces one or more measurements according to the SparkServer or YarnServer provided.
Given a configuration like:
```toml
[[inputs.spark]]
spark_servers = ["127.0.0.1:8778"]
yarn_server = "127.0.0.1:8088"
```
The maximum collected measurements will be:
```
spark_HeapMemoryUsage , spark_Threading , spark_apps , spark_clusterInfo , spark_clusterMetrics , spark_jolokiaMetrics , spark_jvmMetrics , spark_nodes
```
# Useful Metrics:
Here is a list of metrics that are fetched and might be useful to monitor your spark cluster.
####measurement domains collected through Jolokia
- "/metrics:*"
- "/java.lang:type=Memory/HeapMemoryUsage"
- "/java.lang:type=Threading
####measurement domains collected through YarnServer
- "/cluster"
- "/cluster/metrics"
- "/cluster/apps"
- "/cluster/nodes"

View File

@@ -0,0 +1,437 @@
package spark
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type YarnClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type YarnClientImpl struct {
client *http.Client
}
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Spark struct {
jClient JolokiaClient
spark_servers []string
yarn_server string
}
type javaMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type sparkMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type Yarn struct {
yClient YarnClient
serverAddress string
}
type yarnMetric struct {
host string
acc telegraf.Accumulator
}
type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newSparkMetric(host string, metric string,
acc telegraf.Accumulator) *sparkMetric {
return &sparkMetric{host: host, metric: metric, acc: acc}
}
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
return &yarnMetric{host: host, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
mname string) {
for k, v := range values {
if v != nil {
fields[mname+"_"+k] = v
}
}
}
func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "metrics" {
tokens["class"] = "spark_jolokia_metrics"
} else if classAndPairs[0] == "java.lang" {
tokens["class"] = "java"
} else {
return tokens
}
pair := strings.Split(classAndPairs[1], "=")
tokens[pair[0]] = pair[1]
return tokens
}
func addTokensToTags(tokens map[string]string, tags map[string]string) {
for k, v := range tokens {
if k == "name" {
tags["mname"] = v // name seems to a reserved word in influxdb
} else if k == "class" || k == "type" {
continue // class and type are used in the metric name
} else {
tags[k] = v
}
}
}
func addJavaMetric(class string, c *javaMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["spark_host"] = c.host
tags["spark_class"] = class
if class == "spark_threading" {
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
for _, value := range list {
if values[value] != nil {
fields[value] = values[value]
}
}
} else {
for k, v := range values {
if v != nil {
fields[k] = v
}
}
}
c.acc.AddFields(class, fields, tags)
}
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
request := out["request"].(map[string]interface{})
var mbean = request["mbean"].(string)
var mbeansplit = strings.Split(mbean, "=")
var class = mbeansplit[1]
if valuesMap, ok := out["value"]; ok {
if class == "Memory" {
addJavaMetric("spark_heap_memory_usage", j, valuesMap.(map[string]interface{}))
} else if class == "Threading" {
addJavaMetric("spark_threading", j, valuesMap.(map[string]interface{}))
} else {
fmt.Printf("Missing key in '%s' output response\n%v\n",
j.metric, out)
return
}
}
}
func addSparkMetric(mbean string, c *sparkMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
tags["spark_host"] = c.host
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
}
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) {
addSparkMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
}
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["yarn_host"] = c.host
for key, val := range value {
fields[key] = val
}
c.acc.AddFields(metrictype, fields, tags)
}
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["clusterMetrics"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_metrics")
} else if valuesMap, ok := out["clusterInfo"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_info")
} else if valuesMap, ok := out["apps"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
}
}
} else if valuesMap, ok := out["nodes"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
}
}
} else {
fmt.Printf("Missing the required key in output response\n%v\n", out)
return
}
}
func (j *Spark) SampleConfig() string {
return `
## Spark servers exposing jolokia read service
#spark_servers = ["127.0.0.1:8778"] #optional
## Server running Yarn Resource Manager
#yarn_server = "127.0.0.1:8088" #optional
`
}
func (j *Spark) Description() string {
return "Read Spark metrics through Jolokia and Yarn"
}
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.yClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
log.Printf("Parsing %s", server)
hostAndUser := strings.Split(server, "@")
hostPort := ""
if len(hostAndUser) == 1 {
hostPort = hostAndUser[0]
} else {
log.Printf("Unsupported Server info, skipping")
return nil
}
log.Printf("%s \n", hostPort)
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
return serverTokens
}
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
context := "/jolokia/read"
servers := c.spark_servers
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
if len(servers) == 0 {
wg.Done()
return nil
}
for _, server := range servers {
for _, metric := range metrics {
serverTokens := parseServerTokens(server)
var m jmxMetric
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric, "/metrics:") {
m = newSparkMetric(serverTokens["host"], metric, acc)
} else {
log.Printf("Unsupported Spark metric [%s], skipping",
metric)
continue
}
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
}
wg.Done()
return nil
}
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
server := c.serverAddress
if server == "" {
wg.Done()
return nil
}
serverTokens := parseServerTokens(server)
for _, context := range contexts {
var m = newYarnMetric(server, acc)
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
wg.Done()
return nil
}
func (c *Spark) Gather(acc telegraf.Accumulator) error {
log.Println("Config is ", c)
yarn := Yarn{
yClient: &YarnClientImpl{client: &http.Client{}},
serverAddress: c.yarn_server,
}
wg := sync.WaitGroup{}
wg.Add(1)
go yarn.GatherYarn(acc, &wg)
spark := Spark{
jClient: &JolokiaClientImpl{client: &http.Client{}},
spark_servers: c.spark_servers,
}
wg.Add(1)
go spark.GatherJolokia(acc, &wg)
wg.Wait()
return nil
}
func init() {
inputs.Add("spark", func() telegraf.Input {
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}

View File

@@ -0,0 +1,23 @@
package spark
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestSparkMeasurements(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
a := &Spark{
yarn_server: testutil.GetLocalHost() + ":8088",
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
}

View File

@@ -1,35 +0,0 @@
# Hugepages Input Plugin
The hugepages plugin gathers hugepages metrics including per NUMA node
### Configuration:
```toml
# Description
[[inputs.hugepages]]
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
```
### Measurements & Fields:
- hugepages
- free (int, kB)
- nr (int, kB)
- HugePages_Total (int, kB)
- HugePages_Free (int, kB)
### Tags:
- hugepages has the following tags:
- node
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter hugepages -test
> hugepages,host=maxpc,node=node0 free=0i,nr=0i 1467618621000000000
> hugepages,host=maxpc,name=meminfo HugePages_Free=0i,HugePages_Total=0i 1467618621000000000
```

View File

@@ -1,184 +0,0 @@
package system
import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
var (
newlineByte = []byte("\n")
colonByte = []byte(":")
kbPrecisionByte = []byte("kB")
)
// default file paths
const (
// the path where statistics are kept per NUMA nodes
NUMA_NODE_PATH = "/sys/devices/system/node"
// the path to the meminfo file which is produced by kernel
MEMINFO_PATH = "/proc/meminfo"
// hugepages stat field names on meminfo file
HUGE_PAGES_TOTAL = "HugePages_Total"
HUGE_PAGES_FREE = "HugePages_Free"
)
var hugepagesSampleConfig = `
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
`
// Mem is the
type Hugepages struct {
NUMANodePath string `toml:"numa_node_path"`
MeminfoPath string `toml:"meminfo_path"`
}
func (mem *Hugepages) Description() string {
return "Collects hugepages metrics from kernel and per NUMA node"
}
func (mem *Hugepages) SampleConfig() string {
return hugepagesSampleConfig
}
func (mem *Hugepages) Gather(acc telegraf.Accumulator) error {
mem.loadPaths()
err := mem.GatherStatsPerNode(acc)
if err != nil {
return err
}
err = mem.GatherStatsFromMeminfo(acc)
if err != nil {
return err
}
return nil
}
// GatherHugepagesStatsPerNode collects hugepages stats per NUMA nodes
func (mem *Hugepages) GatherStatsPerNode(acc telegraf.Accumulator) error {
numaNodeMetrics, err := statsPerNUMA(mem.NUMANodePath)
if err != nil {
return err
}
for k, v := range numaNodeMetrics {
metrics := make(map[string]interface{})
tags := map[string]string{
"node": k,
}
metrics["free"] = v.Free
metrics["nr"] = v.NR
acc.AddFields("hugepages", metrics, tags)
}
return nil
}
// GatherHugepagesStatsFromMeminfo collects hugepages statistics from meminfo file
func (mem *Hugepages) GatherStatsFromMeminfo(acc telegraf.Accumulator) error {
tags := map[string]string{
"name": "meminfo",
}
metrics := make(map[string]interface{})
meminfoMetrics, err := statsFromMeminfo(mem.MeminfoPath)
if err != nil {
return err
}
for k, v := range meminfoMetrics {
metrics[k] = v
}
acc.AddFields("hugepages", metrics, tags)
return nil
}
type HugepagesNUMAStats struct {
Free int
NR int
}
// statsPerNUMA gathers hugepages statistics from each NUMA node
func statsPerNUMA(path string) (map[string]HugepagesNUMAStats, error) {
var hugepagesStats = make(map[string]HugepagesNUMAStats)
dirs, err := ioutil.ReadDir(path)
if err != nil {
return hugepagesStats, err
}
for _, d := range dirs {
if !(d.IsDir() && strings.HasPrefix(d.Name(), "node")) {
continue
}
hugepagesFree := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/free_hugepages", path, d.Name())
hugepagesNR := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/nr_hugepages", path, d.Name())
free, err := ioutil.ReadFile(hugepagesFree)
if err != nil {
return hugepagesStats, err
}
nr, err := ioutil.ReadFile(hugepagesNR)
if err != nil {
return hugepagesStats, err
}
f, _ := strconv.Atoi(string(bytes.TrimSuffix(free, newlineByte)))
n, _ := strconv.Atoi(string(bytes.TrimSuffix(nr, newlineByte)))
hugepagesStats[d.Name()] = HugepagesNUMAStats{Free: f, NR: n}
}
return hugepagesStats, nil
}
// statsFromMeminfo gathers hugepages statistics from kernel
func statsFromMeminfo(path string) (map[string]interface{}, error) {
stats := map[string]interface{}{}
meminfo, err := ioutil.ReadFile(path)
if err != nil {
return stats, err
}
lines := bytes.Split(meminfo, newlineByte)
for _, l := range lines {
if bytes.Contains(l, kbPrecisionByte) {
continue
}
fields := bytes.Fields(l)
if len(fields) < 2 {
continue
}
fieldName := string(bytes.TrimSuffix(fields[0], colonByte))
if fieldName == HUGE_PAGES_TOTAL || fieldName == HUGE_PAGES_FREE {
val, _ := strconv.Atoi(string(fields[1]))
stats[fieldName] = val
}
}
return stats, nil
}
// loadPaths can be used to read paths firstly from config
// if it is empty then try read from env variables
func (mem *Hugepages) loadPaths() {
if mem.NUMANodePath == "" {
mem.NUMANodePath = NUMA_NODE_PATH
}
if mem.MeminfoPath == "" {
mem.MeminfoPath = MEMINFO_PATH
}
}
func init() {
inputs.Add("hugepages", func() telegraf.Input {
return &Hugepages{}
})
}

View File

@@ -1,43 +0,0 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
)
var hugepages = Hugepages{
NUMANodePath: "./testdata/node",
MeminfoPath: "./testdata/meminfo",
}
func init() {
hugepages.loadPaths()
}
func TestHugepagesStatsFromMeminfo(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsFromMeminfo(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"HugePages_Total": int(666),
"HugePages_Free": int(999),
}
acc.AssertContainsFields(t, "hugepages", fields)
}
func TestHugepagesStatsPerNode(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsPerNode(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"free": int(123),
"nr": int(456),
}
acc.AssertContainsFields(t, "hugepages", fields)
}

View File

@@ -1,45 +0,0 @@
MemTotal: 5961204 kB
MemFree: 5201764 kB
MemAvailable: 5761624 kB
Buffers: 120248 kB
Cached: 419660 kB
SwapCached: 0 kB
Active: 370656 kB
Inactive: 247364 kB
Active(anon): 79032 kB
Inactive(anon): 552 kB
Active(file): 291624 kB
Inactive(file): 246812 kB
Unevictable: 0 kB
Mlocked: 0 kB
SwapTotal: 0 kB
SwapFree: 0 kB
Dirty: 4 kB
Writeback: 0 kB
AnonPages: 78164 kB
Mapped: 114164 kB
Shmem: 1452 kB
Slab: 77180 kB
SReclaimable: 57676 kB
SUnreclaim: 19504 kB
KernelStack: 2832 kB
PageTables: 8692 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 2980600 kB
Committed_AS: 555492 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 0 kB
VmallocChunk: 0 kB
HardwareCorrupted: 0 kB
AnonHugePages: 0 kB
CmaTotal: 0 kB
CmaFree: 0 kB
HugePages_Total: 666
HugePages_Free: 999
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 84224 kB
DirectMap2M: 6055936 kB