diff --git a/plugins/all/all.go b/plugins/all/all.go index 6b2ffbc1b..ab7dfcbbf 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/elasticsearch" + _ "github.com/influxdb/telegraf/plugins/haproxy" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/lustre2" _ "github.com/influxdb/telegraf/plugins/memcached" diff --git a/plugins/haproxy/haproxy.go b/plugins/haproxy/haproxy.go new file mode 100644 index 000000000..e09bfe5be --- /dev/null +++ b/plugins/haproxy/haproxy.go @@ -0,0 +1,310 @@ +package haproxy + +import ( + "encoding/csv" + "fmt" + "github.com/influxdb/telegraf/plugins" + "io" + "net/http" + "net/url" + "strconv" + "sync" +) + +//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1 +const ( + HF_PXNAME = 0 // 0. pxname [LFBS]: proxy name + HF_SVNAME = 1 // 1. svname [LFBS]: service name (FRONTEND for frontend, BACKEND for backend, any name for server/listener) + HF_QCUR = 2 //2. qcur [..BS]: current queued requests. For the backend this reports the number queued without a server assigned. + HF_QMAX = 3 //3. qmax [..BS]: max value of qcur + HF_SCUR = 4 // 4. scur [LFBS]: current sessions + HF_SMAX = 5 //5. smax [LFBS]: max sessions + HF_SLIM = 6 //6. slim [LFBS]: configured session limit + HF_STOT = 7 //7. stot [LFBS]: cumulative number of connections + HF_BIN = 8 //8. bin [LFBS]: bytes in + HF_BOUT = 9 //9. bout [LFBS]: bytes out + HF_DREQ = 10 //10. dreq [LFB.]: requests denied because of security concerns. + HF_DRESP = 11 //11. dresp [LFBS]: responses denied because of security concerns. + HF_EREQ = 12 //12. ereq [LF..]: request errors. Some of the possible causes are: + HF_ECON = 13 //13. econ [..BS]: number of requests that encountered an error trying to + HF_ERESP = 14 //14. eresp [..BS]: response errors. srv_abrt will be counted here also. Some other errors are: - write error on the client socket (won't be counted for the server stat) - failure applying filters to the response. + HF_WRETR = 15 //15. wretr [..BS]: number of times a connection to a server was retried. + HF_WREDIS = 16 //16. wredis [..BS]: number of times a request was redispatched to another server. The server value counts the number of times that server was switched away from. + HF_STATUS = 17 //17. status [LFBS]: status (UP/DOWN/NOLB/MAINT/MAINT(via)...) + HF_WEIGHT = 18 //18. weight [..BS]: total weight (backend), server weight (server) + HF_ACT = 19 //19. act [..BS]: number of active servers (backend), server is active (server) + HF_BCK = 20 //20. bck [..BS]: number of backup servers (backend), server is backup (server) + HF_CHKFAIL = 21 //21. chkfail [...S]: number of failed checks. (Only counts checks failed when the server is up.) + HF_CHKDOWN = 22 //22. chkdown [..BS]: number of UP->DOWN transitions. The backend counter counts transitions to the whole backend being down, rather than the sum of the counters for each server. + HF_LASTCHG = 23 //23. lastchg [..BS]: number of seconds since the last UP<->DOWN transition + HF_DOWNTIME = 24 //24. downtime [..BS]: total downtime (in seconds). The value for the backend is the downtime for the whole backend, not the sum of the server downtime. + HF_QLIMIT = 25 //25. qlimit [...S]: configured maxqueue for the server, or nothing in the value is 0 (default, meaning no limit) + HF_PID = 26 //26. pid [LFBS]: process id (0 for first instance, 1 for second, ...) + HF_IID = 27 //27. iid [LFBS]: unique proxy id + HF_SID = 28 //28. sid [L..S]: server id (unique inside a proxy) + HF_THROTTLE = 29 //29. throttle [...S]: current throttle percentage for the server, when slowstart is active, or no value if not in slowstart. + HF_LBTOT = 30 //30. lbtot [..BS]: total number of times a server was selected, either for new sessions, or when re-dispatching. The server counter is the number of times that server was selected. + HF_TRACKED = 31 //31. tracked [...S]: id of proxy/server if tracking is enabled. + HF_TYPE = 32 //32. type [LFBS]: (0 = frontend, 1 = backend, 2 = server, 3 = socket/listener) + HF_RATE = 33 //33. rate [.FBS]: number of sessions per second over last elapsed second + HF_RATE_LIM = 34 //34. rate_lim [.F..]: configured limit on new sessions per second + HF_RATE_MAX = 35 //35. rate_max [.FBS]: max number of new sessions per second + HF_CHECK_STATUS = 36 //36. check_status [...S]: status of last health check, one of: + HF_CHECK_CODE = 37 //37. check_code [...S]: layer5-7 code, if available + HF_CHECK_DURATION = 38 //38. check_duration [...S]: time in ms took to finish last health check + HF_HRSP_1xx = 39 //39. hrsp_1xx [.FBS]: http responses with 1xx code + HF_HRSP_2xx = 40 //40. hrsp_2xx [.FBS]: http responses with 2xx code + HF_HRSP_3xx = 41 //41. hrsp_3xx [.FBS]: http responses with 3xx code + HF_HRSP_4xx = 42 //42. hrsp_4xx [.FBS]: http responses with 4xx code + HF_HRSP_5xx = 43 //43. hrsp_5xx [.FBS]: http responses with 5xx code + HF_HRSP_OTHER = 44 //44. hrsp_other [.FBS]: http responses with other codes (protocol error) + HF_HANAFAIL = 45 //45. hanafail [...S]: failed health checks details + HF_REQ_RATE = 46 //46. req_rate [.F..]: HTTP requests per second over last elapsed second + HF_REQ_RATE_MAX = 47 //47. req_rate_max [.F..]: max number of HTTP requests per second observed + HF_REQ_TOT = 48 //48. req_tot [.F..]: total number of HTTP requests received + HF_CLI_ABRT = 49 //49. cli_abrt [..BS]: number of data transfers aborted by the client + HF_SRV_ABRT = 50 //50. srv_abrt [..BS]: number of data transfers aborted by the server (inc. in eresp) + HF_COMP_IN = 51 //51. comp_in [.FB.]: number of HTTP response bytes fed to the compressor + HF_COMP_OUT = 52 //52. comp_out [.FB.]: number of HTTP response bytes emitted by the compressor + HF_COMP_BYP = 53 //53. comp_byp [.FB.]: number of bytes that bypassed the HTTP compressor (CPU/BW limit) + HF_COMP_RSP = 54 //54. comp_rsp [.FB.]: number of HTTP responses that were compressed + HF_LASTSESS = 55 //55. lastsess [..BS]: number of seconds since last session assigned to server/backend + HF_LAST_CHK = 56 //56. last_chk [...S]: last health check contents or textual error + HF_LAST_AGT = 57 //57. last_agt [...S]: last agent check contents or textual error + HF_QTIME = 58 //58. qtime [..BS]: + HF_CTIME = 59 //59. ctime [..BS]: + HF_RTIME = 60 //60. rtime [..BS]: (0 for TCP) + HF_TTIME = 61 //61. ttime [..BS]: the average total session time in ms over the 1024 last requests +) + +type haproxy struct { + Servers []string + + client *http.Client +} + +var sampleConfig = ` +# An array of address to gather stats about. Specify an ip on hostname +# with optional port. ie localhost, 10.10.3.33:1936, etc. +# +# If no servers are specified, then default to 127.0.0.1:1936 +servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"] +# Or you can also use local socket(not work yet) +# servers = ["socket:/run/haproxy/admin.sock"] +` + +func (r *haproxy) SampleConfig() string { + return sampleConfig +} + +func (r *haproxy) Description() string { + return "Read metrics of haproxy, via socket or csv stats page" +} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *haproxy) Gather(acc plugins.Accumulator) error { + if len(g.Servers) == 0 { + return g.gatherServer("http://127.0.0.1:1936", acc) + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range g.Servers { + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = g.gatherServer(serv, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (g *haproxy) gatherServer(addr string, acc plugins.Accumulator) error { + if g.client == nil { + + client := &http.Client{} + g.client = client + } + + u, err := url.Parse(addr) + if err != nil { + return fmt.Errorf("Unable parse server address '%s': %s", addr, err) + } + + req, err := http.NewRequest("GET", fmt.Sprintf("%s://%s%s/;csv", u.Scheme, u.Host, u.Path), nil) + if u.User != nil { + p, _ := u.User.Password() + req.SetBasicAuth(u.User.Username(), p) + } + + res, err := g.client.Do(req) + if err != nil { + return fmt.Errorf("Unable to connect to haproxy server '%s': %s", addr, err) + } + + if res.StatusCode != 200 { + return fmt.Errorf("Unable to get valid stat result from '%s': %s", addr, err) + } + + importCsvResult(res.Body, acc, u.Host) + + return nil +} + +func importCsvResult(r io.Reader, acc plugins.Accumulator, host string) ([][]string, error) { + csv := csv.NewReader(r) + result, err := csv.ReadAll() + + for _, row := range result { + + for field, v := range row { + tags := map[string]string{ + "host": host, + "proxy": row[HF_PXNAME], + "sv": row[HF_SVNAME], + } + switch field { + case HF_QCUR: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("qcur", ival, tags) + } + case HF_QMAX: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("qmax", ival, tags) + } + case HF_SCUR: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("scur", ival, tags) + } + case HF_SMAX: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("smax", ival, tags) + } + case HF_BIN: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("bin", ival, tags) + } + case HF_BOUT: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("bout", ival, tags) + } + case HF_DREQ: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("dreq", ival, tags) + } + case HF_DRESP: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("dresp", ival, tags) + } + case HF_RATE: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("rate", ival, tags) + } + case HF_RATE_MAX: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("rate_max", ival, tags) + } + case HF_STOT: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("stot", ival, tags) + } + case HF_HRSP_1xx: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("http_response.1xx", ival, tags) + } + case HF_HRSP_2xx: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("http_response.2xx", ival, tags) + } + case HF_HRSP_3xx: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("http_response.3xx", ival, tags) + } + case HF_HRSP_4xx: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("http_response.4xx", ival, tags) + } + case HF_EREQ: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("ereq", ival, tags) + } + case HF_ERESP: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("eresp", ival, tags) + } + case HF_ECON: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("econ", ival, tags) + } + case HF_WRETR: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("wretr", ival, tags) + } + case HF_WREDIS: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("wredis", ival, tags) + } + case HF_REQ_RATE: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("req_rate", ival, tags) + } + case HF_REQ_RATE_MAX: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("req_rate_max", ival, tags) + } + case HF_REQ_TOT: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("req_tot", ival, tags) + } + case HF_THROTTLE: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("throttle", ival, tags) + } + case HF_LBTOT: + ival, err := strconv.ParseUint(v, 10, 64) + if err == nil { + acc.Add("lbtot", ival, tags) + } + + } + + } + } + return result, err +} + +func init() { + plugins.Add("haproxy", func() plugins.Plugin { + return &haproxy{} + }) +} diff --git a/plugins/haproxy/haproxy_test.go b/plugins/haproxy/haproxy_test.go new file mode 100644 index 000000000..b54f516c9 --- /dev/null +++ b/plugins/haproxy/haproxy_test.go @@ -0,0 +1,152 @@ +package haproxy + +import ( + "fmt" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" +) + +func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) { + //We create a fake server to return test data + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, ok := r.BasicAuth() + if !ok { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, "Unauthorized") + return + } + + if username == "user" && password == "password" { + fmt.Fprint(w, csvOutputSample) + } else { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, "Unauthorized") + } + })) + defer ts.Close() + + //Now we tested again above server, with our authentication data + r := &haproxy{ + Servers: []string{strings.Replace(ts.URL, "http://", "http://user:password@", 1)}, + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "host": ts.Listener.Addr().String(), + "proxy": "be_app", + "sv": "host0", + } + + assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) + + checkInt := []struct { + name string + value uint64 + }{ + {"bin", 5557055817}, + {"scur", 288}, + {"qmax", 81}, + {"http_response.1xx", 0}, + {"http_response.2xx", 1314093}, + {"http_response.3xx", 537036}, + {"http_response.4xx", 123452}, + {"dreq", 1102}, + {"dresp", 80}, + {"wretr", 17}, + {"wredis", 19}, + {"ereq", 95740}, + {"econ", 0}, + {"eresp", 0}, + {"req_rate", 35}, + {"req_rate_max", 140}, + {"req_tot", 1987928}, + {"bin", 5557055817}, + {"bout", 24096715169}, + {"rate", 18}, + {"rate_max", 102}, + + {"throttle", 13}, + {"lbtot", 114}, + } + + for _, c := range checkInt { + assert.Equal(t, true, acc.CheckValue(c.name, c.value)) + } + + //Here, we should get error because we don't pass authentication data + r = &haproxy{ + Servers: []string{ts.URL}, + } + + err = r.Gather(&acc) + require.Error(t, err) +} + +func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, csvOutputSample) + })) + defer ts.Close() + + r := &haproxy{ + Servers: []string{ts.URL}, + } + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.NoError(t, err) + + tags := map[string]string{ + "proxy": "be_app", + "host": ts.Listener.Addr().String(), + "sv": "host0", + } + + assert.NoError(t, acc.ValidateTaggedValue("stot", uint64(171014), tags)) + assert.NoError(t, acc.ValidateTaggedValue("scur", uint64(1), tags)) + assert.NoError(t, acc.ValidateTaggedValue("rate", uint64(3), tags)) + assert.Equal(t, true, acc.CheckValue("bin", uint64(5557055817))) +} + +//When not passing server config, we default to localhost +//We just want to make sure we did request stat from localhost +func TestHaproxyDefaultGetFromLocalhost(t *testing.T) { + r := &haproxy{} + + var acc testutil.Accumulator + + err := r.Gather(&acc) + require.Error(t, err) + assert.Contains(t, err.Error(), "127.0.0.1:1936/;csv") +} + +const csvOutputSample = ` +# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,check_status,check_code,check_duration,hrsp_1xx,hrsp_2xx,hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,comp_in,comp_out,comp_byp,comp_rsp,lastsess,last_chk,last_agt,qtime,ctime,rtime,ttime, +fe_app,FRONTEND,,81,288,713,2000,1094063,5557055817,24096715169,1102,80,95740,,,17,19,OPEN,,,,,,,,,2,16,113,13,114,,0,18,0,102,,,,0,1314093,537036,123452,11966,1360,,35,140,1987928,,,0,0,0,0,,,,,,,, +be_static,host0,0,0,0,3,,3209,1141294,17389596,,0,,0,0,0,0,no check,1,1,0,,,,,,2,17,1,,3209,,2,0,,7,,,,0,218,1497,1494,0,0,0,,,,0,0,,,,,2,,,0,2,23,545, +be_static,BACKEND,0,0,0,3,200,3209,1141294,17389596,0,0,,0,0,0,0,UP,1,1,0,,0,70698,0,,2,17,0,,3209,,1,0,,7,,,,0,218,1497,1494,0,0,,,,,0,0,0,0,0,0,2,,,0,2,23,545, +be_static,host0,0,0,0,1,,28,17313,466003,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,1,,28,,2,0,,1,L4OK,,1,0,17,6,5,0,0,0,,,,0,0,,,,,2103,,,0,1,1,36, +be_static,host4,0,0,0,1,,28,15358,1281073,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,2,,28,,2,0,,1,L4OK,,1,0,20,5,3,0,0,0,,,,0,0,,,,,2076,,,0,1,1,54, +be_static,host5,0,0,0,1,,28,17547,1970404,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,3,,28,,2,0,,1,L4OK,,0,0,20,5,3,0,0,0,,,,0,0,,,,,1495,,,0,1,1,53, +be_static,host6,0,0,0,1,,28,14105,1328679,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,4,,28,,2,0,,1,L4OK,,0,0,18,8,2,0,0,0,,,,0,0,,,,,1418,,,0,0,1,49, +be_static,host7,0,0,0,1,,28,15258,1965185,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,5,,28,,2,0,,1,L4OK,,0,0,17,8,3,0,0,0,,,,0,0,,,,,935,,,0,0,1,28, +be_static,host8,0,0,0,1,,28,12934,1034779,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,6,,28,,2,0,,1,L4OK,,0,0,17,9,2,0,0,0,,,,0,0,,,,,582,,,0,1,1,66, +be_static,host9,0,0,0,1,,28,13434,134063,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,7,,28,,2,0,,1,L4OK,,0,0,17,8,3,0,0,0,,,,0,0,,,,,539,,,0,0,1,80, +be_static,host1,0,0,0,1,,28,7873,1209688,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,8,,28,,2,0,,1,L4OK,,0,0,22,6,0,0,0,0,,,,0,0,,,,,487,,,0,0,1,36, +be_static,host2,0,0,0,1,,28,13830,1085929,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,9,,28,,2,0,,1,L4OK,,0,0,19,6,3,0,0,0,,,,0,0,,,,,338,,,0,1,1,38, +be_static,host3,0,0,0,1,,28,17959,1259760,,0,,0,0,0,0,UP,1,1,0,0,0,70698,0,,2,18,10,,28,,2,0,,1,L4OK,,1,0,20,6,2,0,0,0,,,,0,0,,,,,92,,,0,1,1,17, +be_static,BACKEND,0,0,0,2,200,307,160276,13322728,0,0,,0,0,0,0,UP,11,11,0,,0,70698,0,,2,18,0,,307,,1,0,,4,,,,0,205,73,29,0,0,,,,,0,0,0,0,0,0,92,,,0,1,3,381, +be_app,host0,0,0,1,32,,171014,510913516,2193856571,,0,,0,1,1,0,UP,100,1,0,1,0,70698,0,,2,19,1,,171013,,2,3,,12,L7OK,301,10,0,119534,48051,2345,1056,0,0,,,,73,1,,,,,0,Moved Permanently,,0,2,312,2341, +be_app,host4,0,0,2,29,,171013,499318742,2195595896,12,34,,0,2,0,0,UP,100,1,0,2,0,70698,0,,2,19,2,,171013,,2,3,,12,L7OK,301,12,0,119572,47882,2441,1088,0,0,,,,84,2,,,,,0,Moved Permanently,,0,2,316,2355, +`