MongoDB input plugin: Adding per DB stats (#1466)

This commit is contained in:
Victor Garcia 2016-07-19 13:47:12 +02:00 committed by Cameron Sparr
parent 5f14ad9fa1
commit cbf5a55c7d
7 changed files with 159 additions and 7 deletions

View File

@ -49,6 +49,7 @@ should now look like:
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1466](https://github.com/influxdata/telegraf/pull/1466): MongoDB input plugin: adding per DB stats from db.stats()
### Bugfixes

View File

@ -10,6 +10,7 @@
## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
```
For authenticated mongodb istances use connection mongdb connection URI
@ -52,3 +53,15 @@ and create a single measurement containing values e.g.
* ttl_passes_per_sec
* repl_lag
* jumbo_chunks (only if mongos or mongo config)
If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()
creating another measurement called mongodb_db_stats and containing values:
* collections
* objects
* avg_obj_size
* data_size
* storage_size
* num_extents
* indexes
* index_size
* ok

View File

@ -15,9 +15,10 @@ import (
)
type MongoDB struct {
Servers []string
Ssl Ssl
mongos map[string]*Server
Servers []string
Ssl Ssl
mongos map[string]*Server
GatherPerdbStats bool
}
type Ssl struct {
@ -32,6 +33,7 @@ var sampleConfig = `
## mongodb://10.10.3.33:18832,
## 10.0.0.1:10000, etc.
servers = ["127.0.0.1:27017"]
gather_perdb_stats = false
`
func (m *MongoDB) SampleConfig() string {
@ -135,7 +137,7 @@ func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
}
server.Session = sess
}
return server.gatherData(acc)
return server.gatherData(acc, m.GatherPerdbStats)
}
func init() {

View File

@ -12,6 +12,12 @@ type MongodbData struct {
StatLine *StatLine
Fields map[string]interface{}
Tags map[string]string
DbData []DbData
}
type DbData struct {
Name string
Fields map[string]interface{}
}
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
@ -22,6 +28,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
StatLine: statLine,
Tags: tags,
Fields: make(map[string]interface{}),
DbData: []DbData{},
}
}
@ -72,6 +79,34 @@ var WiredTigerStats = map[string]string{
"percent_cache_used": "CacheUsedPercent",
}
var DbDataStats = map[string]string{
"collections": "Collections",
"objects": "Objects",
"avg_obj_size": "AvgObjSize",
"data_size": "DataSize",
"storage_size": "StorageSize",
"num_extents": "NumExtents",
"indexes": "Indexes",
"index_size": "IndexSize",
"ok": "Ok",
}
func (d *MongodbData) AddDbStats() {
for _, dbstat := range d.StatLine.DbStatsLines {
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "db_stat"
for key, value := range DbDataStats {
val := dbStatLine.FieldByName(value).Interface()
newDbData.Fields[key] = val
}
d.DbData = append(d.DbData, *newDbData)
}
}
func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, DefaultStats)
@ -113,4 +148,15 @@ func (d *MongodbData) flush(acc telegraf.Accumulator) {
d.StatLine.Time,
)
d.Fields = make(map[string]interface{})
for _, db := range d.DbData {
d.Tags["db_name"] = db.Name
acc.AddFields(
"mongodb_db_stats",
db.Fields,
d.Tags,
d.StatLine.Time,
)
db.Fields = make(map[string]interface{})
}
}

View File

@ -22,7 +22,7 @@ func (s *Server) getDefaultTags() map[string]string {
return tags
}
func (s *Server) gatherData(acc telegraf.Accumulator) error {
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
result_server := &ServerStatus{}
@ -42,10 +42,34 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
JumboChunksCount: int64(jumbo_chunks),
}
result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
err = s.Session.DB(db_name).Run(bson.D{{"dbStats", 1}}, db_stat_line)
if err != nil {
log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")")
}
db := &Db{
Name: db_name,
DbStatsData: db_stat_line,
}
result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
}
}
result := &MongoStatus{
ServerStatus: result_server,
ReplSetStatus: result_repl,
ClusterStatus: result_cluster,
DbStats: result_db_stats,
}
defer func() {
@ -64,6 +88,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
s.getDefaultTags(),
)
data.AddDefaultStats()
data.AddDbStats()
data.flush(acc)
}
return nil

View File

@ -29,12 +29,12 @@ func TestGetDefaultTags(t *testing.T) {
func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator
err := server.gatherData(&acc)
err := server.gatherData(&acc, false)
require.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second)
// need to call this twice so it can perform the diff
err = server.gatherData(&acc)
err = server.gatherData(&acc, false)
require.NoError(t, err)
for key, _ := range DefaultStats {

View File

@ -35,6 +35,7 @@ type MongoStatus struct {
ServerStatus *ServerStatus
ReplSetStatus *ReplSetStatus
ClusterStatus *ClusterStatus
DbStats *DbStats
}
type ServerStatus struct {
@ -65,6 +66,32 @@ type ServerStatus struct {
Metrics *MetricsStats `bson:"metrics"`
}
// DbStats stores stats from all dbs
type DbStats struct {
Dbs []Db
}
// Db represent a single DB
type Db struct {
Name string
DbStatsData *DbStatsData
}
// DbStatsData stores stats from a db
type DbStatsData struct {
Db string `bson:"db"`
Collections int64 `bson:"collections"`
Objects int64 `bson:"objects"`
AvgObjSize float64 `bson:"avgObjSize"`
DataSize int64 `bson:"dataSize"`
StorageSize int64 `bson:"storageSize"`
NumExtents int64 `bson:"numExtents"`
Indexes int64 `bson:"indexes"`
IndexSize int64 `bson:"indexSize"`
Ok int64 `bson:"ok"`
GleStats interface{} `bson:"gleStats"`
}
// ClusterStatus stores information related to the whole cluster
type ClusterStatus struct {
JumboChunksCount int64
@ -396,6 +423,22 @@ type StatLine struct {
// Cluster fields
JumboChunksCount int64
// DB stats field
DbStatsLines []DbStatLine
}
type DbStatLine struct {
Name string
Collections int64
Objects int64
AvgObjSize float64
DataSize int64
StorageSize int64
NumExtents int64
Indexes int64
IndexSize int64
Ok int64
}
func parseLocks(stat ServerStatus) map[string]LockUsage {
@ -677,5 +720,27 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec
newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {
dbStatsData := db.DbStatsData
// mongos doesn't have the db key, so setting the db name
if dbStatsData.Db == "" {
dbStatsData.Db = db.Name
}
dbStatLine := &DbStatLine{
Name: dbStatsData.Db,
Collections: dbStatsData.Collections,
Objects: dbStatsData.Objects,
AvgObjSize: dbStatsData.AvgObjSize,
DataSize: dbStatsData.DataSize,
StorageSize: dbStatsData.StorageSize,
NumExtents: dbStatsData.NumExtents,
Indexes: dbStatsData.Indexes,
IndexSize: dbStatsData.IndexSize,
Ok: dbStatsData.Ok,
}
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
}
return returnVal
}