Add nginx_upstream_check input plugin (#4303)

This commit is contained in:
Dmitry Ilyin 2019-01-15 23:56:40 +03:00 committed by Daniel Nelson
parent 059ab5d16b
commit e404e5145b
5 changed files with 436 additions and 0 deletions

View File

@ -219,6 +219,7 @@ For documentation on the latest development code see the [documentation index][d
* [nginx_vts](./plugins/inputs/nginx_vts)
* [nsq_consumer](./plugins/inputs/nsq_consumer)
* [nginx_vts](./plugins/inputs/nginx_vts)
* [nginx_upstream_check](./plugins/inputs/nginx_upstream_check)
* [nsq](./plugins/inputs/nsq)
* [nstat](./plugins/inputs/nstat)
* [ntpq](./plugins/inputs/ntpq)

View File

@ -85,6 +85,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus_api"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_upstream_check"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_vts"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"

View File

@ -0,0 +1,75 @@
# Telegraf Plugin: Nginx_upstream_check
Read the status output of the nginx_upstream_check (https://github.com/yaoweibin/nginx_upstream_check_module).
This module can periodically check the servers in the Nginx's upstream with configured request and interval to determine
if the server is still available. If checks are failed the server is marked as "down" and will not receive any requests
until the check will pass and a server will be marked as "up" again.
The status page displays the current status of all upstreams and servers as well as number of the failed and successful
checks. This information can be exported in JSON format and parsed by this input.
### Configuration:
```
## An URL where Nginx Upstream check module is enabled
## It should be set to return a JSON formatted response
url = "http://127.0.0.1/status?format=json"
## HTTP method
# method = "GET"
## Optional HTTP headers
# headers = {"X-Special-Header" = "Special-Value"}
## Override HTTP "Host" header
# host_header = "check.example.com"
## Timeout for HTTP requests
timeout = "5s"
## Optional HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
### Measurements & Fields:
- Measurement
- fall (The number of failed server check attempts, counter)
- rise (The number of successful server check attempts, counter)
- status (The reporter server status as a string)
- status_code (The server status code. 1 - up, 2 - down, 0 - other)
The "status_code" field most likely will be the most useful one because it allows you to determine the current
state of every server and, possible, add some monitoring to watch over it. InfluxDB can use string values and the
"status" field can be used instead, but for most other monitoring solutions the integer code will be appropriate.
### Tags:
- All measurements have the following tags:
- name (The hostname or IP of the upstream server)
- port (The alternative check port, 0 if the default one is used)
- type (The check type, http/tcp)
- upstream (The name of the upstream block in the Nginx configuration)
- url (The status url used by telegraf)
### Example Output:
When run with:
```
./telegraf --config telegraf.conf --input-filter nginx_upstream_check --test
```
It produces:
```
* Plugin: nginx_upstream_check, Collection 1
> nginx_upstream_check,host=node1,name=192.168.0.1:8080,port=0,type=http,upstream=my_backends,url=http://127.0.0.1:80/status?format\=json fall=0i,rise=100i,status="up",status_code=1i 1529088524000000000
> nginx_upstream_check,host=node2,name=192.168.0.2:8080,port=0,type=http,upstream=my_backends,url=http://127.0.0.1:80/status?format\=json fall=100i,rise=0i,status="down",status_code=2i 1529088524000000000
```

View File

@ -0,0 +1,224 @@
package nginx_upstream_check
import (
"encoding/json"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"net/http"
"net/url"
"strconv"
"time"
)
const sampleConfig = `
## An URL where Nginx Upstream check module is enabled
## It should be set to return a JSON formatted response
url = "http://127.0.0.1/status?format=json"
## HTTP method
# method = "GET"
## Optional HTTP headers
# headers = {"X-Special-Header" = "Special-Value"}
## Override HTTP "Host" header
# host_header = "check.example.com"
## Timeout for HTTP requests
timeout = "5s"
## Optional HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
const description = "Read nginx_upstream_check module status information (https://github.com/yaoweibin/nginx_upstream_check_module)"
type NginxUpstreamCheck struct {
URL string `toml:"uls"`
Username string `toml:"username"`
Password string `toml:"password"`
Method string `toml:"method"`
Headers map[string]string `toml:"headers"`
HostHeader string `toml:"host_header"`
Timeout internal.Duration `toml:"timeout"`
tls.ClientConfig
client *http.Client
}
func NewNginxUpstreamCheck() *NginxUpstreamCheck {
return &NginxUpstreamCheck{
URL: "http://127.0.0.1/status?format=json",
Method: "GET",
Headers: make(map[string]string),
HostHeader: "",
Timeout: internal.Duration{Duration: time.Second * 5},
}
}
func init() {
inputs.Add("nginx_upstream_check", func() telegraf.Input {
return NewNginxUpstreamCheck()
})
}
func (check *NginxUpstreamCheck) SampleConfig() string {
return sampleConfig
}
func (check *NginxUpstreamCheck) Description() string {
return description
}
type NginxUpstreamCheckData struct {
Servers struct {
Total uint64 `json:"total"`
Generation uint64 `json:"generation"`
Server []NginxUpstreamCheckServer `json:"server"`
} `json:"servers"`
}
type NginxUpstreamCheckServer struct {
Index uint64 `json:"index"`
Upstream string `json:"upstream"`
Name string `json:"name"`
Status string `json:"status"`
Rise uint64 `json:"rise"`
Fall uint64 `json:"fall"`
Type string `json:"type"`
Port uint16 `json:"port"`
}
// createHttpClient create a clients to access API
func (check *NginxUpstreamCheck) createHttpClient() (*http.Client, error) {
tlsConfig, err := check.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: check.Timeout.Duration,
}
return client, nil
}
// gatherJsonData query the data source and parse the response JSON
func (check *NginxUpstreamCheck) gatherJsonData(url string, value interface{}) error {
var method string
if check.Method != "" {
method = check.Method
} else {
method = "GET"
}
request, err := http.NewRequest(method, url, nil)
if err != nil {
return err
}
if (check.Username != "") || (check.Password != "") {
request.SetBasicAuth(check.Username, check.Password)
}
for header, value := range check.Headers {
request.Header.Add(header, value)
}
if check.HostHeader != "" {
request.Host = check.HostHeader
}
response, err := check.client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
err = json.NewDecoder(response.Body).Decode(value)
if err != nil {
return err
}
return nil
}
func (check *NginxUpstreamCheck) Gather(accumulator telegraf.Accumulator) error {
if check.client == nil {
client, err := check.createHttpClient()
if err != nil {
return err
}
check.client = client
}
statusURL, err := url.Parse(check.URL)
if err != nil {
return err
}
err = check.gatherStatusData(statusURL.String(), accumulator)
if err != nil {
return err
}
return nil
}
func (check *NginxUpstreamCheck) gatherStatusData(url string, accumulator telegraf.Accumulator) error {
checkData := &NginxUpstreamCheckData{}
err := check.gatherJsonData(url, checkData)
if err != nil {
return err
}
for _, server := range checkData.Servers.Server {
tags := map[string]string{
"upstream": server.Upstream,
"type": server.Type,
"name": server.Name,
"port": strconv.Itoa(int(server.Port)),
"url": url,
}
fields := map[string]interface{}{
"status": server.Status,
"status_code": check.getStatusCode(server.Status),
"rise": server.Rise,
"fall": server.Fall,
}
accumulator.AddFields("nginx_upstream_check", fields, tags)
}
return nil
}
func (check *NginxUpstreamCheck) getStatusCode(status string) uint8 {
switch status {
case "up":
return 1
case "down":
return 2
default:
return 0
}
}

View File

@ -0,0 +1,135 @@
package nginx_upstream_check
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
const sampleStatusResponse = `
{
"servers": {
"total": 2,
"generation": 1,
"server": [
{
"index": 0,
"upstream": "upstream-1",
"name": "127.0.0.1:8081",
"status": "up",
"rise": 1000,
"fall": 0,
"type": "http",
"port": 0
},
{
"index": 1,
"upstream": "upstream-2",
"name": "127.0.0.1:8082",
"status": "down",
"rise": 0,
"fall": 2000,
"type": "tcp",
"port": 8080
}
]
}
}
`
func TestNginxUpstreamCheckData(test *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
var response string
if request.URL.Path == "/status" {
response = sampleStatusResponse
responseWriter.Header()["Content-Type"] = []string{"application/json"}
} else {
panic("Cannot handle request")
}
fmt.Fprintln(responseWriter, response)
}))
defer testServer.Close()
check := NewNginxUpstreamCheck()
check.URL = fmt.Sprintf("%s/status", testServer.URL)
var accumulator testutil.Accumulator
checkError := check.Gather(&accumulator)
require.NoError(test, checkError)
accumulator.AssertContainsTaggedFields(
test,
"nginx_upstream_check",
map[string]interface{}{
"status": string("up"),
"status_code": uint8(1),
"rise": uint64(1000),
"fall": uint64(0),
},
map[string]string{
"upstream": string("upstream-1"),
"type": string("http"),
"name": string("127.0.0.1:8081"),
"port": string("0"),
"url": fmt.Sprintf("%s/status", testServer.URL),
})
accumulator.AssertContainsTaggedFields(
test,
"nginx_upstream_check",
map[string]interface{}{
"status": string("down"),
"status_code": uint8(2),
"rise": uint64(0),
"fall": uint64(2000),
},
map[string]string{
"upstream": string("upstream-2"),
"type": string("tcp"),
"name": string("127.0.0.1:8082"),
"port": string("8080"),
"url": fmt.Sprintf("%s/status", testServer.URL),
})
}
func TestNginxUpstreamCheckRequest(test *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(func(responseWriter http.ResponseWriter, request *http.Request) {
var response string
if request.URL.Path == "/status" {
response = sampleStatusResponse
responseWriter.Header()["Content-Type"] = []string{"application/json"}
} else {
panic("Cannot handle request")
}
fmt.Fprintln(responseWriter, response)
require.Equal(test, request.Method, "POST")
require.Equal(test, request.Header.Get("X-Test"), "test-value")
require.Equal(test, request.Header.Get("Authorization"), "Basic dXNlcjpwYXNzd29yZA==")
require.Equal(test, request.Host, "status.local")
}))
defer testServer.Close()
check := NewNginxUpstreamCheck()
check.URL = fmt.Sprintf("%s/status", testServer.URL)
check.Headers["X-test"] = "test-value"
check.HostHeader = "status.local"
check.Username = "user"
check.Password = "password"
check.Method = "POST"
var accumulator testutil.Accumulator
checkError := check.Gather(&accumulator)
require.NoError(test, checkError)
}