Add tengine input plugin (#4160)

This commit is contained in:
arterforyou 2018-05-24 02:19:50 +08:00 committed by Daniel Nelson
parent cecf9393b4
commit 51d2a3ea39
4 changed files with 518 additions and 0 deletions

View File

@ -102,6 +102,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/tail"
_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/teamspeak" _ "github.com/influxdata/telegraf/plugins/inputs/teamspeak"
_ "github.com/influxdata/telegraf/plugins/inputs/tengine"
_ "github.com/influxdata/telegraf/plugins/inputs/tomcat" _ "github.com/influxdata/telegraf/plugins/inputs/tomcat"
_ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"

View File

@ -0,0 +1,82 @@
# Telegraf Plugin: Tengine
### Configuration:
```
# Read Tengine's basic status information (ngx_http_reqstat_module)
[[inputs.tengine]]
## An array of Tengine reqstat module URI to gather stats.
urls = ["http://127.0.0.1/us"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP response timeout (default: 5s)
response_timeout = "5s"
```
### Measurements & Fields:
- Measurement
- bytes_in total number of bytes received from client
- bytes_out total number of bytes sent to client
- conn_total total number of accepted connections
- req_total total number of processed requests
- http_2xx total number of 2xx requests
- http_3xx total number of 3xx requests
- http_4xx total number of 4xx requests
- http_5xx total number of 5xx requests
- http_other_status total number of other requests
- rt accumulation or rt
- ups_req total number of requests calling for upstream
- ups_rt accumulation or upstream rt
- ups_tries total number of times calling for upstream
- http_200 total number of 200 requests
- http_206 total number of 206 requests
- http_302 total number of 302 requests
- http_304 total number of 304 requests
- http_403 total number of 403 requests
- http_404 total number of 404 requests
- http_416 total number of 416 requests
- http_499 total number of 499 requests
- http_500 total number of 500 requests
- http_502 total number of 502 requests
- http_503 total number of 503 requests
- http_504 total number of 504 requests
- http_508 total number of 508 requests
- http_other_detail_status total number of requests of other status codes*http_ups_4xx total number of requests of upstream 4xx
- http_ups_5xx total number of requests of upstream 5xx
### Tags:
- All measurements have the following tags:
- port
- server
- server_name
### Example Output:
Using this configuration:
```
[[inputs.tengine]]
## An array of tengine req_status_show URI to gather stats.
urls = ["http://127.0.0.1/us"]
```
When run with:
```
./telegraf --config telegraf.conf --input-filter tengine --test
```
It produces:
```
* Plugin: tengine, Collection 1
> tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=localhost bytes_in=9129i,bytes_out=56334i,conn_total=14i,http_200=90i,http_206=0i,http_2xx=90i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=0i,http_416=0i,http_499=0i,http_4xx=0i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=90i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000
tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=28.79.190.35.bc.googleusercontent.com bytes_in=1500i,bytes_out=3009i,conn_total=4i,http_200=1i,http_206=0i,http_2xx=1i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=1i,http_416=0i,http_499=0i,http_4xx=3i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=4i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000
tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=www.google.com bytes_in=372i,bytes_out=786i,conn_total=1i,http_200=1i,http_206=0i,http_2xx=1i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=0i,http_416=0i,http_499=0i,http_4xx=0i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=1i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000
tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=35.190.79.28 bytes_in=4433i,bytes_out=10259i,conn_total=5i,http_200=3i,http_206=0i,http_2xx=3i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=11i,http_416=0i,http_499=0i,http_4xx=11i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=14i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000
tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=tenka-prod-api.txwy.tw bytes_in=3014397400i,bytes_out=14279992835i,conn_total=36844i,http_200=3177339i,http_206=0i,http_2xx=3177339i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=123i,http_416=0i,http_499=0i,http_4xx=123i,http_500=17214i,http_502=4453i,http_503=80i,http_504=0i,http_508=0i,http_5xx=21747i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=123i,http_ups_5xx=21747i,req_total=3199209i,rt=245874536i,ups_req=2685076i,ups_rt=245858217i,ups_tries=2685076i 1526546308000000000
```

View File

@ -0,0 +1,338 @@
package tengine
import (
"bufio"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"io"
)
type Tengine struct {
Urls []string
ResponseTimeout internal.Duration
tls.ClientConfig
// HTTP client
client *http.Client
}
var sampleConfig = `
# An array of Tengine reqstat module URI to gather stats.
urls = ["http://127.0.0.1/us"]
## Optional TLS Config
tls_ca = "/etc/telegraf/ca.pem"
tls_cert = "/etc/telegraf/cert.cer"
tls_key = "/etc/telegraf/key.key"
## Use TLS but skip chain & host verification
insecure_skip_verify = false
# HTTP response timeout (default: 5s)
response_timeout = "5s"
`
func (n *Tengine) SampleConfig() string {
return sampleConfig
}
func (n *Tengine) Description() string {
return "Read Tengine's basic status information (ngx_http_reqstat_module)"
}
func (n *Tengine) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Create an HTTP client that is re-used for each
// collection interval
if n.client == nil {
client, err := n.createHttpClient()
if err != nil {
return err
}
n.client = client
}
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
continue
}
wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
acc.AddError(n.gatherUrl(addr, acc))
}(addr)
}
wg.Wait()
return nil
}
func (n *Tengine) createHttpClient() (*http.Client, error) {
tlsCfg, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
if n.ResponseTimeout.Duration < time.Second {
n.ResponseTimeout.Duration = time.Second * 5
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: n.ResponseTimeout.Duration,
}
return client, nil
}
type TengineSatus struct {
host string
bytes_in uint64
bytes_out uint64
conn_total uint64
req_total uint64
http_2xx uint64
http_3xx uint64
http_4xx uint64
http_5xx uint64
http_other_status uint64
rt uint64
ups_req uint64
ups_rt uint64
ups_tries uint64
http_200 uint64
http_206 uint64
http_302 uint64
http_304 uint64
http_403 uint64
http_404 uint64
http_416 uint64
http_499 uint64
http_500 uint64
http_502 uint64
http_503 uint64
http_504 uint64
http_508 uint64
http_other_detail_status uint64
http_ups_4xx uint64
http_ups_5xx uint64
}
func (n *Tengine) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
var tenginestatus TengineSatus
resp, err := n.client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
r := bufio.NewReader(resp.Body)
for {
line, err := r.ReadString('\n')
if err != nil || io.EOF == err {
break
}
line_split := strings.Split(strings.TrimSpace(line), ",")
if len(line_split) != 30 {
continue
}
tenginestatus.host = line_split[0]
if err != nil {
return err
}
tenginestatus.bytes_in, err = strconv.ParseUint(line_split[1], 10, 64)
if err != nil {
return err
}
tenginestatus.bytes_out, err = strconv.ParseUint(line_split[2], 10, 64)
if err != nil {
return err
}
tenginestatus.conn_total, err = strconv.ParseUint(line_split[3], 10, 64)
if err != nil {
return err
}
tenginestatus.req_total, err = strconv.ParseUint(line_split[4], 10, 64)
if err != nil {
return err
}
tenginestatus.http_2xx, err = strconv.ParseUint(line_split[5], 10, 64)
if err != nil {
return err
}
tenginestatus.http_3xx, err = strconv.ParseUint(line_split[6], 10, 64)
if err != nil {
return err
}
tenginestatus.http_4xx, err = strconv.ParseUint(line_split[7], 10, 64)
if err != nil {
return err
}
tenginestatus.http_5xx, err = strconv.ParseUint(line_split[8], 10, 64)
if err != nil {
return err
}
tenginestatus.http_other_status, err = strconv.ParseUint(line_split[9], 10, 64)
if err != nil {
return err
}
tenginestatus.rt, err = strconv.ParseUint(line_split[10], 10, 64)
if err != nil {
return err
}
tenginestatus.ups_req, err = strconv.ParseUint(line_split[11], 10, 64)
if err != nil {
return err
}
tenginestatus.ups_rt, err = strconv.ParseUint(line_split[12], 10, 64)
if err != nil {
return err
}
tenginestatus.ups_tries, err = strconv.ParseUint(line_split[13], 10, 64)
if err != nil {
return err
}
tenginestatus.http_200, err = strconv.ParseUint(line_split[14], 10, 64)
if err != nil {
return err
}
tenginestatus.http_206, err = strconv.ParseUint(line_split[15], 10, 64)
if err != nil {
return err
}
tenginestatus.http_302, err = strconv.ParseUint(line_split[16], 10, 64)
if err != nil {
return err
}
tenginestatus.http_304, err = strconv.ParseUint(line_split[17], 10, 64)
if err != nil {
return err
}
tenginestatus.http_403, err = strconv.ParseUint(line_split[18], 10, 64)
if err != nil {
return err
}
tenginestatus.http_404, err = strconv.ParseUint(line_split[19], 10, 64)
if err != nil {
return err
}
tenginestatus.http_416, err = strconv.ParseUint(line_split[20], 10, 64)
if err != nil {
return err
}
tenginestatus.http_499, err = strconv.ParseUint(line_split[21], 10, 64)
if err != nil {
return err
}
tenginestatus.http_500, err = strconv.ParseUint(line_split[22], 10, 64)
if err != nil {
return err
}
tenginestatus.http_502, err = strconv.ParseUint(line_split[23], 10, 64)
if err != nil {
return err
}
tenginestatus.http_503, err = strconv.ParseUint(line_split[24], 10, 64)
if err != nil {
return err
}
tenginestatus.http_504, err = strconv.ParseUint(line_split[25], 10, 64)
if err != nil {
return err
}
tenginestatus.http_508, err = strconv.ParseUint(line_split[26], 10, 64)
if err != nil {
return err
}
tenginestatus.http_other_detail_status, err = strconv.ParseUint(line_split[27], 10, 64)
if err != nil {
return err
}
tenginestatus.http_ups_4xx, err = strconv.ParseUint(line_split[28], 10, 64)
if err != nil {
return err
}
tenginestatus.http_ups_5xx, err = strconv.ParseUint(line_split[29], 10, 64)
if err != nil {
return err
}
tags := getTags(addr, tenginestatus.host)
fields := map[string]interface{}{
"bytes_in": tenginestatus.bytes_in,
"bytes_out": tenginestatus.bytes_out,
"conn_total": tenginestatus.conn_total,
"req_total": tenginestatus.req_total,
"http_2xx": tenginestatus.http_2xx,
"http_3xx": tenginestatus.http_3xx,
"http_4xx": tenginestatus.http_4xx,
"http_5xx": tenginestatus.http_5xx,
"http_other_status": tenginestatus.http_other_status,
"rt": tenginestatus.rt,
"ups_req": tenginestatus.ups_req,
"ups_rt": tenginestatus.ups_rt,
"ups_tries": tenginestatus.ups_tries,
"http_200": tenginestatus.http_200,
"http_206": tenginestatus.http_206,
"http_302": tenginestatus.http_302,
"http_304": tenginestatus.http_304,
"http_403": tenginestatus.http_403,
"http_404": tenginestatus.http_404,
"http_416": tenginestatus.http_416,
"http_499": tenginestatus.http_499,
"http_500": tenginestatus.http_500,
"http_502": tenginestatus.http_502,
"http_503": tenginestatus.http_503,
"http_504": tenginestatus.http_504,
"http_508": tenginestatus.http_508,
"http_other_detail_status": tenginestatus.http_other_detail_status,
"http_ups_4xx": tenginestatus.http_ups_4xx,
"http_ups_5xx": tenginestatus.http_ups_5xx,
}
acc.AddFields("tengine", fields, tags)
}
return nil
}
// Get tag(s) for the tengine plugin
func getTags(addr *url.URL, server_name string) map[string]string {
h := addr.Host
host, port, err := net.SplitHostPort(h)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
return map[string]string{"server": host, "port": port, "server_name": server_name}
}
func init() {
inputs.Add("tengine", func() telegraf.Input {
return &Tengine{}
})
}

View File

@ -0,0 +1,97 @@
package tengine
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const tengineSampleResponse = `127.0.0.1,784,1511,2,2,1,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0`
// Verify that tengine tags are properly parsed based on the server
func TestTengineTags(t *testing.T) {
urls := []string{"http://localhost/us", "http://localhost:80/us"}
var addr *url.URL
for _, url1 := range urls {
addr, _ = url.Parse(url1)
tagMap := getTags(addr, "127.0.0.1")
assert.Contains(t, tagMap["server"], "localhost")
}
}
func TestTengineGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
rsp = tengineSampleResponse
fmt.Fprintln(w, rsp)
}))
defer ts.Close()
n := &Tengine{
Urls: []string{fmt.Sprintf("%s/us", ts.URL)},
}
var acc_tengine testutil.Accumulator
err_tengine := acc_tengine.GatherError(n.Gather)
require.NoError(t, err_tengine)
fields_tengine := map[string]interface{}{
"bytes_in": uint64(784),
"bytes_out": uint64(1511),
"conn_total": uint64(2),
"req_total": uint64(2),
"http_2xx": uint64(1),
"http_3xx": uint64(0),
"http_4xx": uint64(1),
"http_5xx": uint64(0),
"http_other_status": uint64(0),
"rt": uint64(0),
"ups_req": uint64(0),
"ups_rt": uint64(0),
"ups_tries": uint64(0),
"http_200": uint64(1),
"http_206": uint64(0),
"http_302": uint64(0),
"http_304": uint64(0),
"http_403": uint64(0),
"http_404": uint64(1),
"http_416": uint64(0),
"http_499": uint64(0),
"http_500": uint64(0),
"http_502": uint64(0),
"http_503": uint64(0),
"http_504": uint64(0),
"http_508": uint64(0),
"http_other_detail_status": uint64(0),
"http_ups_4xx": uint64(0),
"http_ups_5xx": uint64(0),
}
addr, err := url.Parse(ts.URL)
if err != nil {
panic(err)
}
host, port, err := net.SplitHostPort(addr.Host)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
tags := map[string]string{"server": host, "port": port, "server_name": "127.0.0.1"}
acc_tengine.AssertContainsTaggedFields(t, "tengine", fields_tengine, tags)
}