From 51c1659de8b3b6317d3e4134c633df405113aff5 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Tue, 30 Jul 2019 17:31:03 -0600 Subject: [PATCH] Add uWSGI input plugin (#6179) --- plugins/inputs/all/all.go | 1 + plugins/inputs/uwsgi/README.md | 92 +++++++++ plugins/inputs/uwsgi/uwsgi.go | 295 +++++++++++++++++++++++++++++ plugins/inputs/uwsgi/uwsgi_test.go | 185 ++++++++++++++++++ 4 files changed, 573 insertions(+) create mode 100644 plugins/inputs/uwsgi/README.md create mode 100644 plugins/inputs/uwsgi/uwsgi.go create mode 100644 plugins/inputs/uwsgi/uwsgi_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0352e552a..8d2144df3 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -149,6 +149,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/unbound" + _ "github.com/influxdata/telegraf/plugins/inputs/uwsgi" _ "github.com/influxdata/telegraf/plugins/inputs/varnish" _ "github.com/influxdata/telegraf/plugins/inputs/vsphere" _ "github.com/influxdata/telegraf/plugins/inputs/webhooks" diff --git a/plugins/inputs/uwsgi/README.md b/plugins/inputs/uwsgi/README.md new file mode 100644 index 000000000..8053676c0 --- /dev/null +++ b/plugins/inputs/uwsgi/README.md @@ -0,0 +1,92 @@ +# uWSGI + +The uWSGI input plugin gathers metrics about uWSGI using its [Stats Server](https://uwsgi-docs.readthedocs.io/en/latest/StatsServer.html). + +### Configuration + +```toml +[[inputs.uwsgi]] + ## List with urls of uWSGI Stats servers. Url must match pattern: + ## scheme://address[:port] + ## + ## For example: + ## servers = ["tcp://localhost:5050", "http://localhost:1717", "unix:///tmp/statsock"] + servers = ["tcp://127.0.0.1:1717"] + + ## General connection timout + # timeout = "5s" +``` + + +### Metrics: + + - uwsgi_overview + - tags: + - source + - uid + - gid + - version + - fields: + - listen_queue + - listen_queue_errors + - signal_queue + - load + - pid + ++ uwsgi_workers + - tags: + - worker_id + - source + - fields: + - requests + - accepting + - delta_request + - exceptions + - harakiri_count + - pid + - signals + - signal_queue + - status + - rss + - vsz + - running_time + - last_spawn + - respawn_count + - tx + - avg_rt + +- uwsgi_apps + - tags: + - app_id + - worker_id + - source + - fields: + - modifier1 + - requests + - startup_time + - exceptions + ++ uwsgi_cores + - tags: + - core_id + - worker_id + - source + - fields: + - requests + - static_requests + - routed_requests + - offloaded_requests + - write_errors + - read_errors + - in_request + + +### Example Output: + +``` +uwsgi_overview,gid=0,uid=0,source=172.17.0.2,version=2.0.18 listen_queue=0i,listen_queue_errors=0i,load=0i,pid=1i,signal_queue=0i 1564441407000000000 +uwsgi_workers,source=172.17.0.2,worker_id=1 accepting=1i,avg_rt=0i,delta_request=0i,exceptions=0i,harakiri_count=0i,last_spawn=1564441202i,pid=6i,requests=0i,respawn_count=1i,rss=0i,running_time=0i,signal_queue=0i,signals=0i,status="idle",tx=0i,vsz=0i 1564441407000000000 +uwsgi_apps,app_id=0,worker_id=1,source=172.17.0.2 exceptions=0i,modifier1=0i,requests=0i,startup_time=0i 1564441407000000000 +uwsgi_cores,core_id=0,worker_id=1,source=172.17.0.2 in_request=0i,offloaded_requests=0i,read_errors=0i,requests=0i,routed_requests=0i,static_requests=0i,write_errors=0i 1564441407000000000 +``` + diff --git a/plugins/inputs/uwsgi/uwsgi.go b/plugins/inputs/uwsgi/uwsgi.go new file mode 100644 index 000000000..15a9bbe22 --- /dev/null +++ b/plugins/inputs/uwsgi/uwsgi.go @@ -0,0 +1,295 @@ +// Package uwsgi implements a telegraf plugin for collecting uwsgi stats from +// the uwsgi stats server. +package uwsgi + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "strconv" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Uwsgi server struct +type Uwsgi struct { + Servers []string `toml:"servers"` + Timeout internal.Duration `toml:"timeout"` + + client *http.Client +} + +// Description returns the plugin description +func (u *Uwsgi) Description() string { + return "Read uWSGI metrics." +} + +// SampleConfig returns the sample configuration +func (u *Uwsgi) SampleConfig() string { + return ` + ## List with urls of uWSGI Stats servers. URL must match pattern: + ## scheme://address[:port] + ## + ## For example: + ## servers = ["tcp://localhost:5050", "http://localhost:1717", "unix:///tmp/statsock"] + servers = ["tcp://127.0.0.1:1717"] + + ## General connection timout + # timeout = "5s" +` +} + +// Gather collect data from uWSGI Server +func (u *Uwsgi) Gather(acc telegraf.Accumulator) error { + if u.client == nil { + u.client = &http.Client{ + Timeout: u.Timeout.Duration, + } + } + wg := &sync.WaitGroup{} + + for _, s := range u.Servers { + wg.Add(1) + go func(s string) { + defer wg.Done() + n, err := url.Parse(s) + if err != nil { + acc.AddError(fmt.Errorf("could not parse uWSGI Stats Server url '%s': %s", s, err.Error())) + return + } + + if err := u.gatherServer(acc, n); err != nil { + acc.AddError(err) + return + } + }(s) + } + + wg.Wait() + + return nil +} + +func (u *Uwsgi) gatherServer(acc telegraf.Accumulator, url *url.URL) error { + var err error + var r io.ReadCloser + var s StatsServer + + switch url.Scheme { + case "tcp": + r, err = net.DialTimeout(url.Scheme, url.Host, u.Timeout.Duration) + if err != nil { + return err + } + s.source = url.Host + case "unix": + r, err = net.DialTimeout(url.Scheme, url.Host, u.Timeout.Duration) + if err != nil { + return err + } + s.source, err = os.Hostname() + if err != nil { + s.source = url.Host + } + case "http": + resp, err := u.client.Get(url.String()) + if err != nil { + return err + } + r = resp.Body + s.source = url.Host + default: + return fmt.Errorf("'%s' is not a supported scheme", url.Scheme) + } + + defer r.Close() + + if err := json.NewDecoder(r).Decode(&s); err != nil { + return fmt.Errorf("failed to decode json payload from '%s': %s", url.String(), err.Error()) + } + + u.gatherStatServer(acc, &s) + + return err +} + +func (u *Uwsgi) gatherStatServer(acc telegraf.Accumulator, s *StatsServer) { + fields := map[string]interface{}{ + "listen_queue": s.ListenQueue, + "listen_queue_errors": s.ListenQueueErrors, + "signal_queue": s.SignalQueue, + "load": s.Load, + "pid": s.PID, + } + + tags := map[string]string{ + "source": s.source, + "uid": strconv.Itoa(s.UID), + "gid": strconv.Itoa(s.GID), + "version": s.Version, + } + acc.AddFields("uwsgi_overview", fields, tags) + + u.gatherWorkers(acc, s) + u.gatherApps(acc, s) + u.gatherCores(acc, s) +} + +func (u *Uwsgi) gatherWorkers(acc telegraf.Accumulator, s *StatsServer) { + for _, w := range s.Workers { + fields := map[string]interface{}{ + "requests": w.Requests, + "accepting": w.Accepting, + "delta_request": w.DeltaRequests, + "exceptions": w.Exceptions, + "harakiri_count": w.HarakiriCount, + "pid": w.PID, + "signals": w.Signals, + "signal_queue": w.SignalQueue, + "status": w.Status, + "rss": w.Rss, + "vsz": w.Vsz, + "running_time": w.RunningTime, + "last_spawn": w.LastSpawn, + "respawn_count": w.RespawnCount, + "tx": w.Tx, + "avg_rt": w.AvgRt, + } + tags := map[string]string{ + "worker_id": strconv.Itoa(w.WorkerID), + "source": s.source, + } + + acc.AddFields("uwsgi_workers", fields, tags) + } +} + +func (u *Uwsgi) gatherApps(acc telegraf.Accumulator, s *StatsServer) { + for _, w := range s.Workers { + for _, a := range w.Apps { + fields := map[string]interface{}{ + "modifier1": a.Modifier1, + "requests": a.Requests, + "startup_time": a.StartupTime, + "exceptions": a.Exceptions, + } + tags := map[string]string{ + "app_id": strconv.Itoa(a.AppID), + "worker_id": strconv.Itoa(w.WorkerID), + "source": s.source, + } + acc.AddFields("uwsgi_apps", fields, tags) + } + } +} + +func (u *Uwsgi) gatherCores(acc telegraf.Accumulator, s *StatsServer) { + for _, w := range s.Workers { + for _, c := range w.Cores { + fields := map[string]interface{}{ + "requests": c.Requests, + "static_requests": c.StaticRequests, + "routed_requests": c.RoutedRequests, + "offloaded_requests": c.OffloadedRequests, + "write_errors": c.WriteErrors, + "read_errors": c.ReadErrors, + "in_request": c.InRequest, + } + tags := map[string]string{ + "core_id": strconv.Itoa(c.CoreID), + "worker_id": strconv.Itoa(w.WorkerID), + "source": s.source, + } + acc.AddFields("uwsgi_cores", fields, tags) + } + + } +} + +func init() { + inputs.Add("uwsgi", func() telegraf.Input { + return &Uwsgi{ + Timeout: internal.Duration{Duration: 5 * time.Second}, + } + }) +} + +// StatsServer defines the stats server structure. +type StatsServer struct { + // Tags + source string + PID int `json:"pid"` + UID int `json:"uid"` + GID int `json:"gid"` + Version string `json:"version"` + + // Fields + ListenQueue int `json:"listen_queue"` + ListenQueueErrors int `json:"listen_queue_errors"` + SignalQueue int `json:"signal_queue"` + Load int `json:"load"` + + Workers []*Worker `json:"workers"` +} + +// Worker defines the worker metric structure. +type Worker struct { + // Tags + WorkerID int `json:"id"` + PID int `json:"pid"` + + // Fields + Accepting int `json:"accepting"` + Requests int `json:"requests"` + DeltaRequests int `json:"delta_requests"` + Exceptions int `json:"exceptions"` + HarakiriCount int `json:"harakiri_count"` + Signals int `json:"signals"` + SignalQueue int `json:"signal_queue"` + Status string `json:"status"` + Rss int `json:"rss"` + Vsz int `json:"vsz"` + RunningTime int `json:"running_time"` + LastSpawn int `json:"last_spawn"` + RespawnCount int `json:"respawn_count"` + Tx int `json:"tx"` + AvgRt int `json:"avg_rt"` + + Apps []*App `json:"apps"` + Cores []*Core `json:"cores"` +} + +// App defines the app metric structure. +type App struct { + // Tags + AppID int `json:"id"` + + // Fields + Modifier1 int `json:"modifier1"` + Requests int `json:"requests"` + StartupTime int `json:"startup_time"` + Exceptions int `json:"exceptions"` +} + +// Core defines the core metric structure. +type Core struct { + // Tags + CoreID int `json:"id"` + + // Fields + Requests int `json:"requests"` + StaticRequests int `json:"static_requests"` + RoutedRequests int `json:"routed_requests"` + OffloadedRequests int `json:"offloaded_requests"` + WriteErrors int `json:"write_errors"` + ReadErrors int `json:"read_errors"` + InRequest int `json:"in_request"` +} diff --git a/plugins/inputs/uwsgi/uwsgi_test.go b/plugins/inputs/uwsgi/uwsgi_test.go new file mode 100644 index 000000000..34581791e --- /dev/null +++ b/plugins/inputs/uwsgi/uwsgi_test.go @@ -0,0 +1,185 @@ +package uwsgi_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/telegraf/plugins/inputs/uwsgi" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestBasic(t *testing.T) { + js := ` +{ + "version":"2.0.12", + "listen_queue":0, + "listen_queue_errors":0, + "signal_queue":0, + "load":0, + "pid":28372, + "uid":1000, + "gid":1000, + "cwd":"/opt/uwsgi", + "locks":[ + { + "user 0":0 + }, + { + "signal":0 + }, + { + "filemon":0 + }, + { + "timer":0 + }, + { + "rbtimer":0 + }, + { + "cron":0 + }, + { + "rpc":0 + }, + { + "snmp":0 + } + ], + "sockets":[ + { + "name":"127.0.0.1:47430", + "proto":"uwsgi", + "queue":0, + "max_queue":100, + "shared":0, + "can_offload":0 + } + ], + "workers":[ + { + "id":1, + "pid":28375, + "accepting":1, + "requests":0, + "delta_requests":0, + "exceptions":0, + "harakiri_count":0, + "signals":0, + "signal_queue":0, + "status":"idle", + "rss":0, + "vsz":0, + "running_time":0, + "last_spawn":1459942782, + "respawn_count":1, + "tx":0, + "avg_rt":0, + "apps":[ + { + "id":0, + "modifier1":0, + "mountpoint":"", + "startup_time":0, + "requests":0, + "exceptions":0, + "chdir":"" + } + ], + "cores":[ + { + "id":0, + "requests":0, + "static_requests":0, + "routed_requests":0, + "offloaded_requests":0, + "write_errors":0, + "read_errors":0, + "in_request":0, + "vars":[ + + ] + } + ] + } + ] +} +` + + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + + defer fakeServer.Close() + + plugin := &uwsgi.Uwsgi{ + Servers: []string{fakeServer.URL + "/"}, + } + var acc testutil.Accumulator + plugin.Gather(&acc) + require.Equal(t, 0, len(acc.Errors)) +} + +func TestInvalidJSON(t *testing.T) { + js := ` +{ + "version":"2.0.12", + "listen_queue":0, + "listen_queue_errors":0, + "signal_queue":0, + "load":0, + "pid:28372 + "uid":10 +} +` + + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + _, _ = w.Write([]byte(js)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + + defer fakeServer.Close() + + plugin := &uwsgi.Uwsgi{ + Servers: []string{fakeServer.URL + "/"}, + } + var acc testutil.Accumulator + plugin.Gather(&acc) + require.Equal(t, 1, len(acc.Errors)) +} + +func TestHttpError(t *testing.T) { + plugin := &uwsgi.Uwsgi{ + Servers: []string{"http://novalidurladress/"}, + } + var acc testutil.Accumulator + plugin.Gather(&acc) + require.Equal(t, 1, len(acc.Errors)) +} + +func TestTcpError(t *testing.T) { + plugin := &uwsgi.Uwsgi{ + Servers: []string{"tcp://novalidtcpadress/"}, + } + var acc testutil.Accumulator + plugin.Gather(&acc) + require.Equal(t, 1, len(acc.Errors)) +} + +func TestUnixSocketError(t *testing.T) { + plugin := &uwsgi.Uwsgi{ + Servers: []string{"unix:///novalidunixsocket"}, + } + var acc testutil.Accumulator + plugin.Gather(&acc) + require.Equal(t, 1, len(acc.Errors)) +}