Adding replication lag metric
This commit is contained in:
parent
4bcf157d88
commit
76973fa60a
|
@ -37,6 +37,7 @@ based on _prefix_ in addition to globs. This means that a filter like
|
||||||
- [#889](https://github.com/influxdata/telegraf/pull/889): Improved MySQL plugin. Thanks @maksadbek!
|
- [#889](https://github.com/influxdata/telegraf/pull/889): Improved MySQL plugin. Thanks @maksadbek!
|
||||||
- [#1060](https://github.com/influxdata/telegraf/pull/1060): TTL metrics added to MongoDB input plugin
|
- [#1060](https://github.com/influxdata/telegraf/pull/1060): TTL metrics added to MongoDB input plugin
|
||||||
- [#1056](https://github.com/influxdata/telegraf/pull/1056): Don't allow inputs to overwrite host tags.
|
- [#1056](https://github.com/influxdata/telegraf/pull/1056): Don't allow inputs to overwrite host tags.
|
||||||
|
- [#1066](https://github.com/influxdata/telegraf/pull/1066): Replication lag metrics for MongoDB input plugin
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -50,3 +50,4 @@ and create a single measurement containing values e.g.
|
||||||
* vsize_megabytes
|
* vsize_megabytes
|
||||||
* ttl_deletes_per_sec
|
* ttl_deletes_per_sec
|
||||||
* ttl_passes_per_sec
|
* ttl_passes_per_sec
|
||||||
|
* repl_lag
|
||||||
|
|
|
@ -54,6 +54,7 @@ var DefaultReplStats = map[string]string{
|
||||||
"repl_getmores_per_sec": "GetMoreR",
|
"repl_getmores_per_sec": "GetMoreR",
|
||||||
"repl_commands_per_sec": "CommandR",
|
"repl_commands_per_sec": "CommandR",
|
||||||
"member_status": "NodeType",
|
"member_status": "NodeType",
|
||||||
|
"repl_lag": "ReplLag",
|
||||||
}
|
}
|
||||||
|
|
||||||
var MmapStats = map[string]string{
|
var MmapStats = map[string]string{
|
||||||
|
|
|
@ -127,6 +127,7 @@ func TestStateTag(t *testing.T) {
|
||||||
"repl_inserts_per_sec": int64(0),
|
"repl_inserts_per_sec": int64(0),
|
||||||
"repl_queries_per_sec": int64(0),
|
"repl_queries_per_sec": int64(0),
|
||||||
"repl_updates_per_sec": int64(0),
|
"repl_updates_per_sec": int64(0),
|
||||||
|
"repl_lag": int64(0),
|
||||||
"resident_megabytes": int64(0),
|
"resident_megabytes": int64(0),
|
||||||
"updates_per_sec": int64(0),
|
"updates_per_sec": int64(0),
|
||||||
"vsize_megabytes": int64(0),
|
"vsize_megabytes": int64(0),
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package mongodb
|
package mongodb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ import (
|
||||||
type Server struct {
|
type Server struct {
|
||||||
Url *url.URL
|
Url *url.URL
|
||||||
Session *mgo.Session
|
Session *mgo.Session
|
||||||
lastResult *ServerStatus
|
lastResult *MongoStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) getDefaultTags() map[string]string {
|
func (s *Server) getDefaultTags() map[string]string {
|
||||||
|
@ -24,11 +25,22 @@ func (s *Server) getDefaultTags() map[string]string {
|
||||||
func (s *Server) gatherData(acc telegraf.Accumulator) error {
|
func (s *Server) gatherData(acc telegraf.Accumulator) error {
|
||||||
s.Session.SetMode(mgo.Eventual, true)
|
s.Session.SetMode(mgo.Eventual, true)
|
||||||
s.Session.SetSocketTimeout(0)
|
s.Session.SetSocketTimeout(0)
|
||||||
result := &ServerStatus{}
|
result_server := &ServerStatus{}
|
||||||
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result)
|
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result_server)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
result_repl := &ReplSetStatus{}
|
||||||
|
err = s.Session.DB("admin").Run(bson.D{{"replSetGetStatus", 1}}, result_repl)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Not gathering replica set status, member not in replica set")
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &MongoStatus{
|
||||||
|
ServerStatus: result_server,
|
||||||
|
ReplSetStatus: result_repl,
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.lastResult = result
|
s.lastResult = result
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/mgo.v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -28,8 +30,13 @@ const (
|
||||||
WTOnly // only active if node has wiredtiger-specific fields
|
WTOnly // only active if node has wiredtiger-specific fields
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MongoStatus struct {
|
||||||
|
SampleTime time.Time
|
||||||
|
ServerStatus *ServerStatus
|
||||||
|
ReplSetStatus *ReplSetStatus
|
||||||
|
}
|
||||||
|
|
||||||
type ServerStatus struct {
|
type ServerStatus struct {
|
||||||
SampleTime time.Time `bson:""`
|
|
||||||
Host string `bson:"host"`
|
Host string `bson:"host"`
|
||||||
Version string `bson:"version"`
|
Version string `bson:"version"`
|
||||||
Process string `bson:"process"`
|
Process string `bson:"process"`
|
||||||
|
@ -57,6 +64,19 @@ type ServerStatus struct {
|
||||||
Metrics *MetricsStats `bson:"metrics"`
|
Metrics *MetricsStats `bson:"metrics"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplSetStatus stores information from replSetGetStatus
|
||||||
|
type ReplSetStatus struct {
|
||||||
|
Members []ReplSetMember `bson:"members"`
|
||||||
|
MyState int64 `bson:"myState"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplSetMember stores information related to a replica set member
|
||||||
|
type ReplSetMember struct {
|
||||||
|
Name string `bson:"name"`
|
||||||
|
State int64 `bson:"state"`
|
||||||
|
Optime *bson.MongoTimestamp `bson:"optime"`
|
||||||
|
}
|
||||||
|
|
||||||
// WiredTiger stores information related to the WiredTiger storage engine.
|
// WiredTiger stores information related to the WiredTiger storage engine.
|
||||||
type WiredTiger struct {
|
type WiredTiger struct {
|
||||||
Transaction TransactionStats `bson:"transaction"`
|
Transaction TransactionStats `bson:"transaction"`
|
||||||
|
@ -356,6 +376,7 @@ type StatLine struct {
|
||||||
|
|
||||||
// Replicated Opcounter fields
|
// Replicated Opcounter fields
|
||||||
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
|
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
|
||||||
|
ReplLag int64
|
||||||
Flushes int64
|
Flushes int64
|
||||||
Mapped, Virtual, Resident, NonMapped int64
|
Mapped, Virtual, Resident, NonMapped int64
|
||||||
Faults int64
|
Faults int64
|
||||||
|
@ -410,8 +431,11 @@ func diff(newVal, oldVal, sampleTime int64) int64 {
|
||||||
return d / sampleTime
|
return d / sampleTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStatLine constructs a StatLine object from two ServerStatus objects.
|
// NewStatLine constructs a StatLine object from two MongoStatus objects.
|
||||||
func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs int64) *StatLine {
|
func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSecs int64) *StatLine {
|
||||||
|
oldStat := *oldMongo.ServerStatus
|
||||||
|
newStat := *newMongo.ServerStatus
|
||||||
|
|
||||||
returnVal := &StatLine{
|
returnVal := &StatLine{
|
||||||
Key: key,
|
Key: key,
|
||||||
Host: newStat.Host,
|
Host: newStat.Host,
|
||||||
|
@ -462,7 +486,7 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
|
||||||
returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
|
returnVal.Flushes = newStat.BackgroundFlushing.Flushes - oldStat.BackgroundFlushing.Flushes
|
||||||
}
|
}
|
||||||
|
|
||||||
returnVal.Time = newStat.SampleTime
|
returnVal.Time = newMongo.SampleTime
|
||||||
returnVal.IsMongos =
|
returnVal.IsMongos =
|
||||||
(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
|
(newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess))
|
||||||
|
|
||||||
|
@ -607,5 +631,39 @@ func NewStatLine(oldStat, newStat ServerStatus, key string, all bool, sampleSecs
|
||||||
returnVal.NumConnections = newStat.Connections.Current
|
returnVal.NumConnections = newStat.Connections.Current
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newReplStat := *newMongo.ReplSetStatus
|
||||||
|
|
||||||
|
if newReplStat.Members != nil {
|
||||||
|
myName := newStat.Repl.Me
|
||||||
|
// Find the master and myself
|
||||||
|
master := ReplSetMember{}
|
||||||
|
me := ReplSetMember{}
|
||||||
|
for _, member := range newReplStat.Members {
|
||||||
|
if member.Name == myName {
|
||||||
|
if member.State == 1 {
|
||||||
|
// I'm the master
|
||||||
|
returnVal.ReplLag = 0
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
// I'm secondary
|
||||||
|
me = member
|
||||||
|
}
|
||||||
|
} else if member.State == 1 {
|
||||||
|
// Master found
|
||||||
|
master = member
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if me.Optime != nil && master.Optime != nil && me.State == 2 {
|
||||||
|
// MongoTimestamp type is int64 where the first 32bits are the unix timestamp
|
||||||
|
lag := int64(*master.Optime>>32 - *me.Optime>>32)
|
||||||
|
if lag < 0 {
|
||||||
|
returnVal.ReplLag = 0
|
||||||
|
} else {
|
||||||
|
returnVal.ReplLag = lag
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return returnVal
|
return returnVal
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue