diff --git a/CHANGELOG.md b/CHANGELOG.md index 85730cafd..f7fcae123 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,17 +1,36 @@ +## v0.1.4 [2015-07-09] + +### Features +- [#56](https://github.com/influxdb/telegraf/pull/56): Update README for Kafka plugin. Thanks @EmilS! + +### Bugfixes +- [#50](https://github.com/influxdb/telegraf/pull/50): Fix init.sh script to use telegraf directory. Thanks @jseriff! +- [#52](https://github.com/influxdb/telegraf/pull/52): Update CHANGELOG to reference updated directory. Thanks @benfb! + +## v0.1.3 [2015-07-05] + +### Features +- [#35](https://github.com/influxdb/telegraf/pull/35): Add Kafka plugin. Thanks @EmilS! +- [#47](https://github.com/influxdb/telegraf/pull/47): Add RethinkDB plugin. Thanks @jipperinbham! + +### Bugfixes +- [#45](https://github.com/influxdb/telegraf/pull/45): Skip disk tags that don't have a value. Thanks @jhofeditz! +- [#43](https://github.com/influxdb/telegraf/pull/43): Fix bug in MySQL plugin. Thanks @marcosnils! + ## v0.1.2 [2015-07-01] ### Features -- [#12](https://github.com/influxdb/influxdb/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit! -- [#14](https://github.com/influxdb/influxdb/pull/14): Clarify the S3 buckets that Telegraf is pushed to. -- [#16](https://github.com/influxdb/influxdb/pull/16): Convert Redis to use URI, support Redis AUTH. Thanks @jipperinbham! -- [#21](https://github.com/influxdb/influxdb/pull/21): Add memcached plugiun. Thanks @Yukki! +- [#12](https://github.com/influxdb/telegraf/pull/12): Add Linux/ARM to the list of built binaries. Thanks @voxxit! +- [#14](https://github.com/influxdb/telegraf/pull/14): Clarify the S3 buckets that Telegraf is pushed to. +- [#16](https://github.com/influxdb/telegraf/pull/16): Convert Redis to use URI, support Redis AUTH. Thanks @jipperinbham! +- [#21](https://github.com/influxdb/telegraf/pull/21): Add memcached plugin. Thanks @Yukki! ### Bugfixes -- [#13](https://github.com/influxdb/influxdb/pull/13): Fix the packaging script. -- [#19](https://github.com/influxdb/influxdb/pull/19): Add host name to metric tags. Thanks @sherifzain! -- [#20](https://github.com/influxdb/influxdb/pull/20): Fix race condition with accumulator mutex. Thanks @nkatsaros! -- [#23](https://github.com/influxdb/influxdb/pull/23): Change name of folder for packages. Thanks @colinrymer! -- [#32](https://github.com/influxdb/influxdb/pull/32): Fix spelling of memoory -> memory. Thanks @tylernisonoff! +- [#13](https://github.com/influxdb/telegraf/pull/13): Fix the packaging script. +- [#19](https://github.com/influxdb/telegraf/pull/19): Add host name to metric tags. Thanks @sherifzain! +- [#20](https://github.com/influxdb/telegraf/pull/20): Fix race condition with accumulator mutex. Thanks @nkatsaros! +- [#23](https://github.com/influxdb/telegraf/pull/23): Change name of folder for packages. Thanks @colinrymer! +- [#32](https://github.com/influxdb/telegraf/pull/32): Fix spelling of memoory -> memory. Thanks @tylernisonoff! ## v0.1.1 [2015-06-19] diff --git a/README.md b/README.md index e55e59b5b..24a1b59e7 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ We'll eagerly accept pull requests for new plugins and will manage the set of pl ### Linux packages for Debian/Ubuntu and RHEL/CentOS: ``` -http://get.influxdb.org/telegraf/telegraf_0.1.2_amd64.deb -http://get.influxdb.org/telegraf/telegraf-0.1.2-1.x86_64.rpm +http://get.influxdb.org/telegraf/telegraf_0.1.4_amd64.deb +http://get.influxdb.org/telegraf/telegraf-0.1.4-1.x86_64.rpm ``` ### OSX via Homebrew: @@ -47,9 +47,11 @@ Telegraf currently has support for collecting metrics from: * System (memory, CPU, network, etc.) * Docker * MySQL +* Prometheus (client libraries and exporters) * PostgreSQL * Redis * RethinkDB +* Kafka * MongoDB We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. @@ -143,6 +145,7 @@ func Gather(acc plugins.Accumulator) error { ### Example ```go +package simple // simple.go @@ -171,7 +174,7 @@ func (s *Simple) Gather(acc plugins.Accumulator) error { } func init() { - plugins.Add("simple", func() plugins.Plugin { &Simple{} }) + plugins.Add("simple", func() plugins.Plugin { return &Simple{} }) } ``` diff --git a/package.sh b/package.sh index cea28db1e..c20e566cb 100755 --- a/package.sh +++ b/package.sh @@ -32,9 +32,9 @@ AWS_FILE=~/aws.conf -INSTALL_ROOT_DIR=/opt/influxdb -TELEGRAF_LOG_DIR=/var/log/influxdb -CONFIG_ROOT_DIR=/etc/opt/influxdb +INSTALL_ROOT_DIR=/opt/telegraf +TELEGRAF_LOG_DIR=/var/log/telegraf +CONFIG_ROOT_DIR=/etc/opt/telegraf SAMPLE_CONFIGURATION=etc/config.sample.toml INITD_SCRIPT=scripts/init.sh diff --git a/plugins/all/all.go b/plugins/all/all.go index 17a48b74e..4b2b4ed1c 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/mongodb" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" + _ "github.com/influxdb/telegraf/plugins/prometheus" _ "github.com/influxdb/telegraf/plugins/redis" _ "github.com/influxdb/telegraf/plugins/system" ) diff --git a/plugins/kafka_consumer/README.md b/plugins/kafka_consumer/README.md new file mode 100644 index 000000000..15e404215 --- /dev/null +++ b/plugins/kafka_consumer/README.md @@ -0,0 +1,24 @@ +# Kafka Consumer + +The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka +topic and adds messages to InfluxDB. The plugin assumes messages follow the +line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/consumergroup) +is used to talk to the Kafka cluster so multiple instances of telegraf can read +from the same topic in parallel. + +## Testing + +Running integration tests requires running Zookeeper & Kafka. The following +commands assume you're on OS X & using [boot2docker](http://boot2docker.io/). + +To start Kafka & Zookeeper: + +``` +docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip` --env ADVERTISED_PORT=9092 spotify/kafka +``` + +To run tests: + +``` +ZOOKEEPER_PEERS=$(boot2docker ip):2181 KAFKA_PEERS=$(boot2docker ip):9092 go test +``` diff --git a/plugins/mysql/mysql.go b/plugins/mysql/mysql.go index 15b1af8c9..2e9e2cd22 100644 --- a/plugins/mysql/mysql.go +++ b/plugins/mysql/mysql.go @@ -91,7 +91,7 @@ func (m *Mysql) gatherServer(serv string, acc plugins.Accumulator) error { rows, err := db.Query(`SHOW /*!50002 GLOBAL */ STATUS`) if err != nil { - return nil + return err } for rows.Next() { diff --git a/plugins/mysql/mysql_test.go b/plugins/mysql/mysql_test.go index 33643861a..76cb28b0d 100644 --- a/plugins/mysql/mysql_test.go +++ b/plugins/mysql/mysql_test.go @@ -39,7 +39,7 @@ func TestMysqlGeneratesMetrics(t *testing.T) { var count int for _, p := range acc.Points { - if strings.HasPrefix(p.Name, prefix.prefix) { + if strings.HasPrefix(p.Measurement, prefix.prefix) { count++ } } diff --git a/plugins/postgresql/postgresql_test.go b/plugins/postgresql/postgresql_test.go index b11200e9f..91776d619 100644 --- a/plugins/postgresql/postgresql_test.go +++ b/plugins/postgresql/postgresql_test.go @@ -91,7 +91,7 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { var found bool for _, pnt := range acc.Points { - if pnt.Name == "xact_commit" { + if pnt.Measurement == "xact_commit" { if pnt.Tags["db"] == "postgres" { found = true break diff --git a/plugins/prometheus/prometheus.go b/plugins/prometheus/prometheus.go new file mode 100644 index 000000000..4029e9932 --- /dev/null +++ b/plugins/prometheus/prometheus.go @@ -0,0 +1,105 @@ +package prometheus + +import ( + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins" + "github.com/prometheus/client_golang/extraction" + "github.com/prometheus/client_golang/model" +) + +type Prometheus struct { + Urls []string +} + +var sampleConfig = ` +# An array of urls to scrape metrics from. +urls = ["http://localhost:9100/metrics"]` + +func (r *Prometheus) SampleConfig() string { + return sampleConfig +} + +func (r *Prometheus) Description() string { + return "Read metrics from one or many prometheus clients" +} + +var ErrProtocolError = errors.New("prometheus protocol error") + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (g *Prometheus) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + var outerr error + + for _, serv := range g.Urls { + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = g.gatherURL(serv, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (g *Prometheus) gatherURL(url string, acc plugins.Accumulator) error { + resp, err := http.Get(url) + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", url, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) + } + processor, err := extraction.ProcessorForRequestHeader(resp.Header) + if err != nil { + return fmt.Errorf("error getting extractor for %s: %s", url, err) + } + + ingestor := &Ingester{ + acc: acc, + } + + options := &extraction.ProcessOptions{ + Timestamp: model.TimestampFromTime(time.Now()), + } + + err = processor.ProcessSingle(resp.Body, ingestor, options) + if err != nil { + return fmt.Errorf("error getting processing samples for %s: %s", url, err) + } + return nil +} + +type Ingester struct { + acc plugins.Accumulator +} + +// Ingest implements an extraction.Ingester. +func (i *Ingester) Ingest(samples model.Samples) error { + for _, sample := range samples { + tags := map[string]string{} + for key, value := range sample.Metric { + if key == model.MetricNameLabel { + continue + } + tags[string(key)] = string(value) + } + i.acc.Add(string(sample.Metric[model.MetricNameLabel]), float64(sample.Value), tags) + } + return nil +} + +func init() { + plugins.Add("prometheus", func() plugins.Plugin { + return &Prometheus{} + }) +} diff --git a/plugins/prometheus/prometheus_test.go b/plugins/prometheus/prometheus_test.go new file mode 100644 index 000000000..be2b190e7 --- /dev/null +++ b/plugins/prometheus/prometheus_test.go @@ -0,0 +1,56 @@ +package prometheus + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + + +const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 +go_gc_duration_seconds{quantile="0.25"} 0.000139108 +go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002 +go_gc_duration_seconds{quantile="0.75"} 0.000331463 +go_gc_duration_seconds{quantile="1"} 0.000667154 +go_gc_duration_seconds_sum 0.0018183950000000002 +go_gc_duration_seconds_count 7 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 15 +` + +func TestPrometheusGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, sampleTextFormat) + })) + defer ts.Close() + + p := &Prometheus{ + Urls: []string{ts.URL}, + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + expected := []struct { + name string + value float64 + tags map[string]string + }{ + {"go_gc_duration_seconds_count", 7, map[string]string{}}, + {"go_goroutines", 15, map[string]string{}}, + } + + for _, e := range expected { + assert.NoError(t, acc.ValidateValue(e.name, e.value)) + } +} diff --git a/plugins/rethinkdb/rethinkdb.go b/plugins/rethinkdb/rethinkdb.go new file mode 100644 index 000000000..1c46a1f49 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb.go @@ -0,0 +1,92 @@ +package rethinkdb + +import ( + "fmt" + "net/url" + "sync" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type RethinkDB struct { + Servers []string +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, +# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. +servers = ["127.0.0.1:28015"]` + +func (r *RethinkDB) SampleConfig() string { + return sampleConfig +} + +func (r *RethinkDB) Description() string { + return "Read metrics from one or many RethinkDB servers" +} + +var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (r *RethinkDB) Gather(acc plugins.Accumulator) error { + if len(r.Servers) == 0 { + r.gatherServer(localhost, acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range r.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Host = serv + } + wg.Add(1) + go func(serv string) { + defer wg.Done() + outerr = r.gatherServer(&Server{Url: u}, acc) + }(serv) + } + + wg.Wait() + + return outerr +} + +func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error { + var err error + connectOpts := gorethink.ConnectOpts{ + Address: server.Url.Host, + DiscoverHosts: false, + } + if server.Url.User != nil { + pwd, set := server.Url.User.Password() + if set && pwd != "" { + connectOpts.AuthKey = pwd + } + } + server.session, err = gorethink.Connect(connectOpts) + if err != nil { + return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error()) + } + defer server.session.Close() + + return server.gatherData(acc) +} + +func init() { + plugins.Add("rethinkdb", func() plugins.Plugin { + return &RethinkDB{} + }) +} diff --git a/plugins/rethinkdb/rethinkdb_data.go b/plugins/rethinkdb/rethinkdb_data.go new file mode 100644 index 000000000..5fae28931 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data.go @@ -0,0 +1,110 @@ +package rethinkdb + +import ( + "reflect" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +type serverStatus struct { + Id string `gorethink:"id"` + Network struct { + Addresses []Address `gorethink:"canonical_addresses"` + Hostname string `gorethink:"hostname"` + DriverPort int `gorethink:"reql_port"` + } `gorethink:"network"` + Process struct { + Version string `gorethink:"version"` + RunningSince time.Time `gorethink:"time_started"` + } `gorethink:"process"` +} + +type Address struct { + Host string `gorethink:"host"` + Port int `gorethink:"port"` +} + +type stats struct { + Engine Engine `gorethink:"query_engine"` +} + +type Engine struct { + ClientConns int64 `gorethink:"client_connections,omitempty"` + ClientActive int64 `gorethink:"clients_active,omitempty"` + QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"` + TotalQueries int64 `gorethink:"queries_total,omitempty"` + ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"` + TotalReads int64 `gorethink:"read_docs_total,omitempty"` + WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"` + TotalWrites int64 `gorethink:"written_docs_total,omitempty"` +} + +type tableStatus struct { + Id string `gorethink:"id"` + DB string `gorethink:"db"` + Name string `gorethink:"name"` +} + +type tableStats struct { + Engine Engine `gorethink:"query_engine"` + Storage Storage `gorethink:"storage_engine"` +} + +type Storage struct { + Cache Cache `gorethink:"cache"` + Disk Disk `gorethink:"disk"` +} + +type Cache struct { + BytesInUse int64 `gorethink:"in_use_bytes"` +} + +type Disk struct { + ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"` + ReadBytesTotal int64 `gorethink:"read_bytes_total"` + WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"` + WriteBytesTotal int64 `gorethink:"written_bytes_total"` + SpaceUsage SpaceUsage `gorethink:"space_usage"` +} + +type SpaceUsage struct { + Data int64 `gorethink:"data_bytes"` + Garbage int64 `gorethink:"garbage_bytes"` + Metadata int64 `gorethink:"metadata_bytes"` + Prealloc int64 `gorethink:"preallocated_bytes"` +} + +var engineStats = map[string]string{ + "active_clients": "ClientActive", + "clients": "ClientConns", + "queries_per_sec": "QueriesPerSec", + "total_queries": "TotalQueries", + "read_docs_per_sec": "ReadsPerSec", + "total_reads": "TotalReads", + "written_docs_per_sec": "WritesPerSec", + "total_writes": "TotalWrites", +} + +func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) { + engine := reflect.ValueOf(e).Elem() + for _, key := range keys { + acc.Add( + key, + engine.FieldByName(engineStats[key]).Interface(), + tags, + ) + } +} + +func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) { + acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags) + acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags) + acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags) + acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags) + acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags) + acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags) + acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags) + acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags) + acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags) +} diff --git a/plugins/rethinkdb/rethinkdb_data_test.go b/plugins/rethinkdb/rethinkdb_data_test.go new file mode 100644 index 000000000..4c76b2340 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_data_test.go @@ -0,0 +1,112 @@ +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +var tags = make(map[string]string) + +func TestAddEngineStats(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + TotalQueries: 0, + ReadsPerSec: 0, + TotalReads: 0, + WritesPerSec: 0, + TotalWrites: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddEngineStatsPartial(t *testing.T) { + engine := &Engine{ + ClientConns: 0, + ClientActive: 0, + QueriesPerSec: 0, + ReadsPerSec: 0, + WritesPerSec: 0, + } + + var acc testutil.Accumulator + + keys := []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", + } + + missing_keys := []string{ + "total_queries", + "total_reads", + "total_writes", + } + engine.AddEngineStats(keys, &acc, tags) + + for _, metric := range missing_keys { + assert.False(t, acc.HasIntValue(metric)) + } +} + +func TestAddStorageStats(t *testing.T) { + storage := &Storage{ + Cache: Cache{ + BytesInUse: 0, + }, + Disk: Disk{ + ReadBytesPerSec: 0, + ReadBytesTotal: 0, + WriteBytesPerSec: 0, + WriteBytesTotal: 0, + SpaceUsage: SpaceUsage{ + Data: 0, + Garbage: 0, + Metadata: 0, + Prealloc: 0, + }, + }, + } + + var acc testutil.Accumulator + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + storage.AddStats(&acc, tags) + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_server.go b/plugins/rethinkdb/rethinkdb_server.go new file mode 100644 index 000000000..43551fe25 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server.go @@ -0,0 +1,193 @@ +package rethinkdb + +import ( + "errors" + "fmt" + "net" + "net/url" + "regexp" + "strconv" + "strings" + + "github.com/influxdb/telegraf/plugins" + + "gopkg.in/dancannon/gorethink.v1" +) + +type Server struct { + Url *url.URL + session *gorethink.Session + serverStatus serverStatus +} + +func (s *Server) gatherData(acc plugins.Accumulator) error { + if err := s.getServerStatus(); err != nil { + return fmt.Errorf("Failed to get server_status, %s\n", err) + } + + if err := s.validateVersion(); err != nil { + return fmt.Errorf("Failed version validation, %s\n", err.Error()) + } + + if err := s.addClusterStats(acc); err != nil { + fmt.Printf("error adding cluster stats, %s\n", err.Error()) + return fmt.Errorf("Error adding cluster stats, %s\n", err.Error()) + } + + if err := s.addMemberStats(acc); err != nil { + return fmt.Errorf("Error adding member stats, %s\n", err.Error()) + } + + if err := s.addTableStats(acc); err != nil { + return fmt.Errorf("Error adding table stats, %s\n", err.Error()) + } + + return nil +} + +func (s *Server) validateVersion() error { + if s.serverStatus.Process.Version == "" { + return errors.New("could not determine the RethinkDB server version: process.version key missing") + } + + versionRegexp := regexp.MustCompile("\\d.\\d.\\d") + versionString := versionRegexp.FindString(s.serverStatus.Process.Version) + if versionString == "" { + return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version) + } + + majorVersion, err := strconv.Atoi(strings.Split(versionString, "")[0]) + if err != nil || majorVersion < 2 { + return fmt.Errorf("unsupported major version %s\n", versionString) + } + return nil +} + +func (s *Server) getServerStatus() error { + cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session) + if err != nil { + return err + } + + if cursor.IsNil() { + return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table") + } + defer cursor.Close() + var serverStatuses []serverStatus + err = cursor.All(&serverStatuses) + if err != nil { + return errors.New("could not parse server_status results") + } + host, port, err := net.SplitHostPort(s.Url.Host) + if err != nil { + return fmt.Errorf("unable to determine provided hostname from %s\n", s.Url.Host) + } + driverPort, _ := strconv.Atoi(port) + for _, ss := range serverStatuses { + for _, address := range ss.Network.Addresses { + if address.Host == host && ss.Network.DriverPort == driverPort { + s.serverStatus = ss + return nil + } + } + } + + return fmt.Errorf("unable to determine host id from server_status with %s", s.Url.Host) +} + +func (s *Server) getDefaultTags() map[string]string { + tags := make(map[string]string) + tags["host"] = s.Url.Host + tags["hostname"] = s.serverStatus.Network.Hostname + return tags +} + +var ClusterTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "read_docs_per_sec", + "written_docs_per_sec", +} + +func (s *Server) addClusterStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) + if err != nil { + return fmt.Errorf("cluster stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var clusterStats stats + if err := cursor.One(&clusterStats); err != nil { + return fmt.Errorf("failure to parse cluster stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "cluster" + clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags) + return nil +} + +var MemberTracking = []string{ + "active_clients", + "clients", + "queries_per_sec", + "total_queries", + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addMemberStats(acc plugins.Accumulator) error { + cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.Id}).Run(s.session) + if err != nil { + return fmt.Errorf("member stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var memberStats stats + if err := cursor.One(&memberStats); err != nil { + return fmt.Errorf("failure to parse member stats, $s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "member" + memberStats.Engine.AddEngineStats(MemberTracking, acc, tags) + return nil +} + +var TableTracking = []string{ + "read_docs_per_sec", + "total_reads", + "written_docs_per_sec", + "total_writes", +} + +func (s *Server) addTableStats(acc plugins.Accumulator) error { + tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) + defer tablesCursor.Close() + var tables []tableStatus + err = tablesCursor.All(&tables) + if err != nil { + return errors.New("could not parse table_status results") + } + for _, table := range tables { + cursor, err := gorethink.DB("rethinkdb").Table("stats"). + Get([]string{"table_server", table.Id, s.serverStatus.Id}). + Run(s.session) + if err != nil { + return fmt.Errorf("table stats query error, %s\n", err.Error()) + } + defer cursor.Close() + var ts tableStats + if err := cursor.One(&ts); err != nil { + return fmt.Errorf("failure to parse table stats, %s\n", err.Error()) + } + + tags := s.getDefaultTags() + tags["type"] = "data" + tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name) + ts.Engine.AddEngineStats(TableTracking, acc, tags) + ts.Storage.AddStats(acc, tags) + } + return nil +} diff --git a/plugins/rethinkdb/rethinkdb_server_test.go b/plugins/rethinkdb/rethinkdb_server_test.go new file mode 100644 index 000000000..21ab0dbbd --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_server_test.go @@ -0,0 +1,81 @@ +// +build integration + +package rethinkdb + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestValidateVersion(t *testing.T) { + err := server.validateVersion() + require.NoError(t, err) +} + +func TestGetDefaultTags(t *testing.T) { + var tagTests = []struct { + in string + out string + }{ + {"host", server.Url.Host}, + {"hostname", server.serverStatus.Network.Hostname}, + } + defaultTags := server.getDefaultTags() + for _, tt := range tagTests { + if defaultTags[tt.in] != tt.out { + t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in]) + } + } +} + +func TestAddClusterStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addClusterStats(&acc) + require.NoError(t, err) + + for _, metric := range ClusterTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddMemberStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addMemberStats(&acc) + require.NoError(t, err) + + for _, metric := range MemberTracking { + assert.True(t, acc.HasIntValue(metric)) + } +} + +func TestAddTableStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.addTableStats(&acc) + require.NoError(t, err) + + for _, metric := range TableTracking { + assert.True(t, acc.HasIntValue(metric)) + } + + keys := []string{ + "cache_bytes_in_use", + "disk_read_bytes_per_sec", + "disk_read_bytes_total", + "disk_written_bytes_per_sec", + "disk_written_bytes_total", + "disk_usage_data_bytes", + "disk_usage_garbage_bytes", + "disk_usage_metadata_bytes", + "disk_usage_preallocated_bytes", + } + + for _, metric := range keys { + assert.True(t, acc.HasIntValue(metric)) + } +} diff --git a/plugins/rethinkdb/rethinkdb_test.go b/plugins/rethinkdb/rethinkdb_test.go new file mode 100644 index 000000000..85c747f42 --- /dev/null +++ b/plugins/rethinkdb/rethinkdb_test.go @@ -0,0 +1,59 @@ +// +build integration + +package rethinkdb + +import ( + "log" + "math/rand" + "net/url" + "os" + "testing" + "time" + + "gopkg.in/dancannon/gorethink.v1" +) + +var connect_url, authKey string +var server *Server + +func init() { + connect_url = os.Getenv("RETHINKDB_URL") + if connect_url == "" { + connect_url = "127.0.0.1:28015" + } + authKey = os.Getenv("RETHINKDB_AUTHKEY") + +} + +func testSetup(m *testing.M) { + var err error + server = &Server{Url: &url.URL{Host: connect_url}} + server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ + Address: server.Url.Host, + AuthKey: authKey, + DiscoverHosts: false, + }) + if err != nil { + log.Fatalln(err.Error()) + } + + err = server.getServerStatus() + if err != nil { + log.Fatalln(err.Error()) + } +} + +func testTeardown(m *testing.M) { + server.session.Close() +} + +func TestMain(m *testing.M) { + // seed randomness for use with tests + rand.Seed(time.Now().UTC().UnixNano()) + + testSetup(m) + res := m.Run() + testTeardown(m) + + os.Exit(res) +} diff --git a/plugins/system/disk.go b/plugins/system/disk.go index cd5475962..1c5bdaef6 100644 --- a/plugins/system/disk.go +++ b/plugins/system/disk.go @@ -55,9 +55,12 @@ func (s *DiskIOStats) Gather(acc plugins.Accumulator) error { } for _, io := range diskio { - tags := map[string]string{ - "name": io.Name, - "serial": io.SerialNumber, + tags := map[string]string{} + if len(io.Name) != 0 { + tags["name"] = io.Name + } + if len(io.SerialNumber) != 0 { + tags["serial"] = io.SerialNumber } acc.Add("reads", io.ReadCount, tags) diff --git a/plugins/system/system_test.go b/plugins/system/system_test.go index b8e0e169c..1c40b7f97 100644 --- a/plugins/system/system_test.go +++ b/plugins/system/system_test.go @@ -272,7 +272,9 @@ func TestSystemStats_GenerateStats(t *testing.T) { require.NoError(t, err) dockertags := map[string]string{ - "id": "blah", + "name": "blah", + "id": "", + "command": "", } assert.True(t, acc.CheckTaggedValue("user", 3.1, dockertags)) diff --git a/scripts/init.sh b/scripts/init.sh index 3e5b239b0..b9339e407 100755 --- a/scripts/init.sh +++ b/scripts/init.sh @@ -42,7 +42,7 @@ if [ ! -f "$STDOUT" ]; then fi if [ -z "$STDERR" ]; then - STDERR=/var/log/influxdb/telegraf.log + STDERR=/var/log/telegraf/telegraf.log fi if [ ! -f "$STDERR" ]; then mkdir -p `dirname $STDERR` @@ -92,10 +92,10 @@ function log_success_msg() { name=telegraf # Daemon name, where is the actual executable -daemon=/opt/influxdb/telegraf +daemon=/opt/telegraf/telegraf # pid file for the daemon -pidfile=/var/run/influxdb/telegraf.pid +pidfile=/var/run/telegraf/telegraf.pid piddir=`dirname $pidfile` if [ ! -d "$piddir" ]; then @@ -104,7 +104,7 @@ if [ ! -d "$piddir" ]; then fi # Configuration file -config=/etc/opt/influxdb/telegraf.conf +config=/etc/opt/telegraf/telegraf.conf # If the daemon is not there, then exit. [ -x $daemon ] || exit 5 diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 27129e105..56657e711 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -2,6 +2,7 @@ package testutil import ( "fmt" + "reflect" "time" ) @@ -17,6 +18,9 @@ type Accumulator struct { } func (a *Accumulator) Add(measurement string, value interface{}, tags map[string]string) { + if tags == nil { + tags = map[string]string{} + } a.Points = append( a.Points, &Point{ @@ -69,30 +73,23 @@ func (a *Accumulator) CheckTaggedValue(measurement string, val interface{}, tags } func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, tags map[string]string) error { + if tags == nil { + tags = map[string]string{} + } for _, p := range a.Points { - var found bool - - if p.Tags == nil && tags == nil { - found = true - } else { - for k, v := range p.Tags { - if tags[k] == v { - found = true - break - } - } + if !reflect.DeepEqual(tags, p.Tags) { + continue } - if found && p.Measurement == measurement { + if p.Measurement == measurement { if p.Values["value"] != val { return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val) } - return nil } } - return fmt.Errorf("unknown value %s with tags %v", measurement, tags) + return fmt.Errorf("unknown measurement %s with tags %v", measurement, tags) } func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {