228 lines
5.6 KiB
Go
228 lines
5.6 KiB
Go
package mongodb
|
|
|
|
import (
|
|
"log"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"gopkg.in/mgo.v2"
|
|
"gopkg.in/mgo.v2/bson"
|
|
)
|
|
|
|
type Server struct {
|
|
Url *url.URL
|
|
Session *mgo.Session
|
|
lastResult *MongoStatus
|
|
}
|
|
|
|
func (s *Server) getDefaultTags() map[string]string {
|
|
tags := make(map[string]string)
|
|
tags["hostname"] = s.Url.Host
|
|
return tags
|
|
}
|
|
|
|
type oplogEntry struct {
|
|
Timestamp bson.MongoTimestamp `bson:"ts"`
|
|
}
|
|
|
|
func IsAuthorization(err error) bool {
|
|
return strings.Contains(err.Error(), "not authorized")
|
|
}
|
|
|
|
func (s *Server) gatherOplogStats() *OplogStats {
|
|
stats := &OplogStats{}
|
|
localdb := s.Session.DB("local")
|
|
|
|
op_first := oplogEntry{}
|
|
op_last := oplogEntry{}
|
|
query := bson.M{"ts": bson.M{"$exists": true}}
|
|
|
|
for _, collection_name := range []string{"oplog.rs", "oplog.$main"} {
|
|
if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
|
|
if err == mgo.ErrNotFound {
|
|
continue
|
|
}
|
|
if IsAuthorization(err) {
|
|
log.Println("D! Error getting first oplog entry (" + err.Error() + ")")
|
|
} else {
|
|
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
|
|
}
|
|
return stats
|
|
}
|
|
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
|
|
if err == mgo.ErrNotFound || IsAuthorization(err) {
|
|
continue
|
|
}
|
|
if IsAuthorization(err) {
|
|
log.Println("D! Error getting first oplog entry (" + err.Error() + ")")
|
|
} else {
|
|
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
|
|
}
|
|
return stats
|
|
}
|
|
}
|
|
|
|
op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
|
|
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
|
|
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
|
|
return stats
|
|
}
|
|
|
|
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
|
|
s.Session.SetMode(mgo.Eventual, true)
|
|
s.Session.SetSocketTimeout(0)
|
|
result_server := &ServerStatus{}
|
|
err := s.Session.DB("admin").Run(bson.D{
|
|
{
|
|
Name: "serverStatus",
|
|
Value: 1,
|
|
},
|
|
{
|
|
Name: "recordStats",
|
|
Value: 0,
|
|
},
|
|
}, result_server)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
result_repl := &ReplSetStatus{}
|
|
// ignore error because it simply indicates that the db is not a member
|
|
// in a replica set, which is fine.
|
|
_ = s.Session.DB("admin").Run(bson.D{
|
|
{
|
|
Name: "replSetGetStatus",
|
|
Value: 1,
|
|
},
|
|
}, result_repl)
|
|
|
|
jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
|
|
|
|
result_cluster := &ClusterStatus{
|
|
JumboChunksCount: int64(jumbo_chunks),
|
|
}
|
|
|
|
resultShards := &ShardStats{}
|
|
err = s.Session.DB("admin").Run(bson.D{
|
|
{
|
|
Name: "shardConnPoolStats",
|
|
Value: 1,
|
|
},
|
|
}, &resultShards)
|
|
if err != nil {
|
|
if IsAuthorization(err) {
|
|
log.Println("D! Error getting database shard stats (" + err.Error() + ")")
|
|
} else {
|
|
log.Println("E! Error getting database shard stats (" + err.Error() + ")")
|
|
}
|
|
}
|
|
|
|
oplogStats := s.gatherOplogStats()
|
|
|
|
result_db_stats := &DbStats{}
|
|
if gatherDbStats == true {
|
|
names := []string{}
|
|
names, err = s.Session.DatabaseNames()
|
|
if err != nil {
|
|
log.Println("E! Error getting database names (" + err.Error() + ")")
|
|
}
|
|
for _, db_name := range names {
|
|
db_stat_line := &DbStatsData{}
|
|
err = s.Session.DB(db_name).Run(bson.D{
|
|
{
|
|
Name: "dbStats",
|
|
Value: 1,
|
|
},
|
|
}, db_stat_line)
|
|
if err != nil {
|
|
log.Println("E! Error getting db stats from " + db_name + "(" + err.Error() + ")")
|
|
}
|
|
db := &Db{
|
|
Name: db_name,
|
|
DbStatsData: db_stat_line,
|
|
}
|
|
|
|
result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
|
|
}
|
|
}
|
|
|
|
result_col_stats := &ColStats{}
|
|
if gatherColStats == true {
|
|
names := []string{}
|
|
names, err = s.Session.DatabaseNames()
|
|
if err != nil {
|
|
log.Println("E! Error getting database names (" + err.Error() + ")")
|
|
}
|
|
for _, db_name := range names {
|
|
if stringInSlice(db_name, colStatsDbs) || len(colStatsDbs) == 0 {
|
|
var colls []string
|
|
colls, err = s.Session.DB(db_name).CollectionNames()
|
|
if err != nil {
|
|
log.Println("E! Error getting collection names (" + err.Error() + ")")
|
|
}
|
|
for _, col_name := range colls {
|
|
col_stat_line := &ColStatsData{}
|
|
err = s.Session.DB(db_name).Run(bson.D{
|
|
{
|
|
Name: "collStats",
|
|
Value: col_name,
|
|
},
|
|
}, col_stat_line)
|
|
if err != nil {
|
|
log.Println("E! Error getting col stats from " + col_name + "(" + err.Error() + ")")
|
|
}
|
|
collection := &Collection{
|
|
Name: col_name,
|
|
DbName: db_name,
|
|
ColStatsData: col_stat_line,
|
|
}
|
|
result_col_stats.Collections = append(result_col_stats.Collections, *collection)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
result := &MongoStatus{
|
|
ServerStatus: result_server,
|
|
ReplSetStatus: result_repl,
|
|
ClusterStatus: result_cluster,
|
|
DbStats: result_db_stats,
|
|
ColStats: result_col_stats,
|
|
ShardStats: resultShards,
|
|
OplogStats: oplogStats,
|
|
}
|
|
|
|
defer func() {
|
|
s.lastResult = result
|
|
}()
|
|
|
|
result.SampleTime = time.Now()
|
|
if s.lastResult != nil && result != nil {
|
|
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
|
|
durationInSeconds := int64(duration.Seconds())
|
|
if durationInSeconds == 0 {
|
|
durationInSeconds = 1
|
|
}
|
|
data := NewMongodbData(
|
|
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
|
|
s.getDefaultTags(),
|
|
)
|
|
data.AddDefaultStats()
|
|
data.AddDbStats()
|
|
data.AddColStats()
|
|
data.AddShardHostStats()
|
|
data.flush(acc)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func stringInSlice(a string, list []string) bool {
|
|
for _, b := range list {
|
|
if b == a {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|