diff --git a/README.md b/README.md index 4da35f744..aff4d2073 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,7 @@ For documentation on the latest development code see the [documentation index][d * [nginx_vts](./plugins/inputs/nginx_vts) * [nsq_consumer](./plugins/inputs/nsq_consumer) * [nginx_vts](./plugins/inputs/nginx_vts) +* [nginx_upstream_check](./plugins/inputs/nginx_upstream_check) * [nsq](./plugins/inputs/nsq) * [nstat](./plugins/inputs/nstat) * [ntpq](./plugins/inputs/ntpq) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2b189e5ff..106c0118c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -85,6 +85,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus" _ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus_api" + _ "github.com/influxdata/telegraf/plugins/inputs/nginx_upstream_check" _ "github.com/influxdata/telegraf/plugins/inputs/nginx_vts" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" _ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer" diff --git a/plugins/inputs/nginx_upstream_check/README.md b/plugins/inputs/nginx_upstream_check/README.md new file mode 100644 index 000000000..4ff76889d --- /dev/null +++ b/plugins/inputs/nginx_upstream_check/README.md @@ -0,0 +1,75 @@ +# Telegraf Plugin: Nginx_upstream_check + +Read the status output of the nginx_upstream_check (https://github.com/yaoweibin/nginx_upstream_check_module). +This module can periodically check the servers in the Nginx's upstream with configured request and interval to determine +if the server is still available. If checks are failed the server is marked as "down" and will not receive any requests +until the check will pass and a server will be marked as "up" again. + +The status page displays the current status of all upstreams and servers as well as number of the failed and successful +checks. This information can be exported in JSON format and parsed by this input. + +### Configuration: + +``` + ## An URL where Nginx Upstream check module is enabled + ## It should be set to return a JSON formatted response + url = "http://127.0.0.1/status?format=json" + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "check.example.com" + + ## Timeout for HTTP requests + timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +### Measurements & Fields: + +- Measurement + - fall (The number of failed server check attempts, counter) + - rise (The number of successful server check attempts, counter) + - status (The reporter server status as a string) + - status_code (The server status code. 1 - up, 2 - down, 0 - other) + +The "status_code" field most likely will be the most useful one because it allows you to determine the current +state of every server and, possible, add some monitoring to watch over it. InfluxDB can use string values and the +"status" field can be used instead, but for most other monitoring solutions the integer code will be appropriate. + +### Tags: + +- All measurements have the following tags: + - name (The hostname or IP of the upstream server) + - port (The alternative check port, 0 if the default one is used) + - type (The check type, http/tcp) + - upstream (The name of the upstream block in the Nginx configuration) + - url (The status url used by telegraf) + +### Example Output: + +When run with: +``` +./telegraf --config telegraf.conf --input-filter nginx_upstream_check --test +``` + +It produces: +``` +* Plugin: nginx_upstream_check, Collection 1 +> nginx_upstream_check,host=node1,name=192.168.0.1:8080,port=0,type=http,upstream=my_backends,url=http://127.0.0.1:80/status?format\=json fall=0i,rise=100i,status="up",status_code=1i 1529088524000000000 +> nginx_upstream_check,host=node2,name=192.168.0.2:8080,port=0,type=http,upstream=my_backends,url=http://127.0.0.1:80/status?format\=json fall=100i,rise=0i,status="down",status_code=2i 1529088524000000000 +``` diff --git a/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go b/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go new file mode 100644 index 000000000..e5a2e096d --- /dev/null +++ b/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go @@ -0,0 +1,224 @@ +package nginx_upstream_check + +import ( + "encoding/json" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "net/http" + "net/url" + "strconv" + "time" +) + +const sampleConfig = ` + ## An URL where Nginx Upstream check module is enabled + ## It should be set to return a JSON formatted response + url = "http://127.0.0.1/status?format=json" + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "check.example.com" + + ## Timeout for HTTP requests + timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +const description = "Read nginx_upstream_check module status information (https://github.com/yaoweibin/nginx_upstream_check_module)" + +type NginxUpstreamCheck struct { + URL string `toml:"uls"` + + Username string `toml:"username"` + Password string `toml:"password"` + Method string `toml:"method"` + Headers map[string]string `toml:"headers"` + HostHeader string `toml:"host_header"` + Timeout internal.Duration `toml:"timeout"` + + tls.ClientConfig + client *http.Client +} + +func NewNginxUpstreamCheck() *NginxUpstreamCheck { + return &NginxUpstreamCheck{ + URL: "http://127.0.0.1/status?format=json", + Method: "GET", + Headers: make(map[string]string), + HostHeader: "", + Timeout: internal.Duration{Duration: time.Second * 5}, + } +} + +func init() { + inputs.Add("nginx_upstream_check", func() telegraf.Input { + return NewNginxUpstreamCheck() + }) +} + +func (check *NginxUpstreamCheck) SampleConfig() string { + return sampleConfig +} + +func (check *NginxUpstreamCheck) Description() string { + return description +} + +type NginxUpstreamCheckData struct { + Servers struct { + Total uint64 `json:"total"` + Generation uint64 `json:"generation"` + Server []NginxUpstreamCheckServer `json:"server"` + } `json:"servers"` +} + +type NginxUpstreamCheckServer struct { + Index uint64 `json:"index"` + Upstream string `json:"upstream"` + Name string `json:"name"` + Status string `json:"status"` + Rise uint64 `json:"rise"` + Fall uint64 `json:"fall"` + Type string `json:"type"` + Port uint16 `json:"port"` +} + +// createHttpClient create a clients to access API +func (check *NginxUpstreamCheck) createHttpClient() (*http.Client, error) { + tlsConfig, err := check.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: check.Timeout.Duration, + } + + return client, nil +} + +// gatherJsonData query the data source and parse the response JSON +func (check *NginxUpstreamCheck) gatherJsonData(url string, value interface{}) error { + + var method string + if check.Method != "" { + method = check.Method + } else { + method = "GET" + } + + request, err := http.NewRequest(method, url, nil) + if err != nil { + return err + } + + if (check.Username != "") || (check.Password != "") { + request.SetBasicAuth(check.Username, check.Password) + } + for header, value := range check.Headers { + request.Header.Add(header, value) + } + if check.HostHeader != "" { + request.Host = check.HostHeader + } + + response, err := check.client.Do(request) + if err != nil { + return err + } + + defer response.Body.Close() + + err = json.NewDecoder(response.Body).Decode(value) + if err != nil { + return err + } + + return nil +} + +func (check *NginxUpstreamCheck) Gather(accumulator telegraf.Accumulator) error { + if check.client == nil { + client, err := check.createHttpClient() + + if err != nil { + return err + } + check.client = client + } + + statusURL, err := url.Parse(check.URL) + if err != nil { + return err + } + + err = check.gatherStatusData(statusURL.String(), accumulator) + if err != nil { + return err + } + + return nil + +} + +func (check *NginxUpstreamCheck) gatherStatusData(url string, accumulator telegraf.Accumulator) error { + checkData := &NginxUpstreamCheckData{} + + err := check.gatherJsonData(url, checkData) + if err != nil { + return err + } + + for _, server := range checkData.Servers.Server { + + tags := map[string]string{ + "upstream": server.Upstream, + "type": server.Type, + "name": server.Name, + "port": strconv.Itoa(int(server.Port)), + "url": url, + } + + fields := map[string]interface{}{ + "status": server.Status, + "status_code": check.getStatusCode(server.Status), + "rise": server.Rise, + "fall": server.Fall, + } + + accumulator.AddFields("nginx_upstream_check", fields, tags) + } + + return nil +} + +func (check *NginxUpstreamCheck) getStatusCode(status string) uint8 { + switch status { + case "up": + return 1 + case "down": + return 2 + default: + return 0 + } +} diff --git a/plugins/inputs/nginx_upstream_check/nginx_upstream_check_test.go b/plugins/inputs/nginx_upstream_check/nginx_upstream_check_test.go new file mode 100644 index 000000000..1b70770d0 --- /dev/null +++ b/plugins/inputs/nginx_upstream_check/nginx_upstream_check_test.go @@ -0,0 +1,135 @@ +package nginx_upstream_check + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const sampleStatusResponse = ` +{ + "servers": { + "total": 2, + "generation": 1, + "server": [ + { + "index": 0, + "upstream": "upstream-1", + "name": "127.0.0.1:8081", + "status": "up", + "rise": 1000, + "fall": 0, + "type": "http", + "port": 0 + }, + { + "index": 1, + "upstream": "upstream-2", + "name": "127.0.0.1:8082", + "status": "down", + "rise": 0, + "fall": 2000, + "type": "tcp", + "port": 8080 + } + ] + } +} +` + +func TestNginxUpstreamCheckData(test *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) { + var response string + + if request.URL.Path == "/status" { + response = sampleStatusResponse + responseWriter.Header()["Content-Type"] = []string{"application/json"} + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(responseWriter, response) + })) + defer testServer.Close() + + check := NewNginxUpstreamCheck() + check.URL = fmt.Sprintf("%s/status", testServer.URL) + + var accumulator testutil.Accumulator + + checkError := check.Gather(&accumulator) + require.NoError(test, checkError) + + accumulator.AssertContainsTaggedFields( + test, + "nginx_upstream_check", + map[string]interface{}{ + "status": string("up"), + "status_code": uint8(1), + "rise": uint64(1000), + "fall": uint64(0), + }, + map[string]string{ + "upstream": string("upstream-1"), + "type": string("http"), + "name": string("127.0.0.1:8081"), + "port": string("0"), + "url": fmt.Sprintf("%s/status", testServer.URL), + }) + + accumulator.AssertContainsTaggedFields( + test, + "nginx_upstream_check", + map[string]interface{}{ + "status": string("down"), + "status_code": uint8(2), + "rise": uint64(0), + "fall": uint64(2000), + }, + map[string]string{ + "upstream": string("upstream-2"), + "type": string("tcp"), + "name": string("127.0.0.1:8082"), + "port": string("8080"), + "url": fmt.Sprintf("%s/status", testServer.URL), + }) +} + +func TestNginxUpstreamCheckRequest(test *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) { + var response string + + if request.URL.Path == "/status" { + response = sampleStatusResponse + responseWriter.Header()["Content-Type"] = []string{"application/json"} + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(responseWriter, response) + + require.Equal(test, request.Method, "POST") + require.Equal(test, request.Header.Get("X-Test"), "test-value") + require.Equal(test, request.Header.Get("Authorization"), "Basic dXNlcjpwYXNzd29yZA==") + require.Equal(test, request.Host, "status.local") + + })) + defer testServer.Close() + + check := NewNginxUpstreamCheck() + check.URL = fmt.Sprintf("%s/status", testServer.URL) + check.Headers["X-test"] = "test-value" + check.HostHeader = "status.local" + check.Username = "user" + check.Password = "password" + check.Method = "POST" + + var accumulator testutil.Accumulator + + checkError := check.Gather(&accumulator) + require.NoError(test, checkError) +}