Compare commits

..

4 Commits

Author SHA1 Message Date
Russ Savage
af5017d3dc reviving code from #1447 to a new branch 2018-02-08 15:54:33 -08:00
Daniel Nelson
32dd1b3725 Adjust time of nightly build 2018-02-07 18:37:33 -08:00
Daniel Nelson
1b0e87a8b0 Update changelog 2018-02-07 18:37:32 -08:00
Daniel Nelson
efa9095829 Add TLS support to the mesos input plugin (#3769) 2018-02-07 18:36:38 -08:00
15 changed files with 481 additions and 562 deletions

View File

@@ -42,7 +42,7 @@ workflows:
- 'build' - 'build'
triggers: triggers:
- schedule: - schedule:
cron: "0 0 * * *" cron: "0 18 * * *"
filters: filters:
branches: branches:
only: only:

View File

@@ -56,6 +56,7 @@
- [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model. - [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model.
- [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat. - [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat.
- [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap. - [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap.
- [#3769](https://github.com/influxdata/telegraf/pull/3769): Add TLS support to the mesos input plugin.
### Bugfixes ### Bugfixes
@@ -816,7 +817,6 @@ consistent with the behavior of `collection_jitter`.
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service. - [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency. - [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin. - [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
- [#1611](https://github.com/influxdata/telegraf/pull/1611): Adding Spark Plugin
### Bugfixes ### Bugfixes

View File

@@ -88,7 +88,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" _ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/solr"
_ "github.com/influxdata/telegraf/plugins/inputs/spark"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd" _ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat" _ "github.com/influxdata/telegraf/plugins/inputs/sysstat"

View File

@@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
## A list of Mesos masters. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
@@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
``` ```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
@@ -235,7 +242,8 @@ Mesos slave metric groups
### Tags: ### Tags:
- All master/slave measurements have the following tags: - All master/slave measurements have the following tags:
- server - server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- role (master/slave) - role (master/slave)
- All master measurements have the extra tags: - All master measurements have the extra tags:

View File

@@ -7,11 +7,14 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@@ -30,6 +33,20 @@ type Mesos struct {
Slaves []string Slaves []string
SlaveCols []string `toml:"slave_collections"` SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool //SlaveTasks bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
} }
var allMetrics = map[Role][]string{ var allMetrics = map[Role][]string{
@@ -41,7 +58,7 @@ var sampleConfig = `
## Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
## A list of Mesos masters. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
@@ -65,6 +82,13 @@ var sampleConfig = `
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters" return "Telegraf plugin for gathering metrics from N Mesos masters"
} }
func (m *Mesos) SetDefaults() { func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}
s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}
return url.Parse(s)
}
func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 { if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER] m.MasterCols = allMetrics[MASTER]
} }
@@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() {
} }
if m.Timeout == 0 { if m.Timeout == 0 {
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)") log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100 m.Timeout = 100
} }
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}
client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client
return nil
} }
// Gather() metrics from given list of Mesos Masters // Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error { func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup if !m.initialized {
err := m.initialize()
m.SetDefaults() if err != nil {
return err
for _, v := range m.Masters { }
wg.Add(1) m.initialized = true
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
wg.Done()
return
}(v)
} }
for _, v := range m.Slaves { var wg sync.WaitGroup
for _, master := range m.masterURLs {
wg.Add(1) wg.Add(1)
go func(c string) { go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc)) acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done() wg.Done()
return return
}(v) }(master)
}
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
return
}(slave)
// if !m.SlaveTasks { // if !m.SlaveTasks {
// continue // continue
@@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
// wg.Add(1) // wg.Add(1)
// go func(c string) { // go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc)) // acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// wg.Done() // wg.Done()
// return // return
// }(v) // }(v)
@@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}
return client, nil
}
// metricsDiff() returns set names for removal // metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string { func metricsDiff(role Role, w []string) []string {
b := []string{} b := []string{}
@@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
} }
} }
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics // TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct { type TaskStats struct {
ExecutorID string `json:"executor_id"` ExecutorID string `json:"executor_id"`
@@ -409,22 +501,15 @@ type TaskStats struct {
Statistics map[string]interface{} `json:"statistics"` Statistics map[string]interface{} `json:"statistics"`
} }
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error { func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
var metrics []TaskStats var metrics []TaskStats
host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}
tags := map[string]string{ tags := map[string]string{
"server": host, "server": u.Hostname(),
"url": urlTag(u),
} }
ts := strconv.Itoa(m.Timeout) + "ms" resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil { if err != nil {
return err return err
@@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
return nil return nil
} }
// This should not belong to the object func withPath(u *url.URL, path string) *url.URL {
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error { c := *u
var jsonOut map[string]interface{} c.Path = path
return &c
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
} }
func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
tags := map[string]string{ tags := map[string]string{
"server": host, "server": u.Hostname(),
"url": urlTag(u),
"role": string(role), "role": string(role),
} }
ts := strconv.Itoa(m.Timeout) + "ms" resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil { if err != nil {
return err return err

View File

@@ -6,10 +6,12 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"os" "os"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
var masterMetrics map[string]interface{} var masterMetrics map[string]interface{}
@@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
} }
} }
} }
func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}
func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}

View File

@@ -1,47 +0,0 @@
# Telegraf plugin: Spark
#### Plugin arguments:
- **spark_servers** []string: List of spark nodes with the format ["host:port"] (optional)
- **yarn_server** string: Address of Yarn resource manager (optional)
#### Description
The Spark plugin collects metrics in 2 ways, both being optional:
- Spark-JVM metrics exposed as MBean's attributes through jolokia REST endpoint. Metrics are collected for each server configured. See:https://jolokia.org/
- Spark application metrics if managed by Yarn Resource manager. The plugin collects through the Yarn API. If some spark job has been submitted then only it will fetch else it will not produce any spark application result.
# Measurements:
Spark plugin produces one or more measurements according to the SparkServer or YarnServer provided.
Given a configuration like:
```toml
[[inputs.spark]]
spark_servers = ["127.0.0.1:8778"]
yarn_server = "127.0.0.1:8088"
```
The maximum collected measurements will be:
```
spark_HeapMemoryUsage , spark_Threading , spark_apps , spark_clusterInfo , spark_clusterMetrics , spark_jolokiaMetrics , spark_jvmMetrics , spark_nodes
```
# Useful Metrics:
Here is a list of metrics that are fetched and might be useful to monitor your spark cluster.
####measurement domains collected through Jolokia
- "/metrics:*"
- "/java.lang:type=Memory/HeapMemoryUsage"
- "/java.lang:type=Threading
####measurement domains collected through YarnServer
- "/cluster"
- "/cluster/metrics"
- "/cluster/apps"
- "/cluster/nodes"

View File

@@ -1,437 +0,0 @@
package spark
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type JolokiaClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type JolokiaClientImpl struct {
client *http.Client
}
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type YarnClient interface {
MakeRequest(req *http.Request) (*http.Response, error)
}
type YarnClientImpl struct {
client *http.Client
}
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
return c.client.Do(req)
}
type Spark struct {
jClient JolokiaClient
spark_servers []string
yarn_server string
}
type javaMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type sparkMetric struct {
host string
metric string
acc telegraf.Accumulator
}
type Yarn struct {
yClient YarnClient
serverAddress string
}
type yarnMetric struct {
host string
acc telegraf.Accumulator
}
type jmxMetric interface {
addTagsFields(out map[string]interface{})
}
func newJavaMetric(host string, metric string,
acc telegraf.Accumulator) *javaMetric {
return &javaMetric{host: host, metric: metric, acc: acc}
}
func newSparkMetric(host string, metric string,
acc telegraf.Accumulator) *sparkMetric {
return &sparkMetric{host: host, metric: metric, acc: acc}
}
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
return &yarnMetric{host: host, acc: acc}
}
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
mname string) {
for k, v := range values {
if v != nil {
fields[mname+"_"+k] = v
}
}
}
func parseJmxMetricRequest(mbean string) map[string]string {
tokens := make(map[string]string)
classAndPairs := strings.Split(mbean, ":")
if classAndPairs[0] == "metrics" {
tokens["class"] = "spark_jolokia_metrics"
} else if classAndPairs[0] == "java.lang" {
tokens["class"] = "java"
} else {
return tokens
}
pair := strings.Split(classAndPairs[1], "=")
tokens[pair[0]] = pair[1]
return tokens
}
func addTokensToTags(tokens map[string]string, tags map[string]string) {
for k, v := range tokens {
if k == "name" {
tags["mname"] = v // name seems to a reserved word in influxdb
} else if k == "class" || k == "type" {
continue // class and type are used in the metric name
} else {
tags[k] = v
}
}
}
func addJavaMetric(class string, c *javaMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["spark_host"] = c.host
tags["spark_class"] = class
if class == "spark_threading" {
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
for _, value := range list {
if values[value] != nil {
fields[value] = values[value]
}
}
} else {
for k, v := range values {
if v != nil {
fields[k] = v
}
}
}
c.acc.AddFields(class, fields, tags)
}
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
request := out["request"].(map[string]interface{})
var mbean = request["mbean"].(string)
var mbeansplit = strings.Split(mbean, "=")
var class = mbeansplit[1]
if valuesMap, ok := out["value"]; ok {
if class == "Memory" {
addJavaMetric("spark_heap_memory_usage", j, valuesMap.(map[string]interface{}))
} else if class == "Threading" {
addJavaMetric("spark_threading", j, valuesMap.(map[string]interface{}))
} else {
fmt.Printf("Missing key in '%s' output response\n%v\n",
j.metric, out)
return
}
}
}
func addSparkMetric(mbean string, c *sparkMetric,
values map[string]interface{}) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tokens := parseJmxMetricRequest(mbean)
addTokensToTags(tokens, tags)
tags["spark_host"] = c.host
addValuesAsFields(values, fields, tags["mname"])
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
}
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["value"]; ok {
for k, v := range valuesMap.(map[string]interface{}) {
addSparkMetric(k, c, v.(map[string]interface{}))
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
c.metric, out)
return
}
}
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
tags := make(map[string]string)
fields := make(map[string]interface{})
tags["yarn_host"] = c.host
for key, val := range value {
fields[key] = val
}
c.acc.AddFields(metrictype, fields, tags)
}
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
if valuesMap, ok := out["clusterMetrics"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_metrics")
} else if valuesMap, ok := out["clusterInfo"]; ok {
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_info")
} else if valuesMap, ok := out["apps"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
}
}
} else if valuesMap, ok := out["nodes"]; ok {
for _, value := range valuesMap.(map[string]interface{}) {
for _, vv := range value.([]interface{}) {
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
}
}
} else {
fmt.Printf("Missing the required key in output response\n%v\n", out)
return
}
}
func (j *Spark) SampleConfig() string {
return `
## Spark servers exposing jolokia read service
#spark_servers = ["127.0.0.1:8778"] #optional
## Server running Yarn Resource Manager
#yarn_server = "127.0.0.1:8088" #optional
`
}
func (j *Spark) Description() string {
return "Read Spark metrics through Jolokia and Yarn"
}
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
resp, err := j.yClient.MakeRequest(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
return nil, err
}
// read body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Unmarshal json
var jsonOut map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}
return jsonOut, nil
}
func parseServerTokens(server string) map[string]string {
serverTokens := make(map[string]string)
log.Printf("Parsing %s", server)
hostAndUser := strings.Split(server, "@")
hostPort := ""
if len(hostAndUser) == 1 {
hostPort = hostAndUser[0]
} else {
log.Printf("Unsupported Server info, skipping")
return nil
}
log.Printf("%s \n", hostPort)
hostTokens := strings.Split(hostPort, ":")
serverTokens["host"] = hostTokens[0]
serverTokens["port"] = hostTokens[1]
return serverTokens
}
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
context := "/jolokia/read"
servers := c.spark_servers
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
if len(servers) == 0 {
wg.Done()
return nil
}
for _, server := range servers {
for _, metric := range metrics {
serverTokens := parseServerTokens(server)
var m jmxMetric
if strings.HasPrefix(metric, "/java.lang:") {
m = newJavaMetric(serverTokens["host"], metric, acc)
} else if strings.HasPrefix(metric, "/metrics:") {
m = newSparkMetric(serverTokens["host"], metric, acc)
} else {
log.Printf("Unsupported Spark metric [%s], skipping",
metric)
continue
}
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
serverTokens["port"] + context + metric)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
}
wg.Done()
return nil
}
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
server := c.serverAddress
if server == "" {
wg.Done()
return nil
}
serverTokens := parseServerTokens(server)
for _, context := range contexts {
var m = newYarnMetric(server, acc)
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
if err != nil {
return err
}
out, err := c.getAttr(requestUrl)
if len(out) == 0 {
continue
}
m.addTagsFields(out)
}
wg.Done()
return nil
}
func (c *Spark) Gather(acc telegraf.Accumulator) error {
log.Println("Config is ", c)
yarn := Yarn{
yClient: &YarnClientImpl{client: &http.Client{}},
serverAddress: c.yarn_server,
}
wg := sync.WaitGroup{}
wg.Add(1)
go yarn.GatherYarn(acc, &wg)
spark := Spark{
jClient: &JolokiaClientImpl{client: &http.Client{}},
spark_servers: c.spark_servers,
}
wg.Add(1)
go spark.GatherJolokia(acc, &wg)
wg.Wait()
return nil
}
func init() {
inputs.Add("spark", func() telegraf.Input {
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
})
}

View File

@@ -1,23 +0,0 @@
package spark
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestSparkMeasurements(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
a := &Spark{
yarn_server: testutil.GetLocalHost() + ":8088",
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
}

View File

@@ -0,0 +1,35 @@
# Hugepages Input Plugin
The hugepages plugin gathers hugepages metrics including per NUMA node
### Configuration:
```toml
# Description
[[inputs.hugepages]]
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
```
### Measurements & Fields:
- hugepages
- free (int, kB)
- nr (int, kB)
- HugePages_Total (int, kB)
- HugePages_Free (int, kB)
### Tags:
- hugepages has the following tags:
- node
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter hugepages -test
> hugepages,host=maxpc,node=node0 free=0i,nr=0i 1467618621000000000
> hugepages,host=maxpc,name=meminfo HugePages_Free=0i,HugePages_Total=0i 1467618621000000000
```

View File

@@ -0,0 +1,184 @@
package system
import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
var (
newlineByte = []byte("\n")
colonByte = []byte(":")
kbPrecisionByte = []byte("kB")
)
// default file paths
const (
// the path where statistics are kept per NUMA nodes
NUMA_NODE_PATH = "/sys/devices/system/node"
// the path to the meminfo file which is produced by kernel
MEMINFO_PATH = "/proc/meminfo"
// hugepages stat field names on meminfo file
HUGE_PAGES_TOTAL = "HugePages_Total"
HUGE_PAGES_FREE = "HugePages_Free"
)
var hugepagesSampleConfig = `
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
`
// Mem is the
type Hugepages struct {
NUMANodePath string `toml:"numa_node_path"`
MeminfoPath string `toml:"meminfo_path"`
}
func (mem *Hugepages) Description() string {
return "Collects hugepages metrics from kernel and per NUMA node"
}
func (mem *Hugepages) SampleConfig() string {
return hugepagesSampleConfig
}
func (mem *Hugepages) Gather(acc telegraf.Accumulator) error {
mem.loadPaths()
err := mem.GatherStatsPerNode(acc)
if err != nil {
return err
}
err = mem.GatherStatsFromMeminfo(acc)
if err != nil {
return err
}
return nil
}
// GatherHugepagesStatsPerNode collects hugepages stats per NUMA nodes
func (mem *Hugepages) GatherStatsPerNode(acc telegraf.Accumulator) error {
numaNodeMetrics, err := statsPerNUMA(mem.NUMANodePath)
if err != nil {
return err
}
for k, v := range numaNodeMetrics {
metrics := make(map[string]interface{})
tags := map[string]string{
"node": k,
}
metrics["free"] = v.Free
metrics["nr"] = v.NR
acc.AddFields("hugepages", metrics, tags)
}
return nil
}
// GatherHugepagesStatsFromMeminfo collects hugepages statistics from meminfo file
func (mem *Hugepages) GatherStatsFromMeminfo(acc telegraf.Accumulator) error {
tags := map[string]string{
"name": "meminfo",
}
metrics := make(map[string]interface{})
meminfoMetrics, err := statsFromMeminfo(mem.MeminfoPath)
if err != nil {
return err
}
for k, v := range meminfoMetrics {
metrics[k] = v
}
acc.AddFields("hugepages", metrics, tags)
return nil
}
type HugepagesNUMAStats struct {
Free int
NR int
}
// statsPerNUMA gathers hugepages statistics from each NUMA node
func statsPerNUMA(path string) (map[string]HugepagesNUMAStats, error) {
var hugepagesStats = make(map[string]HugepagesNUMAStats)
dirs, err := ioutil.ReadDir(path)
if err != nil {
return hugepagesStats, err
}
for _, d := range dirs {
if !(d.IsDir() && strings.HasPrefix(d.Name(), "node")) {
continue
}
hugepagesFree := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/free_hugepages", path, d.Name())
hugepagesNR := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/nr_hugepages", path, d.Name())
free, err := ioutil.ReadFile(hugepagesFree)
if err != nil {
return hugepagesStats, err
}
nr, err := ioutil.ReadFile(hugepagesNR)
if err != nil {
return hugepagesStats, err
}
f, _ := strconv.Atoi(string(bytes.TrimSuffix(free, newlineByte)))
n, _ := strconv.Atoi(string(bytes.TrimSuffix(nr, newlineByte)))
hugepagesStats[d.Name()] = HugepagesNUMAStats{Free: f, NR: n}
}
return hugepagesStats, nil
}
// statsFromMeminfo gathers hugepages statistics from kernel
func statsFromMeminfo(path string) (map[string]interface{}, error) {
stats := map[string]interface{}{}
meminfo, err := ioutil.ReadFile(path)
if err != nil {
return stats, err
}
lines := bytes.Split(meminfo, newlineByte)
for _, l := range lines {
if bytes.Contains(l, kbPrecisionByte) {
continue
}
fields := bytes.Fields(l)
if len(fields) < 2 {
continue
}
fieldName := string(bytes.TrimSuffix(fields[0], colonByte))
if fieldName == HUGE_PAGES_TOTAL || fieldName == HUGE_PAGES_FREE {
val, _ := strconv.Atoi(string(fields[1]))
stats[fieldName] = val
}
}
return stats, nil
}
// loadPaths can be used to read paths firstly from config
// if it is empty then try read from env variables
func (mem *Hugepages) loadPaths() {
if mem.NUMANodePath == "" {
mem.NUMANodePath = NUMA_NODE_PATH
}
if mem.MeminfoPath == "" {
mem.MeminfoPath = MEMINFO_PATH
}
}
func init() {
inputs.Add("hugepages", func() telegraf.Input {
return &Hugepages{}
})
}

View File

@@ -0,0 +1,43 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
)
var hugepages = Hugepages{
NUMANodePath: "./testdata/node",
MeminfoPath: "./testdata/meminfo",
}
func init() {
hugepages.loadPaths()
}
func TestHugepagesStatsFromMeminfo(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsFromMeminfo(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"HugePages_Total": int(666),
"HugePages_Free": int(999),
}
acc.AssertContainsFields(t, "hugepages", fields)
}
func TestHugepagesStatsPerNode(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsPerNode(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"free": int(123),
"nr": int(456),
}
acc.AssertContainsFields(t, "hugepages", fields)
}

45
plugins/inputs/system/testdata/meminfo vendored Normal file
View File

@@ -0,0 +1,45 @@
MemTotal: 5961204 kB
MemFree: 5201764 kB
MemAvailable: 5761624 kB
Buffers: 120248 kB
Cached: 419660 kB
SwapCached: 0 kB
Active: 370656 kB
Inactive: 247364 kB
Active(anon): 79032 kB
Inactive(anon): 552 kB
Active(file): 291624 kB
Inactive(file): 246812 kB
Unevictable: 0 kB
Mlocked: 0 kB
SwapTotal: 0 kB
SwapFree: 0 kB
Dirty: 4 kB
Writeback: 0 kB
AnonPages: 78164 kB
Mapped: 114164 kB
Shmem: 1452 kB
Slab: 77180 kB
SReclaimable: 57676 kB
SUnreclaim: 19504 kB
KernelStack: 2832 kB
PageTables: 8692 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 2980600 kB
Committed_AS: 555492 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 0 kB
VmallocChunk: 0 kB
HardwareCorrupted: 0 kB
AnonHugePages: 0 kB
CmaTotal: 0 kB
CmaFree: 0 kB
HugePages_Total: 666
HugePages_Free: 999
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 84224 kB
DirectMap2M: 6055936 kB

View File

@@ -0,0 +1 @@
456