Merge pull request #54 from jipperinbham/mongodb-plugin
add MongoDB plugin
This commit is contained in:
commit
b4e032d9c9
|
@ -53,6 +53,7 @@ Telegraf currently has support for collecting metrics from:
|
||||||
* Elasticsearch
|
* Elasticsearch
|
||||||
* RethinkDB
|
* RethinkDB
|
||||||
* Kafka
|
* Kafka
|
||||||
|
* MongoDB
|
||||||
|
|
||||||
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.
|
We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,14 @@ address = "sslmode=disable"
|
||||||
# If no servers are specified, then localhost is used as the host.
|
# If no servers are specified, then localhost is used as the host.
|
||||||
servers = ["localhost"]
|
servers = ["localhost"]
|
||||||
|
|
||||||
|
[mongodb]
|
||||||
|
# An array of URI to gather stats about. Specify an ip or hostname
|
||||||
|
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
|
||||||
|
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
|
||||||
|
#
|
||||||
|
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
|
||||||
|
servers = ["127.0.0.1:27017"]
|
||||||
|
|
||||||
# Read metrics about swap memory usage
|
# Read metrics about swap memory usage
|
||||||
[swap]
|
[swap]
|
||||||
# no configuration
|
# no configuration
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
|
||||||
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
|
||||||
_ "github.com/influxdb/telegraf/plugins/memcached"
|
_ "github.com/influxdb/telegraf/plugins/memcached"
|
||||||
|
_ "github.com/influxdb/telegraf/plugins/mongodb"
|
||||||
_ "github.com/influxdb/telegraf/plugins/mysql"
|
_ "github.com/influxdb/telegraf/plugins/mysql"
|
||||||
_ "github.com/influxdb/telegraf/plugins/postgresql"
|
_ "github.com/influxdb/telegraf/plugins/postgresql"
|
||||||
_ "github.com/influxdb/telegraf/plugins/prometheus"
|
_ "github.com/influxdb/telegraf/plugins/prometheus"
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
|
"gopkg.in/mgo.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MongoDB struct {
|
||||||
|
Servers []string
|
||||||
|
Ssl Ssl
|
||||||
|
mongos map[string]*Server
|
||||||
|
}
|
||||||
|
|
||||||
|
type Ssl struct {
|
||||||
|
Enabled bool
|
||||||
|
CaCerts []string `toml:"cacerts"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
# An array of URI to gather stats about. Specify an ip or hostname
|
||||||
|
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
|
||||||
|
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
|
||||||
|
#
|
||||||
|
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
|
||||||
|
servers = ["127.0.0.1:27017"]`
|
||||||
|
|
||||||
|
func (m *MongoDB) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*MongoDB) Description() string {
|
||||||
|
return "Read metrics from one or many MongoDB servers"
|
||||||
|
}
|
||||||
|
|
||||||
|
var localhost = &url.URL{Host: "127.0.0.1:27017"}
|
||||||
|
|
||||||
|
// Reads stats from all configured servers accumulates stats.
|
||||||
|
// Returns one of the errors encountered while gather stats (if any).
|
||||||
|
func (m *MongoDB) Gather(acc plugins.Accumulator) error {
|
||||||
|
if len(m.Servers) == 0 {
|
||||||
|
m.gatherServer(m.getMongoServer(localhost), acc)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
var outerr error
|
||||||
|
|
||||||
|
for _, serv := range m.Servers {
|
||||||
|
u, err := url.Parse(serv)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
|
||||||
|
} else if u.Scheme == "" {
|
||||||
|
u.Scheme = "mongodb"
|
||||||
|
// fallback to simple string based address (i.e. "10.0.0.1:10000")
|
||||||
|
u.Host = serv
|
||||||
|
if u.Path == u.Host {
|
||||||
|
u.Path = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
outerr = m.gatherServer(m.getMongoServer(u), acc)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return outerr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
|
||||||
|
if _, ok := m.mongos[url.Host]; !ok {
|
||||||
|
m.mongos[url.Host] = &Server{
|
||||||
|
Url: url,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m.mongos[url.Host]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
|
||||||
|
if server.Session == nil {
|
||||||
|
var dialAddrs []string
|
||||||
|
if server.Url.User != nil {
|
||||||
|
dialAddrs = []string{server.Url.String()}
|
||||||
|
} else {
|
||||||
|
dialAddrs = []string{server.Url.Host}
|
||||||
|
}
|
||||||
|
dialInfo, err := mgo.ParseURL(dialAddrs[0])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
|
||||||
|
}
|
||||||
|
dialInfo.Direct = true
|
||||||
|
dialInfo.Timeout = time.Duration(10) * time.Second
|
||||||
|
|
||||||
|
if m.Ssl.Enabled {
|
||||||
|
tlsConfig := &tls.Config{}
|
||||||
|
if len(m.Ssl.CaCerts) > 0 {
|
||||||
|
roots := x509.NewCertPool()
|
||||||
|
for _, caCert := range m.Ssl.CaCerts {
|
||||||
|
ok := roots.AppendCertsFromPEM([]byte(caCert))
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("failed to parse root certificate")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tlsConfig.RootCAs = roots
|
||||||
|
} else {
|
||||||
|
tlsConfig.InsecureSkipVerify = true
|
||||||
|
}
|
||||||
|
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
|
||||||
|
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error in Dial, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
return conn, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sess, err := mgo.DialWithInfo(dialInfo)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error dialing over ssl, %s\n", err.Error())
|
||||||
|
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
server.Session = sess
|
||||||
|
}
|
||||||
|
return server.gatherData(acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
plugins.Add("mongodb", func() plugins.Plugin {
|
||||||
|
return &MongoDB{
|
||||||
|
mongos: make(map[string]*Server),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MongodbData struct {
|
||||||
|
StatLine *StatLine
|
||||||
|
Tags map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
|
||||||
|
if statLine.NodeType != "" && statLine.NodeType != "UNK" {
|
||||||
|
tags["state"] = statLine.NodeType
|
||||||
|
}
|
||||||
|
return &MongodbData{
|
||||||
|
StatLine: statLine,
|
||||||
|
Tags: tags,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultStats = map[string]string{
|
||||||
|
"inserts_per_sec": "Insert",
|
||||||
|
"queries_per_sec": "Query",
|
||||||
|
"updates_per_sec": "Update",
|
||||||
|
"deletes_per_sec": "Delete",
|
||||||
|
"getmores_per_sec": "GetMore",
|
||||||
|
"commands_per_sec": "Command",
|
||||||
|
"flushes_per_sec": "Flushes",
|
||||||
|
"vsize_megabytes": "Virtual",
|
||||||
|
"resident_megabytes": "Resident",
|
||||||
|
"queued_reads": "QueuedReaders",
|
||||||
|
"queued_writes": "QueuedWriters",
|
||||||
|
"active_reads": "ActiveReaders",
|
||||||
|
"active_writes": "ActiveWriters",
|
||||||
|
"net_in_bytes": "NetIn",
|
||||||
|
"net_out_bytes": "NetOut",
|
||||||
|
"open_connections": "NumConnections",
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultReplStats = map[string]string{
|
||||||
|
"repl_inserts_per_sec": "InsertR",
|
||||||
|
"repl_queries_per_sec": "QueryR",
|
||||||
|
"repl_updates_per_sec": "UpdateR",
|
||||||
|
"repl_deletes_per_sec": "DeleteR",
|
||||||
|
"repl_getmores_per_sec": "GetMoreR",
|
||||||
|
"repl_commands_per_sec": "CommandR",
|
||||||
|
"member_status": "NodeType",
|
||||||
|
}
|
||||||
|
|
||||||
|
var MmapStats = map[string]string{
|
||||||
|
"mapped_megabytes": "Mapped",
|
||||||
|
"non-mapped_megabytes": "NonMapped",
|
||||||
|
"page_faults_per_sec": "Faults",
|
||||||
|
}
|
||||||
|
|
||||||
|
var WiredTigerStats = map[string]string{
|
||||||
|
"percent_cache_dirty": "CacheDirtyPercent",
|
||||||
|
"percent_cache_used": "CacheUsedPercent",
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
|
||||||
|
statLine := reflect.ValueOf(d.StatLine).Elem()
|
||||||
|
d.addStat(acc, statLine, DefaultStats)
|
||||||
|
if d.StatLine.NodeType != "" {
|
||||||
|
d.addStat(acc, statLine, DefaultReplStats)
|
||||||
|
}
|
||||||
|
if d.StatLine.StorageEngine == "mmapv1" {
|
||||||
|
d.addStat(acc, statLine, MmapStats)
|
||||||
|
} else if d.StatLine.StorageEngine == "wiredTiger" {
|
||||||
|
for key, value := range WiredTigerStats {
|
||||||
|
val := statLine.FieldByName(value).Interface()
|
||||||
|
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
|
||||||
|
floatVal, _ := strconv.ParseFloat(percentVal, 64)
|
||||||
|
d.add(acc, key, floatVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
|
||||||
|
for key, value := range stats {
|
||||||
|
val := statLine.FieldByName(value).Interface()
|
||||||
|
d.add(acc, key, val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
|
||||||
|
acc.AddValuesWithTime(
|
||||||
|
key,
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": val,
|
||||||
|
},
|
||||||
|
d.Tags,
|
||||||
|
d.StatLine.Time,
|
||||||
|
)
|
||||||
|
}
|
|
@ -0,0 +1,111 @@
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tags = make(map[string]string)
|
||||||
|
|
||||||
|
func TestAddNonReplStats(t *testing.T) {
|
||||||
|
d := NewMongodbData(
|
||||||
|
&StatLine{
|
||||||
|
StorageEngine: "",
|
||||||
|
Time: time.Now(),
|
||||||
|
Insert: 0,
|
||||||
|
Query: 0,
|
||||||
|
Update: 0,
|
||||||
|
Delete: 0,
|
||||||
|
GetMore: 0,
|
||||||
|
Command: 0,
|
||||||
|
Flushes: 0,
|
||||||
|
Virtual: 0,
|
||||||
|
Resident: 0,
|
||||||
|
QueuedReaders: 0,
|
||||||
|
QueuedWriters: 0,
|
||||||
|
ActiveReaders: 0,
|
||||||
|
ActiveWriters: 0,
|
||||||
|
NetIn: 0,
|
||||||
|
NetOut: 0,
|
||||||
|
NumConnections: 0,
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
d.AddDefaultStats(&acc)
|
||||||
|
|
||||||
|
for key, _ := range DefaultStats {
|
||||||
|
assert.True(t, acc.HasIntValue(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddReplStats(t *testing.T) {
|
||||||
|
d := NewMongodbData(
|
||||||
|
&StatLine{
|
||||||
|
StorageEngine: "mmapv1",
|
||||||
|
Mapped: 0,
|
||||||
|
NonMapped: 0,
|
||||||
|
Faults: 0,
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
d.AddDefaultStats(&acc)
|
||||||
|
|
||||||
|
for key, _ := range MmapStats {
|
||||||
|
assert.True(t, acc.HasIntValue(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddWiredTigerStats(t *testing.T) {
|
||||||
|
d := NewMongodbData(
|
||||||
|
&StatLine{
|
||||||
|
StorageEngine: "wiredTiger",
|
||||||
|
CacheDirtyPercent: 0,
|
||||||
|
CacheUsedPercent: 0,
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
d.AddDefaultStats(&acc)
|
||||||
|
|
||||||
|
for key, _ := range WiredTigerStats {
|
||||||
|
assert.True(t, acc.HasFloatValue(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateTag(t *testing.T) {
|
||||||
|
d := NewMongodbData(
|
||||||
|
&StatLine{
|
||||||
|
StorageEngine: "",
|
||||||
|
Time: time.Now(),
|
||||||
|
Insert: 0,
|
||||||
|
Query: 0,
|
||||||
|
NodeType: "PRI",
|
||||||
|
},
|
||||||
|
tags,
|
||||||
|
)
|
||||||
|
|
||||||
|
stats := []string{"inserts_per_sec", "queries_per_sec"}
|
||||||
|
|
||||||
|
stateTags := make(map[string]string)
|
||||||
|
stateTags["state"] = "PRI"
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
d.AddDefaultStats(&acc)
|
||||||
|
|
||||||
|
for _, key := range stats {
|
||||||
|
err := acc.ValidateTaggedValue(key, int64(0), stateTags)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
|
"gopkg.in/mgo.v2"
|
||||||
|
"gopkg.in/mgo.v2/bson"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Server struct {
|
||||||
|
Url *url.URL
|
||||||
|
Session *mgo.Session
|
||||||
|
lastResult *ServerStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getDefaultTags() map[string]string {
|
||||||
|
tags := make(map[string]string)
|
||||||
|
tags["hostname"] = s.Url.Host
|
||||||
|
return tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) gatherData(acc plugins.Accumulator) error {
|
||||||
|
s.Session.SetMode(mgo.Eventual, true)
|
||||||
|
s.Session.SetSocketTimeout(0)
|
||||||
|
result := &ServerStatus{}
|
||||||
|
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
s.lastResult = result
|
||||||
|
}()
|
||||||
|
|
||||||
|
result.SampleTime = time.Now()
|
||||||
|
if s.lastResult != nil && result != nil {
|
||||||
|
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
|
||||||
|
durationInSeconds := int64(duration.Seconds())
|
||||||
|
if durationInSeconds == 0 {
|
||||||
|
durationInSeconds = 1
|
||||||
|
}
|
||||||
|
data := NewMongodbData(
|
||||||
|
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
|
||||||
|
s.getDefaultTags(),
|
||||||
|
)
|
||||||
|
data.AddDefaultStats(acc)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetDefaultTags(t *testing.T) {
|
||||||
|
var tagTests = []struct {
|
||||||
|
in string
|
||||||
|
out string
|
||||||
|
}{
|
||||||
|
{"hostname", server.Url.Host},
|
||||||
|
}
|
||||||
|
defaultTags := server.getDefaultTags()
|
||||||
|
for _, tt := range tagTests {
|
||||||
|
if defaultTags[tt.in] != tt.out {
|
||||||
|
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddDefaultStats(t *testing.T) {
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err := server.gatherData(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(time.Duration(1) * time.Second)
|
||||||
|
// need to call this twice so it can perform the diff
|
||||||
|
err = server.gatherData(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for key, _ := range DefaultStats {
|
||||||
|
assert.True(t, acc.HasIntValue(key))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/mgo.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var connect_url string
|
||||||
|
var server *Server
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
connect_url = os.Getenv("MONGODB_URL")
|
||||||
|
if connect_url == "" {
|
||||||
|
connect_url = "127.0.0.1:27017"
|
||||||
|
server = &Server{Url: &url.URL{Host: connect_url}}
|
||||||
|
} else {
|
||||||
|
full_url, err := url.Parse(connect_url)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to parse URL (%s), %s\n", full_url, err.Error())
|
||||||
|
}
|
||||||
|
server = &Server{Url: full_url}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSetup(m *testing.M) {
|
||||||
|
var err error
|
||||||
|
var dialAddrs []string
|
||||||
|
if server.Url.User != nil {
|
||||||
|
dialAddrs = []string{server.Url.String()}
|
||||||
|
} else {
|
||||||
|
dialAddrs = []string{server.Url.Host}
|
||||||
|
}
|
||||||
|
dialInfo, err := mgo.ParseURL(dialAddrs[0])
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
|
||||||
|
}
|
||||||
|
dialInfo.Direct = true
|
||||||
|
dialInfo.Timeout = time.Duration(10) * time.Second
|
||||||
|
sess, err := mgo.DialWithInfo(dialInfo)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Unable to connect to MongoDB, %s\n", err.Error())
|
||||||
|
}
|
||||||
|
server.Session = sess
|
||||||
|
server.Session, _ = mgo.Dial(server.Url.Host)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalln(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testTeardown(m *testing.M) {
|
||||||
|
server.Session.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
// seed randomness for use with tests
|
||||||
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
|
|
||||||
|
testSetup(m)
|
||||||
|
res := m.Run()
|
||||||
|
testTeardown(m)
|
||||||
|
|
||||||
|
os.Exit(res)
|
||||||
|
}
|
|
@ -0,0 +1,549 @@
|
||||||
|
/***
|
||||||
|
The code contained here came from https://github.com/mongodb/mongo-tools/blob/master/mongostat/stat_types.go
|
||||||
|
and contains modifications so that no other dependency from that project is needed. Other modifications included
|
||||||
|
removing uneccessary code specific to formatting the output and determine the current state of the database. It
|
||||||
|
is licensed under Apache Version 2.0, http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
***/
|
||||||
|
|
||||||
|
package mongodb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MongosProcess = "mongos"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Flags to determine cases when to activate/deactivate columns for output.
|
||||||
|
const (
|
||||||
|
Always = 1 << iota // always activate the column
|
||||||
|
Discover // only active when mongostat is in discover mode
|
||||||
|
Repl // only active if one of the nodes being monitored is in a replset
|
||||||
|
Locks // only active if node is capable of calculating lock info
|
||||||
|
AllOnly // only active if mongostat was run with --all option
|
||||||
|
MMAPOnly // only active if node has mmap-specific fields
|
||||||
|
WTOnly // only active if node has wiredtiger-specific fields
|
||||||
|
)
|
||||||
|
|
||||||
|
type ServerStatus struct {
|
||||||
|
SampleTime time.Time `bson:""`
|
||||||
|
Host string `bson:"host"`
|
||||||
|
Version string `bson:"version"`
|
||||||
|
Process string `bson:"process"`
|
||||||
|
Pid int64 `bson:"pid"`
|
||||||
|
Uptime int64 `bson:"uptime"`
|
||||||
|
UptimeMillis int64 `bson:"uptimeMillis"`
|
||||||
|
UptimeEstimate int64 `bson:"uptimeEstimate"`
|
||||||
|
LocalTime time.Time `bson:"localTime"`
|
||||||
|
Asserts map[string]int64 `bson:"asserts"`
|
||||||
|
BackgroundFlushing *FlushStats `bson:"backgroundFlushing"`
|
||||||
|
ExtraInfo *ExtraInfo `bson:"extra_info"`
|
||||||
|
Connections *ConnectionStats `bson:"connections"`
|
||||||
|
Dur *DurStats `bson:"dur"`
|
||||||
|
GlobalLock *GlobalLockStats `bson:"globalLock"`
|
||||||
|
Locks map[string]LockStats `bson:"locks,omitempty"`
|
||||||
|
Network *NetworkStats `bson:"network"`
|
||||||
|
Opcounters *OpcountStats `bson:"opcounters"`
|
||||||
|
OpcountersRepl *OpcountStats `bson:"opcountersRepl"`
|
||||||
|
RecordStats *DBRecordStats `bson:"recordStats"`
|
||||||
|
Mem *MemStats `bson:"mem"`
|
||||||
|
Repl *ReplStatus `bson:"repl"`
|
||||||
|
ShardCursorType map[string]interface{} `bson:"shardCursorType"`
|
||||||
|
StorageEngine map[string]string `bson:"storageEngine"`
|
||||||
|
WiredTiger *WiredTiger `bson:"wiredTiger"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WiredTiger stores information related to the WiredTiger storage engine.
|
||||||
|
type WiredTiger struct {
|
||||||
|
Transaction TransactionStats `bson:"transaction"`
|
||||||
|
Concurrent ConcurrentTransactions `bson:"concurrentTransactions"`
|
||||||
|
Cache CacheStats `bson:"cache"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConcurrentTransactions struct {
|
||||||
|
Write ConcurrentTransStats `bson:"write"`
|
||||||
|
Read ConcurrentTransStats `bson:"read"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConcurrentTransStats struct {
|
||||||
|
Out int64 `bson:"out"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheStats stores cache statistics for WiredTiger.
|
||||||
|
type CacheStats struct {
|
||||||
|
TrackedDirtyBytes int64 `bson:"tracked dirty bytes in the cache"`
|
||||||
|
CurrentCachedBytes int64 `bson:"bytes currently in the cache"`
|
||||||
|
MaxBytesConfigured int64 `bson:"maximum bytes configured"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransactionStats stores transaction checkpoints in WiredTiger.
|
||||||
|
type TransactionStats struct {
|
||||||
|
TransCheckpoints int64 `bson:"transaction checkpoints"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplStatus stores data related to replica sets.
|
||||||
|
type ReplStatus struct {
|
||||||
|
SetName interface{} `bson:"setName"`
|
||||||
|
IsMaster interface{} `bson:"ismaster"`
|
||||||
|
Secondary interface{} `bson:"secondary"`
|
||||||
|
IsReplicaSet interface{} `bson:"isreplicaset"`
|
||||||
|
ArbiterOnly interface{} `bson:"arbiterOnly"`
|
||||||
|
Hosts []string `bson:"hosts"`
|
||||||
|
Passives []string `bson:"passives"`
|
||||||
|
Me string `bson:"me"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DBRecordStats stores data related to memory operations across databases.
|
||||||
|
type DBRecordStats struct {
|
||||||
|
AccessesNotInMemory int64 `bson:"accessesNotInMemory"`
|
||||||
|
PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"`
|
||||||
|
DBRecordAccesses map[string]RecordAccesses `bson:",inline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordAccesses stores data related to memory operations scoped to a database.
|
||||||
|
type RecordAccesses struct {
|
||||||
|
AccessesNotInMemory int64 `bson:"accessesNotInMemory"`
|
||||||
|
PageFaultExceptionsThrown int64 `bson:"pageFaultExceptionsThrown"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// MemStats stores data related to memory statistics.
|
||||||
|
type MemStats struct {
|
||||||
|
Bits int64 `bson:"bits"`
|
||||||
|
Resident int64 `bson:"resident"`
|
||||||
|
Virtual int64 `bson:"virtual"`
|
||||||
|
Supported interface{} `bson:"supported"`
|
||||||
|
Mapped int64 `bson:"mapped"`
|
||||||
|
MappedWithJournal int64 `bson:"mappedWithJournal"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlushStats stores information about memory flushes.
|
||||||
|
type FlushStats struct {
|
||||||
|
Flushes int64 `bson:"flushes"`
|
||||||
|
TotalMs int64 `bson:"total_ms"`
|
||||||
|
AverageMs float64 `bson:"average_ms"`
|
||||||
|
LastMs int64 `bson:"last_ms"`
|
||||||
|
LastFinished time.Time `bson:"last_finished"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectionStats stores information related to incoming database connections.
|
||||||
|
type ConnectionStats struct {
|
||||||
|
Current int64 `bson:"current"`
|
||||||
|
Available int64 `bson:"available"`
|
||||||
|
TotalCreated int64 `bson:"totalCreated"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DurTiming stores information related to journaling.
|
||||||
|
type DurTiming struct {
|
||||||
|
Dt int64 `bson:"dt"`
|
||||||
|
PrepLogBuffer int64 `bson:"prepLogBuffer"`
|
||||||
|
WriteToJournal int64 `bson:"writeToJournal"`
|
||||||
|
WriteToDataFiles int64 `bson:"writeToDataFiles"`
|
||||||
|
RemapPrivateView int64 `bson:"remapPrivateView"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DurStats stores information related to journaling statistics.
|
||||||
|
type DurStats struct {
|
||||||
|
Commits int64 `bson:"commits"`
|
||||||
|
JournaledMB int64 `bson:"journaledMB"`
|
||||||
|
WriteToDataFilesMB int64 `bson:"writeToDataFilesMB"`
|
||||||
|
Compression int64 `bson:"compression"`
|
||||||
|
CommitsInWriteLock int64 `bson:"commitsInWriteLock"`
|
||||||
|
EarlyCommits int64 `bson:"earlyCommits"`
|
||||||
|
TimeMs DurTiming
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueStats stores the number of queued read/write operations.
|
||||||
|
type QueueStats struct {
|
||||||
|
Total int64 `bson:"total"`
|
||||||
|
Readers int64 `bson:"readers"`
|
||||||
|
Writers int64 `bson:"writers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientStats stores the number of active read/write operations.
|
||||||
|
type ClientStats struct {
|
||||||
|
Total int64 `bson:"total"`
|
||||||
|
Readers int64 `bson:"readers"`
|
||||||
|
Writers int64 `bson:"writers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GlobalLockStats stores information related locks in the MMAP storage engine.
|
||||||
|
type GlobalLockStats struct {
|
||||||
|
TotalTime int64 `bson:"totalTime"`
|
||||||
|
LockTime int64 `bson:"lockTime"`
|
||||||
|
CurrentQueue *QueueStats `bson:"currentQueue"`
|
||||||
|
ActiveClients *ClientStats `bson:"activeClients"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NetworkStats stores information related to network traffic.
|
||||||
|
type NetworkStats struct {
|
||||||
|
BytesIn int64 `bson:"bytesIn"`
|
||||||
|
BytesOut int64 `bson:"bytesOut"`
|
||||||
|
NumRequests int64 `bson:"numRequests"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpcountStats stores information related to comamnds and basic CRUD operations.
|
||||||
|
type OpcountStats struct {
|
||||||
|
Insert int64 `bson:"insert"`
|
||||||
|
Query int64 `bson:"query"`
|
||||||
|
Update int64 `bson:"update"`
|
||||||
|
Delete int64 `bson:"delete"`
|
||||||
|
GetMore int64 `bson:"getmore"`
|
||||||
|
Command int64 `bson:"command"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadWriteLockTimes stores time spent holding read/write locks.
|
||||||
|
type ReadWriteLockTimes struct {
|
||||||
|
Read int64 `bson:"R"`
|
||||||
|
Write int64 `bson:"W"`
|
||||||
|
ReadLower int64 `bson:"r"`
|
||||||
|
WriteLower int64 `bson:"w"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockStats stores information related to time spent acquiring/holding locks
|
||||||
|
// for a given database.
|
||||||
|
type LockStats struct {
|
||||||
|
TimeLockedMicros ReadWriteLockTimes `bson:"timeLockedMicros"`
|
||||||
|
TimeAcquiringMicros ReadWriteLockTimes `bson:"timeAcquiringMicros"`
|
||||||
|
|
||||||
|
// AcquireCount is a new field of the lock stats only populated on 3.0 or newer.
|
||||||
|
// Typed as a pointer so that if it is nil, mongostat can assume the field is not populated
|
||||||
|
// with real namespace data.
|
||||||
|
AcquireCount *ReadWriteLockTimes `bson:"acquireCount,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtraInfo stores additional platform specific information.
|
||||||
|
type ExtraInfo struct {
|
||||||
|
PageFaults *int64 `bson:"page_faults"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatHeader describes a single column for mongostat's terminal output,
|
||||||
|
// its formatting, and in which modes it should be displayed.
|
||||||
|
type StatHeader struct {
|
||||||
|
// The text to appear in the column's header cell
|
||||||
|
HeaderText string
|
||||||
|
|
||||||
|
// Bitmask containing flags to determine if this header is active or not
|
||||||
|
ActivateFlags int
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatHeaders are the complete set of data metrics supported by mongostat.
|
||||||
|
var StatHeaders = []StatHeader{
|
||||||
|
{"", Always}, // placeholder for hostname column (blank header text)
|
||||||
|
{"insert", Always},
|
||||||
|
{"query", Always},
|
||||||
|
{"update", Always},
|
||||||
|
{"delete", Always},
|
||||||
|
{"getmore", Always},
|
||||||
|
{"command", Always},
|
||||||
|
{"% dirty", WTOnly},
|
||||||
|
{"% used", WTOnly},
|
||||||
|
{"flushes", Always},
|
||||||
|
{"mapped", MMAPOnly},
|
||||||
|
{"vsize", Always},
|
||||||
|
{"res", Always},
|
||||||
|
{"non-mapped", MMAPOnly | AllOnly},
|
||||||
|
{"faults", MMAPOnly},
|
||||||
|
{" locked db", Locks},
|
||||||
|
{"qr|qw", Always},
|
||||||
|
{"ar|aw", Always},
|
||||||
|
{"netIn", Always},
|
||||||
|
{"netOut", Always},
|
||||||
|
{"conn", Always},
|
||||||
|
{"set", Repl},
|
||||||
|
{"repl", Repl},
|
||||||
|
{"time", Always},
|
||||||
|
}
|
||||||
|
|
||||||
|
// NamespacedLocks stores information on the LockStatus of namespaces.
|
||||||
|
type NamespacedLocks map[string]LockStatus
|
||||||
|
|
||||||
|
// LockUsage stores information related to a namespace's lock usage.
|
||||||
|
type LockUsage struct {
|
||||||
|
Namespace string
|
||||||
|
Reads int64
|
||||||
|
Writes int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type lockUsages []LockUsage
|
||||||
|
|
||||||
|
func percentageInt64(value, outOf int64) float64 {
|
||||||
|
if value == 0 || outOf == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 100 * (float64(value) / float64(outOf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slice lockUsages) Len() int {
|
||||||
|
return len(slice)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slice lockUsages) Less(i, j int) bool {
|
||||||
|
return slice[i].Reads+slice[i].Writes < slice[j].Reads+slice[j].Writes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slice lockUsages) Swap(i, j int) {
|
||||||
|
slice[i], slice[j] = slice[j], slice[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockStatus stores a database's lock statistics.
|
||||||
|
type LockStatus struct {
|
||||||
|
DBName string
|
||||||
|
Percentage float64
|
||||||
|
Global bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatLine is a wrapper for all metrics reported by mongostat for monitored hosts.
|
||||||
|
type StatLine struct {
|
||||||
|
Key string
|
||||||
|
// What storage engine is being used for the node with this stat line
|
||||||
|
StorageEngine string
|
||||||
|
|
||||||
|
Error error
|
||||||
|
IsMongos bool
|
||||||
|
Host string
|
||||||
|
|
||||||
|
// The time at which this StatLine was generated.
|
||||||
|
Time time.Time
|
||||||
|
|
||||||
|
// The last time at which this StatLine was printed to output.
|
||||||
|
LastPrinted time.Time
|
||||||
|
|
||||||
|
// Opcounter fields
|
||||||
|
Insert, Query, Update, Delete, GetMore, Command int64
|
||||||
|
|
||||||
|
// Cache utilization (wiredtiger only)
|
||||||
|
CacheDirtyPercent float64
|
||||||
|
CacheUsedPercent float64
|
||||||
|
|
||||||
|
// Replicated Opcounter fields
|
||||||
|
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
|
||||||
|
Flushes int64
|
||||||
|
Mapped, Virtual, Resident, NonMapped int64
|
||||||
|
Faults int64
|
||||||
|
HighestLocked *LockStatus
|
||||||
|
QueuedReaders, QueuedWriters int64
|
||||||
|
ActiveReaders, ActiveWriters int64
|
||||||
|
NetIn, NetOut int64
|
||||||
|
NumConnections int64
|
||||||
|
ReplSetName string
|
||||||
|
NodeType string
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseLocks(stat ServerStatus) map[string]LockUsage {
|
||||||
|
returnVal := map[string]LockUsage{}
|
||||||
|
for namespace, lockInfo := range stat.Locks {
|
||||||
|
returnVal[namespace] = LockUsage{
|
||||||
|
namespace,
|
||||||
|
lockInfo.TimeLockedMicros.Read + lockInfo.TimeLockedMicros.ReadLower,
|
||||||
|
lockInfo.TimeLockedMicros.Write + lockInfo.TimeLockedMicros.WriteLower,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return returnVal
|
||||||
|
}
|
||||||
|
|
||||||
|
func computeLockDiffs(prevLocks, curLocks map[string]LockUsage) []LockUsage {
|
||||||
|
lockUsages := lockUsages(make([]LockUsage, 0, len(curLocks)))
|
||||||
|
for namespace, curUsage := range curLocks {
|
||||||
|
prevUsage, hasKey := prevLocks[namespace]
|
||||||
|
if !hasKey {
|
||||||
|
// This namespace didn't appear in the previous batch of lock info,
|
||||||
|
// so we can't compute a diff for it - skip it.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Calculate diff of lock usage for this namespace and add to the list
|
||||||
|
lockUsages = append(lockUsages,
|
||||||
|
LockUsage{
|
||||||
|
namespace,
|
||||||
|
curUsage.Reads - prevUsage.Reads,
|
||||||
|
curUsage.Writes - prevUsage.Writes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// Sort the array in order of least to most locked
|
||||||
|
sort.Sort(lockUsages)
|
||||||
|
return lockUsages
|
||||||
|
}
|
||||||
|
|
||||||
|
func diff(newVal, oldVal, sampleTime int64) int64 {
|
||||||
|
return (newVal - oldVal) / sampleTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatLine constructs a StatLine object from two ServerStatus objects.
|
||||||
|
func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine {
|
||||||
|
returnVal := &StatLine{
|
||||||
|
Key: key,
|
||||||
|
Host: newStat.Host,
|
||||||
|
Mapped: -1,
|
||||||
|
Virtual: -1,
|
||||||
|
Resident: -1,
|
||||||
|
NonMapped: -1,
|
||||||
|
Faults: -1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the storage engine appropriately
|
||||||
|
if newStat.StorageEngine != nil && newStat.StorageEngine["name"] != "" {
|
||||||
|
returnVal.StorageEngine = newStat.StorageEngine["name"]
|
||||||
|
} else {
|
||||||
|
returnVal.StorageEngine = "mmapv1"
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStat.Opcounters != nil && oldStat.Opcounters != nil {
|
||||||
|
returnVal.Insert = diff(newStat.Opcounters.Insert, oldStat.Opcounters.Insert, sampleSecs)
|
||||||
|
returnVal.Query = diff(newStat.Opcounters.Query, oldStat.Opcounters.Query, sampleSecs)
|
||||||
|
returnVal.Update = diff(newStat.Opcounters.Update, oldStat.Opcounters.Update, sampleSecs)
|
||||||
|
returnVal.Delete = diff(newStat.Opcounters.Delete, oldStat.Opcounters.Delete, sampleSecs)
|
||||||
|
returnVal.GetMore = diff(newStat.Opcounters.GetMore, oldStat.Opcounters.GetMore, sampleSecs)
|
||||||
|
returnVal.Command = diff(newStat.Opcounters.Command, oldStat.Opcounters.Command, sampleSecs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStat.OpcountersRepl != nil && oldStat.OpcountersRepl != nil {
|
||||||
|
returnVal.InsertR = diff(newStat.OpcountersRepl.Insert, oldStat.OpcountersRepl.Insert, sampleSecs)
|
||||||
|
returnVal.QueryR = diff(newStat.OpcountersRepl.Query, oldStat.OpcountersRepl.Query, sampleSecs)
|
||||||
|
returnVal.UpdateR = diff(newStat.OpcountersRepl.Update, oldStat.OpcountersRepl.Update, sampleSecs)
|
||||||
|
returnVal.DeleteR = diff(newStat.OpcountersRepl.Delete, oldStat.OpcountersRepl.Delete, sampleSecs)
|
||||||
|
returnVal.GetMoreR = diff(newStat.OpcountersRepl.GetMore, oldStat.OpcountersRepl.GetMore, sampleSecs)
|
||||||
|
returnVal.CommandR = diff(newStat.OpcountersRepl.Command, oldStat.OpcountersRepl.Command, sampleSecs)
|
||||||
|
}
|
||||||
|
|
||||||
|
returnVal.CacheDirtyPercent = -1
|
||||||
|
returnVal.CacheUsedPercent = -1
|
||||||
|
if newStat.WiredTiger != nil && oldStat.WiredTiger != nil {
|
||||||
|
returnVal.Flushes = newStat.WiredTiger.Transaction.TransCheckpoints - oldStat.WiredTiger.Transaction.TransCheckpoints
|
||||||
|
returnVal.CacheDirtyPercent = float64(newStat.WiredTiger.Cache.TrackedDirtyBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured)
|
||||||
|
returnVal.CacheUsedPercent = float64(newStat.WiredTiger.Cache.CurrentCachedBytes) / float64(newStat.WiredTiger.Cache.MaxBytesConfigured)
|
||||||
|
} else if newStat.BackgroundFlushing != nil && oldStat.BackgroundFlushing != nil {
|
||||||
|
returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
|
||||||
|
}
|
||||||
|
|
||||||
|
returnVal.Time = newStat.SampleTime
|
||||||
|
returnVal.IsMongos =
|
||||||
|
(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
|
||||||
|
|
||||||
|
// BEGIN code modification
|
||||||
|
if oldStat.Mem.Supported.(bool) {
|
||||||
|
// END code modification
|
||||||
|
if !returnVal.IsMongos {
|
||||||
|
returnVal.Mapped = newStat.Mem.Mapped
|
||||||
|
}
|
||||||
|
returnVal.Virtual = newStat.Mem.Virtual
|
||||||
|
returnVal.Resident = newStat.Mem.Resident
|
||||||
|
|
||||||
|
if !returnVal.IsMongos && all {
|
||||||
|
returnVal.NonMapped = newStat.Mem.Virtual - newStat.Mem.Mapped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStat.Repl != nil {
|
||||||
|
setName, isReplSet := newStat.Repl.SetName.(string)
|
||||||
|
if isReplSet {
|
||||||
|
returnVal.ReplSetName = setName
|
||||||
|
}
|
||||||
|
// BEGIN code modification
|
||||||
|
if newStat.Repl.IsMaster.(bool) {
|
||||||
|
returnVal.NodeType = "PRI"
|
||||||
|
} else if newStat.Repl.Secondary.(bool) {
|
||||||
|
returnVal.NodeType = "SEC"
|
||||||
|
} else {
|
||||||
|
returnVal.NodeType = "UNK"
|
||||||
|
}
|
||||||
|
// END code modification
|
||||||
|
} else if returnVal.IsMongos {
|
||||||
|
returnVal.NodeType = "RTR"
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldStat.ExtraInfo != nil && newStat.ExtraInfo != nil &&
|
||||||
|
oldStat.ExtraInfo.PageFaults != nil && newStat.ExtraInfo.PageFaults != nil {
|
||||||
|
returnVal.Faults = diff(*(newStat.ExtraInfo.PageFaults), *(oldStat.ExtraInfo.PageFaults), sampleSecs)
|
||||||
|
}
|
||||||
|
if !returnVal.IsMongos && oldStat.Locks != nil && oldStat.Locks != nil {
|
||||||
|
globalCheck, hasGlobal := oldStat.Locks["Global"]
|
||||||
|
if hasGlobal && globalCheck.AcquireCount != nil {
|
||||||
|
// This appears to be a 3.0+ server so the data in these fields do *not* refer to
|
||||||
|
// actual namespaces and thus we can't compute lock %.
|
||||||
|
returnVal.HighestLocked = nil
|
||||||
|
} else {
|
||||||
|
prevLocks := parseLocks(oldStat)
|
||||||
|
curLocks := parseLocks(newStat)
|
||||||
|
lockdiffs := computeLockDiffs(prevLocks, curLocks)
|
||||||
|
if len(lockdiffs) == 0 {
|
||||||
|
if newStat.GlobalLock != nil {
|
||||||
|
returnVal.HighestLocked = &LockStatus{
|
||||||
|
DBName: "",
|
||||||
|
Percentage: percentageInt64(newStat.GlobalLock.LockTime, newStat.GlobalLock.TotalTime),
|
||||||
|
Global: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Get the entry with the highest lock
|
||||||
|
highestLocked := lockdiffs[len(lockdiffs)-1]
|
||||||
|
|
||||||
|
var timeDiffMillis int64
|
||||||
|
timeDiffMillis = newStat.UptimeMillis - oldStat.UptimeMillis
|
||||||
|
|
||||||
|
lockToReport := highestLocked.Writes
|
||||||
|
|
||||||
|
// if the highest locked namespace is not '.'
|
||||||
|
if highestLocked.Namespace != "." {
|
||||||
|
for _, namespaceLockInfo := range lockdiffs {
|
||||||
|
if namespaceLockInfo.Namespace == "." {
|
||||||
|
lockToReport += namespaceLockInfo.Writes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock data is in microseconds and uptime is in milliseconds - so
|
||||||
|
// divide by 1000 so that they units match
|
||||||
|
lockToReport /= 1000
|
||||||
|
|
||||||
|
returnVal.HighestLocked = &LockStatus{
|
||||||
|
DBName: highestLocked.Namespace,
|
||||||
|
Percentage: percentageInt64(lockToReport, timeDiffMillis),
|
||||||
|
Global: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
returnVal.HighestLocked = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStat.GlobalLock != nil {
|
||||||
|
hasWT := (newStat.WiredTiger != nil && oldStat.WiredTiger != nil)
|
||||||
|
//If we have wiredtiger stats, use those instead
|
||||||
|
if newStat.GlobalLock.CurrentQueue != nil {
|
||||||
|
if hasWT {
|
||||||
|
returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers + newStat.GlobalLock.ActiveClients.Readers - newStat.WiredTiger.Concurrent.Read.Out
|
||||||
|
returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers + newStat.GlobalLock.ActiveClients.Writers - newStat.WiredTiger.Concurrent.Write.Out
|
||||||
|
if returnVal.QueuedReaders < 0 {
|
||||||
|
returnVal.QueuedReaders = 0
|
||||||
|
}
|
||||||
|
if returnVal.QueuedWriters < 0 {
|
||||||
|
returnVal.QueuedWriters = 0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
returnVal.QueuedReaders = newStat.GlobalLock.CurrentQueue.Readers
|
||||||
|
returnVal.QueuedWriters = newStat.GlobalLock.CurrentQueue.Writers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasWT {
|
||||||
|
returnVal.ActiveReaders = newStat.WiredTiger.Concurrent.Read.Out
|
||||||
|
returnVal.ActiveWriters = newStat.WiredTiger.Concurrent.Write.Out
|
||||||
|
} else if newStat.GlobalLock.ActiveClients != nil {
|
||||||
|
returnVal.ActiveReaders = newStat.GlobalLock.ActiveClients.Readers
|
||||||
|
returnVal.ActiveWriters = newStat.GlobalLock.ActiveClients.Writers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldStat.Network != nil && newStat.Network != nil {
|
||||||
|
returnVal.NetIn = diff(newStat.Network.BytesIn, oldStat.Network.BytesIn, sampleSecs)
|
||||||
|
returnVal.NetOut = diff(newStat.Network.BytesOut, oldStat.Network.BytesOut, sampleSecs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStat.Connections != nil {
|
||||||
|
returnVal.NumConnections = newStat.Connections.Current
|
||||||
|
}
|
||||||
|
|
||||||
|
return returnVal
|
||||||
|
}
|
|
@ -8,7 +8,6 @@ import (
|
||||||
|
|
||||||
type Point struct {
|
type Point struct {
|
||||||
Measurement string
|
Measurement string
|
||||||
Value interface{}
|
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
Values map[string]interface{}
|
Values map[string]interface{}
|
||||||
Time time.Time
|
Time time.Time
|
||||||
|
@ -26,7 +25,7 @@ func (a *Accumulator) Add(measurement string, value interface{}, tags map[string
|
||||||
a.Points,
|
a.Points,
|
||||||
&Point{
|
&Point{
|
||||||
Measurement: measurement,
|
Measurement: measurement,
|
||||||
Value: value,
|
Values: map[string]interface{}{"value": value},
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -62,7 +61,7 @@ func (a *Accumulator) Get(measurement string) (*Point, bool) {
|
||||||
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
|
func (a *Accumulator) CheckValue(measurement string, val interface{}) bool {
|
||||||
for _, p := range a.Points {
|
for _, p := range a.Points {
|
||||||
if p.Measurement == measurement {
|
if p.Measurement == measurement {
|
||||||
return p.Value == val
|
return p.Values["value"] == val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,8 +82,8 @@ func (a *Accumulator) ValidateTaggedValue(measurement string, val interface{}, t
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Measurement == measurement {
|
if p.Measurement == measurement {
|
||||||
if p.Value != val {
|
if p.Values["value"] != val {
|
||||||
return fmt.Errorf("%v (%T) != %v (%T)", p.Value, p.Value, val, val)
|
return fmt.Errorf("%v (%T) != %v (%T)", p.Values["value"], p.Values["value"], val, val)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -100,7 +99,7 @@ func (a *Accumulator) ValidateValue(measurement string, val interface{}) error {
|
||||||
func (a *Accumulator) HasIntValue(measurement string) bool {
|
func (a *Accumulator) HasIntValue(measurement string) bool {
|
||||||
for _, p := range a.Points {
|
for _, p := range a.Points {
|
||||||
if p.Measurement == measurement {
|
if p.Measurement == measurement {
|
||||||
_, ok := p.Value.(int64)
|
_, ok := p.Values["value"].(int64)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -111,7 +110,7 @@ func (a *Accumulator) HasIntValue(measurement string) bool {
|
||||||
func (a *Accumulator) HasFloatValue(measurement string) bool {
|
func (a *Accumulator) HasFloatValue(measurement string) bool {
|
||||||
for _, p := range a.Points {
|
for _, p := range a.Points {
|
||||||
if p.Measurement == measurement {
|
if p.Measurement == measurement {
|
||||||
_, ok := p.Value.(float64)
|
_, ok := p.Values["value"].(float64)
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue