diff --git a/agent/agent.go b/agent/agent.go index 700bccb05..e2ef79b84 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -210,7 +210,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. switch input.Config.Name { - case "inputs.cpu", "inputs.mongodb", "inputs.procstat": + case "cpu", "mongodb", "procstat": nulAcc := NewAccumulator(input, nulC) nulAcc.SetPrecision(a.Precision()) if err := input.Input.Gather(nulAcc); err != nil { diff --git a/plugins/inputs/mongodb/mongodb_data.go b/plugins/inputs/mongodb/mongodb_data.go index c218fd3ad..6f999cbd7 100644 --- a/plugins/inputs/mongodb/mongodb_data.go +++ b/plugins/inputs/mongodb/mongodb_data.go @@ -101,7 +101,6 @@ var DefaultReplStats = map[string]string{ "member_status": "NodeType", "state": "NodeState", "repl_lag": "ReplLag", - "repl_oplog_window_sec": "OplogTimeDiff", } var DefaultClusterStats = map[string]string{ @@ -230,6 +229,11 @@ func (d *MongodbData) AddDefaultStats() { if d.StatLine.NodeType != "" { d.addStat(statLine, DefaultReplStats) } + + if d.StatLine.OplogStats != nil { + d.add("repl_oplog_window_sec", d.StatLine.OplogStats.TimeDiff) + } + d.addStat(statLine, DefaultClusterStats) d.addStat(statLine, DefaultShardStats) if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" { diff --git a/plugins/inputs/mongodb/mongodb_data_test.go b/plugins/inputs/mongodb/mongodb_data_test.go index da50bdc9e..527e7ab93 100644 --- a/plugins/inputs/mongodb/mongodb_data_test.go +++ b/plugins/inputs/mongodb/mongodb_data_test.go @@ -232,7 +232,6 @@ func TestStateTag(t *testing.T) { "repl_updates": int64(0), "repl_updates_per_sec": int64(0), "repl_lag": int64(0), - "repl_oplog_window_sec": int64(0), "resident_megabytes": int64(0), "updates": int64(0), "updates_per_sec": int64(0), diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index 4df14c014..404fa8143 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -39,35 +39,116 @@ func authLogLevel(err error) string { } } -func (s *Server) gatherOplogStats() *OplogStats { - stats := &OplogStats{} - localdb := s.Session.DB("local") +func (s *Server) gatherServerStatus() (*ServerStatus, error) { + serverStatus := &ServerStatus{} + err := s.Session.DB("admin").Run(bson.D{ + { + Name: "serverStatus", + Value: 1, + }, + { + Name: "recordStats", + Value: 0, + }, + }, serverStatus) + if err != nil { + return nil, err + } + return serverStatus, nil +} - op_first := oplogEntry{} - op_last := oplogEntry{} - query := bson.M{"ts": bson.M{"$exists": true}} +func (s *Server) gatherReplSetStatus() (*ReplSetStatus, error) { + replSetStatus := &ReplSetStatus{} + err := s.Session.DB("admin").Run(bson.D{ + { + Name: "replSetGetStatus", + Value: 1, + }, + }, replSetStatus) + if err != nil { + return nil, err + } + return replSetStatus, nil +} - for _, collection_name := range []string{"oplog.rs", "oplog.$main"} { - if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil { - if err == mgo.ErrNotFound { - continue - } - log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err) - return stats - } - if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil { - if err == mgo.ErrNotFound || IsAuthorization(err) { - continue - } - log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err) - return stats - } +func (s *Server) gatherClusterStatus() (*ClusterStatus, error) { + chunkCount, err := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count() + if err != nil { + return nil, err } - op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0) - op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0) - stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds()) - return stats + return &ClusterStatus{ + JumboChunksCount: int64(chunkCount), + }, nil +} + +func (s *Server) gatherShardConnPoolStats() (*ShardStats, error) { + shardStats := &ShardStats{} + err := s.Session.DB("admin").Run(bson.D{ + { + Name: "shardConnPoolStats", + Value: 1, + }, + }, &shardStats) + if err != nil { + return nil, err + } + return shardStats, nil +} + +func (s *Server) gatherDBStats(name string) (*Db, error) { + stats := &DbStatsData{} + err := s.Session.DB(name).Run(bson.D{ + { + Name: "dbStats", + Value: 1, + }, + }, stats) + if err != nil { + return nil, err + } + + return &Db{ + Name: name, + DbStatsData: stats, + }, nil +} + +func (s *Server) getOplogReplLag(collection string) (*OplogStats, error) { + query := bson.M{"ts": bson.M{"$exists": true}} + + var first oplogEntry + err := s.Session.DB("local").C(collection).Find(query).Sort("$natural").Limit(1).One(&first) + if err != nil { + return nil, err + } + + var last oplogEntry + err = s.Session.DB("local").C(collection).Find(query).Sort("-$natural").Limit(1).One(&last) + if err != nil { + return nil, err + } + + firstTime := time.Unix(int64(first.Timestamp>>32), 0) + lastTime := time.Unix(int64(last.Timestamp>>32), 0) + stats := &OplogStats{ + TimeDiff: int64(lastTime.Sub(firstTime).Seconds()), + } + return stats, nil +} + +// The "oplog.rs" collection is stored on all replica set members. +// +// The "oplog.$main" collection is created on the master node of a +// master-slave replicated deployment. As of MongoDB 3.2, master-slave +// replication has been deprecated. +func (s *Server) gatherOplogStats() (*OplogStats, error) { + stats, err := s.getOplogReplLag("oplog.rs") + if err == nil { + return stats, nil + } + + return s.getOplogReplLag("oplog.$main") } func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) { @@ -112,99 +193,71 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error { s.Session.SetMode(mgo.Eventual, true) s.Session.SetSocketTimeout(0) - result_server := &ServerStatus{} - err := s.Session.DB("admin").Run(bson.D{ - { - Name: "serverStatus", - Value: 1, - }, - { - Name: "recordStats", - Value: 0, - }, - }, result_server) + + serverStatus, err := s.gatherServerStatus() if err != nil { return err } - result_repl := &ReplSetStatus{} - // ignore error because it simply indicates that the db is not a member - // in a replica set, which is fine. - _ = s.Session.DB("admin").Run(bson.D{ - { - Name: "replSetGetStatus", - Value: 1, - }, - }, result_repl) - jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count() - - result_cluster := &ClusterStatus{ - JumboChunksCount: int64(jumbo_chunks), - } - - resultShards := &ShardStats{} - err = s.Session.DB("admin").Run(bson.D{ - { - Name: "shardConnPoolStats", - Value: 1, - }, - }, &resultShards) + // Get replica set status, an error indicates that the server is not a + // member of a replica set. + replSetStatus, err := s.gatherReplSetStatus() if err != nil { - if IsAuthorization(err) { - log.Printf("D! [inputs.mongodb] Error getting database shard stats: %v", err) - } else { - log.Printf("E! [inputs.mongodb] Error getting database shard stats: %v", err) - } + log.Printf("D! [inputs.mongodb] Unable to gather replica set status: %v", err) } - oplogStats := s.gatherOplogStats() - - result_db_stats := &DbStats{} - if gatherDbStats == true { - names := []string{} - names, err = s.Session.DatabaseNames() + // Gather the oplog if we are a member of a replica set. Non-replica set + // members do not have the oplog collections. + var oplogStats *OplogStats + if replSetStatus != nil { + oplogStats, err = s.gatherOplogStats() if err != nil { - log.Printf("E! [inputs.mongodb] Error getting database names: %v", err) - } - for _, db_name := range names { - db_stat_line := &DbStatsData{} - err = s.Session.DB(db_name).Run(bson.D{ - { - Name: "dbStats", - Value: 1, - }, - }, db_stat_line) - if err != nil { - log.Printf("E! [inputs.mongodb] Error getting db stats from %q: %v", db_name, err) - } - db := &Db{ - Name: db_name, - DbStatsData: db_stat_line, - } - - result_db_stats.Dbs = append(result_db_stats.Dbs, *db) + return err } } - result_col_stats, err := s.gatherCollectionStats(colStatsDbs) + clusterStatus, err := s.gatherClusterStatus() + if err != nil { + log.Printf("D! [inputs.mongodb] Unable to gather cluster status: %v", err) + } + + shardStats, err := s.gatherShardConnPoolStats() + if err != nil { + log.Printf("%s [inputs.mongodb] Unable to gather shard connection pool stats: %v", + authLogLevel(err), err) + } + + collectionStats, err := s.gatherCollectionStats(colStatsDbs) if err != nil { return err } + dbStats := &DbStats{} + if gatherDbStats { + names, err := s.Session.DatabaseNames() + if err != nil { + return err + } + + for _, name := range names { + db, err := s.gatherDBStats(name) + if err != nil { + log.Printf("D! [inputs.mongodb] Error getting db stats from %q: %v", name, err) + } + dbStats.Dbs = append(dbStats.Dbs, *db) + } + } + result := &MongoStatus{ - ServerStatus: result_server, - ReplSetStatus: result_repl, - ClusterStatus: result_cluster, - DbStats: result_db_stats, - ColStats: result_col_stats, - ShardStats: resultShards, + ServerStatus: serverStatus, + ReplSetStatus: replSetStatus, + ClusterStatus: clusterStatus, + DbStats: dbStats, + ColStats: collectionStats, + ShardStats: shardStats, OplogStats: oplogStats, } - defer func() { - s.lastResult = result - }() - result.SampleTime = time.Now() if s.lastResult != nil && result != nil { duration := result.SampleTime.Sub(s.lastResult.SampleTime) @@ -222,6 +275,8 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather data.AddShardHostStats() data.flush(acc) } + + s.lastResult = result return nil } diff --git a/plugins/inputs/mongodb/mongostat.go b/plugins/inputs/mongodb/mongostat.go index 709c074d7..d82100974 100644 --- a/plugins/inputs/mongodb/mongostat.go +++ b/plugins/inputs/mongodb/mongostat.go @@ -541,7 +541,7 @@ type StatLine struct { GetMoreR, GetMoreRCnt int64 CommandR, CommandRCnt int64 ReplLag int64 - OplogTimeDiff int64 + OplogStats *OplogStats Flushes, FlushesCnt int64 FlushesTotalTime int64 Mapped, Virtual, Resident, NonMapped int64 @@ -890,66 +890,75 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec returnVal.NumConnections = newStat.Connections.Current } - newReplStat := *newMongo.ReplSetStatus + if newMongo.ReplSetStatus != nil { + newReplStat := *newMongo.ReplSetStatus - if newReplStat.Members != nil { - myName := newStat.Repl.Me - // Find the master and myself - master := ReplSetMember{} - me := ReplSetMember{} - for _, member := range newReplStat.Members { - if member.Name == myName { - // Store my state string - returnVal.NodeState = member.StateStr - if member.State == 1 { - // I'm the master - returnVal.ReplLag = 0 - break - } else { - // I'm secondary - me = member + if newReplStat.Members != nil { + myName := newStat.Repl.Me + // Find the master and myself + master := ReplSetMember{} + me := ReplSetMember{} + for _, member := range newReplStat.Members { + if member.Name == myName { + // Store my state string + returnVal.NodeState = member.StateStr + if member.State == 1 { + // I'm the master + returnVal.ReplLag = 0 + break + } else { + // I'm secondary + me = member + } + } else if member.State == 1 { + // Master found + master = member } - } else if member.State == 1 { - // Master found - master = member } - } - if me.State == 2 { - // OptimeDate.Unix() type is int64 - lag := master.OptimeDate.Unix() - me.OptimeDate.Unix() - if lag < 0 { - returnVal.ReplLag = 0 - } else { - returnVal.ReplLag = lag + if me.State == 2 { + // OptimeDate.Unix() type is int64 + lag := master.OptimeDate.Unix() - me.OptimeDate.Unix() + if lag < 0 { + returnVal.ReplLag = 0 + } else { + returnVal.ReplLag = lag + } } } } - newClusterStat := *newMongo.ClusterStatus - returnVal.JumboChunksCount = newClusterStat.JumboChunksCount - returnVal.OplogTimeDiff = newMongo.OplogStats.TimeDiff + if newMongo.ClusterStatus != nil { + newClusterStat := *newMongo.ClusterStatus + returnVal.JumboChunksCount = newClusterStat.JumboChunksCount + } - newDbStats := *newMongo.DbStats - for _, db := range newDbStats.Dbs { - dbStatsData := db.DbStatsData - // mongos doesn't have the db key, so setting the db name - if dbStatsData.Db == "" { - dbStatsData.Db = db.Name + if newMongo.OplogStats != nil { + returnVal.OplogStats = newMongo.OplogStats + } + + if newMongo.DbStats != nil { + newDbStats := *newMongo.DbStats + for _, db := range newDbStats.Dbs { + dbStatsData := db.DbStatsData + // mongos doesn't have the db key, so setting the db name + if dbStatsData.Db == "" { + dbStatsData.Db = db.Name + } + dbStatLine := &DbStatLine{ + Name: dbStatsData.Db, + Collections: dbStatsData.Collections, + Objects: dbStatsData.Objects, + AvgObjSize: dbStatsData.AvgObjSize, + DataSize: dbStatsData.DataSize, + StorageSize: dbStatsData.StorageSize, + NumExtents: dbStatsData.NumExtents, + Indexes: dbStatsData.Indexes, + IndexSize: dbStatsData.IndexSize, + Ok: dbStatsData.Ok, + } + returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine) } - dbStatLine := &DbStatLine{ - Name: dbStatsData.Db, - Collections: dbStatsData.Collections, - Objects: dbStatsData.Objects, - AvgObjSize: dbStatsData.AvgObjSize, - DataSize: dbStatsData.DataSize, - StorageSize: dbStatsData.StorageSize, - NumExtents: dbStatsData.NumExtents, - Indexes: dbStatsData.Indexes, - IndexSize: dbStatsData.IndexSize, - Ok: dbStatsData.Ok, - } - returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine) } newColStats := *newMongo.ColStats