Compare commits

...

4 Commits

Author SHA1 Message Date
Russ Savage
af5017d3dc reviving code from #1447 to a new branch 2018-02-08 15:54:33 -08:00
Daniel Nelson
32dd1b3725 Adjust time of nightly build 2018-02-07 18:37:33 -08:00
Daniel Nelson
1b0e87a8b0 Update changelog 2018-02-07 18:37:32 -08:00
Daniel Nelson
efa9095829 Add TLS support to the mesos input plugin (#3769) 2018-02-07 18:36:38 -08:00
11 changed files with 481 additions and 53 deletions

View File

@@ -42,7 +42,7 @@ workflows:
- 'build'
triggers:
- schedule:
cron: "0 0 * * *"
cron: "0 18 * * *"
filters:
branches:
only:

View File

@@ -56,6 +56,7 @@
- [#3618](https://github.com/influxdata/telegraf/pull/3618): Add new sqlserver output data model.
- [#3559](https://github.com/influxdata/telegraf/pull/3559): Add native Go method for finding pids to procstat.
- [#3722](https://github.com/influxdata/telegraf/pull/3722): Add additional metrics and reverse metric names option to openldap.
- [#3769](https://github.com/influxdata/telegraf/pull/3769): Add TLS support to the mesos input plugin.
### Bugfixes

View File

@@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
@@ -235,7 +242,8 @@ Mesos slave metric groups
### Tags:
- All master/slave measurements have the following tags:
- server
- server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- role (master/slave)
- All master measurements have the extra tags:

View File

@@ -7,11 +7,14 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@@ -30,6 +33,20 @@ type Mesos struct {
Slaves []string
SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
}
var allMetrics = map[Role][]string{
@@ -41,7 +58,7 @@ var sampleConfig = `
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@@ -65,6 +82,13 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
// SampleConfig returns a sample configuration block
@@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
func (m *Mesos) SetDefaults() {
func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}
s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}
return url.Parse(s)
}
func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
@@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() {
}
if m.Timeout == 0 {
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}
client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client
return nil
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
m.SetDefaults()
for _, v := range m.Masters {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
wg.Done()
return
}(v)
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}
for _, v := range m.Slaves {
var wg sync.WaitGroup
for _, master := range m.masterURLs {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc))
go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done()
return
}(v)
}(master)
}
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
return
}(slave)
// if !m.SlaveTasks {
// continue
@@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
// wg.Add(1)
// go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc))
// acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// wg.Done()
// return
// }(v)
@@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil
}
func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}
return client, nil
}
// metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string {
b := []string{}
@@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
}
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
@@ -409,22 +501,15 @@ type TaskStats struct {
Statistics map[string]interface{} `json:"statistics"`
}
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
var metrics []TaskStats
host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}
tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
if err != nil {
return err
@@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
return nil
}
func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
return &c
}
func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
"role": string(role),
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
if err != nil {
return err

View File

@@ -6,10 +6,12 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var masterMetrics map[string]interface{}
@@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
}
}
}
func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}
func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}

View File

@@ -0,0 +1,35 @@
# Hugepages Input Plugin
The hugepages plugin gathers hugepages metrics including per NUMA node
### Configuration:
```toml
# Description
[[inputs.hugepages]]
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
```
### Measurements & Fields:
- hugepages
- free (int, kB)
- nr (int, kB)
- HugePages_Total (int, kB)
- HugePages_Free (int, kB)
### Tags:
- hugepages has the following tags:
- node
### Example Output:
```
$ ./telegraf -config telegraf.conf -input-filter hugepages -test
> hugepages,host=maxpc,node=node0 free=0i,nr=0i 1467618621000000000
> hugepages,host=maxpc,name=meminfo HugePages_Free=0i,HugePages_Total=0i 1467618621000000000
```

View File

@@ -0,0 +1,184 @@
package system
import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
var (
newlineByte = []byte("\n")
colonByte = []byte(":")
kbPrecisionByte = []byte("kB")
)
// default file paths
const (
// the path where statistics are kept per NUMA nodes
NUMA_NODE_PATH = "/sys/devices/system/node"
// the path to the meminfo file which is produced by kernel
MEMINFO_PATH = "/proc/meminfo"
// hugepages stat field names on meminfo file
HUGE_PAGES_TOTAL = "HugePages_Total"
HUGE_PAGES_FREE = "HugePages_Free"
)
var hugepagesSampleConfig = `
## Path to a NUMA nodes
# numa_node_path = "/sys/devices/system/node"
## Path to a meminfo file
# meminfo_path = "/proc/meminfo"
`
// Mem is the
type Hugepages struct {
NUMANodePath string `toml:"numa_node_path"`
MeminfoPath string `toml:"meminfo_path"`
}
func (mem *Hugepages) Description() string {
return "Collects hugepages metrics from kernel and per NUMA node"
}
func (mem *Hugepages) SampleConfig() string {
return hugepagesSampleConfig
}
func (mem *Hugepages) Gather(acc telegraf.Accumulator) error {
mem.loadPaths()
err := mem.GatherStatsPerNode(acc)
if err != nil {
return err
}
err = mem.GatherStatsFromMeminfo(acc)
if err != nil {
return err
}
return nil
}
// GatherHugepagesStatsPerNode collects hugepages stats per NUMA nodes
func (mem *Hugepages) GatherStatsPerNode(acc telegraf.Accumulator) error {
numaNodeMetrics, err := statsPerNUMA(mem.NUMANodePath)
if err != nil {
return err
}
for k, v := range numaNodeMetrics {
metrics := make(map[string]interface{})
tags := map[string]string{
"node": k,
}
metrics["free"] = v.Free
metrics["nr"] = v.NR
acc.AddFields("hugepages", metrics, tags)
}
return nil
}
// GatherHugepagesStatsFromMeminfo collects hugepages statistics from meminfo file
func (mem *Hugepages) GatherStatsFromMeminfo(acc telegraf.Accumulator) error {
tags := map[string]string{
"name": "meminfo",
}
metrics := make(map[string]interface{})
meminfoMetrics, err := statsFromMeminfo(mem.MeminfoPath)
if err != nil {
return err
}
for k, v := range meminfoMetrics {
metrics[k] = v
}
acc.AddFields("hugepages", metrics, tags)
return nil
}
type HugepagesNUMAStats struct {
Free int
NR int
}
// statsPerNUMA gathers hugepages statistics from each NUMA node
func statsPerNUMA(path string) (map[string]HugepagesNUMAStats, error) {
var hugepagesStats = make(map[string]HugepagesNUMAStats)
dirs, err := ioutil.ReadDir(path)
if err != nil {
return hugepagesStats, err
}
for _, d := range dirs {
if !(d.IsDir() && strings.HasPrefix(d.Name(), "node")) {
continue
}
hugepagesFree := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/free_hugepages", path, d.Name())
hugepagesNR := fmt.Sprintf("%s/%s/hugepages/hugepages-2048kB/nr_hugepages", path, d.Name())
free, err := ioutil.ReadFile(hugepagesFree)
if err != nil {
return hugepagesStats, err
}
nr, err := ioutil.ReadFile(hugepagesNR)
if err != nil {
return hugepagesStats, err
}
f, _ := strconv.Atoi(string(bytes.TrimSuffix(free, newlineByte)))
n, _ := strconv.Atoi(string(bytes.TrimSuffix(nr, newlineByte)))
hugepagesStats[d.Name()] = HugepagesNUMAStats{Free: f, NR: n}
}
return hugepagesStats, nil
}
// statsFromMeminfo gathers hugepages statistics from kernel
func statsFromMeminfo(path string) (map[string]interface{}, error) {
stats := map[string]interface{}{}
meminfo, err := ioutil.ReadFile(path)
if err != nil {
return stats, err
}
lines := bytes.Split(meminfo, newlineByte)
for _, l := range lines {
if bytes.Contains(l, kbPrecisionByte) {
continue
}
fields := bytes.Fields(l)
if len(fields) < 2 {
continue
}
fieldName := string(bytes.TrimSuffix(fields[0], colonByte))
if fieldName == HUGE_PAGES_TOTAL || fieldName == HUGE_PAGES_FREE {
val, _ := strconv.Atoi(string(fields[1]))
stats[fieldName] = val
}
}
return stats, nil
}
// loadPaths can be used to read paths firstly from config
// if it is empty then try read from env variables
func (mem *Hugepages) loadPaths() {
if mem.NUMANodePath == "" {
mem.NUMANodePath = NUMA_NODE_PATH
}
if mem.MeminfoPath == "" {
mem.MeminfoPath = MEMINFO_PATH
}
}
func init() {
inputs.Add("hugepages", func() telegraf.Input {
return &Hugepages{}
})
}

View File

@@ -0,0 +1,43 @@
package system
import (
"testing"
"github.com/influxdata/telegraf/testutil"
)
var hugepages = Hugepages{
NUMANodePath: "./testdata/node",
MeminfoPath: "./testdata/meminfo",
}
func init() {
hugepages.loadPaths()
}
func TestHugepagesStatsFromMeminfo(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsFromMeminfo(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"HugePages_Total": int(666),
"HugePages_Free": int(999),
}
acc.AssertContainsFields(t, "hugepages", fields)
}
func TestHugepagesStatsPerNode(t *testing.T) {
acc := &testutil.Accumulator{}
err := hugepages.GatherStatsPerNode(acc)
if err != nil {
t.Fatal(err)
}
fields := map[string]interface{}{
"free": int(123),
"nr": int(456),
}
acc.AssertContainsFields(t, "hugepages", fields)
}

45
plugins/inputs/system/testdata/meminfo vendored Normal file
View File

@@ -0,0 +1,45 @@
MemTotal: 5961204 kB
MemFree: 5201764 kB
MemAvailable: 5761624 kB
Buffers: 120248 kB
Cached: 419660 kB
SwapCached: 0 kB
Active: 370656 kB
Inactive: 247364 kB
Active(anon): 79032 kB
Inactive(anon): 552 kB
Active(file): 291624 kB
Inactive(file): 246812 kB
Unevictable: 0 kB
Mlocked: 0 kB
SwapTotal: 0 kB
SwapFree: 0 kB
Dirty: 4 kB
Writeback: 0 kB
AnonPages: 78164 kB
Mapped: 114164 kB
Shmem: 1452 kB
Slab: 77180 kB
SReclaimable: 57676 kB
SUnreclaim: 19504 kB
KernelStack: 2832 kB
PageTables: 8692 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 2980600 kB
Committed_AS: 555492 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 0 kB
VmallocChunk: 0 kB
HardwareCorrupted: 0 kB
AnonHugePages: 0 kB
CmaTotal: 0 kB
CmaFree: 0 kB
HugePages_Total: 666
HugePages_Free: 999
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 84224 kB
DirectMap2M: 6055936 kB

View File

@@ -0,0 +1 @@
456