diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0dbbb613d..1a386d97c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -20,6 +20,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/exec" _ "github.com/influxdata/telegraf/plugins/inputs/filestat" _ "github.com/influxdata/telegraf/plugins/inputs/github_webhooks" + _ "github.com/influxdata/telegraf/plugins/inputs/graylog" _ "github.com/influxdata/telegraf/plugins/inputs/haproxy" _ "github.com/influxdata/telegraf/plugins/inputs/http_response" _ "github.com/influxdata/telegraf/plugins/inputs/httpjson" diff --git a/plugins/inputs/graylog/README.md b/plugins/inputs/graylog/README.md new file mode 100644 index 000000000..9d033e20a --- /dev/null +++ b/plugins/inputs/graylog/README.md @@ -0,0 +1,46 @@ +# GrayLog plugin + +The Graylog plugin can collect data from remote Graylog service URLs. + +Plugin currently support two type of end points:- + +- multiple (Ex http://[graylog-server-ip]:12900/system/metrics/multiple) +- namespace (Ex http://[graylog-server-ip]:12900/system/metrics/namespace/{namespace}) + +End Point can be a mixe of one multiple end point and several namespaces end points + + +Note: if namespace end point specified metrics array will be ignored for that call. + +Sample configration +``` +[[inputs.graylog]] + ## API End Point, currently supported API: + ## - multiple (Ex http://[graylog-server-ip]:12900/system/metrics/multiple) + ## - namespace (Ex http://[graylog-server-ip]:12900/system/metrics/namespace/{namespace}) + ## Note if namespace end point specified metrics array will be ignored for that call. + ## End point can contain namespace and multiple type calls + ## Please check http://[graylog-server-ip]:12900/api-browser for full list end points + + servers = [ + "http://10.224.162.16:12900/system/metrics/multiple" + ] + + #Metrics define metric which will be pulled from GrayLog and reported to the defined Output + metrics = [ + "jvm.cl.loaded", + "jvm.memory.pools.Metaspace.committed" + ] + ## User name and password + username = "put-username-here" + password = "put-password-here" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false +``` + +Please refer to GrayLog metrics api browser for full metric end points http://10.224.162.16:12900/api-browser diff --git a/plugins/inputs/graylog/graylog.go b/plugins/inputs/graylog/graylog.go new file mode 100644 index 000000000..b9b5ade25 --- /dev/null +++ b/plugins/inputs/graylog/graylog.go @@ -0,0 +1,307 @@ +package graylog + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type ResponseMetrics struct { + total int + Metrics []Metric `json:"metrics"` +} + +type Metric struct { + FullName string `json:"full_name"` + Name string `json:"name"` + Type string `json:"type"` + Fields map[string]interface{} `json:"metric"` +} + +type GrayLog struct { + Servers []string + Metrics []string + Username string + Password string + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + client HTTPClient +} + +type HTTPClient interface { + // Returns the result of an http request + // + // Parameters: + // req: HTTP request object + // + // Returns: + // http.Response: HTTP respons object + // error : Any error that may have occurred + MakeRequest(req *http.Request) (*http.Response, error) + + SetHTTPClient(client *http.Client) + HTTPClient() *http.Client +} + +type Messagebody struct { + Metrics []string `json:"metrics"` +} + +type RealHTTPClient struct { + client *http.Client +} + +func (c *RealHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { + return c.client.Do(req) +} + +func (c *RealHTTPClient) SetHTTPClient(client *http.Client) { + c.client = client +} + +func (c *RealHTTPClient) HTTPClient() *http.Client { + return c.client +} + +var sampleConfig = ` + ## API End Point, currently supported API: + ## - multiple (Ex http://[graylog-server-ip]:12900/system/metrics/multiple) + ## - namespace (Ex http://[graylog-server-ip]:12900/system/metrics/namespace/{namespace}) + ## Note if namespace end point specified metrics array will be ignored for that call. + ## End point can contain namespace and multiple type calls + ## Please check http://[graylog-server-ip]:12900/api-browser for full list end points + servers = [ + "http://[graylog-server-ip]:12900/system/metrics/multiple", + ] + + ## metrics list + ## List of metrics can be found on Graylog webservice documentation + ## Or by hitting the the web service api http://[graylog-host]:12900/system/metrics + metrics = [ + "jvm.cl.loaded", + "jvm.memory.pools.Metaspace.committed" + ] + + ## User name and password + username = "put-username-here" + password = "put-password-here" + + ## Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ## Use SSL but skip chain & host verification + # insecure_skip_verify = false +` + +func (h *GrayLog) SampleConfig() string { + return sampleConfig +} + +func (h *GrayLog) Description() string { + return "Read flattened metrics from one or more GrayLog HTTP endpoints" +} + +// Gathers data for all servers. +func (h *GrayLog) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + if h.client.HTTPClient() == nil { + tlsCfg, err := internal.GetTLSConfig( + h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify) + if err != nil { + return err + } + tr := &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + TLSClientConfig: tlsCfg, + } + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(4 * time.Second), + } + h.client.SetHTTPClient(client) + } + + errorChannel := make(chan error, len(h.Servers)) + + for _, server := range h.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + if err := h.gatherServer(acc, server); err != nil { + errorChannel <- err + } + }(server) + } + + wg.Wait() + close(errorChannel) + + // Get all errors and return them as one giant error + errorStrings := []string{} + for err := range errorChannel { + errorStrings = append(errorStrings, err.Error()) + } + + if len(errorStrings) == 0 { + return nil + } + return errors.New(strings.Join(errorStrings, "\n")) +} + +// Gathers data from a particular server +// Parameters: +// acc : The telegraf Accumulator to use +// serverURL: endpoint to send request to +// service : the service being queried +// +// Returns: +// error: Any error that may have occurred +func (h *GrayLog) gatherServer( + acc telegraf.Accumulator, + serverURL string, +) error { + resp, _, err := h.sendRequest(serverURL) + if err != nil { + return err + } + requestURL, err := url.Parse(serverURL) + host, port, _ := net.SplitHostPort(requestURL.Host) + var dat ResponseMetrics + if err != nil { + return err + } + if err := json.Unmarshal([]byte(resp), &dat); err != nil { + return err + } + for _, m_item := range dat.Metrics { + fields := make(map[string]interface{}) + tags := map[string]string{ + "server": host, + "port": port, + "name": m_item.Name, + "type": m_item.Type, + } + h.flatten(m_item.Fields, fields, "") + acc.AddFields(m_item.FullName, fields, tags) + } + return nil +} + +// Flatten JSON hierarchy to produce field name and field value +// Parameters: +// item: Item map to flatten +// fields: Map to store generated fields. +// id: Prefix for top level metric (empty string "") +// Returns: +// void +func (h *GrayLog) flatten(item map[string]interface{}, fields map[string]interface{}, id string) { + if id != "" { + id = id + "_" + } + for k, i := range item { + switch i.(type) { + case int: + fields[id+k] = i.(float64) + case float64: + fields[id+k] = i.(float64) + case map[string]interface{}: + h.flatten(i.(map[string]interface{}), fields, id+k) + default: + } + } +} + +// Sends an HTTP request to the server using the GrayLog object's HTTPClient. +// Parameters: +// serverURL: endpoint to send request to +// +// Returns: +// string: body of the response +// error : Any error that may have occurred +func (h *GrayLog) sendRequest(serverURL string) (string, float64, error) { + headers := map[string]string{ + "Content-Type": "application/json", + "Accept": "application/json", + } + method := "GET" + content := bytes.NewBufferString("") + headers["Authorization"] = "Basic " + base64.URLEncoding.EncodeToString([]byte(h.Username+":"+h.Password)) + // Prepare URL + requestURL, err := url.Parse(serverURL) + if err != nil { + return "", -1, fmt.Errorf("Invalid server URL \"%s\"", serverURL) + } + if strings.Contains(requestURL.String(), "multiple") { + m := &Messagebody{Metrics: h.Metrics} + http_body, err := json.Marshal(m) + if err != nil { + return "", -1, fmt.Errorf("Invalid list of Metrics %s", h.Metrics) + } + method = "POST" + content = bytes.NewBuffer(http_body) + } + req, err := http.NewRequest(method, requestURL.String(), content) + if err != nil { + return "", -1, err + } + // Add header parameters + for k, v := range headers { + req.Header.Add(k, v) + } + start := time.Now() + resp, err := h.client.MakeRequest(req) + if err != nil { + return "", -1, err + } + + defer resp.Body.Close() + responseTime := time.Since(start).Seconds() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return string(body), responseTime, err + } + + // Process response + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", + requestURL.String(), + resp.StatusCode, + http.StatusText(resp.StatusCode), + http.StatusOK, + http.StatusText(http.StatusOK)) + return string(body), responseTime, err + } + return string(body), responseTime, err +} + +func init() { + inputs.Add("graylog", func() telegraf.Input { + return &GrayLog{ + client: &RealHTTPClient{}, + } + }) +} diff --git a/plugins/inputs/graylog/graylog_test.go b/plugins/inputs/graylog/graylog_test.go new file mode 100644 index 000000000..09bca454d --- /dev/null +++ b/plugins/inputs/graylog/graylog_test.go @@ -0,0 +1,199 @@ +package graylog + +import ( + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const validJSON = ` + { + "total": 3, + "metrics": [ + { + "full_name": "jvm.cl.loaded", + "metric": { + "value": 18910 + }, + "name": "loaded", + "type": "gauge" + }, + { + "full_name": "jvm.memory.pools.Metaspace.committed", + "metric": { + "value": 108040192 + }, + "name": "committed", + "type": "gauge" + }, + { + "full_name": "org.graylog2.shared.journal.KafkaJournal.writeTime", + "metric": { + "time": { + "min": 99 + }, + "rate": { + "total": 10, + "mean": 2 + }, + "duration_unit": "microseconds", + "rate_unit": "events/second" + }, + "name": "writeTime", + "type": "hdrtimer" + } + ] + }` + +var validTags = map[string]map[string]string{ + "jvm.cl.loaded": { + "name": "loaded", + "type": "gauge", + "port": "12900", + "server": "localhost", + }, + "jvm.memory.pools.Metaspace.committed": { + "name": "committed", + "type": "gauge", + "port": "12900", + "server": "localhost", + }, + "org.graylog2.shared.journal.KafkaJournal.writeTime": { + "name": "writeTime", + "type": "hdrtimer", + "port": "12900", + "server": "localhost", + }, +} + +var expectedFields = map[string]map[string]interface{}{ + "jvm.cl.loaded": { + "value": float64(18910), + }, + "jvm.memory.pools.Metaspace.committed": { + "value": float64(108040192), + }, + "org.graylog2.shared.journal.KafkaJournal.writeTime": { + "time_min": float64(99), + "rate_total": float64(10), + "rate_mean": float64(2), + }, +} + +const invalidJSON = "I don't think this is JSON" + +const empty = "" + +type mockHTTPClient struct { + responseBody string + statusCode int +} + +// Mock implementation of MakeRequest. Usually returns an http.Response with +// hard-coded responseBody and statusCode. However, if the request uses a +// nonstandard method, it uses status code 405 (method not allowed) +func (c *mockHTTPClient) MakeRequest(req *http.Request) (*http.Response, error) { + resp := http.Response{} + resp.StatusCode = c.statusCode + + // basic error checking on request method + allowedMethods := []string{"GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"} + methodValid := false + for _, method := range allowedMethods { + if req.Method == method { + methodValid = true + break + } + } + + if !methodValid { + resp.StatusCode = 405 // Method not allowed + } + + resp.Body = ioutil.NopCloser(strings.NewReader(c.responseBody)) + return &resp, nil +} + +func (c *mockHTTPClient) SetHTTPClient(_ *http.Client) { +} + +func (c *mockHTTPClient) HTTPClient() *http.Client { + return nil +} + +// Generates a pointer to an HttpJson object that uses a mock HTTP client. +// Parameters: +// response : Body of the response that the mock HTTP client should return +// statusCode: HTTP status code the mock HTTP client should return +// +// Returns: +// *HttpJson: Pointer to an HttpJson object that uses the generated mock HTTP client +func genMockGrayLog(response string, statusCode int) []*GrayLog { + return []*GrayLog{ + &GrayLog{ + client: &mockHTTPClient{responseBody: response, statusCode: statusCode}, + Servers: []string{ + "http://localhost:12900/system/metrics/multiple", + }, + Metrics: []string{ + "jvm.memory.pools.Metaspace.committed", + "jvm.cl.loaded", + "org.graylog2.shared.journal.KafkaJournal.writeTime", + }, + Username: "test", + Password: "test", + }, + } +} + +// Test that the proper values are ignored or collected +func TestNormalResponse(t *testing.T) { + graylog := genMockGrayLog(validJSON, 200) + + for _, service := range graylog { + var acc testutil.Accumulator + err := service.Gather(&acc) + require.NoError(t, err) + for k, v := range expectedFields { + acc.AssertContainsTaggedFields(t, k, v, validTags[k]) + } + } +} + +// Test response to HTTP 500 +func TestHttpJson500(t *testing.T) { + graylog := genMockGrayLog(validJSON, 500) + + var acc testutil.Accumulator + err := graylog[0].Gather(&acc) + + assert.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +} + +// Test response to malformed JSON +func TestHttpJsonBadJson(t *testing.T) { + graylog := genMockGrayLog(invalidJSON, 200) + + var acc testutil.Accumulator + err := graylog[0].Gather(&acc) + + assert.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +} + +// Test response to empty string as response objectgT +func TestHttpJsonEmptyResponse(t *testing.T) { + graylog := genMockGrayLog(empty, 200) + + var acc testutil.Accumulator + err := graylog[0].Gather(&acc) + + assert.NotNil(t, err) + assert.Equal(t, 0, acc.NFields()) +}