Add TLS support to the mesos input plugin (#3769)

This commit is contained in:
Daniel Nelson 2018-02-07 18:36:38 -08:00 committed by GitHub
parent d467a20b2c
commit 2950a3bdeb
3 changed files with 170 additions and 52 deletions

View File

@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
@ -235,7 +242,8 @@ Mesos slave metric groups
### Tags:
- All master/slave measurements have the following tags:
- server
- server (network location of server: `host:port`)
- url (URL origin of server: `scheme://host:port`)
- role (master/slave)
- All master measurements have the extra tags:

View File

@ -7,11 +7,14 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
@ -30,6 +33,20 @@ type Mesos struct {
Slaves []string
SlaveCols []string `toml:"slave_collections"`
//SlaveTasks bool
// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
initialized bool
client *http.Client
masterURLs []*url.URL
slaveURLs []*url.URL
}
var allMetrics = map[Role][]string{
@ -41,7 +58,7 @@ var sampleConfig = `
## Timeout, in ms.
timeout = 100
## A list of Mesos masters.
masters = ["localhost:5050"]
masters = ["http://localhost:5050"]
## Master metrics groups to be collected, by default, all enabled.
master_collections = [
"resources",
@ -65,6 +82,13 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
`
// SampleConfig returns a sample configuration block
@ -77,7 +101,28 @@ func (m *Mesos) Description() string {
return "Telegraf plugin for gathering metrics from N Mesos masters"
}
func (m *Mesos) SetDefaults() {
func parseURL(s string, role Role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
port = "5050"
case SLAVE:
port = "5051"
}
}
s = "http://" + host + ":" + port
log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s)
}
return url.Parse(s)
}
func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
}
@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() {
}
if m.Timeout == 0 {
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := parseURL(master, MASTER)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.masterURLs = append(m.masterURLs, u)
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := parseURL(slave, SLAVE)
if err != nil {
return err
}
u.RawQuery = rawQuery
m.slaveURLs = append(m.slaveURLs, u)
}
client, err := m.createHttpClient()
if err != nil {
return err
}
m.client = client
return nil
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
m.SetDefaults()
for _, v := range m.Masters {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc))
wg.Done()
return
}(v)
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}
for _, v := range m.Slaves {
var wg sync.WaitGroup
for _, master := range m.masterURLs {
wg.Add(1)
go func(c string) {
acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc))
go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done()
return
}(v)
}(master)
}
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
return
}(slave)
// if !m.SlaveTasks {
// continue
@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
// wg.Add(1)
// go func(c string) {
// acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc))
// acc.AddError(m.gatherSlaveTaskMetrics(slave, acc))
// wg.Done()
// return
// }(v)
@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return nil
}
func (m *Mesos) createHttpClient() (*http.Client, error) {
tlsCfg, err := internal.GetTLSConfig(
m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: 4 * time.Second,
}
return client, nil
}
// metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string {
b := []string{}
@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
}
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
@ -409,22 +501,15 @@ type TaskStats struct {
Statistics map[string]interface{} `json:"statistics"`
}
func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error {
func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error {
var metrics []TaskStats
host, _, err := net.SplitHostPort(address)
if err != nil {
host = address
address = address + defaultPort
}
tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/monitor/statistics").String())
if err != nil {
return err
@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
return nil
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
host, _, err := net.SplitHostPort(a)
if err != nil {
host = a
a = a + defaultPort
func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
return &c
}
func urlTag(u *url.URL) string {
c := *u
c.Path = ""
c.User = nil
c.RawQuery = ""
return c.String()
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
tags := map[string]string{
"server": host,
"server": u.Hostname(),
"url": urlTag(u),
"role": string(role),
}
ts := strconv.Itoa(m.Timeout) + "ms"
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String())
if err != nil {
return err

View File

@ -6,10 +6,12 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var masterMetrics map[string]interface{}
@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) {
}
}
}
func TestWithPathDoesNotModify(t *testing.T) {
u, err := url.Parse("http://localhost:5051")
require.NoError(t, err)
v := withPath(u, "/xyzzy")
require.Equal(t, u.String(), "http://localhost:5051")
require.Equal(t, v.String(), "http://localhost:5051/xyzzy")
}
func TestURLTagDoesNotModify(t *testing.T) {
u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms")
require.NoError(t, err)
v := urlTag(u)
require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms")
require.Equal(t, v, "http://localhost:5051")
}