From 51d2a3ea39601b307906cf3897aa5f87dc1cc0ee Mon Sep 17 00:00:00 2001 From: arterforyou Date: Thu, 24 May 2018 02:19:50 +0800 Subject: [PATCH] Add tengine input plugin (#4160) --- plugins/inputs/all/all.go | 1 + plugins/inputs/tengine/README.md | 82 ++++++ plugins/inputs/tengine/tengine.go | 338 +++++++++++++++++++++++++ plugins/inputs/tengine/tengine_test.go | 97 +++++++ 4 files changed, 518 insertions(+) create mode 100644 plugins/inputs/tengine/README.md create mode 100644 plugins/inputs/tengine/tengine.go create mode 100644 plugins/inputs/tengine/tengine_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 239cf6e11..cba156f69 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -102,6 +102,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/tail" _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/teamspeak" + _ "github.com/influxdata/telegraf/plugins/inputs/tengine" _ "github.com/influxdata/telegraf/plugins/inputs/tomcat" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" diff --git a/plugins/inputs/tengine/README.md b/plugins/inputs/tengine/README.md new file mode 100644 index 000000000..b83a0611b --- /dev/null +++ b/plugins/inputs/tengine/README.md @@ -0,0 +1,82 @@ +# Telegraf Plugin: Tengine + +### Configuration: + +``` +# Read Tengine's basic status information (ngx_http_reqstat_module) +[[inputs.tengine]] + ## An array of Tengine reqstat module URI to gather stats. + urls = ["http://127.0.0.1/us"] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## HTTP response timeout (default: 5s) + response_timeout = "5s" +``` + +### Measurements & Fields: + +- Measurement + - bytes_in total number of bytes received from client + - bytes_out total number of bytes sent to client + - conn_total total number of accepted connections + - req_total total number of processed requests + - http_2xx total number of 2xx requests + - http_3xx total number of 3xx requests + - http_4xx total number of 4xx requests + - http_5xx total number of 5xx requests + - http_other_status total number of other requests + - rt accumulation or rt + - ups_req total number of requests calling for upstream + - ups_rt accumulation or upstream rt + - ups_tries total number of times calling for upstream + - http_200 total number of 200 requests + - http_206 total number of 206 requests + - http_302 total number of 302 requests + - http_304 total number of 304 requests + - http_403 total number of 403 requests + - http_404 total number of 404 requests + - http_416 total number of 416 requests + - http_499 total number of 499 requests + - http_500 total number of 500 requests + - http_502 total number of 502 requests + - http_503 total number of 503 requests + - http_504 total number of 504 requests + - http_508 total number of 508 requests + - http_other_detail_status total number of requests of other status codes*http_ups_4xx total number of requests of upstream 4xx + - http_ups_5xx total number of requests of upstream 5xx +### Tags: + +- All measurements have the following tags: + - port + - server + - server_name + +### Example Output: + +Using this configuration: +``` +[[inputs.tengine]] + ## An array of tengine req_status_show URI to gather stats. + urls = ["http://127.0.0.1/us"] +``` + +When run with: +``` +./telegraf --config telegraf.conf --input-filter tengine --test +``` + +It produces: +``` +* Plugin: tengine, Collection 1 +> tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=localhost bytes_in=9129i,bytes_out=56334i,conn_total=14i,http_200=90i,http_206=0i,http_2xx=90i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=0i,http_416=0i,http_499=0i,http_4xx=0i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=90i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000 + tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=28.79.190.35.bc.googleusercontent.com bytes_in=1500i,bytes_out=3009i,conn_total=4i,http_200=1i,http_206=0i,http_2xx=1i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=1i,http_416=0i,http_499=0i,http_4xx=3i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=4i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000 + tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=www.google.com bytes_in=372i,bytes_out=786i,conn_total=1i,http_200=1i,http_206=0i,http_2xx=1i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=0i,http_416=0i,http_499=0i,http_4xx=0i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=1i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000 + tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=35.190.79.28 bytes_in=4433i,bytes_out=10259i,conn_total=5i,http_200=3i,http_206=0i,http_2xx=3i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=11i,http_416=0i,http_499=0i,http_4xx=11i,http_500=0i,http_502=0i,http_503=0i,http_504=0i,http_508=0i,http_5xx=0i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=0i,http_ups_5xx=0i,req_total=14i,rt=0i,ups_req=0i,ups_rt=0i,ups_tries=0i 1526546308000000000 + tengine,host=gcp-thz-api-5,port=80,server=localhost,server_name=tenka-prod-api.txwy.tw bytes_in=3014397400i,bytes_out=14279992835i,conn_total=36844i,http_200=3177339i,http_206=0i,http_2xx=3177339i,http_302=0i,http_304=0i,http_3xx=0i,http_403=0i,http_404=123i,http_416=0i,http_499=0i,http_4xx=123i,http_500=17214i,http_502=4453i,http_503=80i,http_504=0i,http_508=0i,http_5xx=21747i,http_other_detail_status=0i,http_other_status=0i,http_ups_4xx=123i,http_ups_5xx=21747i,req_total=3199209i,rt=245874536i,ups_req=2685076i,ups_rt=245858217i,ups_tries=2685076i 1526546308000000000 +``` diff --git a/plugins/inputs/tengine/tengine.go b/plugins/inputs/tengine/tengine.go new file mode 100644 index 000000000..a9a899825 --- /dev/null +++ b/plugins/inputs/tengine/tengine.go @@ -0,0 +1,338 @@ +package tengine + +import ( + "bufio" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "io" +) + +type Tengine struct { + Urls []string + ResponseTimeout internal.Duration + tls.ClientConfig + + // HTTP client + client *http.Client +} + +var sampleConfig = ` + # An array of Tengine reqstat module URI to gather stats. + urls = ["http://127.0.0.1/us"] + + ## Optional TLS Config + tls_ca = "/etc/telegraf/ca.pem" + tls_cert = "/etc/telegraf/cert.cer" + tls_key = "/etc/telegraf/key.key" + ## Use TLS but skip chain & host verification + insecure_skip_verify = false + + # HTTP response timeout (default: 5s) + response_timeout = "5s" +` + +func (n *Tengine) SampleConfig() string { + return sampleConfig +} + +func (n *Tengine) Description() string { + return "Read Tengine's basic status information (ngx_http_reqstat_module)" +} + +func (n *Tengine) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + // Create an HTTP client that is re-used for each + // collection interval + if n.client == nil { + client, err := n.createHttpClient() + if err != nil { + return err + } + n.client = client + } + + for _, u := range n.Urls { + addr, err := url.Parse(u) + if err != nil { + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) + continue + } + + wg.Add(1) + go func(addr *url.URL) { + defer wg.Done() + acc.AddError(n.gatherUrl(addr, acc)) + }(addr) + } + + wg.Wait() + return nil +} + +func (n *Tengine) createHttpClient() (*http.Client, error) { + tlsCfg, err := n.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + if n.ResponseTimeout.Duration < time.Second { + n.ResponseTimeout.Duration = time.Second * 5 + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: n.ResponseTimeout.Duration, + } + + return client, nil +} + +type TengineSatus struct { + host string + bytes_in uint64 + bytes_out uint64 + conn_total uint64 + req_total uint64 + http_2xx uint64 + http_3xx uint64 + http_4xx uint64 + http_5xx uint64 + http_other_status uint64 + rt uint64 + ups_req uint64 + ups_rt uint64 + ups_tries uint64 + http_200 uint64 + http_206 uint64 + http_302 uint64 + http_304 uint64 + http_403 uint64 + http_404 uint64 + http_416 uint64 + http_499 uint64 + http_500 uint64 + http_502 uint64 + http_503 uint64 + http_504 uint64 + http_508 uint64 + http_other_detail_status uint64 + http_ups_4xx uint64 + http_ups_5xx uint64 +} + +func (n *Tengine) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { + var tenginestatus TengineSatus + resp, err := n.client.Get(addr.String()) + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status) + } + r := bufio.NewReader(resp.Body) + + for { + line, err := r.ReadString('\n') + + if err != nil || io.EOF == err { + break + } + line_split := strings.Split(strings.TrimSpace(line), ",") + if len(line_split) != 30 { + continue + } + tenginestatus.host = line_split[0] + if err != nil { + return err + } + tenginestatus.bytes_in, err = strconv.ParseUint(line_split[1], 10, 64) + if err != nil { + return err + } + tenginestatus.bytes_out, err = strconv.ParseUint(line_split[2], 10, 64) + if err != nil { + return err + } + tenginestatus.conn_total, err = strconv.ParseUint(line_split[3], 10, 64) + if err != nil { + return err + } + tenginestatus.req_total, err = strconv.ParseUint(line_split[4], 10, 64) + if err != nil { + return err + } + tenginestatus.http_2xx, err = strconv.ParseUint(line_split[5], 10, 64) + if err != nil { + return err + } + tenginestatus.http_3xx, err = strconv.ParseUint(line_split[6], 10, 64) + if err != nil { + return err + } + tenginestatus.http_4xx, err = strconv.ParseUint(line_split[7], 10, 64) + if err != nil { + return err + } + tenginestatus.http_5xx, err = strconv.ParseUint(line_split[8], 10, 64) + if err != nil { + return err + } + tenginestatus.http_other_status, err = strconv.ParseUint(line_split[9], 10, 64) + if err != nil { + return err + } + tenginestatus.rt, err = strconv.ParseUint(line_split[10], 10, 64) + if err != nil { + return err + } + tenginestatus.ups_req, err = strconv.ParseUint(line_split[11], 10, 64) + if err != nil { + return err + } + tenginestatus.ups_rt, err = strconv.ParseUint(line_split[12], 10, 64) + if err != nil { + return err + } + tenginestatus.ups_tries, err = strconv.ParseUint(line_split[13], 10, 64) + if err != nil { + return err + } + tenginestatus.http_200, err = strconv.ParseUint(line_split[14], 10, 64) + if err != nil { + return err + } + tenginestatus.http_206, err = strconv.ParseUint(line_split[15], 10, 64) + if err != nil { + return err + } + tenginestatus.http_302, err = strconv.ParseUint(line_split[16], 10, 64) + if err != nil { + return err + } + tenginestatus.http_304, err = strconv.ParseUint(line_split[17], 10, 64) + if err != nil { + return err + } + tenginestatus.http_403, err = strconv.ParseUint(line_split[18], 10, 64) + if err != nil { + return err + } + tenginestatus.http_404, err = strconv.ParseUint(line_split[19], 10, 64) + if err != nil { + return err + } + tenginestatus.http_416, err = strconv.ParseUint(line_split[20], 10, 64) + if err != nil { + return err + } + tenginestatus.http_499, err = strconv.ParseUint(line_split[21], 10, 64) + if err != nil { + return err + } + tenginestatus.http_500, err = strconv.ParseUint(line_split[22], 10, 64) + if err != nil { + return err + } + tenginestatus.http_502, err = strconv.ParseUint(line_split[23], 10, 64) + if err != nil { + return err + } + tenginestatus.http_503, err = strconv.ParseUint(line_split[24], 10, 64) + if err != nil { + return err + } + tenginestatus.http_504, err = strconv.ParseUint(line_split[25], 10, 64) + if err != nil { + return err + } + tenginestatus.http_508, err = strconv.ParseUint(line_split[26], 10, 64) + if err != nil { + return err + } + tenginestatus.http_other_detail_status, err = strconv.ParseUint(line_split[27], 10, 64) + if err != nil { + return err + } + tenginestatus.http_ups_4xx, err = strconv.ParseUint(line_split[28], 10, 64) + if err != nil { + return err + } + tenginestatus.http_ups_5xx, err = strconv.ParseUint(line_split[29], 10, 64) + if err != nil { + return err + } + tags := getTags(addr, tenginestatus.host) + fields := map[string]interface{}{ + "bytes_in": tenginestatus.bytes_in, + "bytes_out": tenginestatus.bytes_out, + "conn_total": tenginestatus.conn_total, + "req_total": tenginestatus.req_total, + "http_2xx": tenginestatus.http_2xx, + "http_3xx": tenginestatus.http_3xx, + "http_4xx": tenginestatus.http_4xx, + "http_5xx": tenginestatus.http_5xx, + "http_other_status": tenginestatus.http_other_status, + "rt": tenginestatus.rt, + "ups_req": tenginestatus.ups_req, + "ups_rt": tenginestatus.ups_rt, + "ups_tries": tenginestatus.ups_tries, + "http_200": tenginestatus.http_200, + "http_206": tenginestatus.http_206, + "http_302": tenginestatus.http_302, + "http_304": tenginestatus.http_304, + "http_403": tenginestatus.http_403, + "http_404": tenginestatus.http_404, + "http_416": tenginestatus.http_416, + "http_499": tenginestatus.http_499, + "http_500": tenginestatus.http_500, + "http_502": tenginestatus.http_502, + "http_503": tenginestatus.http_503, + "http_504": tenginestatus.http_504, + "http_508": tenginestatus.http_508, + "http_other_detail_status": tenginestatus.http_other_detail_status, + "http_ups_4xx": tenginestatus.http_ups_4xx, + "http_ups_5xx": tenginestatus.http_ups_5xx, + } + acc.AddFields("tengine", fields, tags) + } + + return nil +} + +// Get tag(s) for the tengine plugin +func getTags(addr *url.URL, server_name string) map[string]string { + h := addr.Host + host, port, err := net.SplitHostPort(h) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + return map[string]string{"server": host, "port": port, "server_name": server_name} +} + +func init() { + inputs.Add("tengine", func() telegraf.Input { + return &Tengine{} + }) +} diff --git a/plugins/inputs/tengine/tengine_test.go b/plugins/inputs/tengine/tengine_test.go new file mode 100644 index 000000000..317820bb2 --- /dev/null +++ b/plugins/inputs/tengine/tengine_test.go @@ -0,0 +1,97 @@ +package tengine + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const tengineSampleResponse = `127.0.0.1,784,1511,2,2,1,0,1,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0` + +// Verify that tengine tags are properly parsed based on the server +func TestTengineTags(t *testing.T) { + urls := []string{"http://localhost/us", "http://localhost:80/us"} + var addr *url.URL + for _, url1 := range urls { + addr, _ = url.Parse(url1) + tagMap := getTags(addr, "127.0.0.1") + assert.Contains(t, tagMap["server"], "localhost") + } +} + +func TestTengineGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + rsp = tengineSampleResponse + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &Tengine{ + Urls: []string{fmt.Sprintf("%s/us", ts.URL)}, + } + + var acc_tengine testutil.Accumulator + + err_tengine := acc_tengine.GatherError(n.Gather) + + require.NoError(t, err_tengine) + + fields_tengine := map[string]interface{}{ + "bytes_in": uint64(784), + "bytes_out": uint64(1511), + "conn_total": uint64(2), + "req_total": uint64(2), + "http_2xx": uint64(1), + "http_3xx": uint64(0), + "http_4xx": uint64(1), + "http_5xx": uint64(0), + "http_other_status": uint64(0), + "rt": uint64(0), + "ups_req": uint64(0), + "ups_rt": uint64(0), + "ups_tries": uint64(0), + "http_200": uint64(1), + "http_206": uint64(0), + "http_302": uint64(0), + "http_304": uint64(0), + "http_403": uint64(0), + "http_404": uint64(1), + "http_416": uint64(0), + "http_499": uint64(0), + "http_500": uint64(0), + "http_502": uint64(0), + "http_503": uint64(0), + "http_504": uint64(0), + "http_508": uint64(0), + "http_other_detail_status": uint64(0), + "http_ups_4xx": uint64(0), + "http_ups_5xx": uint64(0), + } + + addr, err := url.Parse(ts.URL) + if err != nil { + panic(err) + } + + host, port, err := net.SplitHostPort(addr.Host) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + tags := map[string]string{"server": host, "port": port, "server_name": "127.0.0.1"} + acc_tengine.AssertContainsTaggedFields(t, "tengine", fields_tengine, tags) +}