diff --git a/README.md b/README.md index 21a3445ea..c38890350 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ Currently implemented sources: * prometheus * puppetagent * rabbitmq +* raindrops * redis * rethinkdb * sql server (microsoft) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 335d41a32..639afbe09 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -35,6 +35,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/prometheus" _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" + _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" diff --git a/plugins/inputs/raindrops/README.md b/plugins/inputs/raindrops/README.md new file mode 100644 index 000000000..8dc4e51a0 --- /dev/null +++ b/plugins/inputs/raindrops/README.md @@ -0,0 +1,15 @@ +# Raindrops Input Plugin + +The [raindrops](http://raindrops.bogomips.org/) plugin reads from +specified raindops middleware URI and adds stats to InfluxDB. +### Configuration: + +```toml +# Read raindrops stats +[[inputs.raindrops]] + urls = ["http://localhost/_raindrops"] +``` + +### Tags: + +- Multiple listeners are tagged with IP:Port/Socket, ie `0.0.0.0:8080` or `/tmp/unicorn` diff --git a/plugins/inputs/raindrops/raindrops.go b/plugins/inputs/raindrops/raindrops.go new file mode 100644 index 000000000..00c711cb2 --- /dev/null +++ b/plugins/inputs/raindrops/raindrops.go @@ -0,0 +1,185 @@ +package raindrops + +import ( + "bufio" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Raindrops struct { + Urls []string + http_client *http.Client +} + +var sampleConfig = ` + ### An array of raindrops middleware URI to gather stats. + urls = ["http://localhost/_raindrops"] +` + +func (r *Raindrops) SampleConfig() string { + return sampleConfig +} + +func (r *Raindrops) Description() string { + return "Read raindrops stats (raindrops - real-time stats for preforking Rack servers)" +} + +func (r *Raindrops) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + var outerr error + + for _, u := range r.Urls { + addr, err := url.Parse(u) + if err != nil { + return fmt.Errorf("Unable to parse address '%s': %s", u, err) + } + + wg.Add(1) + go func(addr *url.URL) { + defer wg.Done() + outerr = r.gatherUrl(addr, acc) + }(addr) + } + + wg.Wait() + + return outerr +} + +func (r *Raindrops) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { + resp, err := r.http_client.Get(addr.String()) + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status) + } + buf := bufio.NewReader(resp.Body) + + // Calling + _, err = buf.ReadString(':') + if err != nil { + return err + } + line, err := buf.ReadString('\n') + if err != nil { + return err + } + calling, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64) + if err != nil { + return err + } + + // Writing + _, err = buf.ReadString(':') + if err != nil { + return err + } + line, err = buf.ReadString('\n') + if err != nil { + return err + } + writing, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64) + if err != nil { + return err + } + tags := r.getTags(addr) + fields := map[string]interface{}{ + "calling": calling, + "writing": writing, + } + acc.AddFields("raindrops", fields, tags) + + iterate := true + var queued_line_str string + var active_line_str string + var active_err error + var queued_err error + + for iterate { + // Listen + var tags map[string]string + + lis := map[string]interface{}{ + "active": 0, + "queued": 0, + } + active_line_str, active_err = buf.ReadString('\n') + if active_err != nil { + iterate = false + break + } + if strings.Compare(active_line_str, "\n") == 0{ + break + } + queued_line_str, queued_err = buf.ReadString('\n') + if queued_err != nil { + iterate = false + } + active_line := strings.Split(active_line_str, " ") + listen_name := active_line[0] + + active, err := strconv.ParseUint(strings.TrimSpace(active_line[2]), 10, 64) + if err != nil { + active = 0 + } + lis["active"] = active + + queued_line := strings.Split(queued_line_str, " ") + queued, err := strconv.ParseUint(strings.TrimSpace(queued_line[2]), 10, 64) + if err != nil { + queued = 0 + } + lis["queued"] = queued + if strings.Contains(listen_name, ":") { + listener := strings.Split(listen_name, ":") + tags = map[string]string{ + "ip": listener[0], + "port": listener[1], + } + + } else { + tags = map[string]string{ + "socket": listen_name, + } + } + fmt.Println("raindropssock", lis, tags) + acc.AddFields("raindropssock", lis, tags) + } + return nil +} + +// Get tag(s) for the raindrops calling/writing plugin +func (r *Raindrops) getTags(addr *url.URL) map[string]string { + h := addr.Host + host, port, err := net.SplitHostPort(h) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + return map[string]string{"server": host, "port": port} +} + +func init() { + inputs.Add("raindrops", func() telegraf.Input { + return &Raindrops{http_client: &http.Client{Transport: &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + }}} + }) +} diff --git a/plugins/inputs/raindrops/raindrops_test.go b/plugins/inputs/raindrops/raindrops_test.go new file mode 100644 index 000000000..d4767b88a --- /dev/null +++ b/plugins/inputs/raindrops/raindrops_test.go @@ -0,0 +1,108 @@ +package raindrops + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "time" +) + +const sampleResponse = ` +calling: 100 +writing: 200 +0.0.0.0:8080 active: 1 +0.0.0.0:8080 queued: 2 +0.0.0.0:8081 active: 3 +0.0.0.0:8081 queued: 4 +127.0.0.1:8082 active: 5 +127.0.0.1:8082 queued: 6 +0.0.0.0:8083 active: 7 +0.0.0.0:8083 queued: 8 +0.0.0.0:8084 active: 9 +0.0.0.0:8084 queued: 10 +0.0.0.0:3000 active: 11 +0.0.0.0:3000 queued: 12 +/tmp/listen.me active: 13 +/tmp/listen.me queued: 14 +` + +// Verify that raindrops tags are properly parsed based on the server +func TestRaindropsTags(t *testing.T) { + urls := []string{"http://localhost/_raindrops", "http://localhost:80/_raindrops"} + var addr *url.URL + r := &Raindrops{} + for _, url1 := range urls { + addr, _ = url.Parse(url1) + tagMap := r.getTags(addr) + assert.Contains(t, tagMap["server"], "localhost") + } +} + +func TestRaindropsGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + + if r.URL.Path == "/_raindrops" { + rsp = sampleResponse + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &Raindrops{ + Urls: []string{fmt.Sprintf("%s/_raindrops", ts.URL)}, + http_client: &http.Client{Transport: &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + }}, + } + + var acc testutil.Accumulator + + err := n.Gather(&acc) + require.NoError(t, err) + + fields := map[string]interface{}{ + "calling": uint64(100), + "writing": uint64(200), + } + addr, err := url.Parse(ts.URL) + if err != nil { + panic(err) + } + + host, port, err := net.SplitHostPort(addr.Host) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + + tags := map[string]string{"server": host, "port": port} + acc.AssertContainsTaggedFields(t, "raindrops", fields, tags) + + tags = map[string]string{ + "port": "8081", + "ip": "0.0.0.0", + } + fields = map[string]interface {} { + "active": uint64(3), + "queued": uint64(4), + } + fmt.Println("raindropssock_test", fields, tags) + acc.AssertContainsTaggedFields(t, "raindropssock", fields, tags) +}