diff --git a/README.md b/README.md index 711903f51..e55e59b5b 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ Telegraf currently has support for collecting metrics from: * MySQL * PostgreSQL * Redis +* RethinkDB +* MongoDB We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e9628e3fa..38cfeba68 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -99,11 +99,11 @@ servers = ["localhost"] # postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full] # or a simple string: # host=localhost user=pqotest password=... sslmode=... -# +# # All connection parameters are optional. By default, the host is localhost # and the user is the currently running user. For localhost, we default # to sslmode=disable as well. -# +# address = "sslmode=disable" @@ -124,6 +124,14 @@ address = "sslmode=disable" # If no servers are specified, then localhost is used as the host. servers = ["localhost"] +[mongodb] +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017, +# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port. +servers = ["127.0.0.1:27017"] + # Read metrics about swap memory usage [swap] # no configuration diff --git a/plugins/all/all.go b/plugins/all/all.go index 466a7166d..17a48b74e 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" + _ "github.com/influxdb/telegraf/plugins/mongodb" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" diff --git a/plugins/mongodb/mongodb.go b/plugins/mongodb/mongodb.go new file mode 100644 index 000000000..1b7e5c3a1 --- /dev/null +++ b/plugins/mongodb/mongodb.go @@ -0,0 +1,110 @@ +package mongodb + +import ( + "fmt" + "net/url" + "sync" + "time" + + "github.com/influxdb/telegraf/plugins" + "gopkg.in/mgo.v2" +) + +type MongoDB struct { + Servers []string + mongos map[string]*Server +} + +var sampleConfig = ` +# An array of URI to gather stats about. Specify an ip or hostname +# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017, +# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc. +# +# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port. +servers = ["127.0.0.1:27017"]` + +func (m *MongoDB) SampleConfig() string { + return sampleConfig +} + +func (*MongoDB) Description() string { + return "Read metrics from one or many MongoDB servers" +} + +var localhost = &url.URL{Host: "127.0.0.1:27017"} + +// Reads stats from all configured servers accumulates stats. +// Returns one of the errors encountered while gather stats (if any). +func (m *MongoDB) Gather(acc plugins.Accumulator) error { + if len(m.Servers) == 0 { + m.gatherServer(m.getMongoServer(localhost), acc) + return nil + } + + var wg sync.WaitGroup + + var outerr error + + for _, serv := range m.Servers { + u, err := url.Parse(serv) + if err != nil { + return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + } else if u.Scheme == "" { + u.Scheme = "mongodb" + // fallback to simple string based address (i.e. "10.0.0.1:10000") + u.Host = serv + if u.Path == u.Host { + u.Path = "" + } + } + wg.Add(1) + go func() { + defer wg.Done() + outerr = m.gatherServer(m.getMongoServer(u), acc) + }() + } + + wg.Wait() + + return outerr +} + +func (m *MongoDB) getMongoServer(url *url.URL) *Server { + if _, ok := m.mongos[url.Host]; !ok { + m.mongos[url.Host] = &Server{ + Url: url, + } + } + return m.mongos[url.Host] +} + +func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error { + if server.Session == nil { + var dialAddrs []string + if server.Url.User != nil { + dialAddrs = []string{server.Url.String()} + } else { + dialAddrs = []string{server.Url.Host} + } + dialInfo, err := mgo.ParseURL(dialAddrs[0]) + if err != nil { + return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error()) + } + dialInfo.Direct = true + dialInfo.Timeout = time.Duration(10) * time.Second + sess, err := mgo.DialWithInfo(dialInfo) + if err != nil { + return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error()) + } + server.Session = sess + } + return server.gatherData(acc) +} + +func init() { + plugins.Add("mongodb", func() plugins.Plugin { + return &MongoDB{ + mongos: make(map[string]*Server), + } + }) +} diff --git a/plugins/mongodb/mongodb_data.go b/plugins/mongodb/mongodb_data.go new file mode 100644 index 000000000..ba6cc8d95 --- /dev/null +++ b/plugins/mongodb/mongodb_data.go @@ -0,0 +1,100 @@ +package mongodb + +import ( + "fmt" + "reflect" + "strconv" + + "github.com/influxdb/telegraf/plugins" +) + +type MongodbData struct { + StatLine *StatLine + Tags map[string]string +} + +func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { + if statLine.NodeType != "" && statLine.NodeType != "UNK" { + tags["state"] = statLine.NodeType + } + return &MongodbData{ + StatLine: statLine, + Tags: tags, + } +} + +var DefaultStats = map[string]string{ + "inserts_per_sec": "Insert", + "queries_per_sec": "Query", + "updates_per_sec": "Update", + "deletes_per_sec": "Delete", + "getmores_per_sec": "GetMore", + "commands_per_sec": "Command", + "flushes_per_sec": "Flushes", + "vsize_megabytes": "Virtual", + "resident_megabytes": "Resident", + "queued_reads": "QueuedReaders", + "queued_writes": "QueuedWriters", + "active_reads": "ActiveReaders", + "active_writes": "ActiveWriters", + "net_in_bytes": "NetIn", + "net_out_bytes": "NetOut", + "open_connections": "NumConnections", +} + +var DefaultReplStats = map[string]string{ + "repl_inserts_per_sec": "InsertR", + "repl_queries_per_sec": "QueryR", + "repl_updates_per_sec": "UpdateR", + "repl_deletes_per_sec": "DeleteR", + "repl_getmores_per_sec": "GetMoreR", + "repl_commands_per_sec": "CommandR", + "member_status": "NodeType", +} + +var MmapStats = map[string]string{ + "mapped_megabytes": "Mapped", + "non-mapped_megabytes": "NonMapped", + "page_faults_per_sec": "Faults", +} + +var WiredTigerStats = map[string]string{ + "percent_cache_dirty": "CacheDirtyPercent", + "percent_cache_used": "CacheUsedPercent", +} + +func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) { + statLine := reflect.ValueOf(d.StatLine).Elem() + d.addStat(acc, statLine, DefaultStats) + if d.StatLine.NodeType != "" { + d.addStat(acc, statLine, DefaultReplStats) + } + if d.StatLine.StorageEngine == "mmapv1" { + d.addStat(acc, statLine, MmapStats) + } else if d.StatLine.StorageEngine == "wiredTiger" { + for key, value := range WiredTigerStats { + val := statLine.FieldByName(value).Interface() + percentVal := fmt.Sprintf("%.1f", val.(float64)*100) + floatVal, _ := strconv.ParseFloat(percentVal, 64) + d.add(acc, key, floatVal) + } + } +} + +func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) { + for key, value := range stats { + val := statLine.FieldByName(value).Interface() + d.add(acc, key, val) + } +} + +func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { + acc.AddValuesWithTime( + key, + map[string]interface{}{ + "value": val, + }, + d.Tags, + d.StatLine.Time, + ) +} diff --git a/plugins/mongodb/mongodb_data_test.go b/plugins/mongodb/mongodb_data_test.go new file mode 100644 index 000000000..9ee3f9f48 --- /dev/null +++ b/plugins/mongodb/mongodb_data_test.go @@ -0,0 +1,111 @@ +package mongodb + +import ( + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var tags = make(map[string]string) + +func TestAddNonReplStats(t *testing.T) { + d := NewMongodbData( + &StatLine{ + StorageEngine: "", + Time: time.Now(), + Insert: 0, + Query: 0, + Update: 0, + Delete: 0, + GetMore: 0, + Command: 0, + Flushes: 0, + Virtual: 0, + Resident: 0, + QueuedReaders: 0, + QueuedWriters: 0, + ActiveReaders: 0, + ActiveWriters: 0, + NetIn: 0, + NetOut: 0, + NumConnections: 0, + }, + tags, + ) + var acc testutil.Accumulator + + d.AddDefaultStats(&acc) + + for key, _ := range DefaultStats { + assert.True(t, acc.HasIntValue(key)) + } +} + +func TestAddReplStats(t *testing.T) { + d := NewMongodbData( + &StatLine{ + StorageEngine: "mmapv1", + Mapped: 0, + NonMapped: 0, + Faults: 0, + }, + tags, + ) + + var acc testutil.Accumulator + + d.AddDefaultStats(&acc) + + for key, _ := range MmapStats { + assert.True(t, acc.HasIntValue(key)) + } +} + +func TestAddWiredTigerStats(t *testing.T) { + d := NewMongodbData( + &StatLine{ + StorageEngine: "wiredTiger", + CacheDirtyPercent: 0, + CacheUsedPercent: 0, + }, + tags, + ) + + var acc testutil.Accumulator + + d.AddDefaultStats(&acc) + + for key, _ := range WiredTigerStats { + assert.True(t, acc.HasFloatValue(key)) + } +} + +func TestStateTag(t *testing.T) { + d := NewMongodbData( + &StatLine{ + StorageEngine: "", + Time: time.Now(), + Insert: 0, + Query: 0, + NodeType: "PRI", + }, + tags, + ) + + stats := []string{"inserts_per_sec", "queries_per_sec"} + + stateTags := make(map[string]string) + stateTags["state"] = "PRI" + + var acc testutil.Accumulator + + d.AddDefaultStats(&acc) + + for _, key := range stats { + err := acc.ValidateTaggedValue(key, int64(0), stateTags) + require.NoError(t, err) + } +} diff --git a/plugins/mongodb/mongodb_server.go b/plugins/mongodb/mongodb_server.go new file mode 100644 index 000000000..f5283105d --- /dev/null +++ b/plugins/mongodb/mongodb_server.go @@ -0,0 +1,50 @@ +package mongodb + +import ( + "net/url" + "time" + + "github.com/influxdb/telegraf/plugins" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +type Server struct { + Url *url.URL + Session *mgo.Session + lastResult *ServerStatus +} + +func (s *Server) getDefaultTags() map[string]string { + tags := make(map[string]string) + tags["host"] = s.Url.Host + return tags +} + +func (s *Server) gatherData(acc plugins.Accumulator) error { + s.Session.SetMode(mgo.Eventual, true) + s.Session.SetSocketTimeout(0) + result := &ServerStatus{} + err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result) + if err != nil { + return err + } + defer func() { + s.lastResult = result + }() + + result.SampleTime = time.Now() + if s.lastResult != nil && result != nil { + duration := result.SampleTime.Sub(s.lastResult.SampleTime) + durationInSeconds := int64(duration.Seconds()) + if durationInSeconds == 0 { + durationInSeconds = 1 + } + data := NewMongodbData( + NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds), + s.getDefaultTags(), + ) + data.AddDefaultStats(acc) + } + return nil +} diff --git a/plugins/mongodb/mongodb_server_test.go b/plugins/mongodb/mongodb_server_test.go new file mode 100644 index 000000000..07058aa82 --- /dev/null +++ b/plugins/mongodb/mongodb_server_test.go @@ -0,0 +1,43 @@ +// +build integration + +package mongodb + +import ( + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetDefaultTags(t *testing.T) { + var tagTests = []struct { + in string + out string + }{ + {"host", server.Url.Host}, + } + defaultTags := server.getDefaultTags() + for _, tt := range tagTests { + if defaultTags[tt.in] != tt.out { + t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in]) + } + } +} + +func TestAddDefaultStats(t *testing.T) { + var acc testutil.Accumulator + + err := server.gatherData(&acc) + require.NoError(t, err) + + time.Sleep(time.Duration(1) * time.Second) + // need to call this twice so it can perform the diff + err = server.gatherData(&acc) + require.NoError(t, err) + + for key, _ := range DefaultStats { + assert.True(t, acc.HasIntValue(key)) + } +} diff --git a/plugins/mongodb/mongodb_test.go b/plugins/mongodb/mongodb_test.go new file mode 100644 index 000000000..174128d19 --- /dev/null +++ b/plugins/mongodb/mongodb_test.go @@ -0,0 +1,71 @@ +// +build integration + +package mongodb + +import ( + "log" + "math/rand" + "net/url" + "os" + "testing" + "time" + + "gopkg.in/mgo.v2" +) + +var connect_url string +var server *Server + +func init() { + connect_url = os.Getenv("MONGODB_URL") + if connect_url == "" { + connect_url = "127.0.0.1:27017" + server = &Server{Url: &url.URL{Host: connect_url}} + } else { + full_url, err := url.Parse(connect_url) + if err != nil { + log.Fatalf("Unable to parse URL (%s), %s\n", full_url, err.Error()) + } + server = &Server{Url: full_url} + } +} + +func testSetup(m *testing.M) { + var err error + var dialAddrs []string + if server.Url.User != nil { + dialAddrs = []string{server.Url.String()} + } else { + dialAddrs = []string{server.Url.Host} + } + dialInfo, err := mgo.ParseURL(dialAddrs[0]) + if err != nil { + log.Fatalf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error()) + } + dialInfo.Direct = true + dialInfo.Timeout = time.Duration(10) * time.Second + sess, err := mgo.DialWithInfo(dialInfo) + if err != nil { + log.Fatalf("Unable to connect to MongoDB, %s\n", err.Error()) + } + server.Session = sess + server.Session, _ = mgo.Dial(server.Url.Host) + if err != nil { + log.Fatalln(err.Error()) + } +} + +func testTeardown(m *testing.M) { + server.Session.Close() +} + +func TestMain(m *testing.M) { + // seed randomness for use with tests + rand.Seed(time.Now().UTC().UnixNano()) + + testSetup(m) + res := m.Run() + testTeardown(m) + + os.Exit(res) +} diff --git a/plugins/mongodb/mongostat.go b/plugins/mongodb/mongostat.go new file mode 100644 index 000000000..ae39a3e3a --- /dev/null +++ b/plugins/mongodb/mongostat.go @@ -0,0 +1,549 @@ +/*** +The code contained here came from https://github.com/mongodb/mongo-tools/blob/master/mongostat/stat_types.go +and contains modifications so that no other dependency from that project is needed. Other modifications included +removing uneccessary code specific to formatting the output and determine the current state of the database. It +is licensed under Apache Version 2.0, http://www.apache.org/licenses/LICENSE-2.0.html +***/ + +package mongodb + +import ( + "sort" + "strings" + "time" +) + +const ( + MongosProcess = "mongos" +) + +// Flags to determine cases when to activate/deactivate columns for output. +const ( + Always = 1 << iota // always activate the column + Discover // only active when mongostat is in discover mode + Repl // only active if one of the nodes being monitored is in a replset + Locks // only active if node is capable of calculating lock info + AllOnly // only active if mongostat was run with --all option + MMAPOnly // only active if node has mmap-specific fields + WTOnly // only active if node has wiredtiger-specific fields +) + +type ServerStatus struct { + SampleTime time.Time `bson:""` + Host string `bson:"host"` + Version string `bson:"version"` + Process string `bson:"process"` + Pid int64 `bson:"pid"` + Uptime int64 `bson:"uptime"` + UptimeMillis int64 `bson:"uptimeMillis"` + UptimeEstimate int64 `bson:"uptimeEstimate"` + LocalTime time.Time `bson:"localTime"` + Asserts map[string]int64 `bson:"asserts"` + BackgroundFlushing *FlushStats `bson:"backgroundFlushing"` + ExtraInfo *ExtraInfo `bson:"extra_info"` + Connections *ConnectionStats `bson:"connections"` + Dur *DurStats `bson:"dur"` + GlobalLock *GlobalLockStats `bson:"globalLock"` + Locks map[string]LockStats `bson:"locks,omitempty"` + Network *NetworkStats `bson:"network"` + Opcounters *OpcountStats `bson:"opcounters"` + OpcountersRepl *OpcountStats `bson:"opcountersRepl"` + RecordStats *DBRecordStats `bson:"recordStats"` + Mem *MemStats `bson:"mem"` + Repl *ReplStatus `bson:"repl"` + ShardCursorType map[string]interface{} `bson:"shardCursorType"` + StorageEngine map[string]string `bson:"storageEngine"` + WiredTiger *WiredTiger `bson:"wiredTiger"` +} + +// WiredTiger stores information related to the WiredTiger storage engine. +type WiredTiger struct { + Transaction TransactionStats `bson:"transaction"` + Concurrent ConcurrentTransactions `bson:"concurrentTransactions"` + Cache CacheStats `bson:"cache"` +} + +type ConcurrentTransactions struct { + Write ConcurrentTransStats `bson:"write"` + Read ConcurrentTransStats `bson:"read"` +} + +type ConcurrentTransStats struct { + Out int64 `bson:"out"` +} + +// CacheStats stores cache statistics for WiredTiger. +type CacheStats struct { + TrackedDirtyBytes int64 `bson:"tracked dirty bytes in the cache"` + CurrentCachedBytes int64 `bson:"bytes currently in the cache"` + MaxBytesConfigured int64 `bson:"maximum bytes configured"` +} + +// TransactionStats stores transaction checkpoints in WiredTiger. +type TransactionStats struct { + TransCheckpoints int64 `bson:"transaction checkpoints"` +} + +// ReplStatus stores data related to replica sets. +type ReplStatus struct { + SetName interface{} `bson:"setName"` + IsMaster interface{} `bson:"ismaster"` + Secondary interface{} `bson:"secondary"` + IsReplicaSet interface{} `bson:"isreplicaset"` + ArbiterOnly interface{} `bson:"arbiterOnly"` + Hosts []string `bson:"hosts"` + Passives []string `bson:"passives"` + Me string `bson:"me"` +} + +// DBRecordStats stores data related to memory operations across databases. +type DBRecordStats struct { + AccessesNotInMemory int64 `bson:"accessesNotInMemory"` + PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"` + DBRecordAccesses map[string]RecordAccesses `bson:",inline"` +} + +// RecordAccesses stores data related to memory operations scoped to a database. +type RecordAccesses struct { + AccessesNotInMemory int64 `bson:"accessesNotInMemory"` + PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"` +} + +// MemStats stores data related to memory statistics. +type MemStats struct { + Bits int64 `bson:"bits"` + Resident int64 `bson:"resident"` + Virtual int64 `bson:"virtual"` + Supported interface{} `bson:"supported"` + Mapped int64 `bson:"mapped"` + MappedWithJournal int64 `bson:"mappedWithJournal"` +} + +// FlushStats stores information about memory flushes. +type FlushStats struct { + Flushes int64 `bson:"flushes"` + TotalMs int64 `bson:"total_ms"` + AverageMs float64 `bson:"average_ms"` + LastMs int64 `bson:"last_ms"` + LastFinished time.Time `bson:"last_finished"` +} + +// ConnectionStats stores information related to incoming database connections. +type ConnectionStats struct { + Current int64 `bson:"current"` + Available int64 `bson:"available"` + TotalCreated int64 `bson:"totalCreated"` +} + +// DurTiming stores information related to journaling. +type DurTiming struct { + Dt int64 `bson:"dt"` + PrepLogBuffer int64 `bson:"prepLogBuffer"` + WriteToJournal int64 `bson:"writeToJournal"` + WriteToDataFiles int64 `bson:"writeToDataFiles"` + RemapPrivateView int64 `bson:"remapPrivateView"` +} + +// DurStats stores information related to journaling statistics. +type DurStats struct { + Commits int64 `bson:"commits"` + JournaledMB int64 `bson:"journaledMB"` + WriteToDataFilesMB int64 `bson:"writeToDataFilesMB"` + Compression int64 `bson:"compression"` + CommitsInWriteLock int64 `bson:"commitsInWriteLock"` + EarlyCommits int64 `bson:"earlyCommits"` + TimeMs DurTiming +} + +// QueueStats stores the number of queued read/write operations. +type QueueStats struct { + Total int64 `bson:"total"` + Readers int64 `bson:"readers"` + Writers int64 `bson:"writers"` +} + +// ClientStats stores the number of active read/write operations. +type ClientStats struct { + Total int64 `bson:"total"` + Readers int64 `bson:"readers"` + Writers int64 `bson:"writers"` +} + +// GlobalLockStats stores information related locks in the MMAP storage engine. +type GlobalLockStats struct { + TotalTime int64 `bson:"totalTime"` + LockTime int64 `bson:"lockTime"` + CurrentQueue *QueueStats `bson:"currentQueue"` + ActiveClients *ClientStats `bson:"activeClients"` +} + +// NetworkStats stores information related to network traffic. +type NetworkStats struct { + BytesIn int64 `bson:"bytesIn"` + BytesOut int64 `bson:"bytesOut"` + NumRequests int64 `bson:"numRequests"` +} + +// OpcountStats stores information related to comamnds and basic CRUD operations. +type OpcountStats struct { + Insert int64 `bson:"insert"` + Query int64 `bson:"query"` + Update int64 `bson:"update"` + Delete int64 `bson:"delete"` + GetMore int64 `bson:"getmore"` + Command int64 `bson:"command"` +} + +// ReadWriteLockTimes stores time spent holding read/write locks. +type ReadWriteLockTimes struct { + Read int64 `bson:"R"` + Write int64 `bson:"W"` + ReadLower int64 `bson:"r"` + WriteLower int64 `bson:"w"` +} + +// LockStats stores information related to time spent acquiring/holding locks +// for a given database. +type LockStats struct { + TimeLockedMicros ReadWriteLockTimes `bson:"timeLockedMicros"` + TimeAcquiringMicros ReadWriteLockTimes `bson:"timeAcquiringMicros"` + + // AcquireCount is a new field of the lock stats only populated on 3.0 or newer. + // Typed as a pointer so that if it is nil, mongostat can assume the field is not populated + // with real namespace data. + AcquireCount *ReadWriteLockTimes `bson:"acquireCount,omitempty"` +} + +// ExtraInfo stores additional platform specific information. +type ExtraInfo struct { + PageFaults *int64 `bson:"page_faults"` +} + +// StatHeader describes a single column for mongostat's terminal output, +// its formatting, and in which modes it should be displayed. +type StatHeader struct { + // The text to appear in the column's header cell + HeaderText string + + // Bitmask containing flags to determine if this header is active or not + ActivateFlags int +} + +// StatHeaders are the complete set of data metrics supported by mongostat. +var StatHeaders = []StatHeader{ + {"", Always}, // placeholder for hostname column (blank header text) + {"insert", Always}, + {"query", Always}, + {"update", Always}, + {"delete", Always}, + {"getmore", Always}, + {"command", Always}, + {"% dirty", WTOnly}, + {"% used", WTOnly}, + {"flushes", Always}, + {"mapped", MMAPOnly}, + {"vsize", Always}, + {"res", Always}, + {"non-mapped", MMAPOnly | AllOnly}, + {"faults", MMAPOnly}, + {" locked db", Locks}, + {"qr|qw", Always}, + {"ar|aw", Always}, + {"netIn", Always}, + {"netOut", Always}, + {"conn", Always}, + {"set", Repl}, + {"repl", Repl}, + {"time", Always}, +} + +// NamespacedLocks stores information on the LockStatus of namespaces. +type NamespacedLocks map[string]LockStatus + +// LockUsage stores information related to a namespace's lock usage. +type LockUsage struct { + Namespace string + Reads int64 + Writes int64 +} + +type lockUsages []LockUsage + +func percentageInt64(value, outOf int64) float64 { + if value == 0 || outOf == 0 { + return 0 + } + return 100 * (float64(value) / float64(outOf)) +} + +func (slice lockUsages) Len() int { + return len(slice) +} + +func (slice lockUsages) Less(i, j int) bool { + return slice[i].Reads+slice[i].Writes < slice[j].Reads+slice[j].Writes +} + +func (slice lockUsages) Swap(i, j int) { + slice[i], slice[j] = slice[j], slice[i] +} + +// LockStatus stores a database's lock statistics. +type LockStatus struct { + DBName string + Percentage float64 + Global bool +} + +// StatLine is a wrapper for all metrics reported by mongostat for monitored hosts. +type StatLine struct { + Key string + // What storage engine is being used for the node with this stat line + StorageEngine string + + Error error + IsMongos bool + Host string + + // The time at which this StatLine was generated. + Time time.Time + + // The last time at which this StatLine was printed to output. + LastPrinted time.Time + + // Opcounter fields + Insert, Query, Update, Delete, GetMore, Command int64 + + // Cache utilization (wiredtiger only) + CacheDirtyPercent float64 + CacheUsedPercent float64 + + // Replicated Opcounter fields + InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64 + Flushes int64 + Mapped, Virtual, Resident, NonMapped int64 + Faults int64 + HighestLocked *LockStatus + QueuedReaders, QueuedWriters int64 + ActiveReaders, ActiveWriters int64 + NetIn, NetOut int64 + NumConnections int64 + ReplSetName string + NodeType string +} + +func parseLocks(stat ServerStatus) map[string]LockUsage { + returnVal := map[string]LockUsage{} + for namespace, lockInfo := range stat.Locks { + returnVal[namespace] = LockUsage{ + namespace, + lockInfo.TimeLockedMicros.Read + lockInfo.TimeLockedMicros.ReadLower, + lockInfo.TimeLockedMicros.Write + lockInfo.TimeLockedMicros.WriteLower, + } + } + return returnVal +} + +func computeLockDiffs(prevLocks, curLocks map[string]LockUsage) []LockUsage { + lockUsages := lockUsages(make([]LockUsage, 0, len(curLocks))) + for namespace, curUsage := range curLocks { + prevUsage, hasKey := prevLocks[namespace] + if !hasKey { + // This namespace didn't appear in the previous batch of lock info, + // so we can't compute a diff for it - skip it. + continue + } + // Calculate diff of lock usage for this namespace and add to the list + lockUsages = append(lockUsages, + LockUsage{ + namespace, + curUsage.Reads - prevUsage.Reads, + curUsage.Writes - prevUsage.Writes, + }) + } + // Sort the array in order of least to most locked + sort.Sort(lockUsages) + return lockUsages +} + +func diff(newVal, oldVal, sampleTime int64) int64 { + return (newVal - oldVal) / sampleTime +} + +// NewStatLine constructs a StatLine object from two ServerStatus objects. +func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine { + returnVal := &StatLine{ + Key: key, + Host: newStat.Host, + Mapped: -1, + Virtual: -1, + Resident: -1, + NonMapped: -1, + Faults: -1, + } + + // set the storage engine appropriately + if newStat.StorageEngine != nil && newStat.StorageEngine["name"] != "" { + returnVal.StorageEngine = newStat.StorageEngine["name"] + } else { + returnVal.StorageEngine = "mmapv1" + } + + if newStat.Opcounters != nil && oldStat.Opcounters != nil { + returnVal.Insert = diff(newStat.Opcounters.Insert, oldStat.Opcounters.Insert, sampleSecs) + returnVal.Query = diff(newStat.Opcounters.Query, oldStat.Opcounters.Query, sampleSecs) + returnVal.Update = diff(newStat.Opcounters.Update, oldStat.Opcounters.Update, sampleSecs) + returnVal.Delete = diff(newStat.Opcounters.Delete, oldStat.Opcounters.Delete, sampleSecs) + returnVal.GetMore = diff(newStat.Opcounters.GetMore, oldStat.Opcounters.GetMore, sampleSecs) + returnVal.Command = diff(newStat.Opcounters.Command, oldStat.Opcounters.Command, sampleSecs) + } + + if newStat.OpcountersRepl != nil && oldStat.OpcountersRepl != nil { + returnVal.InsertR = diff(newStat.OpcountersRepl.Insert, oldStat.OpcountersRepl.Insert, sampleSecs) + returnVal.QueryR = diff(newStat.OpcountersRepl.Query, oldStat.OpcountersRepl.Query, sampleSecs) + returnVal.UpdateR = diff(newStat.OpcountersRepl.Update, oldStat.OpcountersRepl.Update, sampleSecs) + returnVal.DeleteR = diff(newStat.OpcountersRepl.Delete, oldStat.OpcountersRepl.Delete, sampleSecs) + returnVal.GetMoreR = diff(newStat.OpcountersRepl.GetMore, oldStat.OpcountersRepl.GetMore, sampleSecs) + returnVal.CommandR = diff(newStat.OpcountersRepl.Command, oldStat.OpcountersRepl.Command, sampleSecs) + } + + returnVal.CacheDirtyPercent = -1 + returnVal.CacheUsedPercent = -1 + if newStat.WiredTiger != nil && oldStat.WiredTiger != nil { + returnVal.Flushes = newStat.WiredTiger.Transaction.TransCheckpoints - oldStat.WiredTiger.Transaction.TransCheckpoints + returnVal.CacheDirtyPercent = float64(newStat.WiredTiger.Cache.TrackedDirtyBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured) + returnVal.CacheUsedPercent = float64(newStat.WiredTiger.Cache.CurrentCachedBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured) + } else if newStat.BackgroundFlushing != nil && oldStat.BackgroundFlushing != nil { + returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes + } + + returnVal.Time = newStat.SampleTime + returnVal.IsMongos = + (newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess)) + + // BEGIN code modification + if oldStat.Mem.Supported.(bool) { + // END code modification + if !returnVal.IsMongos { + returnVal.Mapped = newStat.Mem.Mapped + } + returnVal.Virtual = newStat.Mem.Virtual + returnVal.Resident = newStat.Mem.Resident + + if !returnVal.IsMongos && all { + returnVal.NonMapped = newStat.Mem.Virtual - newStat.Mem.Mapped + } + } + + if newStat.Repl != nil { + setName, isReplSet := newStat.Repl.SetName.(string) + if isReplSet { + returnVal.ReplSetName = setName + } + // BEGIN code modification + if newStat.Repl.IsMaster.(bool) { + returnVal.NodeType = "PRI" + } else if newStat.Repl.Secondary.(bool) { + returnVal.NodeType = "SEC" + } else { + returnVal.NodeType = "UNK" + } + // END code modification + } else if returnVal.IsMongos { + returnVal.NodeType = "RTR" + } + + if oldStat.ExtraInfo != nil && newStat.ExtraInfo != nil && + oldStat.ExtraInfo.PageFaults != nil && newStat.ExtraInfo.PageFaults != nil { + returnVal.Faults = diff(*(newStat.ExtraInfo.PageFaults), *(oldStat.ExtraInfo.PageFaults), sampleSecs) + } + if !returnVal.IsMongos && oldStat.Locks != nil && oldStat.Locks != nil { + globalCheck, hasGlobal := oldStat.Locks["Global"] + if hasGlobal && globalCheck.AcquireCount != nil { + // This appears to be a 3.0+ server so the data in these fields do *not* refer to + // actual namespaces and thus we can't compute lock %. + returnVal.HighestLocked = nil + } else { + prevLocks := parseLocks(oldStat) + curLocks := parseLocks(newStat) + lockdiffs := computeLockDiffs(prevLocks, curLocks) + if len(lockdiffs) == 0 { + if newStat.GlobalLock != nil { + returnVal.HighestLocked = &LockStatus{ + DBName: "", + Percentage: percentageInt64(newStat.GlobalLock.LockTime, newStat.GlobalLock.TotalTime), + Global: true, + } + } + } else { + // Get the entry with the highest lock + highestLocked := lockdiffs[len(lockdiffs)-1] + + var timeDiffMillis int64 + timeDiffMillis = newStat.UptimeMillis - oldStat.UptimeMillis + + lockToReport := highestLocked.Writes + + // if the highest locked namespace is not '.' + if highestLocked.Namespace != "." { + for _, namespaceLockInfo := range lockdiffs { + if namespaceLockInfo.Namespace == "." { + lockToReport += namespaceLockInfo.Writes + } + } + } + + // lock data is in microseconds and uptime is in milliseconds - so + // divide by 1000 so that they units match + lockToReport /= 1000 + + returnVal.HighestLocked = &LockStatus{ + DBName: highestLocked.Namespace, + Percentage: percentageInt64(lockToReport, timeDiffMillis), + Global: false, + } + } + } + } else { + returnVal.HighestLocked = nil + } + + if newStat.GlobalLock != nil { + hasWT := (newStat.WiredTiger != nil && oldStat.WiredTiger != nil) + //If we have wiredtiger stats, use those instead + if newStat.GlobalLock.CurrentQueue != nil { + if hasWT { + returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers + newStat.GlobalLock.ActiveClients.Readers - newStat.WiredTiger.Concurrent.Read.Out + returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers + newStat.GlobalLock.ActiveClients.Writers - newStat.WiredTiger.Concurrent.Write.Out + if returnVal.QueuedReaders < 0 { + returnVal.QueuedReaders = 0 + } + if returnVal.QueuedWriters < 0 { + returnVal.QueuedWriters = 0 + } + } else { + returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers + returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers + } + } + + if hasWT { + returnVal.ActiveReaders = newStat.WiredTiger.Concurrent.Read.Out + returnVal.ActiveWriters = newStat.WiredTiger.Concurrent.Write.Out + } else if newStat.GlobalLock.ActiveClients != nil { + returnVal.ActiveReaders = newStat.GlobalLock.ActiveClients.Readers + returnVal.ActiveWriters = newStat.GlobalLock.ActiveClients.Writers + } + } + + if oldStat.Network != nil && newStat.Network != nil { + returnVal.NetIn = diff(newStat.Network.BytesIn, oldStat.Network.BytesIn, sampleSecs) + returnVal.NetOut = diff(newStat.Network.BytesOut, oldStat.Network.BytesOut, sampleSecs) + } + + if newStat.Connections != nil { + returnVal.NumConnections = newStat.Connections.Current + } + + return returnVal +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 645366fd5..27129e105 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -7,7 +7,6 @@ import ( type Point struct { Measurement string - Value interface{} Tags map[string]string Values map[string]interface{} Time time.Time @@ -22,7 +21,7 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string a.Points, &Point{ Measurement: measurement, - Value: value, + Values: map[string]interface{}{"value": value}, Tags: tags, }, ) @@ -58,7 +57,7 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) { func (a *Accumulator) CheckValue(measurement string, val interface{}) bool { for _, p := range a.Points { if p.Measurement == measurement { - return p.Value == val + return p.Values["value"] == val } } @@ -85,8 +84,8 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t } if found && p.Measurement == measurement { - if p.Value != val { - return fmt.Errorf("%v (%T) != %v (%T)", p.Value, p.Value, val, val) + if p.Values["value"] != val { + return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val) } return nil @@ -103,7 +102,7 @@ func (a *Accumulator) ValidateValue(measurement string, val interface{}) error { func (a *Accumulator) HasIntValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement { - _, ok := p.Value.(int64) + _, ok := p.Values["value"].(int64) return ok } } @@ -114,7 +113,7 @@ func (a *Accumulator) HasIntValue(measurement string) bool { func (a *Accumulator) HasFloatValue(measurement string) bool { for _, p := range a.Points { if p.Measurement == measurement { - _, ok := p.Value.(float64) + _, ok := p.Values["value"].(float64) return ok } }