diff --git a/plugins/inputs/mesos/README.md b/plugins/inputs/mesos/README.md index 67590c3ff..46df267aa 100644 --- a/plugins/inputs/mesos/README.md +++ b/plugins/inputs/mesos/README.md @@ -11,7 +11,7 @@ For more information, please check the [Mesos Observability Metrics](http://meso ## Timeout, in ms. timeout = 100 ## A list of Mesos masters. - masters = ["localhost:5050"] + masters = ["http://localhost:5050"] ## Master metrics groups to be collected, by default, all enabled. master_collections = [ "resources", @@ -35,6 +35,13 @@ For more information, please check the [Mesos Observability Metrics](http://meso # "tasks", # "messages", # ] + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ``` By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default @@ -235,7 +242,8 @@ Mesos slave metric groups ### Tags: - All master/slave measurements have the following tags: - - server + - server (network location of server: `host:port`) + - url (URL origin of server: `scheme://host:port`) - role (master/slave) - All master measurements have the extra tags: diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index e37eabf5d..5b0697cab 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -7,11 +7,14 @@ import ( "log" "net" "net/http" + "net/url" "strconv" + "strings" "sync" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) @@ -30,6 +33,20 @@ type Mesos struct { Slaves []string SlaveCols []string `toml:"slave_collections"` //SlaveTasks bool + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + initialized bool + client *http.Client + masterURLs []*url.URL + slaveURLs []*url.URL } var allMetrics = map[Role][]string{ @@ -41,7 +58,7 @@ var sampleConfig = ` ## Timeout, in ms. timeout = 100 ## A list of Mesos masters. - masters = ["localhost:5050"] + masters = ["http://localhost:5050"] ## Master metrics groups to be collected, by default, all enabled. master_collections = [ "resources", @@ -65,6 +82,13 @@ var sampleConfig = ` # "tasks", # "messages", # ] + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false ` // SampleConfig returns a sample configuration block @@ -77,7 +101,28 @@ func (m *Mesos) Description() string { return "Telegraf plugin for gathering metrics from N Mesos masters" } -func (m *Mesos) SetDefaults() { +func parseURL(s string, role Role) (*url.URL, error) { + if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { + host, port, err := net.SplitHostPort(s) + // no port specified + if err != nil { + host = s + switch role { + case MASTER: + port = "5050" + case SLAVE: + port = "5051" + } + } + + s = "http://" + host + ":" + port + log.Printf("W! [inputs.mesos] Using %q as connection URL; please update your configuration to use an URL", s) + } + + return url.Parse(s) +} + +func (m *Mesos) initialize() error { if len(m.MasterCols) == 0 { m.MasterCols = allMetrics[MASTER] } @@ -87,33 +132,71 @@ func (m *Mesos) SetDefaults() { } if m.Timeout == 0 { - log.Println("I! [mesos] Missing timeout value, setting default value (100ms)") + log.Println("I! [inputs.mesos] Missing timeout value, setting default value (100ms)") m.Timeout = 100 } + + rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms" + + m.masterURLs = make([]*url.URL, 0, len(m.Masters)) + for _, master := range m.Masters { + u, err := parseURL(master, MASTER) + if err != nil { + return err + } + + u.RawQuery = rawQuery + m.masterURLs = append(m.masterURLs, u) + } + + m.slaveURLs = make([]*url.URL, 0, len(m.Slaves)) + for _, slave := range m.Slaves { + u, err := parseURL(slave, SLAVE) + if err != nil { + return err + } + + u.RawQuery = rawQuery + m.slaveURLs = append(m.slaveURLs, u) + } + + client, err := m.createHttpClient() + if err != nil { + return err + } + m.client = client + + return nil } // Gather() metrics from given list of Mesos Masters func (m *Mesos) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - - m.SetDefaults() - - for _, v := range m.Masters { - wg.Add(1) - go func(c string) { - acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc)) - wg.Done() - return - }(v) + if !m.initialized { + err := m.initialize() + if err != nil { + return err + } + m.initialized = true } - for _, v := range m.Slaves { + var wg sync.WaitGroup + + for _, master := range m.masterURLs { wg.Add(1) - go func(c string) { - acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc)) + go func(master *url.URL) { + acc.AddError(m.gatherMainMetrics(master, MASTER, acc)) wg.Done() return - }(v) + }(master) + } + + for _, slave := range m.slaveURLs { + wg.Add(1) + go func(slave *url.URL) { + acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc)) + wg.Done() + return + }(slave) // if !m.SlaveTasks { // continue @@ -121,7 +204,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { // wg.Add(1) // go func(c string) { - // acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc)) + // acc.AddError(m.gatherSlaveTaskMetrics(slave, acc)) // wg.Done() // return // }(v) @@ -132,6 +215,24 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { return nil } +func (m *Mesos) createHttpClient() (*http.Client, error) { + tlsCfg, err := internal.GetTLSConfig( + m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + }, + Timeout: 4 * time.Second, + } + + return client, nil +} + // metricsDiff() returns set names for removal func metricsDiff(role Role, w []string) []string { b := []string{} @@ -393,15 +494,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) { } } -var tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), -} - -var client = &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), -} - // TaskStats struct for JSON API output /monitor/statistics type TaskStats struct { ExecutorID string `json:"executor_id"` @@ -409,22 +501,15 @@ type TaskStats struct { Statistics map[string]interface{} `json:"statistics"` } -func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc telegraf.Accumulator) error { +func (m *Mesos) gatherSlaveTaskMetrics(u *url.URL, acc telegraf.Accumulator) error { var metrics []TaskStats - host, _, err := net.SplitHostPort(address) - if err != nil { - host = address - address = address + defaultPort - } - tags := map[string]string{ - "server": host, + "server": u.Hostname(), + "url": urlTag(u), } - ts := strconv.Itoa(m.Timeout) + "ms" - - resp, err := client.Get("http://" + address + "/monitor/statistics?timeout=" + ts) + resp, err := m.client.Get(withPath(u, "/monitor/statistics").String()) if err != nil { return err @@ -459,24 +544,31 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t return nil } +func withPath(u *url.URL, path string) *url.URL { + c := *u + c.Path = path + return &c +} + +func urlTag(u *url.URL) string { + c := *u + c.Path = "" + c.User = nil + c.RawQuery = "" + return c.String() +} + // This should not belong to the object -func (m *Mesos) gatherMainMetrics(a string, defaultPort string, role Role, acc telegraf.Accumulator) error { +func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error { var jsonOut map[string]interface{} - host, _, err := net.SplitHostPort(a) - if err != nil { - host = a - a = a + defaultPort - } - tags := map[string]string{ - "server": host, + "server": u.Hostname(), + "url": urlTag(u), "role": string(role), } - ts := strconv.Itoa(m.Timeout) + "ms" - - resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts) + resp, err := m.client.Get(withPath(u, "/metrics/snapshot").String()) if err != nil { return err diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index a7705d11e..905adb6e3 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -6,10 +6,12 @@ import ( "math/rand" "net/http" "net/http/httptest" + "net/url" "os" "testing" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) var masterMetrics map[string]interface{} @@ -378,3 +380,19 @@ func TestSlaveFilter(t *testing.T) { } } } + +func TestWithPathDoesNotModify(t *testing.T) { + u, err := url.Parse("http://localhost:5051") + require.NoError(t, err) + v := withPath(u, "/xyzzy") + require.Equal(t, u.String(), "http://localhost:5051") + require.Equal(t, v.String(), "http://localhost:5051/xyzzy") +} + +func TestURLTagDoesNotModify(t *testing.T) { + u, err := url.Parse("http://a:b@localhost:5051?timeout=1ms") + require.NoError(t, err) + v := urlTag(u) + require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms") + require.Equal(t, v, "http://localhost:5051") +}