diff --git a/.gitignore b/.gitignore index a127b89f7..1ba60d55c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ pkg/ tivan .vagrant +telegraf +telegraf.toml diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e9628e3fa..4958f62cd 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -99,11 +99,11 @@ servers = ["localhost"] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # or a simple string: # host=localhost user=pqotest password=... sslmode=... -# +# # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. -# +# address = "sslmode=disable" @@ -124,6 +124,16 @@ address = "sslmode=disable" # If no servers are specified, then localhost is used as the host. servers = ["localhost"] +# Read metrics from one or many rethinkdb servers +[rethinkdb] + +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie rethinkdb://user:auth_key@localhost, +# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. +servers = ["127.0.0.1:28015"] + # Read metrics about swap memory usage [swap] # no configuration diff --git a/plugins/all/all.go b/plugins/all/all.go index 71381aa87..5f99c3dd3 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,9 +1,11 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" + _ "github.com/influxdb/telegraf/plugins/rethinkdb" _ "github.com/influxdb/telegraf/plugins/system" ) diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go new file mode 100644 index 000000000..5e1b7fc5b --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -0,0 +1,153 @@ +package kafka_consumer + +import ( + "os" + "os/signal" + "time" + + "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/telegraf/plugins" + "github.com/wvanbergen/kafka/consumergroup" + "gopkg.in/Shopify/sarama.v1" +) + +type Kafka struct { + ConsumerGroupName string + Topic string + ZookeeperPeers []string + Consumer *consumergroup.ConsumerGroup + BatchSize int +} + +var sampleConfig = ` +# topic to consume +topic = "topic_with_metrics" + +# the name of the consumer group +consumerGroupName = "telegraf_metrics_consumers" + +# an array of Zookeeper connection strings +zookeeperPeers = ["localhost:2181"] + +# Batch size of points sent to InfluxDB +batchSize = 1000` + +func (k *Kafka) SampleConfig() string { + return sampleConfig +} + +func (k *Kafka) Description() string { + return "read metrics from a Kafka topic" +} + +type Metric struct { + Measurement string `json:"measurement"` + Values map[string]interface{} `json:"values"` + Tags map[string]string `json:"tags"` + Time time.Time `json:"time"` +} + +func (k *Kafka) Gather(acc plugins.Accumulator) error { + var consumerErr error + metricQueue := make(chan []byte, 200) + + if k.Consumer == nil { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroupName, + []string{k.Topic}, + k.ZookeeperPeers, + nil, + ) + + if consumerErr != nil { + return consumerErr + } + + c := make(chan os.Signal, 1) + halt := make(chan bool, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + halt <- true + emitMetrics(k, acc, metricQueue) + k.Consumer.Close() + }() + + go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + } + + return emitMetrics(k, acc, metricQueue) +} + +func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error { + timeout := time.After(1 * time.Second) + + for { + select { + case batch := <-metricConsumer: + var points []tsdb.Point + var err error + if points, err = tsdb.ParsePoints(batch); err != nil { + return err + } + + for _, point := range points { + acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + case <-timeout: + return nil + } + } +} + +const millisecond = 1000000 * time.Nanosecond + +type ack func(*sarama.ConsumerMessage) error + +func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { + batch := make([]byte, 0) + currentBatchSize := 0 + timeout := time.After(500 * millisecond) + var msg *sarama.ConsumerMessage + + for { + select { + case msg = <-kafkaMsgs: + if currentBatchSize != 0 { + batch = append(batch, '\n') + } + + batch = append(batch, msg.Value...) + currentBatchSize++ + + if currentBatchSize == maxBatchSize { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + case <-timeout: + if currentBatchSize != 0 { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + + timeout = time.After(500 * millisecond) + case <-halt: + if currentBatchSize != 0 { + metricProducer <- batch + ackMsg(msg) + } + + return + } + } +} + +func init() { + plugins.Add("kafka", func() plugins.Plugin { + return &Kafka{} + }) +} diff --git a/plugins/kafka_consumer/kafka_consumer_integration_test.go b/plugins/kafka_consumer/kafka_consumer_integration_test.go new file mode 100644 index 000000000..1541cb127 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_integration_test.go @@ -0,0 +1,62 @@ +package kafka_consumer + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadsMetricsFromKafka(t *testing.T) { + var zkPeers, brokerPeers []string + + if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 { + zkPeers = []string{"localhost:2181"} + } else { + zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",") + } + + if len(os.Getenv("KAFKA_PEERS")) == 0 { + brokerPeers = []string{"localhost:9092"} + } else { + brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",") + } + + k := &Kafka{ + ConsumerGroupName: "telegraf_test_consumers", + Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()), + ZookeeperPeers: zkPeers, + } + + msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + producer, err := sarama.NewSyncProducer(brokerPeers, nil) + require.NoError(t, err) + _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)}) + producer.Close() + + var acc testutil.Accumulator + + // Sanity check + assert.Equal(t, 0, len(acc.Points), "there should not be any points") + + err = k.Gather(&acc) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} diff --git a/plugins/kafka_consumer/kafka_consumer_test.go b/plugins/kafka_consumer/kafka_consumer_test.go new file mode 100644 index 000000000..fa6ad4a97 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_test.go @@ -0,0 +1,95 @@ +package kafka_consumer + +import ( + "strings" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/Shopify/sarama.v1" +) + +const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + +func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 10; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 3; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestEmitMetricsSendMetricsToAcc(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte, 1) + testChan <- []byte(testMsg) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} + +func TestEmitMetricsTimesOut(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 0, len(acc.Points), "there should not be a any points") +} + +func saramaMsg(val string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + } +} diff --git a/plugins/rethinkdb/rethinkdb.go b/plugins/rethinkdb/rethinkdb.go new file mode 100644 index 000000000..1c46a1f49 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb.go @@ -0,0 +1,92 @@ +package rethinkdb + +import ( + "fmt" + "net/url" + "sync" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type RethinkDB struct { + Servers []string +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, +# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. +servers = ["127.0.0.1:28015"]` + +func (r *RethinkDB) SampleConfig() string { + return sampleConfig +} + +func (r *RethinkDB) Description() string { + return "Read metrics from one or many RethinkDB servers" +} + +var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (r *RethinkDB) Gather(acc plugins.Accumulator) error { + if len(r.Servers) == 0 { + r.gatherServer(localhost, acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range r.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Host = serv + } + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = r.gatherServer(&Server{Url: u}, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error { + var err error + connectOpts := gorethink.ConnectOpts{ + Address: server.Url.Host, + DiscoverHosts: false, + } + if server.Url.User != nil { + pwd, set := server.Url.User.Password() + if set && pwd != "" { + connectOpts.AuthKey = pwd + } + } + server.session, err = gorethink.Connect(connectOpts) + if err != nil { + return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error()) + } + defer server.session.Close() + + return server.gatherData(acc) +} + +func init() { + plugins.Add("rethinkdb", func() plugins.Plugin { + return &RethinkDB{} + }) +} diff --git a/plugins/rethinkdb/rethinkdb_data.go b/plugins/rethinkdb/rethinkdb_data.go new file mode 100644 index 000000000..5fae28931 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data.go @@ -0,0 +1,110 @@ +package rethinkdb + +import ( + "reflect" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +type serverStatus struct { + Id string `gorethink:"id"` + Network struct { + Addresses []Address `gorethink:"canonical_addresses"` + Hostname string `gorethink:"hostname"` + DriverPort int `gorethink:"reql_port"` + } `gorethink:"network"` + Process struct { + Version string `gorethink:"version"` + RunningSince time.Time `gorethink:"time_started"` + } `gorethink:"process"` +} + +type Address struct { + Host string `gorethink:"host"` + Port int `gorethink:"port"` +} + +type stats struct { + Engine Engine `gorethink:"query_engine"` +} + +type Engine struct { + ClientConns int64 `gorethink:"client_connections,omitempty"` + ClientActive int64 `gorethink:"clients_active,omitempty"` + QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"` + TotalQueries int64 `gorethink:"queries_total,omitempty"` + ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"` + TotalReads int64 `gorethink:"read_docs_total,omitempty"` + WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"` + TotalWrites int64 `gorethink:"written_docs_total,omitempty"` +} + +type tableStatus struct { + Id string `gorethink:"id"` + DB string `gorethink:"db"` + Name string `gorethink:"name"` +} + +type tableStats struct { + Engine Engine `gorethink:"query_engine"` + Storage Storage `gorethink:"storage_engine"` +} + +type Storage struct { + Cache Cache `gorethink:"cache"` + Disk Disk `gorethink:"disk"` +} + +type Cache struct { + BytesInUse int64 `gorethink:"in_use_bytes"` +} + +type Disk struct { + ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"` + ReadBytesTotal int64 `gorethink:"read_bytes_total"` + WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"` + WriteBytesTotal int64 `gorethink:"written_bytes_total"` + SpaceUsage SpaceUsage `gorethink:"space_usage"` +} + +type SpaceUsage struct { + Data int64 `gorethink:"data_bytes"` + Garbage int64 `gorethink:"garbage_bytes"` + Metadata int64 `gorethink:"metadata_bytes"` + Prealloc int64 `gorethink:"preallocated_bytes"` +} + +var engineStats = map[string]string{ + "active_clients": "ClientActive", + "clients": "ClientConns", + "queries_per_sec": "QueriesPerSec", + "total_queries": "TotalQueries", + "read_docs_per_sec": "ReadsPerSec", + "total_reads": "TotalReads", + "written_docs_per_sec": "WritesPerSec", + "total_writes": "TotalWrites", +} + +func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) { + engine := reflect.ValueOf(e).Elem() + for _, key := range keys { + acc.Add( + key, + engine.FieldByName(engineStats[key]).Interface(), + tags, + ) + } +} + +func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) { + acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags) + acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags) + acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags) + acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags) + acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags) + acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags) + acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags) + acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags) + acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags) +} diff --git a/plugins/rethinkdb/rethinkdb_data_test.go b/plugins/rethinkdb/rethinkdb_data_test.go new file mode 100644 index 000000000..4c76b2340 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data_test.go @@ -0,0 +1,112 @@ +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +var tags = make(map[string]string) + +func TestAddEngineStats(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + TotalQueries: 0, + ReadsPerSec: 0, + TotalReads: 0, + WritesPerSec: 0, + TotalWrites: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddEngineStatsPartial(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + ReadsPerSec: 0, + WritesPerSec: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", + } + + missing_keys := []string{ + "total_queries", + "total_reads", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range missing_keys { + assert.False(t, acc.HasIntValue(metric)) + } +} + +func TestAddStorageStats(t *testing.T) { + storage := &Storage{ + Cache: Cache{ + BytesInUse: 0, + }, + Disk: Disk{ + ReadBytesPerSec: 0, + ReadBytesTotal: 0, + WriteBytesPerSec: 0, + WriteBytesTotal: 0, + SpaceUsage: SpaceUsage{ + Data: 0, + Garbage: 0, + Metadata: 0, + Prealloc: 0, + }, + }, + } + + var acc testutil.Accumulator + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + storage.AddStats(&acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_server.go b/plugins/rethinkdb/rethinkdb_server.go new file mode 100644 index 000000000..43551fe25 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server.go @@ -0,0 +1,193 @@ +package rethinkdb + +import ( + "errors" + "fmt" + "net" + "net/url" + "regexp" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type Server struct { + Url *url.URL + session *gorethink.Session + serverStatus serverStatus +} + +func (s *Server) gatherData(acc plugins.Accumulator) error { + if err := s.getServerStatus(); err != nil { + return fmt.Errorf("Failed to get server_status, %s\n", err) + } + + if err := s.validateVersion(); err != nil { + return fmt.Errorf("Failed version validation, %s\n", err.Error()) + } + + if err := s.addClusterStats(acc); err != nil { + fmt.Printf("error adding cluster stats, %s\n", err.Error()) + return fmt.Errorf("Error adding cluster stats, %s\n", err.Error()) + } + + if err := s.addMemberStats(acc); err != nil { + return fmt.Errorf("Error adding member stats, %s\n", err.Error()) + } + + if err := s.addTableStats(acc); err != nil { + return fmt.Errorf("Error adding table stats, %s\n", err.Error()) + } + + return nil +} + +func (s *Server) validateVersion() error { + if s.serverStatus.Process.Version == "" { + return errors.New("could not determine the RethinkDB server version: process.version key missing") + } + + versionRegexp := regexp.MustCompile("\\d.\\d.\\d") + versionString := versionRegexp.FindString(s.serverStatus.Process.Version) + if versionString == "" { + return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version) + } + + majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0]) + if err != nil || majorVersion < 2 { + return fmt.Errorf("unsupported major version %s\n", versionString) + } + return nil +} + +func (s *Server) getServerStatus() error { + cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session) + if err != nil { + return err + } + + if cursor.IsNil() { + return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table") + } + defer cursor.Close() + var serverStatuses []serverStatus + err = cursor.All(&serverStatuses) + if err != nil { + return errors.New("could not parse server_status results") + } + host, port, err := net.SplitHostPort(s.Url.Host) + if err != nil { + return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host) + } + driverPort, _ := strconv.Atoi(port) + for _, ss := range serverStatuses { + for _, address := range ss.Network.Addresses { + if address.Host == host && ss.Network.DriverPort == driverPort { + s.serverStatus = ss + return nil + } + } + } + + return fmt.Errorf("unable to determine host id from server_status with %s", s.Url.Host) +} + +func (s *Server) getDefaultTags() map[string]string { + tags := make(map[string]string) + tags["host"] = s.Url.Host + tags["hostname"] = s.serverStatus.Network.Hostname + return tags +} + +var ClusterTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", +} + +func (s *Server) addClusterStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) + if err != nil { + return fmt.Errorf("cluster stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var clusterStats stats + if err := cursor.One(&clusterStats); err != nil { + return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "cluster" + clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags) + return nil +} + +var MemberTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addMemberStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session) + if err != nil { + return fmt.Errorf("member stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var memberStats stats + if err := cursor.One(&memberStats); err != nil { + return fmt.Errorf("failure to parse member stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "member" + memberStats.Engine.AddEngineStats(MemberTracking, acc, tags) + return nil +} + +var TableTracking = []string{ + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addTableStats(acc plugins.Accumulator) error { + tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) + defer tablesCursor.Close() + var tables []tableStatus + err = tablesCursor.All(&tables) + if err != nil { + return errors.New("could not parse table_status results") + } + for _, table := range tables { + cursor, err := gorethink.DB("rethinkdb").Table("stats"). + Get([]string{"table_server", table.Id, s.serverStatus.Id}). + Run(s.session) + if err != nil { + return fmt.Errorf("table stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var ts tableStats + if err := cursor.One(&ts); err != nil { + return fmt.Errorf("failure to parse table stats, %s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "data" + tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name) + ts.Engine.AddEngineStats(TableTracking, acc, tags) + ts.Storage.AddStats(acc, tags) + } + return nil +} diff --git a/plugins/rethinkdb/rethinkdb_server_test.go b/plugins/rethinkdb/rethinkdb_server_test.go new file mode 100644 index 000000000..21ab0dbbd --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server_test.go @@ -0,0 +1,81 @@ +// +build integration + +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestValidateVersion(t *testing.T) { + err := server.validateVersion() + require.NoError(t, err) +} + +func TestGetDefaultTags(t *testing.T) { + var tagTests = []struct { + in string + out string + }{ + {"host", server.Url.Host}, + {"hostname", server.serverStatus.Network.Hostname}, + } + defaultTags := server.getDefaultTags() + for _, tt := range tagTests { + if defaultTags[tt.in] != tt.out { + t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in]) + } + } +} + +func TestAddClusterStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addClusterStats(&acc) + require.NoError(t, err) + + for _, metric := range ClusterTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddMemberStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addMemberStats(&acc) + require.NoError(t, err) + + for _, metric := range MemberTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddTableStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addTableStats(&acc) + require.NoError(t, err) + + for _, metric := range TableTracking { + assert.True(t, acc.HasIntValue(metric)) + } + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_test.go b/plugins/rethinkdb/rethinkdb_test.go new file mode 100644 index 000000000..85c747f42 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_test.go @@ -0,0 +1,59 @@ +// +build integration + +package rethinkdb + +import ( + "log" + "math/rand" + "net/url" + "os" + "testing" + "time" + + "gopkg.in/dancannon/gorethink.v1" +) + +var connect_url, authKey string +var server *Server + +func init() { + connect_url = os.Getenv("RETHINKDB_URL") + if connect_url == "" { + connect_url = "127.0.0.1:28015" + } + authKey = os.Getenv("RETHINKDB_AUTHKEY") + +} + +func testSetup(m *testing.M) { + var err error + server = &Server{Url: &url.URL{Host: connect_url}} + server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ + Address: server.Url.Host, + AuthKey: authKey, + DiscoverHosts: false, + }) + if err != nil { + log.Fatalln(err.Error()) + } + + err = server.getServerStatus() + if err != nil { + log.Fatalln(err.Error()) + } +} + +func testTeardown(m *testing.M) { + server.session.Close() +} + +func TestMain(m *testing.M) { + // seed randomness for use with tests + rand.Seed(time.Now().UTC().UnixNano()) + + testSetup(m) + res := m.Run() + testTeardown(m) + + os.Exit(res) +}