Adding replication lag metric

closes #1066
This commit is contained in:
Victor Garcia 2016-04-20 01:16:22 +02:00 committed by Cameron Sparr
parent f9c8ed0dc3
commit 194288c00e
6 changed files with 81 additions and 7 deletions

View File

@ -40,6 +40,7 @@ based on _prefix_ in addition to globs. This means that a filter like
- [#1035](https://github.com/influxdata/telegraf/issues/1035): Add `user`, `exe`, `pidfile` tags to procstat plugin. - [#1035](https://github.com/influxdata/telegraf/issues/1035): Add `user`, `exe`, `pidfile` tags to procstat plugin.
- [#1041](https://github.com/influxdata/telegraf/issues/1041): Add `n_cpus` field to the system plugin. - [#1041](https://github.com/influxdata/telegraf/issues/1041): Add `n_cpus` field to the system plugin.
- [#1072](https://github.com/influxdata/telegraf/pull/1072): New Input Plugin: filestat. - [#1072](https://github.com/influxdata/telegraf/pull/1072): New Input Plugin: filestat.
- [#1066](https://github.com/influxdata/telegraf/pull/1066): Replication lag metrics for MongoDB input plugin
### Bugfixes ### Bugfixes

View File

@ -50,3 +50,4 @@ and create a single measurement containing values e.g.
* vsize_megabytes * vsize_megabytes
* ttl_deletes_per_sec * ttl_deletes_per_sec
* ttl_passes_per_sec * ttl_passes_per_sec
* repl_lag

View File

@ -54,6 +54,7 @@ var DefaultReplStats = map[string]string{
"repl_getmores_per_sec": "GetMoreR", "repl_getmores_per_sec": "GetMoreR",
"repl_commands_per_sec": "CommandR", "repl_commands_per_sec": "CommandR",
"member_status": "NodeType", "member_status": "NodeType",
"repl_lag": "ReplLag",
} }
var MmapStats = map[string]string{ var MmapStats = map[string]string{

View File

@ -127,6 +127,7 @@ func TestStateTag(t *testing.T) {
"repl_inserts_per_sec": int64(0), "repl_inserts_per_sec": int64(0),
"repl_queries_per_sec": int64(0), "repl_queries_per_sec": int64(0),
"repl_updates_per_sec": int64(0), "repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"resident_megabytes": int64(0), "resident_megabytes": int64(0),
"updates_per_sec": int64(0), "updates_per_sec": int64(0),
"vsize_megabytes": int64(0), "vsize_megabytes": int64(0),

View File

@ -1,6 +1,7 @@
package mongodb package mongodb
import ( import (
"log"
"net/url" "net/url"
"time" "time"
@ -12,7 +13,7 @@ import (
type Server struct { type Server struct {
Url *url.URL Url *url.URL
Session *mgo.Session Session *mgo.Session
lastResult *ServerStatus lastResult *MongoStatus
} }
func (s *Server) getDefaultTags() map[string]string { func (s *Server) getDefaultTags() map[string]string {
@ -24,11 +25,22 @@ func (s *Server) getDefaultTags() map[string]string {
func (s *Server) gatherData(acc telegraf.Accumulator) error { func (s *Server) gatherData(acc telegraf.Accumulator) error {
s.Session.SetMode(mgo.Eventual, true) s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0) s.Session.SetSocketTimeout(0)
result := &ServerStatus{} result_server := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result) err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result_server)
if err != nil { if err != nil {
return err return err
} }
result_repl := &ReplSetStatus{}
err = s.Session.DB("admin").Run(bson.D{{"replSetGetStatus", 1}}, result_repl)
if err != nil {
log.Println("Not gathering replica set status, member not in replica set")
}
result := &MongoStatus{
ServerStatus: result_server,
ReplSetStatus: result_repl,
}
defer func() { defer func() {
s.lastResult = result s.lastResult = result
}() }()

View File

@ -11,6 +11,8 @@ import (
"sort" "sort"
"strings" "strings"
"time" "time"
"gopkg.in/mgo.v2/bson"
) )
const ( const (
@ -28,8 +30,13 @@ const (
WTOnly // only active if node has wiredtiger-specific fields WTOnly // only active if node has wiredtiger-specific fields
) )
type MongoStatus struct {
SampleTime time.Time
ServerStatus *ServerStatus
ReplSetStatus *ReplSetStatus
}
type ServerStatus struct { type ServerStatus struct {
SampleTime time.Time `bson:""`
Host string `bson:"host"` Host string `bson:"host"`
Version string `bson:"version"` Version string `bson:"version"`
Process string `bson:"process"` Process string `bson:"process"`
@ -57,6 +64,19 @@ type ServerStatus struct {
Metrics *MetricsStats `bson:"metrics"` Metrics *MetricsStats `bson:"metrics"`
} }
// ReplSetStatus stores information from replSetGetStatus
type ReplSetStatus struct {
Members []ReplSetMember `bson:"members"`
MyState int64 `bson:"myState"`
}
// ReplSetMember stores information related to a replica set member
type ReplSetMember struct {
Name string `bson:"name"`
State int64 `bson:"state"`
Optime *bson.MongoTimestamp `bson:"optime"`
}
// WiredTiger stores information related to the WiredTiger storage engine. // WiredTiger stores information related to the WiredTiger storage engine.
type WiredTiger struct { type WiredTiger struct {
Transaction TransactionStats `bson:"transaction"` Transaction TransactionStats `bson:"transaction"`
@ -356,6 +376,7 @@ type StatLine struct {
// Replicated Opcounter fields // Replicated Opcounter fields
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64 InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
ReplLag int64
Flushes int64 Flushes int64
Mapped, Virtual, Resident, NonMapped int64 Mapped, Virtual, Resident, NonMapped int64
Faults int64 Faults int64
@ -410,8 +431,11 @@ func diff(newVal, oldVal, sampleTime int64) int64 {
return d / sampleTime return d / sampleTime
} }
// NewStatLine constructs a StatLine object from two ServerStatus objects. // NewStatLine constructs a StatLine object from two MongoStatus objects.
func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine { func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSecs int64) *StatLine {
oldStat := *oldMongo.ServerStatus
newStat := *newMongo.ServerStatus
returnVal := &StatLine{ returnVal := &StatLine{
Key: key, Key: key,
Host: newStat.Host, Host: newStat.Host,
@ -462,7 +486,7 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
} }
returnVal.Time = newStat.SampleTime returnVal.Time = newMongo.SampleTime
returnVal.IsMongos = returnVal.IsMongos =
(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess)) (newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
@ -607,5 +631,39 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
returnVal.NumConnections = newStat.Connections.Current returnVal.NumConnections = newStat.Connections.Current
} }
newReplStat := *newMongo.ReplSetStatus
if newReplStat.Members != nil {
myName := newStat.Repl.Me
// Find the master and myself
master := ReplSetMember{}
me := ReplSetMember{}
for _, member := range newReplStat.Members {
if member.Name == myName {
if member.State == 1 {
// I'm the master
returnVal.ReplLag = 0
break
} else {
// I'm secondary
me = member
}
} else if member.State == 1 {
// Master found
master = member
}
}
if me.Optime != nil && master.Optime != nil && me.State == 2 {
// MongoTimestamp type is int64 where the first 32bits are the unix timestamp
lag := int64(*master.Optime>>32 - *me.Optime>>32)
if lag < 0 {
returnVal.ReplLag = 0
} else {
returnVal.ReplLag = lag
}
}
}
return returnVal return returnVal
} }