Compare commits
4 Commits
ShubhamDX-
...
hugepages-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af5017d3dc | ||
|
|
32dd1b3725 | ||
|
|
1b0e87a8b0 | ||
|
|
efa9095829 |
@@ -42,7 +42,7 @@ workflows:
|
|||||||
- 'build'
|
- 'build'
|
||||||
triggers:
|
triggers:
|
||||||
- schedule:
|
- schedule:
|
||||||
cron: "0 0 * * *"
|
cron: "0 18 * * *"
|
||||||
filters:
|
filters:
|
||||||
branches:
|
branches:
|
||||||
only:
|
only:
|
||||||
|
|||||||
@@ -56,6 +56,7 @@
|
|||||||
- [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model.
|
- [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model.
|
||||||
- [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat.
|
- [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat.
|
||||||
- [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap.
|
- [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap.
|
||||||
|
- [#3769](https://github.com/influxdata/telegraf/pull/3769): Add TLS support to the mesos input plugin.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
@@ -816,7 +817,6 @@ consistent with the behavior of `collection_jitter`.
|
|||||||
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
||||||
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
||||||
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
||||||
- [#1611](https://github.com/influxdata/telegraf/pull/1611): Adding Spark Plugin
|
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|||||||
@@ -88,7 +88,6 @@ import (
|
|||||||
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
|
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
|
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
|
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/spark"
|
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
|
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
|
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
|
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
|
|||||||
## Timeout, in ms.
|
## Timeout, in ms.
|
||||||
timeout = 100
|
timeout = 100
|
||||||
## A list of Mesos masters.
|
## A list of Mesos masters.
|
||||||
masters = ["localhost:5050"]
|
masters = ["http://localhost:5050"]
|
||||||
## Master metrics groups to be collected, by default, all enabled.
|
## Master metrics groups to be collected, by default, all enabled.
|
||||||
master_collections = [
|
master_collections = [
|
||||||
"resources",
|
"resources",
|
||||||
@@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
|
|||||||
# "tasks",
|
# "tasks",
|
||||||
# "messages",
|
# "messages",
|
||||||
# ]
|
# ]
|
||||||
|
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use SSL but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
```
|
```
|
||||||
|
|
||||||
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
|
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
|
||||||
@@ -235,7 +242,8 @@ Mesos slave metric groups
|
|||||||
### Tags:
|
### Tags:
|
||||||
|
|
||||||
- All master/slave measurements have the following tags:
|
- All master/slave measurements have the following tags:
|
||||||
- server
|
- server (network location of server: `host:port`)
|
||||||
|
- url (URL origin of server: `scheme://host:port`)
|
||||||
- role (master/slave)
|
- role (master/slave)
|
||||||
|
|
||||||
- All master measurements have the extra tags:
|
- All master measurements have the extra tags:
|
||||||
|
|||||||
@@ -7,11 +7,14 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
)
|
)
|
||||||
@@ -30,6 +33,20 @@ type Mesos struct {
|
|||||||
Slaves []string
|
Slaves []string
|
||||||
SlaveCols []string `toml:"slave_collections"`
|
SlaveCols []string `toml:"slave_collections"`
|
||||||
//SlaveTasks bool
|
//SlaveTasks bool
|
||||||
|
|
||||||
|
// Path to CA file
|
||||||
|
SSLCA string `toml:"ssl_ca"`
|
||||||
|
// Path to host cert file
|
||||||
|
SSLCert string `toml:"ssl_cert"`
|
||||||
|
// Path to cert key file
|
||||||
|
SSLKey string `toml:"ssl_key"`
|
||||||
|
// Use SSL but skip chain & host verification
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
initialized bool
|
||||||
|
client *http.Client
|
||||||
|
masterURLs []*url.URL
|
||||||
|
slaveURLs []*url.URL
|
||||||
}
|
}
|
||||||
|
|
||||||
var allMetrics = map[Role][]string{
|
var allMetrics = map[Role][]string{
|
||||||
@@ -41,7 +58,7 @@ var sampleConfig = `
|
|||||||
## Timeout, in ms.
|
## Timeout, in ms.
|
||||||
timeout = 100
|
timeout = 100
|
||||||
## A list of Mesos masters.
|
## A list of Mesos masters.
|
||||||
masters = ["localhost:5050"]
|
masters = ["http://localhost:5050"]
|
||||||
## Master metrics groups to be collected, by default, all enabled.
|
## Master metrics groups to be collected, by default, all enabled.
|
||||||
master_collections = [
|
master_collections = [
|
||||||
"resources",
|
"resources",
|
||||||
@@ -65,6 +82,13 @@ var sampleConfig = `
|
|||||||
# "tasks",
|
# "tasks",
|
||||||
# "messages",
|
# "messages",
|
||||||
# ]
|
# ]
|
||||||
|
|
||||||
|
## Optional SSL Config
|
||||||
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# ssl_key = "/etc/telegraf/key.pem"
|
||||||
|
## Use SSL but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
`
|
`
|
||||||
|
|
||||||
// SampleConfig returns a sample configuration block
|
// SampleConfig returns a sample configuration block
|
||||||
@@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
|
|||||||
return "Telegraf plugin for gathering metrics from N Mesos masters"
|
return "Telegraf plugin for gathering metrics from N Mesos masters"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mesos) SetDefaults() {
|
func parseURL(s string, role Role) (*url.URL, error) {
|
||||||
|
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
|
||||||
|
host, port, err := net.SplitHostPort(s)
|
||||||
|
// no port specified
|
||||||
|
if err != nil {
|
||||||
|
host = s
|
||||||
|
switch role {
|
||||||
|
case MASTER:
|
||||||
|
port = "5050"
|
||||||
|
case SLAVE:
|
||||||
|
port = "5051"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s = "http://" + host + ":" + port
|
||||||
|
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return url.Parse(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Mesos) initialize() error {
|
||||||
if len(m.MasterCols) == 0 {
|
if len(m.MasterCols) == 0 {
|
||||||
m.MasterCols = allMetrics[MASTER]
|
m.MasterCols = allMetrics[MASTER]
|
||||||
}
|
}
|
||||||
@@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if m.Timeout == 0 {
|
if m.Timeout == 0 {
|
||||||
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
|
log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
|
||||||
m.Timeout = 100
|
m.Timeout = 100
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
|
||||||
|
|
||||||
|
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
|
||||||
|
for _, master := range m.Masters {
|
||||||
|
u, err := parseURL(master, MASTER)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.RawQuery = rawQuery
|
||||||
|
m.masterURLs = append(m.masterURLs, u)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
|
||||||
|
for _, slave := range m.Slaves {
|
||||||
|
u, err := parseURL(slave, SLAVE)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.RawQuery = rawQuery
|
||||||
|
m.slaveURLs = append(m.slaveURLs, u)
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := m.createHttpClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.client = client
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather() metrics from given list of Mesos Masters
|
// Gather() metrics from given list of Mesos Masters
|
||||||
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
if !m.initialized {
|
||||||
|
err := m.initialize()
|
||||||
m.SetDefaults()
|
if err != nil {
|
||||||
|
return err
|
||||||
for _, v := range m.Masters {
|
}
|
||||||
wg.Add(1)
|
m.initialized = true
|
||||||
go func(c string) {
|
|
||||||
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
|
|
||||||
wg.Done()
|
|
||||||
return
|
|
||||||
}(v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range m.Slaves {
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, master := range m.masterURLs {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(c string) {
|
go func(master *url.URL) {
|
||||||
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc))
|
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
|
||||||
wg.Done()
|
wg.Done()
|
||||||
return
|
return
|
||||||
}(v)
|
}(master)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, slave := range m.slaveURLs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(slave *url.URL) {
|
||||||
|
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
|
||||||
|
wg.Done()
|
||||||
|
return
|
||||||
|
}(slave)
|
||||||
|
|
||||||
// if !m.SlaveTasks {
|
// if !m.SlaveTasks {
|
||||||
// continue
|
// continue
|
||||||
@@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
|||||||
|
|
||||||
// wg.Add(1)
|
// wg.Add(1)
|
||||||
// go func(c string) {
|
// go func(c string) {
|
||||||
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc))
|
// acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
|
||||||
// wg.Done()
|
// wg.Done()
|
||||||
// return
|
// return
|
||||||
// }(v)
|
// }(v)
|
||||||
@@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Mesos) createHttpClient() (*http.Client, error) {
|
||||||
|
tlsCfg, err := internal.GetTLSConfig(
|
||||||
|
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
},
|
||||||
|
Timeout: 4 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
// metricsDiff() returns set names for removal
|
// metricsDiff() returns set names for removal
|
||||||
func metricsDiff(role Role, w []string) []string {
|
func metricsDiff(role Role, w []string) []string {
|
||||||
b := []string{}
|
b := []string{}
|
||||||
@@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var tr = &http.Transport{
|
|
||||||
ResponseHeaderTimeout: time.Duration(3 * time.Second),
|
|
||||||
}
|
|
||||||
|
|
||||||
var client = &http.Client{
|
|
||||||
Transport: tr,
|
|
||||||
Timeout: time.Duration(4 * time.Second),
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskStats struct for JSON API output /monitor/statistics
|
// TaskStats struct for JSON API output /monitor/statistics
|
||||||
type TaskStats struct {
|
type TaskStats struct {
|
||||||
ExecutorID string `json:"executor_id"`
|
ExecutorID string `json:"executor_id"`
|
||||||
@@ -409,22 +501,15 @@ type TaskStats struct {
|
|||||||
Statistics map[string]interface{} `json:"statistics"`
|
Statistics map[string]interface{} `json:"statistics"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
|
func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
|
||||||
var metrics []TaskStats
|
var metrics []TaskStats
|
||||||
|
|
||||||
host, _, err := net.SplitHostPort(address)
|
|
||||||
if err != nil {
|
|
||||||
host = address
|
|
||||||
address = address + defaultPort
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"server": host,
|
"server": u.Hostname(),
|
||||||
|
"url": urlTag(u),
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := strconv.Itoa(m.Timeout) + "ms"
|
resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
|
||||||
|
|
||||||
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This should not belong to the object
|
func withPath(u *url.URL, path string) *url.URL {
|
||||||
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
|
c := *u
|
||||||
var jsonOut map[string]interface{}
|
c.Path = path
|
||||||
|
return &c
|
||||||
host, _, err := net.SplitHostPort(a)
|
|
||||||
if err != nil {
|
|
||||||
host = a
|
|
||||||
a = a + defaultPort
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func urlTag(u *url.URL) string {
|
||||||
|
c := *u
|
||||||
|
c.Path = ""
|
||||||
|
c.User = nil
|
||||||
|
c.RawQuery = ""
|
||||||
|
return c.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should not belong to the object
|
||||||
|
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
|
||||||
|
var jsonOut map[string]interface{}
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"server": host,
|
"server": u.Hostname(),
|
||||||
|
"url": urlTag(u),
|
||||||
"role": string(role),
|
"role": string(role),
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := strconv.Itoa(m.Timeout) + "ms"
|
resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
|
||||||
|
|
||||||
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -6,10 +6,12 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var masterMetrics map[string]interface{}
|
var masterMetrics map[string]interface{}
|
||||||
@@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWithPathDoesNotModify(t *testing.T) {
|
||||||
|
u, err := url.Parse("http://localhost:5051")
|
||||||
|
require.NoError(t, err)
|
||||||
|
v := withPath(u, "/xyzzy")
|
||||||
|
require.Equal(t, u.String(), "http://localhost:5051")
|
||||||
|
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestURLTagDoesNotModify(t *testing.T) {
|
||||||
|
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
|
||||||
|
require.NoError(t, err)
|
||||||
|
v := urlTag(u)
|
||||||
|
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
|
||||||
|
require.Equal(t, v, "http://localhost:5051")
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,47 +0,0 @@
|
|||||||
# Telegraf plugin: Spark
|
|
||||||
|
|
||||||
#### Plugin arguments:
|
|
||||||
- **spark_servers** []string: List of spark nodes with the format ["host:port"] (optional)
|
|
||||||
- **yarn_server** string: Address of Yarn resource manager (optional)
|
|
||||||
|
|
||||||
#### Description
|
|
||||||
|
|
||||||
The Spark plugin collects metrics in 2 ways, both being optional:
|
|
||||||
- Spark-JVM metrics exposed as MBean's attributes through jolokia REST endpoint. Metrics are collected for each server configured. See:https://jolokia.org/
|
|
||||||
- Spark application metrics if managed by Yarn Resource manager. The plugin collects through the Yarn API. If some spark job has been submitted then only it will fetch else it will not produce any spark application result.
|
|
||||||
|
|
||||||
# Measurements:
|
|
||||||
Spark plugin produces one or more measurements according to the SparkServer or YarnServer provided.
|
|
||||||
|
|
||||||
Given a configuration like:
|
|
||||||
|
|
||||||
```toml
|
|
||||||
[[inputs.spark]]
|
|
||||||
spark_servers = ["127.0.0.1:8778"]
|
|
||||||
yarn_server = "127.0.0.1:8088"
|
|
||||||
```
|
|
||||||
|
|
||||||
The maximum collected measurements will be:
|
|
||||||
|
|
||||||
```
|
|
||||||
spark_HeapMemoryUsage , spark_Threading , spark_apps , spark_clusterInfo , spark_clusterMetrics , spark_jolokiaMetrics , spark_jvmMetrics , spark_nodes
|
|
||||||
```
|
|
||||||
|
|
||||||
# Useful Metrics:
|
|
||||||
|
|
||||||
Here is a list of metrics that are fetched and might be useful to monitor your spark cluster.
|
|
||||||
|
|
||||||
####measurement domains collected through Jolokia
|
|
||||||
|
|
||||||
- "/metrics:*"
|
|
||||||
- "/java.lang:type=Memory/HeapMemoryUsage"
|
|
||||||
- "/java.lang:type=Threading
|
|
||||||
|
|
||||||
####measurement domains collected through YarnServer
|
|
||||||
- "/cluster"
|
|
||||||
- "/cluster/metrics"
|
|
||||||
- "/cluster/apps"
|
|
||||||
- "/cluster/nodes"
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,437 +0,0 @@
|
|||||||
package spark
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
|
||||||
)
|
|
||||||
|
|
||||||
type JolokiaClient interface {
|
|
||||||
MakeRequest(req *http.Request) (*http.Response, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type JolokiaClientImpl struct {
|
|
||||||
client *http.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
|
|
||||||
return c.client.Do(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
type YarnClient interface {
|
|
||||||
MakeRequest(req *http.Request) (*http.Response, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type YarnClientImpl struct {
|
|
||||||
client *http.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c YarnClientImpl) MakeRequest(req *http.Request) (*http.Response, error) {
|
|
||||||
return c.client.Do(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Spark struct {
|
|
||||||
jClient JolokiaClient
|
|
||||||
spark_servers []string
|
|
||||||
yarn_server string
|
|
||||||
}
|
|
||||||
|
|
||||||
type javaMetric struct {
|
|
||||||
host string
|
|
||||||
metric string
|
|
||||||
acc telegraf.Accumulator
|
|
||||||
}
|
|
||||||
|
|
||||||
type sparkMetric struct {
|
|
||||||
host string
|
|
||||||
metric string
|
|
||||||
acc telegraf.Accumulator
|
|
||||||
}
|
|
||||||
|
|
||||||
type Yarn struct {
|
|
||||||
yClient YarnClient
|
|
||||||
serverAddress string
|
|
||||||
}
|
|
||||||
|
|
||||||
type yarnMetric struct {
|
|
||||||
host string
|
|
||||||
acc telegraf.Accumulator
|
|
||||||
}
|
|
||||||
|
|
||||||
type jmxMetric interface {
|
|
||||||
addTagsFields(out map[string]interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
func newJavaMetric(host string, metric string,
|
|
||||||
acc telegraf.Accumulator) *javaMetric {
|
|
||||||
return &javaMetric{host: host, metric: metric, acc: acc}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSparkMetric(host string, metric string,
|
|
||||||
acc telegraf.Accumulator) *sparkMetric {
|
|
||||||
return &sparkMetric{host: host, metric: metric, acc: acc}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newYarnMetric(host string, acc telegraf.Accumulator) *yarnMetric {
|
|
||||||
return &yarnMetric{host: host, acc: acc}
|
|
||||||
}
|
|
||||||
|
|
||||||
func addValuesAsFields(values map[string]interface{}, fields map[string]interface{},
|
|
||||||
mname string) {
|
|
||||||
for k, v := range values {
|
|
||||||
if v != nil {
|
|
||||||
fields[mname+"_"+k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseJmxMetricRequest(mbean string) map[string]string {
|
|
||||||
tokens := make(map[string]string)
|
|
||||||
classAndPairs := strings.Split(mbean, ":")
|
|
||||||
if classAndPairs[0] == "metrics" {
|
|
||||||
tokens["class"] = "spark_jolokia_metrics"
|
|
||||||
} else if classAndPairs[0] == "java.lang" {
|
|
||||||
tokens["class"] = "java"
|
|
||||||
} else {
|
|
||||||
return tokens
|
|
||||||
}
|
|
||||||
|
|
||||||
pair := strings.Split(classAndPairs[1], "=")
|
|
||||||
tokens[pair[0]] = pair[1]
|
|
||||||
|
|
||||||
return tokens
|
|
||||||
}
|
|
||||||
|
|
||||||
func addTokensToTags(tokens map[string]string, tags map[string]string) {
|
|
||||||
for k, v := range tokens {
|
|
||||||
if k == "name" {
|
|
||||||
tags["mname"] = v // name seems to a reserved word in influxdb
|
|
||||||
} else if k == "class" || k == "type" {
|
|
||||||
continue // class and type are used in the metric name
|
|
||||||
} else {
|
|
||||||
tags[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func addJavaMetric(class string, c *javaMetric,
|
|
||||||
values map[string]interface{}) {
|
|
||||||
|
|
||||||
tags := make(map[string]string)
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
tags["spark_host"] = c.host
|
|
||||||
tags["spark_class"] = class
|
|
||||||
|
|
||||||
if class == "spark_threading" {
|
|
||||||
list := []string{"PeakThreadCount", "CurrentThreadCpuTime", "DaemonThreadCount", "TotalStartedThreadCount", "CurrentThreadUserTime", "ThreadCount"}
|
|
||||||
for _, value := range list {
|
|
||||||
if values[value] != nil {
|
|
||||||
fields[value] = values[value]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for k, v := range values {
|
|
||||||
if v != nil {
|
|
||||||
fields[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.acc.AddFields(class, fields, tags)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *javaMetric) addTagsFields(out map[string]interface{}) {
|
|
||||||
request := out["request"].(map[string]interface{})
|
|
||||||
var mbean = request["mbean"].(string)
|
|
||||||
var mbeansplit = strings.Split(mbean, "=")
|
|
||||||
var class = mbeansplit[1]
|
|
||||||
|
|
||||||
if valuesMap, ok := out["value"]; ok {
|
|
||||||
if class == "Memory" {
|
|
||||||
addJavaMetric("spark_heap_memory_usage", j, valuesMap.(map[string]interface{}))
|
|
||||||
} else if class == "Threading" {
|
|
||||||
addJavaMetric("spark_threading", j, valuesMap.(map[string]interface{}))
|
|
||||||
} else {
|
|
||||||
fmt.Printf("Missing key in '%s' output response\n%v\n",
|
|
||||||
j.metric, out)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func addSparkMetric(mbean string, c *sparkMetric,
|
|
||||||
values map[string]interface{}) {
|
|
||||||
|
|
||||||
tags := make(map[string]string)
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
|
|
||||||
tokens := parseJmxMetricRequest(mbean)
|
|
||||||
addTokensToTags(tokens, tags)
|
|
||||||
tags["spark_host"] = c.host
|
|
||||||
|
|
||||||
addValuesAsFields(values, fields, tags["mname"])
|
|
||||||
c.acc.AddFields(tokens["class"]+tokens["type"], fields, tags)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *sparkMetric) addTagsFields(out map[string]interface{}) {
|
|
||||||
if valuesMap, ok := out["value"]; ok {
|
|
||||||
for k, v := range valuesMap.(map[string]interface{}) {
|
|
||||||
addSparkMetric(k, c, v.(map[string]interface{}))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fmt.Printf("Missing key 'value' in '%s' output response\n%v\n",
|
|
||||||
c.metric, out)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func addYarnMetric(c *yarnMetric, value map[string]interface{}, metrictype string) {
|
|
||||||
|
|
||||||
tags := make(map[string]string)
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
tags["yarn_host"] = c.host
|
|
||||||
for key, val := range value {
|
|
||||||
fields[key] = val
|
|
||||||
}
|
|
||||||
c.acc.AddFields(metrictype, fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *yarnMetric) addTagsFields(out map[string]interface{}) {
|
|
||||||
|
|
||||||
if valuesMap, ok := out["clusterMetrics"]; ok {
|
|
||||||
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_metrics")
|
|
||||||
} else if valuesMap, ok := out["clusterInfo"]; ok {
|
|
||||||
addYarnMetric(c, valuesMap.(map[string]interface{}), "spark_cluster_info")
|
|
||||||
} else if valuesMap, ok := out["apps"]; ok {
|
|
||||||
for _, value := range valuesMap.(map[string]interface{}) {
|
|
||||||
for _, vv := range value.([]interface{}) {
|
|
||||||
addYarnMetric(c, vv.(map[string]interface{}), "spark_apps")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if valuesMap, ok := out["nodes"]; ok {
|
|
||||||
for _, value := range valuesMap.(map[string]interface{}) {
|
|
||||||
for _, vv := range value.([]interface{}) {
|
|
||||||
addYarnMetric(c, vv.(map[string]interface{}), "spark_nodes")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fmt.Printf("Missing the required key in output response\n%v\n", out)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Spark) SampleConfig() string {
|
|
||||||
return `
|
|
||||||
## Spark servers exposing jolokia read service
|
|
||||||
#spark_servers = ["127.0.0.1:8778"] #optional
|
|
||||||
## Server running Yarn Resource Manager
|
|
||||||
#yarn_server = "127.0.0.1:8088" #optional
|
|
||||||
`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Spark) Description() string {
|
|
||||||
return "Read Spark metrics through Jolokia and Yarn"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Spark) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
|
||||||
// Create + send request
|
|
||||||
req, err := http.NewRequest("GET", requestUrl.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := j.jClient.MakeRequest(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// Process response
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
|
||||||
requestUrl,
|
|
||||||
resp.StatusCode,
|
|
||||||
http.StatusText(resp.StatusCode),
|
|
||||||
http.StatusOK,
|
|
||||||
http.StatusText(http.StatusOK))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// read body
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Unmarshal json
|
|
||||||
var jsonOut map[string]interface{}
|
|
||||||
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
|
||||||
return nil, errors.New("Error decoding JSON response")
|
|
||||||
}
|
|
||||||
|
|
||||||
return jsonOut, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (j *Yarn) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
|
|
||||||
req, err := http.NewRequest("GET", requestUrl.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := j.yClient.MakeRequest(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// Process response
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
|
|
||||||
requestUrl,
|
|
||||||
resp.StatusCode,
|
|
||||||
http.StatusText(resp.StatusCode),
|
|
||||||
http.StatusOK,
|
|
||||||
http.StatusText(http.StatusOK))
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// read body
|
|
||||||
body, err := ioutil.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unmarshal json
|
|
||||||
var jsonOut map[string]interface{}
|
|
||||||
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
|
|
||||||
return nil, errors.New("Error decoding JSON response")
|
|
||||||
}
|
|
||||||
|
|
||||||
return jsonOut, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseServerTokens(server string) map[string]string {
|
|
||||||
serverTokens := make(map[string]string)
|
|
||||||
log.Printf("Parsing %s", server)
|
|
||||||
hostAndUser := strings.Split(server, "@")
|
|
||||||
hostPort := ""
|
|
||||||
|
|
||||||
if len(hostAndUser) == 1 {
|
|
||||||
hostPort = hostAndUser[0]
|
|
||||||
} else {
|
|
||||||
log.Printf("Unsupported Server info, skipping")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
log.Printf("%s \n", hostPort)
|
|
||||||
hostTokens := strings.Split(hostPort, ":")
|
|
||||||
serverTokens["host"] = hostTokens[0]
|
|
||||||
serverTokens["port"] = hostTokens[1]
|
|
||||||
return serverTokens
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Spark) GatherJolokia(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
|
|
||||||
context := "/jolokia/read"
|
|
||||||
servers := c.spark_servers
|
|
||||||
metrics := [...]string{"/metrics:*", "/java.lang:type=Memory/HeapMemoryUsage", "/java.lang:type=Threading"}
|
|
||||||
if len(servers) == 0 {
|
|
||||||
wg.Done()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
for _, server := range servers {
|
|
||||||
for _, metric := range metrics {
|
|
||||||
serverTokens := parseServerTokens(server)
|
|
||||||
var m jmxMetric
|
|
||||||
if strings.HasPrefix(metric, "/java.lang:") {
|
|
||||||
m = newJavaMetric(serverTokens["host"], metric, acc)
|
|
||||||
} else if strings.HasPrefix(metric, "/metrics:") {
|
|
||||||
m = newSparkMetric(serverTokens["host"], metric, acc)
|
|
||||||
} else {
|
|
||||||
log.Printf("Unsupported Spark metric [%s], skipping",
|
|
||||||
metric)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" +
|
|
||||||
serverTokens["port"] + context + metric)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := c.getAttr(requestUrl)
|
|
||||||
if len(out) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.addTagsFields(out)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Yarn) GatherYarn(acc telegraf.Accumulator, wg *sync.WaitGroup) error {
|
|
||||||
contexts := [...]string{"/ws/v1/cluster", "/ws/v1/cluster/metrics", "/ws/v1/cluster/apps", "/ws/v1/cluster/nodes"}
|
|
||||||
server := c.serverAddress
|
|
||||||
|
|
||||||
if server == "" {
|
|
||||||
wg.Done()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
serverTokens := parseServerTokens(server)
|
|
||||||
for _, context := range contexts {
|
|
||||||
var m = newYarnMetric(server, acc)
|
|
||||||
requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := c.getAttr(requestUrl)
|
|
||||||
if len(out) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.addTagsFields(out)
|
|
||||||
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Spark) Gather(acc telegraf.Accumulator) error {
|
|
||||||
|
|
||||||
log.Println("Config is ", c)
|
|
||||||
yarn := Yarn{
|
|
||||||
yClient: &YarnClientImpl{client: &http.Client{}},
|
|
||||||
serverAddress: c.yarn_server,
|
|
||||||
}
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
go yarn.GatherYarn(acc, &wg)
|
|
||||||
spark := Spark{
|
|
||||||
jClient: &JolokiaClientImpl{client: &http.Client{}},
|
|
||||||
spark_servers: c.spark_servers,
|
|
||||||
}
|
|
||||||
wg.Add(1)
|
|
||||||
go spark.GatherJolokia(acc, &wg)
|
|
||||||
wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
inputs.Add("spark", func() telegraf.Input {
|
|
||||||
return &Spark{jClient: &JolokiaClientImpl{client: &http.Client{}}}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
package spark
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestSparkMeasurements(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("Skipping integration test in short mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
a := &Spark{
|
|
||||||
yarn_server: testutil.GetLocalHost() + ":8088",
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
err := a.Gather(&acc)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
35
plugins/inputs/system/HUGEPAGES.md
Normal file
35
plugins/inputs/system/HUGEPAGES.md
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
# Hugepages Input Plugin
|
||||||
|
|
||||||
|
The hugepages plugin gathers hugepages metrics including per NUMA node
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Description
|
||||||
|
[[inputs.hugepages]]
|
||||||
|
## Path to a NUMA nodes
|
||||||
|
# numa_node_path = "/sys/devices/system/node"
|
||||||
|
## Path to a meminfo file
|
||||||
|
# meminfo_path = "/proc/meminfo"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
- hugepages
|
||||||
|
- free (int, kB)
|
||||||
|
- nr (int, kB)
|
||||||
|
- HugePages_Total (int, kB)
|
||||||
|
- HugePages_Free (int, kB)
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
- hugepages has the following tags:
|
||||||
|
- node
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ ./telegraf -config telegraf.conf -input-filter hugepages -test
|
||||||
|
> hugepages,host=maxpc,node=node0 free=0i,nr=0i 1467618621000000000
|
||||||
|
> hugepages,host=maxpc,name=meminfo HugePages_Free=0i,HugePages_Total=0i 1467618621000000000
|
||||||
|
```
|
||||||
184
plugins/inputs/system/hugepages.go
Normal file
184
plugins/inputs/system/hugepages.go
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
newlineByte = []byte("\n")
|
||||||
|
colonByte = []byte(":")
|
||||||
|
kbPrecisionByte = []byte("kB")
|
||||||
|
)
|
||||||
|
|
||||||
|
// default file paths
|
||||||
|
const (
|
||||||
|
// the path where statistics are kept per NUMA nodes
|
||||||
|
NUMA_NODE_PATH = "/sys/devices/system/node"
|
||||||
|
// the path to the meminfo file which is produced by kernel
|
||||||
|
MEMINFO_PATH = "/proc/meminfo"
|
||||||
|
|
||||||
|
// hugepages stat field names on meminfo file
|
||||||
|
HUGE_PAGES_TOTAL = "HugePages_Total"
|
||||||
|
HUGE_PAGES_FREE = "HugePages_Free"
|
||||||
|
)
|
||||||
|
|
||||||
|
var hugepagesSampleConfig = `
|
||||||
|
## Path to a NUMA nodes
|
||||||
|
# numa_node_path = "/sys/devices/system/node"
|
||||||
|
## Path to a meminfo file
|
||||||
|
# meminfo_path = "/proc/meminfo"
|
||||||
|
`
|
||||||
|
|
||||||
|
// Mem is the
|
||||||
|
type Hugepages struct {
|
||||||
|
NUMANodePath string `toml:"numa_node_path"`
|
||||||
|
MeminfoPath string `toml:"meminfo_path"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mem *Hugepages) Description() string {
|
||||||
|
return "Collects hugepages metrics from kernel and per NUMA node"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mem *Hugepages) SampleConfig() string {
|
||||||
|
return hugepagesSampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mem *Hugepages) Gather(acc telegraf.Accumulator) error {
|
||||||
|
mem.loadPaths()
|
||||||
|
err := mem.GatherStatsPerNode(acc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = mem.GatherStatsFromMeminfo(acc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GatherHugepagesStatsPerNode collects hugepages stats per NUMA nodes
|
||||||
|
func (mem *Hugepages) GatherStatsPerNode(acc telegraf.Accumulator) error {
|
||||||
|
numaNodeMetrics, err := statsPerNUMA(mem.NUMANodePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range numaNodeMetrics {
|
||||||
|
metrics := make(map[string]interface{})
|
||||||
|
tags := map[string]string{
|
||||||
|
"node": k,
|
||||||
|
}
|
||||||
|
metrics["free"] = v.Free
|
||||||
|
metrics["nr"] = v.NR
|
||||||
|
acc.AddFields("hugepages", metrics, tags)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GatherHugepagesStatsFromMeminfo collects hugepages statistics from meminfo file
|
||||||
|
func (mem *Hugepages) GatherStatsFromMeminfo(acc telegraf.Accumulator) error {
|
||||||
|
tags := map[string]string{
|
||||||
|
"name": "meminfo",
|
||||||
|
}
|
||||||
|
metrics := make(map[string]interface{})
|
||||||
|
meminfoMetrics, err := statsFromMeminfo(mem.MeminfoPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range meminfoMetrics {
|
||||||
|
metrics[k] = v
|
||||||
|
}
|
||||||
|
acc.AddFields("hugepages", metrics, tags)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type HugepagesNUMAStats struct {
|
||||||
|
Free int
|
||||||
|
NR int
|
||||||
|
}
|
||||||
|
|
||||||
|
// statsPerNUMA gathers hugepages statistics from each NUMA node
|
||||||
|
func statsPerNUMA(path string) (map[string]HugepagesNUMAStats, error) {
|
||||||
|
var hugepagesStats = make(map[string]HugepagesNUMAStats)
|
||||||
|
dirs, err := ioutil.ReadDir(path)
|
||||||
|
if err != nil {
|
||||||
|
return hugepagesStats, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, d := range dirs {
|
||||||
|
if !(d.IsDir() && strings.HasPrefix(d.Name(), "node")) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hugepagesFree := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/free_hugepages", path, d.Name())
|
||||||
|
hugepagesNR := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/nr_hugepages", path, d.Name())
|
||||||
|
|
||||||
|
free, err := ioutil.ReadFile(hugepagesFree)
|
||||||
|
if err != nil {
|
||||||
|
return hugepagesStats, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nr, err := ioutil.ReadFile(hugepagesNR)
|
||||||
|
if err != nil {
|
||||||
|
return hugepagesStats, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, _ := strconv.Atoi(string(bytes.TrimSuffix(free, newlineByte)))
|
||||||
|
n, _ := strconv.Atoi(string(bytes.TrimSuffix(nr, newlineByte)))
|
||||||
|
|
||||||
|
hugepagesStats[d.Name()] = HugepagesNUMAStats{Free: f, NR: n}
|
||||||
|
|
||||||
|
}
|
||||||
|
return hugepagesStats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// statsFromMeminfo gathers hugepages statistics from kernel
|
||||||
|
func statsFromMeminfo(path string) (map[string]interface{}, error) {
|
||||||
|
stats := map[string]interface{}{}
|
||||||
|
meminfo, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return stats, err
|
||||||
|
}
|
||||||
|
lines := bytes.Split(meminfo, newlineByte)
|
||||||
|
for _, l := range lines {
|
||||||
|
if bytes.Contains(l, kbPrecisionByte) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields := bytes.Fields(l)
|
||||||
|
if len(fields) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fieldName := string(bytes.TrimSuffix(fields[0], colonByte))
|
||||||
|
if fieldName == HUGE_PAGES_TOTAL || fieldName == HUGE_PAGES_FREE {
|
||||||
|
val, _ := strconv.Atoi(string(fields[1]))
|
||||||
|
stats[fieldName] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadPaths can be used to read paths firstly from config
|
||||||
|
// if it is empty then try read from env variables
|
||||||
|
func (mem *Hugepages) loadPaths() {
|
||||||
|
if mem.NUMANodePath == "" {
|
||||||
|
mem.NUMANodePath = NUMA_NODE_PATH
|
||||||
|
}
|
||||||
|
if mem.MeminfoPath == "" {
|
||||||
|
mem.MeminfoPath = MEMINFO_PATH
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("hugepages", func() telegraf.Input {
|
||||||
|
return &Hugepages{}
|
||||||
|
})
|
||||||
|
}
|
||||||
43
plugins/inputs/system/hugepages_test.go
Normal file
43
plugins/inputs/system/hugepages_test.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package system
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var hugepages = Hugepages{
|
||||||
|
NUMANodePath: "./testdata/node",
|
||||||
|
MeminfoPath: "./testdata/meminfo",
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
hugepages.loadPaths()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHugepagesStatsFromMeminfo(t *testing.T) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
err := hugepages.GatherStatsFromMeminfo(acc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"HugePages_Total": int(666),
|
||||||
|
"HugePages_Free": int(999),
|
||||||
|
}
|
||||||
|
acc.AssertContainsFields(t, "hugepages", fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHugepagesStatsPerNode(t *testing.T) {
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
err := hugepages.GatherStatsPerNode(acc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"free": int(123),
|
||||||
|
"nr": int(456),
|
||||||
|
}
|
||||||
|
acc.AssertContainsFields(t, "hugepages", fields)
|
||||||
|
}
|
||||||
45
plugins/inputs/system/testdata/meminfo
vendored
Normal file
45
plugins/inputs/system/testdata/meminfo
vendored
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
MemTotal: 5961204 kB
|
||||||
|
MemFree: 5201764 kB
|
||||||
|
MemAvailable: 5761624 kB
|
||||||
|
Buffers: 120248 kB
|
||||||
|
Cached: 419660 kB
|
||||||
|
SwapCached: 0 kB
|
||||||
|
Active: 370656 kB
|
||||||
|
Inactive: 247364 kB
|
||||||
|
Active(anon): 79032 kB
|
||||||
|
Inactive(anon): 552 kB
|
||||||
|
Active(file): 291624 kB
|
||||||
|
Inactive(file): 246812 kB
|
||||||
|
Unevictable: 0 kB
|
||||||
|
Mlocked: 0 kB
|
||||||
|
SwapTotal: 0 kB
|
||||||
|
SwapFree: 0 kB
|
||||||
|
Dirty: 4 kB
|
||||||
|
Writeback: 0 kB
|
||||||
|
AnonPages: 78164 kB
|
||||||
|
Mapped: 114164 kB
|
||||||
|
Shmem: 1452 kB
|
||||||
|
Slab: 77180 kB
|
||||||
|
SReclaimable: 57676 kB
|
||||||
|
SUnreclaim: 19504 kB
|
||||||
|
KernelStack: 2832 kB
|
||||||
|
PageTables: 8692 kB
|
||||||
|
NFS_Unstable: 0 kB
|
||||||
|
Bounce: 0 kB
|
||||||
|
WritebackTmp: 0 kB
|
||||||
|
CommitLimit: 2980600 kB
|
||||||
|
Committed_AS: 555492 kB
|
||||||
|
VmallocTotal: 34359738367 kB
|
||||||
|
VmallocUsed: 0 kB
|
||||||
|
VmallocChunk: 0 kB
|
||||||
|
HardwareCorrupted: 0 kB
|
||||||
|
AnonHugePages: 0 kB
|
||||||
|
CmaTotal: 0 kB
|
||||||
|
CmaFree: 0 kB
|
||||||
|
HugePages_Total: 666
|
||||||
|
HugePages_Free: 999
|
||||||
|
HugePages_Rsvd: 0
|
||||||
|
HugePages_Surp: 0
|
||||||
|
Hugepagesize: 2048 kB
|
||||||
|
DirectMap4k: 84224 kB
|
||||||
|
DirectMap2M: 6055936 kB
|
||||||
1
plugins/inputs/system/testdata/node/node0/hugepages/hugepages-2048kB/free_hugepages
vendored
Normal file
1
plugins/inputs/system/testdata/node/node0/hugepages/hugepages-2048kB/free_hugepages
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
123
|
||||||
1
plugins/inputs/system/testdata/node/node0/hugepages/hugepages-2048kB/nr_hugepages
vendored
Normal file
1
plugins/inputs/system/testdata/node/node0/hugepages/hugepages-2048kB/nr_hugepages
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
456
|
||||||
Reference in New Issue
Block a user