Query oplog only when connected to a replica set (#6307)
This commit is contained in:
parent
701339b024
commit
2d2e793c90
|
@ -210,7 +210,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
|
||||||
// Special instructions for some inputs. cpu, for example, needs to be
|
// Special instructions for some inputs. cpu, for example, needs to be
|
||||||
// run twice in order to return cpu usage percentages.
|
// run twice in order to return cpu usage percentages.
|
||||||
switch input.Config.Name {
|
switch input.Config.Name {
|
||||||
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
|
case "cpu", "mongodb", "procstat":
|
||||||
nulAcc := NewAccumulator(input, nulC)
|
nulAcc := NewAccumulator(input, nulC)
|
||||||
nulAcc.SetPrecision(a.Precision())
|
nulAcc.SetPrecision(a.Precision())
|
||||||
if err := input.Input.Gather(nulAcc); err != nil {
|
if err := input.Input.Gather(nulAcc); err != nil {
|
||||||
|
|
|
@ -101,7 +101,6 @@ var DefaultReplStats = map[string]string{
|
||||||
"member_status": "NodeType",
|
"member_status": "NodeType",
|
||||||
"state": "NodeState",
|
"state": "NodeState",
|
||||||
"repl_lag": "ReplLag",
|
"repl_lag": "ReplLag",
|
||||||
"repl_oplog_window_sec": "OplogTimeDiff",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var DefaultClusterStats = map[string]string{
|
var DefaultClusterStats = map[string]string{
|
||||||
|
@ -230,6 +229,11 @@ func (d *MongodbData) AddDefaultStats() {
|
||||||
if d.StatLine.NodeType != "" {
|
if d.StatLine.NodeType != "" {
|
||||||
d.addStat(statLine, DefaultReplStats)
|
d.addStat(statLine, DefaultReplStats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if d.StatLine.OplogStats != nil {
|
||||||
|
d.add("repl_oplog_window_sec", d.StatLine.OplogStats.TimeDiff)
|
||||||
|
}
|
||||||
|
|
||||||
d.addStat(statLine, DefaultClusterStats)
|
d.addStat(statLine, DefaultClusterStats)
|
||||||
d.addStat(statLine, DefaultShardStats)
|
d.addStat(statLine, DefaultShardStats)
|
||||||
if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" {
|
if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" {
|
||||||
|
|
|
@ -232,7 +232,6 @@ func TestStateTag(t *testing.T) {
|
||||||
"repl_updates": int64(0),
|
"repl_updates": int64(0),
|
||||||
"repl_updates_per_sec": int64(0),
|
"repl_updates_per_sec": int64(0),
|
||||||
"repl_lag": int64(0),
|
"repl_lag": int64(0),
|
||||||
"repl_oplog_window_sec": int64(0),
|
|
||||||
"resident_megabytes": int64(0),
|
"resident_megabytes": int64(0),
|
||||||
"updates": int64(0),
|
"updates": int64(0),
|
||||||
"updates_per_sec": int64(0),
|
"updates_per_sec": int64(0),
|
||||||
|
|
|
@ -39,35 +39,116 @@ func authLogLevel(err error) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) gatherOplogStats() *OplogStats {
|
func (s *Server) gatherServerStatus() (*ServerStatus, error) {
|
||||||
stats := &OplogStats{}
|
serverStatus := &ServerStatus{}
|
||||||
localdb := s.Session.DB("local")
|
err := s.Session.DB("admin").Run(bson.D{
|
||||||
|
{
|
||||||
|
Name: "serverStatus",
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "recordStats",
|
||||||
|
Value: 0,
|
||||||
|
},
|
||||||
|
}, serverStatus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return serverStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
op_first := oplogEntry{}
|
func (s *Server) gatherReplSetStatus() (*ReplSetStatus, error) {
|
||||||
op_last := oplogEntry{}
|
replSetStatus := &ReplSetStatus{}
|
||||||
query := bson.M{"ts": bson.M{"$exists": true}}
|
err := s.Session.DB("admin").Run(bson.D{
|
||||||
|
{
|
||||||
|
Name: "replSetGetStatus",
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
}, replSetStatus)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return replSetStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, collection_name := range []string{"oplog.rs", "oplog.$main"} {
|
func (s *Server) gatherClusterStatus() (*ClusterStatus, error) {
|
||||||
if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
|
chunkCount, err := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
|
||||||
if err == mgo.ErrNotFound {
|
if err != nil {
|
||||||
continue
|
return nil, err
|
||||||
}
|
|
||||||
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
|
|
||||||
if err == mgo.ErrNotFound || IsAuthorization(err) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
|
return &ClusterStatus{
|
||||||
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
|
JumboChunksCount: int64(chunkCount),
|
||||||
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
|
}, nil
|
||||||
return stats
|
}
|
||||||
|
|
||||||
|
func (s *Server) gatherShardConnPoolStats() (*ShardStats, error) {
|
||||||
|
shardStats := &ShardStats{}
|
||||||
|
err := s.Session.DB("admin").Run(bson.D{
|
||||||
|
{
|
||||||
|
Name: "shardConnPoolStats",
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
}, &shardStats)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return shardStats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) gatherDBStats(name string) (*Db, error) {
|
||||||
|
stats := &DbStatsData{}
|
||||||
|
err := s.Session.DB(name).Run(bson.D{
|
||||||
|
{
|
||||||
|
Name: "dbStats",
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
}, stats)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Db{
|
||||||
|
Name: name,
|
||||||
|
DbStatsData: stats,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getOplogReplLag(collection string) (*OplogStats, error) {
|
||||||
|
query := bson.M{"ts": bson.M{"$exists": true}}
|
||||||
|
|
||||||
|
var first oplogEntry
|
||||||
|
err := s.Session.DB("local").C(collection).Find(query).Sort("$natural").Limit(1).One(&first)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var last oplogEntry
|
||||||
|
err = s.Session.DB("local").C(collection).Find(query).Sort("-$natural").Limit(1).One(&last)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
firstTime := time.Unix(int64(first.Timestamp>>32), 0)
|
||||||
|
lastTime := time.Unix(int64(last.Timestamp>>32), 0)
|
||||||
|
stats := &OplogStats{
|
||||||
|
TimeDiff: int64(lastTime.Sub(firstTime).Seconds()),
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The "oplog.rs" collection is stored on all replica set members.
|
||||||
|
//
|
||||||
|
// The "oplog.$main" collection is created on the master node of a
|
||||||
|
// master-slave replicated deployment. As of MongoDB 3.2, master-slave
|
||||||
|
// replication has been deprecated.
|
||||||
|
func (s *Server) gatherOplogStats() (*OplogStats, error) {
|
||||||
|
stats, err := s.getOplogReplLag("oplog.rs")
|
||||||
|
if err == nil {
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.getOplogReplLag("oplog.$main")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
|
func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
|
||||||
|
@ -112,99 +193,71 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error)
|
||||||
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
|
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
|
||||||
s.Session.SetMode(mgo.Eventual, true)
|
s.Session.SetMode(mgo.Eventual, true)
|
||||||
s.Session.SetSocketTimeout(0)
|
s.Session.SetSocketTimeout(0)
|
||||||
result_server := &ServerStatus{}
|
|
||||||
err := s.Session.DB("admin").Run(bson.D{
|
serverStatus, err := s.gatherServerStatus()
|
||||||
{
|
|
||||||
Name: "serverStatus",
|
|
||||||
Value: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "recordStats",
|
|
||||||
Value: 0,
|
|
||||||
},
|
|
||||||
}, result_server)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
result_repl := &ReplSetStatus{}
|
|
||||||
// ignore error because it simply indicates that the db is not a member
|
|
||||||
// in a replica set, which is fine.
|
|
||||||
_ = s.Session.DB("admin").Run(bson.D{
|
|
||||||
{
|
|
||||||
Name: "replSetGetStatus",
|
|
||||||
Value: 1,
|
|
||||||
},
|
|
||||||
}, result_repl)
|
|
||||||
|
|
||||||
jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
|
// Get replica set status, an error indicates that the server is not a
|
||||||
|
// member of a replica set.
|
||||||
result_cluster := &ClusterStatus{
|
replSetStatus, err := s.gatherReplSetStatus()
|
||||||
JumboChunksCount: int64(jumbo_chunks),
|
|
||||||
}
|
|
||||||
|
|
||||||
resultShards := &ShardStats{}
|
|
||||||
err = s.Session.DB("admin").Run(bson.D{
|
|
||||||
{
|
|
||||||
Name: "shardConnPoolStats",
|
|
||||||
Value: 1,
|
|
||||||
},
|
|
||||||
}, &resultShards)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsAuthorization(err) {
|
log.Printf("D! [inputs.mongodb] Unable to gather replica set status: %v", err)
|
||||||
log.Printf("D! [inputs.mongodb] Error getting database shard stats: %v", err)
|
|
||||||
} else {
|
|
||||||
log.Printf("E! [inputs.mongodb] Error getting database shard stats: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
oplogStats := s.gatherOplogStats()
|
// Gather the oplog if we are a member of a replica set. Non-replica set
|
||||||
|
// members do not have the oplog collections.
|
||||||
result_db_stats := &DbStats{}
|
var oplogStats *OplogStats
|
||||||
if gatherDbStats == true {
|
if replSetStatus != nil {
|
||||||
names := []string{}
|
oplogStats, err = s.gatherOplogStats()
|
||||||
names, err = s.Session.DatabaseNames()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! [inputs.mongodb] Error getting database names: %v", err)
|
return err
|
||||||
}
|
|
||||||
for _, db_name := range names {
|
|
||||||
db_stat_line := &DbStatsData{}
|
|
||||||
err = s.Session.DB(db_name).Run(bson.D{
|
|
||||||
{
|
|
||||||
Name: "dbStats",
|
|
||||||
Value: 1,
|
|
||||||
},
|
|
||||||
}, db_stat_line)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("E! [inputs.mongodb] Error getting db stats from %q: %v", db_name, err)
|
|
||||||
}
|
|
||||||
db := &Db{
|
|
||||||
Name: db_name,
|
|
||||||
DbStatsData: db_stat_line,
|
|
||||||
}
|
|
||||||
|
|
||||||
result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result_col_stats, err := s.gatherCollectionStats(colStatsDbs)
|
clusterStatus, err := s.gatherClusterStatus()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("D! [inputs.mongodb] Unable to gather cluster status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shardStats, err := s.gatherShardConnPoolStats()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("%s [inputs.mongodb] Unable to gather shard connection pool stats: %v",
|
||||||
|
authLogLevel(err), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
collectionStats, err := s.gatherCollectionStats(colStatsDbs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbStats := &DbStats{}
|
||||||
|
if gatherDbStats {
|
||||||
|
names, err := s.Session.DatabaseNames()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
db, err := s.gatherDBStats(name)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("D! [inputs.mongodb] Error getting db stats from %q: %v", name, err)
|
||||||
|
}
|
||||||
|
dbStats.Dbs = append(dbStats.Dbs, *db)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
result := &MongoStatus{
|
result := &MongoStatus{
|
||||||
ServerStatus: result_server,
|
ServerStatus: serverStatus,
|
||||||
ReplSetStatus: result_repl,
|
ReplSetStatus: replSetStatus,
|
||||||
ClusterStatus: result_cluster,
|
ClusterStatus: clusterStatus,
|
||||||
DbStats: result_db_stats,
|
DbStats: dbStats,
|
||||||
ColStats: result_col_stats,
|
ColStats: collectionStats,
|
||||||
ShardStats: resultShards,
|
ShardStats: shardStats,
|
||||||
OplogStats: oplogStats,
|
OplogStats: oplogStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
|
||||||
s.lastResult = result
|
|
||||||
}()
|
|
||||||
|
|
||||||
result.SampleTime = time.Now()
|
result.SampleTime = time.Now()
|
||||||
if s.lastResult != nil && result != nil {
|
if s.lastResult != nil && result != nil {
|
||||||
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
|
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
|
||||||
|
@ -222,6 +275,8 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
|
||||||
data.AddShardHostStats()
|
data.AddShardHostStats()
|
||||||
data.flush(acc)
|
data.flush(acc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.lastResult = result
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -541,7 +541,7 @@ type StatLine struct {
|
||||||
GetMoreR, GetMoreRCnt int64
|
GetMoreR, GetMoreRCnt int64
|
||||||
CommandR, CommandRCnt int64
|
CommandR, CommandRCnt int64
|
||||||
ReplLag int64
|
ReplLag int64
|
||||||
OplogTimeDiff int64
|
OplogStats *OplogStats
|
||||||
Flushes, FlushesCnt int64
|
Flushes, FlushesCnt int64
|
||||||
FlushesTotalTime int64
|
FlushesTotalTime int64
|
||||||
Mapped, Virtual, Resident, NonMapped int64
|
Mapped, Virtual, Resident, NonMapped int64
|
||||||
|
@ -890,66 +890,75 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
|
||||||
returnVal.NumConnections = newStat.Connections.Current
|
returnVal.NumConnections = newStat.Connections.Current
|
||||||
}
|
}
|
||||||
|
|
||||||
newReplStat := *newMongo.ReplSetStatus
|
if newMongo.ReplSetStatus != nil {
|
||||||
|
newReplStat := *newMongo.ReplSetStatus
|
||||||
|
|
||||||
if newReplStat.Members != nil {
|
if newReplStat.Members != nil {
|
||||||
myName := newStat.Repl.Me
|
myName := newStat.Repl.Me
|
||||||
// Find the master and myself
|
// Find the master and myself
|
||||||
master := ReplSetMember{}
|
master := ReplSetMember{}
|
||||||
me := ReplSetMember{}
|
me := ReplSetMember{}
|
||||||
for _, member := range newReplStat.Members {
|
for _, member := range newReplStat.Members {
|
||||||
if member.Name == myName {
|
if member.Name == myName {
|
||||||
// Store my state string
|
// Store my state string
|
||||||
returnVal.NodeState = member.StateStr
|
returnVal.NodeState = member.StateStr
|
||||||
if member.State == 1 {
|
if member.State == 1 {
|
||||||
// I'm the master
|
// I'm the master
|
||||||
returnVal.ReplLag = 0
|
returnVal.ReplLag = 0
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
// I'm secondary
|
// I'm secondary
|
||||||
me = member
|
me = member
|
||||||
|
}
|
||||||
|
} else if member.State == 1 {
|
||||||
|
// Master found
|
||||||
|
master = member
|
||||||
}
|
}
|
||||||
} else if member.State == 1 {
|
|
||||||
// Master found
|
|
||||||
master = member
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if me.State == 2 {
|
if me.State == 2 {
|
||||||
// OptimeDate.Unix() type is int64
|
// OptimeDate.Unix() type is int64
|
||||||
lag := master.OptimeDate.Unix() - me.OptimeDate.Unix()
|
lag := master.OptimeDate.Unix() - me.OptimeDate.Unix()
|
||||||
if lag < 0 {
|
if lag < 0 {
|
||||||
returnVal.ReplLag = 0
|
returnVal.ReplLag = 0
|
||||||
} else {
|
} else {
|
||||||
returnVal.ReplLag = lag
|
returnVal.ReplLag = lag
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newClusterStat := *newMongo.ClusterStatus
|
if newMongo.ClusterStatus != nil {
|
||||||
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
|
newClusterStat := *newMongo.ClusterStatus
|
||||||
returnVal.OplogTimeDiff = newMongo.OplogStats.TimeDiff
|
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
|
||||||
|
}
|
||||||
|
|
||||||
newDbStats := *newMongo.DbStats
|
if newMongo.OplogStats != nil {
|
||||||
for _, db := range newDbStats.Dbs {
|
returnVal.OplogStats = newMongo.OplogStats
|
||||||
dbStatsData := db.DbStatsData
|
}
|
||||||
// mongos doesn't have the db key, so setting the db name
|
|
||||||
if dbStatsData.Db == "" {
|
if newMongo.DbStats != nil {
|
||||||
dbStatsData.Db = db.Name
|
newDbStats := *newMongo.DbStats
|
||||||
|
for _, db := range newDbStats.Dbs {
|
||||||
|
dbStatsData := db.DbStatsData
|
||||||
|
// mongos doesn't have the db key, so setting the db name
|
||||||
|
if dbStatsData.Db == "" {
|
||||||
|
dbStatsData.Db = db.Name
|
||||||
|
}
|
||||||
|
dbStatLine := &DbStatLine{
|
||||||
|
Name: dbStatsData.Db,
|
||||||
|
Collections: dbStatsData.Collections,
|
||||||
|
Objects: dbStatsData.Objects,
|
||||||
|
AvgObjSize: dbStatsData.AvgObjSize,
|
||||||
|
DataSize: dbStatsData.DataSize,
|
||||||
|
StorageSize: dbStatsData.StorageSize,
|
||||||
|
NumExtents: dbStatsData.NumExtents,
|
||||||
|
Indexes: dbStatsData.Indexes,
|
||||||
|
IndexSize: dbStatsData.IndexSize,
|
||||||
|
Ok: dbStatsData.Ok,
|
||||||
|
}
|
||||||
|
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
|
||||||
}
|
}
|
||||||
dbStatLine := &DbStatLine{
|
|
||||||
Name: dbStatsData.Db,
|
|
||||||
Collections: dbStatsData.Collections,
|
|
||||||
Objects: dbStatsData.Objects,
|
|
||||||
AvgObjSize: dbStatsData.AvgObjSize,
|
|
||||||
DataSize: dbStatsData.DataSize,
|
|
||||||
StorageSize: dbStatsData.StorageSize,
|
|
||||||
NumExtents: dbStatsData.NumExtents,
|
|
||||||
Indexes: dbStatsData.Indexes,
|
|
||||||
IndexSize: dbStatsData.IndexSize,
|
|
||||||
Ok: dbStatsData.Ok,
|
|
||||||
}
|
|
||||||
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newColStats := *newMongo.ColStats
|
newColStats := *newMongo.ColStats
|
||||||
|
|
Loading…
Reference in New Issue