Add repl_oplog_window_s metric to mongodb input (#3964)

This commit is contained in:
Matvey Kruglov 2018-04-07 02:34:47 +03:00 committed by Daniel Nelson
parent fb6390e7ab
commit 01ede2ea0b
5 changed files with 49 additions and 0 deletions

View File

@ -56,6 +56,7 @@ and create a single measurement containing values e.g.
* ttl_deletes_per_sec
* ttl_passes_per_sec
* repl_lag
* repl_oplog_window_s
* jumbo_chunks (only if mongos or mongo config)
If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()

View File

@ -60,6 +60,7 @@ var DefaultReplStats = map[string]string{
"member_status": "NodeType",
"state": "NodeState",
"repl_lag": "ReplLag",
"repl_oplog_window_s": "OplogTimeDiff",
}
var DefaultClusterStats = map[string]string{

View File

@ -162,6 +162,7 @@ func TestStateTag(t *testing.T) {
"repl_queries_per_sec": int64(0),
"repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"repl_oplog_window_s": int64(0),
"resident_megabytes": int64(0),
"updates_per_sec": int64(0),
"vsize_megabytes": int64(0),

View File

@ -22,6 +22,41 @@ func (s *Server) getDefaultTags() map[string]string {
return tags
}
type oplogEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}
func (s *Server) gatherOplogStats() *OplogStats {
stats := &OplogStats{}
localdb := s.Session.DB("local")
op_first := oplogEntry{}
op_last := oplogEntry{}
query := bson.M{"ts": bson.M{"$exists": true}}
for _, collection_name := range []string{"oplog.rs", "oplog.$main"} {
if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
if err == mgo.ErrNotFound {
continue
}
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
return stats
}
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
if err == mgo.ErrNotFound {
continue
}
log.Println("E! Error getting last oplog entry (" + err.Error() + ")")
return stats
}
}
op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
return stats
}
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
@ -66,6 +101,8 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
log.Println("E! Error getting database shard stats (" + err.Error() + ")")
}
oplogStats := s.gatherOplogStats()
result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
@ -99,6 +136,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
ClusterStatus: result_cluster,
DbStats: result_db_stats,
ShardStats: resultShards,
OplogStats: oplogStats,
}
defer func() {

View File

@ -35,6 +35,7 @@ type MongoStatus struct {
ClusterStatus *ClusterStatus
DbStats *DbStats
ShardStats *ShardStats
OplogStats *OplogStats
}
type ServerStatus struct {
@ -102,6 +103,11 @@ type ReplSetStatus struct {
MyState int64 `bson:"myState"`
}
// OplogStatus stores information from getReplicationInfo
type OplogStats struct {
TimeDiff int64
}
// ReplSetMember stores information related to a replica set member
type ReplSetMember struct {
Name string `bson:"name"`
@ -442,6 +448,7 @@ type StatLine struct {
// Replicated Opcounter fields
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
ReplLag int64
OplogTimeDiff int64
Flushes int64
Mapped, Virtual, Resident, NonMapped int64
Faults int64
@ -772,6 +779,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
returnVal.OplogTimeDiff = newMongo.OplogStats.TimeDiff
newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {