Add TLS support to the mesos input plugin (#3769)

This commit is contained in:
Daniel Nelson 2018-02-07 18:36:38 -08:00 committed by GitHub
parent 89974d96d7
commit efa9095829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 170 additions and 52 deletions

View File

@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
## A list of Mesos masters. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
``` ```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
@ -235,7 +242,8 @@ Mesos slave metric groups
### Tags: ### Tags:
- All master/slave measurements have the following tags: - All master/slave measurements have the following tags:
- server - server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- role (master/slave) - role (master/slave)
- All master measurements have the extra tags: - All master measurements have the extra tags:

View File

@ -7,11 +7,14 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
) )
@ -30,6 +33,20 @@ type Mesos struct {
Slaves []string Slaves []string
SlaveCols []string `toml:"slave_collections"` SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool //SlaveTasks bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
} }
var allMetrics = map[Role][]string{ var allMetrics = map[Role][]string{
@ -41,7 +58,7 @@ var sampleConfig = `
## Timeout, in ms. ## Timeout, in ms.
timeout = 100 timeout = 100
## A list of Mesos masters. ## A list of Mesos masters.
masters = ["localhost:5050"] masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled. ## Master metrics groups to be collected, by default, all enabled.
master_collections = [ master_collections = [
"resources", "resources",
@ -65,6 +82,13 @@ var sampleConfig = `
# "tasks", # "tasks",
# "messages", # "messages",
# ] # ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
` `
// SampleConfig returns a sample configuration block // SampleConfig returns a sample configuration block
@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters" return "Telegraf plugin for gathering metrics from N Mesos masters"
} }
func (m *Mesos) SetDefaults() { func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}
s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}
return url.Parse(s)
}
func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 { if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER] m.MasterCols = allMetrics[MASTER]
} }
@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() {
} }
if m.Timeout == 0 { if m.Timeout == 0 {
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)") log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100 m.Timeout = 100
} }
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}
client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client
return nil
} }
// Gather() metrics from given list of Mesos Masters // Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error { func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup if !m.initialized {
err := m.initialize()
m.SetDefaults() if err != nil {
return err
for _, v := range m.Masters { }
wg.Add(1) m.initialized = true
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
wg.Done()
return
}(v)
} }
for _, v := range m.Slaves { var wg sync.WaitGroup
for _, master := range m.masterURLs {
wg.Add(1) wg.Add(1)
go func(c string) { go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc)) acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done() wg.Done()
return return
}(v) }(master)
}
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
return
}(slave)
// if !m.SlaveTasks { // if !m.SlaveTasks {
// continue // continue
@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
// wg.Add(1) // wg.Add(1)
// go func(c string) { // go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc)) // acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// wg.Done() // wg.Done()
// return // return
// }(v) // }(v)
@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}
return client, nil
}
// metricsDiff() returns set names for removal // metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string { func metricsDiff(role Role, w []string) []string {
b := []string{} b := []string{}
@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
} }
} }
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics // TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct { type TaskStats struct {
ExecutorID string `json:"executor_id"` ExecutorID string `json:"executor_id"`
@ -409,22 +501,15 @@ type TaskStats struct {
Statistics map[string]interface{} `json:"statistics"` Statistics map[string]interface{} `json:"statistics"`
} }
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error { func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
var metrics []TaskStats var metrics []TaskStats
host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}
tags := map[string]string{ tags := map[string]string{
"server": host, "server": u.Hostname(),
"url": urlTag(u),
} }
ts := strconv.Itoa(m.Timeout) + "ms" resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
if err != nil { if err != nil {
return err return err
@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
return nil return nil
} }
func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
return &c
}
func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}
// This should not belong to the object // This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error { func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{} var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
}
tags := map[string]string{ tags := map[string]string{
"server": host, "server": u.Hostname(),
"url": urlTag(u),
"role": string(role), "role": string(role),
} }
ts := strconv.Itoa(m.Timeout) + "ms" resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
if err != nil { if err != nil {
return err return err

View File

@ -6,10 +6,12 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"os" "os"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
var masterMetrics map[string]interface{} var masterMetrics map[string]interface{}
@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
} }
} }
} }
func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}
func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}