telegraf/plugins/inputs/rethinkdb/rethinkdb.go

112 lines
2.8 KiB
Go
Raw Permalink Normal View History

2015-07-04 20:09:33 +00:00
package rethinkdb
import (
"fmt"
"net/url"
"sync"
"github.com/influxdata/telegraf"
2016-01-20 18:57:35 +00:00
"github.com/influxdata/telegraf/plugins/inputs"
2015-07-04 20:09:33 +00:00
"gopkg.in/gorethink/gorethink.v3"
2015-07-04 20:09:33 +00:00
)
type RethinkDB struct {
Servers []string
}
var sampleConfig = `
## An array of URI to gather stats about. Specify an ip or hostname
## with optional port add password. ie,
## rethinkdb://user:auth_key@10.10.3.30:28105,
## rethinkdb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:28015"]
##
## If you use actual rethinkdb of > 2.3.0 with username/password authorization,
## protocol have to be named "rethinkdb2" - it will use 1_0 H.
# servers = ["rethinkdb2://username:password@127.0.0.1:28015"]
##
## If you use older versions of rethinkdb (<2.2) with auth_key, protocol
## have to be named "rethinkdb".
# servers = ["rethinkdb://username:auth_key@127.0.0.1:28015"]
2015-08-26 15:21:39 +00:00
`
2015-07-04 20:09:33 +00:00
func (r *RethinkDB) SampleConfig() string {
return sampleConfig
}
func (r *RethinkDB) Description() string {
return "Read metrics from one or many RethinkDB servers"
}
var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
2015-07-04 20:09:33 +00:00
if len(r.Servers) == 0 {
r.gatherServer(localhost, acc)
return nil
}
var wg sync.WaitGroup
for _, serv := range r.Servers {
u, err := url.Parse(serv)
if err != nil {
2017-04-24 18:13:26 +00:00
acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err))
continue
2015-07-04 20:09:33 +00:00
} else if u.Scheme == "" {
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
}
wg.Add(1)
go func(serv string) {
defer wg.Done()
2017-04-24 18:13:26 +00:00
acc.AddError(r.gatherServer(&Server{Url: u}, acc))
2015-07-04 20:09:33 +00:00
}(serv)
}
wg.Wait()
2017-04-24 18:13:26 +00:00
return nil
2015-07-04 20:09:33 +00:00
}
func (r *RethinkDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
2015-07-04 20:09:33 +00:00
var err error
connectOpts := gorethink.ConnectOpts{
Address: server.Url.Host,
DiscoverHosts: false,
}
if server.Url.User != nil {
pwd, set := server.Url.User.Password()
if set && pwd != "" {
connectOpts.AuthKey = pwd
connectOpts.HandshakeVersion = gorethink.HandshakeV0_4
2015-07-04 20:09:33 +00:00
}
}
if server.Url.Scheme == "rethinkdb2" && server.Url.User != nil {
pwd, set := server.Url.User.Password()
if set && pwd != "" {
connectOpts.Username = server.Url.User.Username()
connectOpts.Password = pwd
connectOpts.HandshakeVersion = gorethink.HandshakeV1_0
}
}
2015-07-04 20:09:33 +00:00
server.session, err = gorethink.Connect(connectOpts)
if err != nil {
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error())
}
defer server.session.Close()
return server.gatherData(acc)
}
func init() {
inputs.Add("rethinkdb", func() telegraf.Input {
2015-07-04 20:09:33 +00:00
return &RethinkDB{}
})
}