From 1943d89147f055dcd7d3f9eef9e38820b38f79f4 Mon Sep 17 00:00:00 2001 From: JP Date: Sat, 4 Jul 2015 15:09:33 -0500 Subject: [PATCH] add RethinkDB plugin --- plugins/rethinkdb/rethinkdb.go | 92 ++++++++++ plugins/rethinkdb/rethinkdb_data.go | 110 ++++++++++++ plugins/rethinkdb/rethinkdb_data_test.go | 112 ++++++++++++ plugins/rethinkdb/rethinkdb_server.go | 193 +++++++++++++++++++++ plugins/rethinkdb/rethinkdb_server_test.go | 81 +++++++++ plugins/rethinkdb/rethinkdb_test.go | 59 +++++++ 6 files changed, 647 insertions(+) create mode 100644 plugins/rethinkdb/rethinkdb.go create mode 100644 plugins/rethinkdb/rethinkdb_data.go create mode 100644 plugins/rethinkdb/rethinkdb_data_test.go create mode 100644 plugins/rethinkdb/rethinkdb_server.go create mode 100644 plugins/rethinkdb/rethinkdb_server_test.go create mode 100644 plugins/rethinkdb/rethinkdb_test.go diff --git a/plugins/rethinkdb/rethinkdb.go b/plugins/rethinkdb/rethinkdb.go new file mode 100644 index 000000000..1c46a1f49 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb.go @@ -0,0 +1,92 @@ +package rethinkdb + +import ( + "fmt" + "net/url" + "sync" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type RethinkDB struct { + Servers []string +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, +# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. +servers = ["127.0.0.1:28015"]` + +func (r *RethinkDB) SampleConfig() string { + return sampleConfig +} + +func (r *RethinkDB) Description() string { + return "Read metrics from one or many RethinkDB servers" +} + +var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (r *RethinkDB) Gather(acc plugins.Accumulator) error { + if len(r.Servers) == 0 { + r.gatherServer(localhost, acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range r.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Host = serv + } + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = r.gatherServer(&Server{Url: u}, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error { + var err error + connectOpts := gorethink.ConnectOpts{ + Address: server.Url.Host, + DiscoverHosts: false, + } + if server.Url.User != nil { + pwd, set := server.Url.User.Password() + if set && pwd != "" { + connectOpts.AuthKey = pwd + } + } + server.session, err = gorethink.Connect(connectOpts) + if err != nil { + return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error()) + } + defer server.session.Close() + + return server.gatherData(acc) +} + +func init() { + plugins.Add("rethinkdb", func() plugins.Plugin { + return &RethinkDB{} + }) +} diff --git a/plugins/rethinkdb/rethinkdb_data.go b/plugins/rethinkdb/rethinkdb_data.go new file mode 100644 index 000000000..5fae28931 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data.go @@ -0,0 +1,110 @@ +package rethinkdb + +import ( + "reflect" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +type serverStatus struct { + Id string `gorethink:"id"` + Network struct { + Addresses []Address `gorethink:"canonical_addresses"` + Hostname string `gorethink:"hostname"` + DriverPort int `gorethink:"reql_port"` + } `gorethink:"network"` + Process struct { + Version string `gorethink:"version"` + RunningSince time.Time `gorethink:"time_started"` + } `gorethink:"process"` +} + +type Address struct { + Host string `gorethink:"host"` + Port int `gorethink:"port"` +} + +type stats struct { + Engine Engine `gorethink:"query_engine"` +} + +type Engine struct { + ClientConns int64 `gorethink:"client_connections,omitempty"` + ClientActive int64 `gorethink:"clients_active,omitempty"` + QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"` + TotalQueries int64 `gorethink:"queries_total,omitempty"` + ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"` + TotalReads int64 `gorethink:"read_docs_total,omitempty"` + WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"` + TotalWrites int64 `gorethink:"written_docs_total,omitempty"` +} + +type tableStatus struct { + Id string `gorethink:"id"` + DB string `gorethink:"db"` + Name string `gorethink:"name"` +} + +type tableStats struct { + Engine Engine `gorethink:"query_engine"` + Storage Storage `gorethink:"storage_engine"` +} + +type Storage struct { + Cache Cache `gorethink:"cache"` + Disk Disk `gorethink:"disk"` +} + +type Cache struct { + BytesInUse int64 `gorethink:"in_use_bytes"` +} + +type Disk struct { + ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"` + ReadBytesTotal int64 `gorethink:"read_bytes_total"` + WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"` + WriteBytesTotal int64 `gorethink:"written_bytes_total"` + SpaceUsage SpaceUsage `gorethink:"space_usage"` +} + +type SpaceUsage struct { + Data int64 `gorethink:"data_bytes"` + Garbage int64 `gorethink:"garbage_bytes"` + Metadata int64 `gorethink:"metadata_bytes"` + Prealloc int64 `gorethink:"preallocated_bytes"` +} + +var engineStats = map[string]string{ + "active_clients": "ClientActive", + "clients": "ClientConns", + "queries_per_sec": "QueriesPerSec", + "total_queries": "TotalQueries", + "read_docs_per_sec": "ReadsPerSec", + "total_reads": "TotalReads", + "written_docs_per_sec": "WritesPerSec", + "total_writes": "TotalWrites", +} + +func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) { + engine := reflect.ValueOf(e).Elem() + for _, key := range keys { + acc.Add( + key, + engine.FieldByName(engineStats[key]).Interface(), + tags, + ) + } +} + +func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) { + acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags) + acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags) + acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags) + acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags) + acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags) + acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags) + acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags) + acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags) + acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags) +} diff --git a/plugins/rethinkdb/rethinkdb_data_test.go b/plugins/rethinkdb/rethinkdb_data_test.go new file mode 100644 index 000000000..4c76b2340 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data_test.go @@ -0,0 +1,112 @@ +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +var tags = make(map[string]string) + +func TestAddEngineStats(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + TotalQueries: 0, + ReadsPerSec: 0, + TotalReads: 0, + WritesPerSec: 0, + TotalWrites: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddEngineStatsPartial(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + ReadsPerSec: 0, + WritesPerSec: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", + } + + missing_keys := []string{ + "total_queries", + "total_reads", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range missing_keys { + assert.False(t, acc.HasIntValue(metric)) + } +} + +func TestAddStorageStats(t *testing.T) { + storage := &Storage{ + Cache: Cache{ + BytesInUse: 0, + }, + Disk: Disk{ + ReadBytesPerSec: 0, + ReadBytesTotal: 0, + WriteBytesPerSec: 0, + WriteBytesTotal: 0, + SpaceUsage: SpaceUsage{ + Data: 0, + Garbage: 0, + Metadata: 0, + Prealloc: 0, + }, + }, + } + + var acc testutil.Accumulator + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + storage.AddStats(&acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_server.go b/plugins/rethinkdb/rethinkdb_server.go new file mode 100644 index 000000000..43551fe25 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server.go @@ -0,0 +1,193 @@ +package rethinkdb + +import ( + "errors" + "fmt" + "net" + "net/url" + "regexp" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type Server struct { + Url *url.URL + session *gorethink.Session + serverStatus serverStatus +} + +func (s *Server) gatherData(acc plugins.Accumulator) error { + if err := s.getServerStatus(); err != nil { + return fmt.Errorf("Failed to get server_status, %s\n", err) + } + + if err := s.validateVersion(); err != nil { + return fmt.Errorf("Failed version validation, %s\n", err.Error()) + } + + if err := s.addClusterStats(acc); err != nil { + fmt.Printf("error adding cluster stats, %s\n", err.Error()) + return fmt.Errorf("Error adding cluster stats, %s\n", err.Error()) + } + + if err := s.addMemberStats(acc); err != nil { + return fmt.Errorf("Error adding member stats, %s\n", err.Error()) + } + + if err := s.addTableStats(acc); err != nil { + return fmt.Errorf("Error adding table stats, %s\n", err.Error()) + } + + return nil +} + +func (s *Server) validateVersion() error { + if s.serverStatus.Process.Version == "" { + return errors.New("could not determine the RethinkDB server version: process.version key missing") + } + + versionRegexp := regexp.MustCompile("\\d.\\d.\\d") + versionString := versionRegexp.FindString(s.serverStatus.Process.Version) + if versionString == "" { + return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version) + } + + majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0]) + if err != nil || majorVersion < 2 { + return fmt.Errorf("unsupported major version %s\n", versionString) + } + return nil +} + +func (s *Server) getServerStatus() error { + cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session) + if err != nil { + return err + } + + if cursor.IsNil() { + return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table") + } + defer cursor.Close() + var serverStatuses []serverStatus + err = cursor.All(&serverStatuses) + if err != nil { + return errors.New("could not parse server_status results") + } + host, port, err := net.SplitHostPort(s.Url.Host) + if err != nil { + return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host) + } + driverPort, _ := strconv.Atoi(port) + for _, ss := range serverStatuses { + for _, address := range ss.Network.Addresses { + if address.Host == host && ss.Network.DriverPort == driverPort { + s.serverStatus = ss + return nil + } + } + } + + return fmt.Errorf("unable to determine host id from server_status with %s", s.Url.Host) +} + +func (s *Server) getDefaultTags() map[string]string { + tags := make(map[string]string) + tags["host"] = s.Url.Host + tags["hostname"] = s.serverStatus.Network.Hostname + return tags +} + +var ClusterTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", +} + +func (s *Server) addClusterStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) + if err != nil { + return fmt.Errorf("cluster stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var clusterStats stats + if err := cursor.One(&clusterStats); err != nil { + return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "cluster" + clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags) + return nil +} + +var MemberTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addMemberStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session) + if err != nil { + return fmt.Errorf("member stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var memberStats stats + if err := cursor.One(&memberStats); err != nil { + return fmt.Errorf("failure to parse member stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "member" + memberStats.Engine.AddEngineStats(MemberTracking, acc, tags) + return nil +} + +var TableTracking = []string{ + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addTableStats(acc plugins.Accumulator) error { + tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) + defer tablesCursor.Close() + var tables []tableStatus + err = tablesCursor.All(&tables) + if err != nil { + return errors.New("could not parse table_status results") + } + for _, table := range tables { + cursor, err := gorethink.DB("rethinkdb").Table("stats"). + Get([]string{"table_server", table.Id, s.serverStatus.Id}). + Run(s.session) + if err != nil { + return fmt.Errorf("table stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var ts tableStats + if err := cursor.One(&ts); err != nil { + return fmt.Errorf("failure to parse table stats, %s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "data" + tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name) + ts.Engine.AddEngineStats(TableTracking, acc, tags) + ts.Storage.AddStats(acc, tags) + } + return nil +} diff --git a/plugins/rethinkdb/rethinkdb_server_test.go b/plugins/rethinkdb/rethinkdb_server_test.go new file mode 100644 index 000000000..21ab0dbbd --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server_test.go @@ -0,0 +1,81 @@ +// +build integration + +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestValidateVersion(t *testing.T) { + err := server.validateVersion() + require.NoError(t, err) +} + +func TestGetDefaultTags(t *testing.T) { + var tagTests = []struct { + in string + out string + }{ + {"host", server.Url.Host}, + {"hostname", server.serverStatus.Network.Hostname}, + } + defaultTags := server.getDefaultTags() + for _, tt := range tagTests { + if defaultTags[tt.in] != tt.out { + t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in]) + } + } +} + +func TestAddClusterStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addClusterStats(&acc) + require.NoError(t, err) + + for _, metric := range ClusterTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddMemberStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addMemberStats(&acc) + require.NoError(t, err) + + for _, metric := range MemberTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddTableStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addTableStats(&acc) + require.NoError(t, err) + + for _, metric := range TableTracking { + assert.True(t, acc.HasIntValue(metric)) + } + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_test.go b/plugins/rethinkdb/rethinkdb_test.go new file mode 100644 index 000000000..85c747f42 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_test.go @@ -0,0 +1,59 @@ +// +build integration + +package rethinkdb + +import ( + "log" + "math/rand" + "net/url" + "os" + "testing" + "time" + + "gopkg.in/dancannon/gorethink.v1" +) + +var connect_url, authKey string +var server *Server + +func init() { + connect_url = os.Getenv("RETHINKDB_URL") + if connect_url == "" { + connect_url = "127.0.0.1:28015" + } + authKey = os.Getenv("RETHINKDB_AUTHKEY") + +} + +func testSetup(m *testing.M) { + var err error + server = &Server{Url: &url.URL{Host: connect_url}} + server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ + Address: server.Url.Host, + AuthKey: authKey, + DiscoverHosts: false, + }) + if err != nil { + log.Fatalln(err.Error()) + } + + err = server.getServerStatus() + if err != nil { + log.Fatalln(err.Error()) + } +} + +func testTeardown(m *testing.M) { + server.session.Close() +} + +func TestMain(m *testing.M) { + // seed randomness for use with tests + rand.Seed(time.Now().UTC().UnixNano()) + + testSetup(m) + res := m.Run() + testTeardown(m) + + os.Exit(res) +}