add MongoDB plugin

This commit is contained in:
JP 2015-07-06 20:20:11 -05:00
parent e9ad786578
commit 86145d5eb5
11 changed files with 1053 additions and 9 deletions

View File

@ -49,6 +49,8 @@ Telegraf currently has support for collecting metrics from:
* MySQL
* PostgreSQL
* Redis
* RethinkDB
* MongoDB
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.

View File

@ -124,6 +124,14 @@ address = "sslmode=disable"
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]
[mongodb]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]
# Read metrics about swap memory usage
[swap]
# no configuration

View File

@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/redis"

110
plugins/mongodb/mongodb.go Normal file
View File

@ -0,0 +1,110 @@
package mongodb
import (
"fmt"
"net/url"
"sync"
"time"
"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
)
type MongoDB struct {
Servers []string
mongos map[string]*Server
}
var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]`
func (m *MongoDB) SampleConfig() string {
return sampleConfig
}
func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}
var localhost = &url.URL{Host: "127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}
var wg sync.WaitGroup
var outerr error
for _, serv := range m.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
wg.Add(1)
go func() {
defer wg.Done()
outerr = m.gatherServer(m.getMongoServer(u), acc)
}()
}
wg.Wait()
return outerr
}
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Url: url,
}
}
return m.mongos[url.Host]
}
func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
}
return server.gatherData(acc)
}
func init() {
plugins.Add("mongodb", func() plugins.Plugin {
return &MongoDB{
mongos: make(map[string]*Server),
}
})
}

View File

@ -0,0 +1,100 @@
package mongodb
import (
"fmt"
"reflect"
"strconv"
"github.com/influxdb/telegraf/plugins"
)
type MongodbData struct {
StatLine *StatLine
Tags map[string]string
}
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
if statLine.NodeType != "" && statLine.NodeType != "UNK" {
tags["state"] = statLine.NodeType
}
return &MongodbData{
StatLine: statLine,
Tags: tags,
}
}
var DefaultStats = map[string]string{
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
}
var DefaultReplStats = map[string]string{
"repl_inserts_per_sec": "InsertR",
"repl_queries_per_sec": "QueryR",
"repl_updates_per_sec": "UpdateR",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
}
var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults_per_sec": "Faults",
}
var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}
func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats)
}
if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal)
}
}
}
func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(acc, key, val)
}
}
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddValuesWithTime(
key,
map[string]interface{}{
"value": val,
},
d.Tags,
d.StatLine.Time,
)
}

View File

@ -0,0 +1,111 @@
package mongodb
import (
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var tags = make(map[string]string)
func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
Update: 0,
Delete: 0,
GetMore: 0,
Command: 0,
Flushes: 0,
Virtual: 0,
Resident: 0,
QueuedReaders: 0,
QueuedWriters: 0,
ActiveReaders: 0,
ActiveWriters: 0,
NetIn: 0,
NetOut: 0,
NumConnections: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats(&acc)
for key, _ := range DefaultStats {
assert.True(t, acc.HasIntValue(key))
}
}
func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "mmapv1",
Mapped: 0,
NonMapped: 0,
Faults: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats(&acc)
for key, _ := range MmapStats {
assert.True(t, acc.HasIntValue(key))
}
}
func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
CacheUsedPercent: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats(&acc)
for key, _ := range WiredTigerStats {
assert.True(t, acc.HasFloatValue(key))
}
}
func TestStateTag(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
NodeType: "PRI",
},
tags,
)
stats := []string{"inserts_per_sec", "queries_per_sec"}
stateTags := make(map[string]string)
stateTags["state"] = "PRI"
var acc testutil.Accumulator
d.AddDefaultStats(&acc)
for _, key := range stats {
err := acc.ValidateTaggedValue(key, int64(0), stateTags)
require.NoError(t, err)
}
}

View File

@ -0,0 +1,50 @@
package mongodb
import (
"net/url"
"time"
"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type Server struct {
Url *url.URL
Session *mgo.Session
lastResult *ServerStatus
}
func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["host"] = s.Url.Host
return tags
}
func (s *Server) gatherData(acc plugins.Accumulator) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
result := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result)
if err != nil {
return err
}
defer func() {
s.lastResult = result
}()
result.SampleTime = time.Now()
if s.lastResult != nil && result != nil {
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
durationInSeconds := int64(duration.Seconds())
if durationInSeconds == 0 {
durationInSeconds = 1
}
data := NewMongodbData(
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(),
)
data.AddDefaultStats(acc)
}
return nil
}

View File

@ -0,0 +1,43 @@
// +build integration
package mongodb
import (
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetDefaultTags(t *testing.T) {
var tagTests = []struct {
in string
out string
}{
{"host", server.Url.Host},
}
defaultTags := server.getDefaultTags()
for _, tt := range tagTests {
if defaultTags[tt.in] != tt.out {
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
}
}
}
func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator
err := server.gatherData(&acc)
require.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second)
// need to call this twice so it can perform the diff
err = server.gatherData(&acc)
require.NoError(t, err)
for key, _ := range DefaultStats {
assert.True(t, acc.HasIntValue(key))
}
}

View File

@ -0,0 +1,71 @@
// +build integration
package mongodb
import (
"log"
"math/rand"
"net/url"
"os"
"testing"
"time"
"gopkg.in/mgo.v2"
)
var connect_url string
var server *Server
func init() {
connect_url = os.Getenv("MONGODB_URL")
if connect_url == "" {
connect_url = "127.0.0.1:27017"
server = &Server{Url: &url.URL{Host: connect_url}}
} else {
full_url, err := url.Parse(connect_url)
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", full_url, err.Error())
}
server = &Server{Url: full_url}
}
}
func testSetup(m *testing.M) {
var err error
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
log.Fatalf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
server.Session, _ = mgo.Dial(server.Url.Host)
if err != nil {
log.Fatalln(err.Error())
}
}
func testTeardown(m *testing.M) {
server.Session.Close()
}
func TestMain(m *testing.M) {
// seed randomness for use with tests
rand.Seed(time.Now().UTC().UnixNano())
testSetup(m)
res := m.Run()
testTeardown(m)
os.Exit(res)
}

View File

@ -0,0 +1,549 @@
/***
The code contained here came from https://github.com/mongodb/mongo-tools/blob/master/mongostat/stat_types.go
and contains modifications so that no other dependency from that project is needed. Other modifications included
removing uneccessary code specific to formatting the output and determine the current state of the database. It
is licensed under Apache Version 2.0, http://www.apache.org/licenses/LICENSE-2.0.html
***/
package mongodb
import (
"sort"
"strings"
"time"
)
const (
MongosProcess = "mongos"
)
// Flags to determine cases when to activate/deactivate columns for output.
const (
Always = 1 << iota // always activate the column
Discover // only active when mongostat is in discover mode
Repl // only active if one of the nodes being monitored is in a replset
Locks // only active if node is capable of calculating lock info
AllOnly // only active if mongostat was run with --all option
MMAPOnly // only active if node has mmap-specific fields
WTOnly // only active if node has wiredtiger-specific fields
)
type ServerStatus struct {
SampleTime time.Time `bson:""`
Host string `bson:"host"`
Version string `bson:"version"`
Process string `bson:"process"`
Pid int64 `bson:"pid"`
Uptime int64 `bson:"uptime"`
UptimeMillis int64 `bson:"uptimeMillis"`
UptimeEstimate int64 `bson:"uptimeEstimate"`
LocalTime time.Time `bson:"localTime"`
Asserts map[string]int64 `bson:"asserts"`
BackgroundFlushing *FlushStats `bson:"backgroundFlushing"`
ExtraInfo *ExtraInfo `bson:"extra_info"`
Connections *ConnectionStats `bson:"connections"`
Dur *DurStats `bson:"dur"`
GlobalLock *GlobalLockStats `bson:"globalLock"`
Locks map[string]LockStats `bson:"locks,omitempty"`
Network *NetworkStats `bson:"network"`
Opcounters *OpcountStats `bson:"opcounters"`
OpcountersRepl *OpcountStats `bson:"opcountersRepl"`
RecordStats *DBRecordStats `bson:"recordStats"`
Mem *MemStats `bson:"mem"`
Repl *ReplStatus `bson:"repl"`
ShardCursorType map[string]interface{} `bson:"shardCursorType"`
StorageEngine map[string]string `bson:"storageEngine"`
WiredTiger *WiredTiger `bson:"wiredTiger"`
}
// WiredTiger stores information related to the WiredTiger storage engine.
type WiredTiger struct {
Transaction TransactionStats `bson:"transaction"`
Concurrent ConcurrentTransactions `bson:"concurrentTransactions"`
Cache CacheStats `bson:"cache"`
}
type ConcurrentTransactions struct {
Write ConcurrentTransStats `bson:"write"`
Read ConcurrentTransStats `bson:"read"`
}
type ConcurrentTransStats struct {
Out int64 `bson:"out"`
}
// CacheStats stores cache statistics for WiredTiger.
type CacheStats struct {
TrackedDirtyBytes int64 `bson:"tracked dirty bytes in the cache"`
CurrentCachedBytes int64 `bson:"bytes currently in the cache"`
MaxBytesConfigured int64 `bson:"maximum bytes configured"`
}
// TransactionStats stores transaction checkpoints in WiredTiger.
type TransactionStats struct {
TransCheckpoints int64 `bson:"transaction checkpoints"`
}
// ReplStatus stores data related to replica sets.
type ReplStatus struct {
SetName interface{} `bson:"setName"`
IsMaster interface{} `bson:"ismaster"`
Secondary interface{} `bson:"secondary"`
IsReplicaSet interface{} `bson:"isreplicaset"`
ArbiterOnly interface{} `bson:"arbiterOnly"`
Hosts []string `bson:"hosts"`
Passives []string `bson:"passives"`
Me string `bson:"me"`
}
// DBRecordStats stores data related to memory operations across databases.
type DBRecordStats struct {
AccessesNotInMemory int64 `bson:"accessesNotInMemory"`
PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"`
DBRecordAccesses map[string]RecordAccesses `bson:",inline"`
}
// RecordAccesses stores data related to memory operations scoped to a database.
type RecordAccesses struct {
AccessesNotInMemory int64 `bson:"accessesNotInMemory"`
PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"`
}
// MemStats stores data related to memory statistics.
type MemStats struct {
Bits int64 `bson:"bits"`
Resident int64 `bson:"resident"`
Virtual int64 `bson:"virtual"`
Supported interface{} `bson:"supported"`
Mapped int64 `bson:"mapped"`
MappedWithJournal int64 `bson:"mappedWithJournal"`
}
// FlushStats stores information about memory flushes.
type FlushStats struct {
Flushes int64 `bson:"flushes"`
TotalMs int64 `bson:"total_ms"`
AverageMs float64 `bson:"average_ms"`
LastMs int64 `bson:"last_ms"`
LastFinished time.Time `bson:"last_finished"`
}
// ConnectionStats stores information related to incoming database connections.
type ConnectionStats struct {
Current int64 `bson:"current"`
Available int64 `bson:"available"`
TotalCreated int64 `bson:"totalCreated"`
}
// DurTiming stores information related to journaling.
type DurTiming struct {
Dt int64 `bson:"dt"`
PrepLogBuffer int64 `bson:"prepLogBuffer"`
WriteToJournal int64 `bson:"writeToJournal"`
WriteToDataFiles int64 `bson:"writeToDataFiles"`
RemapPrivateView int64 `bson:"remapPrivateView"`
}
// DurStats stores information related to journaling statistics.
type DurStats struct {
Commits int64 `bson:"commits"`
JournaledMB int64 `bson:"journaledMB"`
WriteToDataFilesMB int64 `bson:"writeToDataFilesMB"`
Compression int64 `bson:"compression"`
CommitsInWriteLock int64 `bson:"commitsInWriteLock"`
EarlyCommits int64 `bson:"earlyCommits"`
TimeMs DurTiming
}
// QueueStats stores the number of queued read/write operations.
type QueueStats struct {
Total int64 `bson:"total"`
Readers int64 `bson:"readers"`
Writers int64 `bson:"writers"`
}
// ClientStats stores the number of active read/write operations.
type ClientStats struct {
Total int64 `bson:"total"`
Readers int64 `bson:"readers"`
Writers int64 `bson:"writers"`
}
// GlobalLockStats stores information related locks in the MMAP storage engine.
type GlobalLockStats struct {
TotalTime int64 `bson:"totalTime"`
LockTime int64 `bson:"lockTime"`
CurrentQueue *QueueStats `bson:"currentQueue"`
ActiveClients *ClientStats `bson:"activeClients"`
}
// NetworkStats stores information related to network traffic.
type NetworkStats struct {
BytesIn int64 `bson:"bytesIn"`
BytesOut int64 `bson:"bytesOut"`
NumRequests int64 `bson:"numRequests"`
}
// OpcountStats stores information related to comamnds and basic CRUD operations.
type OpcountStats struct {
Insert int64 `bson:"insert"`
Query int64 `bson:"query"`
Update int64 `bson:"update"`
Delete int64 `bson:"delete"`
GetMore int64 `bson:"getmore"`
Command int64 `bson:"command"`
}
// ReadWriteLockTimes stores time spent holding read/write locks.
type ReadWriteLockTimes struct {
Read int64 `bson:"R"`
Write int64 `bson:"W"`
ReadLower int64 `bson:"r"`
WriteLower int64 `bson:"w"`
}
// LockStats stores information related to time spent acquiring/holding locks
// for a given database.
type LockStats struct {
TimeLockedMicros ReadWriteLockTimes `bson:"timeLockedMicros"`
TimeAcquiringMicros ReadWriteLockTimes `bson:"timeAcquiringMicros"`
// AcquireCount is a new field of the lock stats only populated on 3.0 or newer.
// Typed as a pointer so that if it is nil, mongostat can assume the field is not populated
// with real namespace data.
AcquireCount *ReadWriteLockTimes `bson:"acquireCount,omitempty"`
}
// ExtraInfo stores additional platform specific information.
type ExtraInfo struct {
PageFaults *int64 `bson:"page_faults"`
}
// StatHeader describes a single column for mongostat's terminal output,
// its formatting, and in which modes it should be displayed.
type StatHeader struct {
// The text to appear in the column's header cell
HeaderText string
// Bitmask containing flags to determine if this header is active or not
ActivateFlags int
}
// StatHeaders are the complete set of data metrics supported by mongostat.
var StatHeaders = []StatHeader{
{"", Always}, // placeholder for hostname column (blank header text)
{"insert", Always},
{"query", Always},
{"update", Always},
{"delete", Always},
{"getmore", Always},
{"command", Always},
{"% dirty", WTOnly},
{"% used", WTOnly},
{"flushes", Always},
{"mapped", MMAPOnly},
{"vsize", Always},
{"res", Always},
{"non-mapped", MMAPOnly | AllOnly},
{"faults", MMAPOnly},
{" locked db", Locks},
{"qr|qw", Always},
{"ar|aw", Always},
{"netIn", Always},
{"netOut", Always},
{"conn", Always},
{"set", Repl},
{"repl", Repl},
{"time", Always},
}
// NamespacedLocks stores information on the LockStatus of namespaces.
type NamespacedLocks map[string]LockStatus
// LockUsage stores information related to a namespace's lock usage.
type LockUsage struct {
Namespace string
Reads int64
Writes int64
}
type lockUsages []LockUsage
func percentageInt64(value, outOf int64) float64 {
if value == 0 || outOf == 0 {
return 0
}
return 100 * (float64(value) / float64(outOf))
}
func (slice lockUsages) Len() int {
return len(slice)
}
func (slice lockUsages) Less(i, j int) bool {
return slice[i].Reads+slice[i].Writes < slice[j].Reads+slice[j].Writes
}
func (slice lockUsages) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
// LockStatus stores a database's lock statistics.
type LockStatus struct {
DBName string
Percentage float64
Global bool
}
// StatLine is a wrapper for all metrics reported by mongostat for monitored hosts.
type StatLine struct {
Key string
// What storage engine is being used for the node with this stat line
StorageEngine string
Error error
IsMongos bool
Host string
// The time at which this StatLine was generated.
Time time.Time
// The last time at which this StatLine was printed to output.
LastPrinted time.Time
// Opcounter fields
Insert, Query, Update, Delete, GetMore, Command int64
// Cache utilization (wiredtiger only)
CacheDirtyPercent float64
CacheUsedPercent float64
// Replicated Opcounter fields
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
Flushes int64
Mapped, Virtual, Resident, NonMapped int64
Faults int64
HighestLocked *LockStatus
QueuedReaders, QueuedWriters int64
ActiveReaders, ActiveWriters int64
NetIn, NetOut int64
NumConnections int64
ReplSetName string
NodeType string
}
func parseLocks(stat ServerStatus) map[string]LockUsage {
returnVal := map[string]LockUsage{}
for namespace, lockInfo := range stat.Locks {
returnVal[namespace] = LockUsage{
namespace,
lockInfo.TimeLockedMicros.Read + lockInfo.TimeLockedMicros.ReadLower,
lockInfo.TimeLockedMicros.Write + lockInfo.TimeLockedMicros.WriteLower,
}
}
return returnVal
}
func computeLockDiffs(prevLocks, curLocks map[string]LockUsage) []LockUsage {
lockUsages := lockUsages(make([]LockUsage, 0, len(curLocks)))
for namespace, curUsage := range curLocks {
prevUsage, hasKey := prevLocks[namespace]
if !hasKey {
// This namespace didn't appear in the previous batch of lock info,
// so we can't compute a diff for it - skip it.
continue
}
// Calculate diff of lock usage for this namespace and add to the list
lockUsages = append(lockUsages,
LockUsage{
namespace,
curUsage.Reads - prevUsage.Reads,
curUsage.Writes - prevUsage.Writes,
})
}
// Sort the array in order of least to most locked
sort.Sort(lockUsages)
return lockUsages
}
func diff(newVal, oldVal, sampleTime int64) int64 {
return (newVal - oldVal) / sampleTime
}
// NewStatLine constructs a StatLine object from two ServerStatus objects.
func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine {
returnVal := &StatLine{
Key: key,
Host: newStat.Host,
Mapped: -1,
Virtual: -1,
Resident: -1,
NonMapped: -1,
Faults: -1,
}
// set the storage engine appropriately
if newStat.StorageEngine != nil && newStat.StorageEngine["name"] != "" {
returnVal.StorageEngine = newStat.StorageEngine["name"]
} else {
returnVal.StorageEngine = "mmapv1"
}
if newStat.Opcounters != nil && oldStat.Opcounters != nil {
returnVal.Insert = diff(newStat.Opcounters.Insert, oldStat.Opcounters.Insert, sampleSecs)
returnVal.Query = diff(newStat.Opcounters.Query, oldStat.Opcounters.Query, sampleSecs)
returnVal.Update = diff(newStat.Opcounters.Update, oldStat.Opcounters.Update, sampleSecs)
returnVal.Delete = diff(newStat.Opcounters.Delete, oldStat.Opcounters.Delete, sampleSecs)
returnVal.GetMore = diff(newStat.Opcounters.GetMore, oldStat.Opcounters.GetMore, sampleSecs)
returnVal.Command = diff(newStat.Opcounters.Command, oldStat.Opcounters.Command, sampleSecs)
}
if newStat.OpcountersRepl != nil && oldStat.OpcountersRepl != nil {
returnVal.InsertR = diff(newStat.OpcountersRepl.Insert, oldStat.OpcountersRepl.Insert, sampleSecs)
returnVal.QueryR = diff(newStat.OpcountersRepl.Query, oldStat.OpcountersRepl.Query, sampleSecs)
returnVal.UpdateR = diff(newStat.OpcountersRepl.Update, oldStat.OpcountersRepl.Update, sampleSecs)
returnVal.DeleteR = diff(newStat.OpcountersRepl.Delete, oldStat.OpcountersRepl.Delete, sampleSecs)
returnVal.GetMoreR = diff(newStat.OpcountersRepl.GetMore, oldStat.OpcountersRepl.GetMore, sampleSecs)
returnVal.CommandR = diff(newStat.OpcountersRepl.Command, oldStat.OpcountersRepl.Command, sampleSecs)
}
returnVal.CacheDirtyPercent = -1
returnVal.CacheUsedPercent = -1
if newStat.WiredTiger != nil && oldStat.WiredTiger != nil {
returnVal.Flushes = newStat.WiredTiger.Transaction.TransCheckpoints - oldStat.WiredTiger.Transaction.TransCheckpoints
returnVal.CacheDirtyPercent = float64(newStat.WiredTiger.Cache.TrackedDirtyBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured)
returnVal.CacheUsedPercent = float64(newStat.WiredTiger.Cache.CurrentCachedBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured)
} else if newStat.BackgroundFlushing != nil && oldStat.BackgroundFlushing != nil {
returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
}
returnVal.Time = newStat.SampleTime
returnVal.IsMongos =
(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
// BEGIN code modification
if oldStat.Mem.Supported.(bool) {
// END code modification
if !returnVal.IsMongos {
returnVal.Mapped = newStat.Mem.Mapped
}
returnVal.Virtual = newStat.Mem.Virtual
returnVal.Resident = newStat.Mem.Resident
if !returnVal.IsMongos && all {
returnVal.NonMapped = newStat.Mem.Virtual - newStat.Mem.Mapped
}
}
if newStat.Repl != nil {
setName, isReplSet := newStat.Repl.SetName.(string)
if isReplSet {
returnVal.ReplSetName = setName
}
// BEGIN code modification
if newStat.Repl.IsMaster.(bool) {
returnVal.NodeType = "PRI"
} else if newStat.Repl.Secondary.(bool) {
returnVal.NodeType = "SEC"
} else {
returnVal.NodeType = "UNK"
}
// END code modification
} else if returnVal.IsMongos {
returnVal.NodeType = "RTR"
}
if oldStat.ExtraInfo != nil && newStat.ExtraInfo != nil &&
oldStat.ExtraInfo.PageFaults != nil && newStat.ExtraInfo.PageFaults != nil {
returnVal.Faults = diff(*(newStat.ExtraInfo.PageFaults), *(oldStat.ExtraInfo.PageFaults), sampleSecs)
}
if !returnVal.IsMongos && oldStat.Locks != nil && oldStat.Locks != nil {
globalCheck, hasGlobal := oldStat.Locks["Global"]
if hasGlobal && globalCheck.AcquireCount != nil {
// This appears to be a 3.0+ server so the data in these fields do *not* refer to
// actual namespaces and thus we can't compute lock %.
returnVal.HighestLocked = nil
} else {
prevLocks := parseLocks(oldStat)
curLocks := parseLocks(newStat)
lockdiffs := computeLockDiffs(prevLocks, curLocks)
if len(lockdiffs) == 0 {
if newStat.GlobalLock != nil {
returnVal.HighestLocked = &LockStatus{
DBName: "",
Percentage: percentageInt64(newStat.GlobalLock.LockTime, newStat.GlobalLock.TotalTime),
Global: true,
}
}
} else {
// Get the entry with the highest lock
highestLocked := lockdiffs[len(lockdiffs)-1]
var timeDiffMillis int64
timeDiffMillis = newStat.UptimeMillis - oldStat.UptimeMillis
lockToReport := highestLocked.Writes
// if the highest locked namespace is not '.'
if highestLocked.Namespace != "." {
for _, namespaceLockInfo := range lockdiffs {
if namespaceLockInfo.Namespace == "." {
lockToReport += namespaceLockInfo.Writes
}
}
}
// lock data is in microseconds and uptime is in milliseconds - so
// divide by 1000 so that they units match
lockToReport /= 1000
returnVal.HighestLocked = &LockStatus{
DBName: highestLocked.Namespace,
Percentage: percentageInt64(lockToReport, timeDiffMillis),
Global: false,
}
}
}
} else {
returnVal.HighestLocked = nil
}
if newStat.GlobalLock != nil {
hasWT := (newStat.WiredTiger != nil && oldStat.WiredTiger != nil)
//If we have wiredtiger stats, use those instead
if newStat.GlobalLock.CurrentQueue != nil {
if hasWT {
returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers + newStat.GlobalLock.ActiveClients.Readers - newStat.WiredTiger.Concurrent.Read.Out
returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers + newStat.GlobalLock.ActiveClients.Writers - newStat.WiredTiger.Concurrent.Write.Out
if returnVal.QueuedReaders < 0 {
returnVal.QueuedReaders = 0
}
if returnVal.QueuedWriters < 0 {
returnVal.QueuedWriters = 0
}
} else {
returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers
returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers
}
}
if hasWT {
returnVal.ActiveReaders = newStat.WiredTiger.Concurrent.Read.Out
returnVal.ActiveWriters = newStat.WiredTiger.Concurrent.Write.Out
} else if newStat.GlobalLock.ActiveClients != nil {
returnVal.ActiveReaders = newStat.GlobalLock.ActiveClients.Readers
returnVal.ActiveWriters = newStat.GlobalLock.ActiveClients.Writers
}
}
if oldStat.Network != nil && newStat.Network != nil {
returnVal.NetIn = diff(newStat.Network.BytesIn, oldStat.Network.BytesIn, sampleSecs)
returnVal.NetOut = diff(newStat.Network.BytesOut, oldStat.Network.BytesOut, sampleSecs)
}
if newStat.Connections != nil {
returnVal.NumConnections = newStat.Connections.Current
}
return returnVal
}

View File

@ -7,7 +7,6 @@ import (
type Point struct {
Measurement string
Value interface{}
Tags map[string]string
Values map[string]interface{}
Time time.Time
@ -22,7 +21,7 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
a.Points,
&Point{
Measurement: measurement,
Value: value,
Values: map[string]interface{}{"value": value},
Tags: tags,
},
)
@ -58,7 +57,7 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) {
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
return p.Value == val
return p.Values["value"] == val
}
}
@ -85,8 +84,8 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t
}
if found && p.Measurement == measurement {
if p.Value != val {
return fmt.Errorf("%v (%T) != %v (%T)", p.Value, p.Value, val, val)
if p.Values["value"] != val {
return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val)
}
return nil
@ -103,7 +102,7 @@ func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
func (a *Accumulator) HasIntValue(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
_, ok := p.Value.(int64)
_, ok := p.Values["value"].(int64)
return ok
}
}
@ -114,7 +113,7 @@ func (a *Accumulator) HasIntValue(measurement string) bool {
func (a *Accumulator) HasFloatValue(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
_, ok := p.Value.(float64)
_, ok := p.Values["value"].(float64)
return ok
}
}