From 6bb34fcda384667ec5318af6f6dc6b1c0a97d25f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sat, 19 Dec 2015 13:31:22 -0700 Subject: [PATCH] 0.3.0: mongodb and jolokia --- plugins/jolokia/jolokia.go | 84 ++++++------------------------- plugins/mongodb/mongodb.go | 3 +- plugins/mongodb/mongodb_data.go | 32 +++++++----- plugins/mongodb/mongodb_server.go | 3 +- 4 files changed, 38 insertions(+), 84 deletions(-) diff --git a/plugins/jolokia/jolokia.go b/plugins/jolokia/jolokia.go index ee579433a..c3241a892 100644 --- a/plugins/jolokia/jolokia.go +++ b/plugins/jolokia/jolokia.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "net/http" "net/url" - "strings" "github.com/influxdb/telegraf/plugins" ) @@ -23,8 +22,6 @@ type Server struct { type Metric struct { Name string Jmx string - Pass []string - Drop []string } type JolokiaClient interface { @@ -44,7 +41,6 @@ type Jolokia struct { Context string Servers []Server Metrics []Metric - Tags map[string]string } func (j *Jolokia) SampleConfig() string { @@ -52,10 +48,6 @@ func (j *Jolokia) SampleConfig() string { # This is the context root used to compose the jolokia url context = "/jolokia/read" - # Tags added to each measurements - [jolokia.tags] - group = "as" - # List of servers exposing jolokia read service [[plugins.jolokia.servers]] name = "stable" @@ -76,17 +68,11 @@ func (j *Jolokia) SampleConfig() string { [[plugins.jolokia.metrics]] name = "memory_eden" jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" - drop = [ "committed" ] - # This passes only DaemonThreadCount and ThreadCount [[plugins.jolokia.metrics]] name = "heap_threads" jmx = "/java.lang:type=Threading" - pass = [ - "DaemonThreadCount", - "ThreadCount" - ] ` } @@ -100,12 +86,9 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { if err != nil { return nil, err } + defer req.Body.Close() resp, err := j.jClient.MakeRequest(req) - if err != nil { - return nil, err - } - if err != nil { return nil, err } @@ -137,65 +120,22 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) { return jsonOut, nil } -func (m *Metric) shouldPass(field string) bool { - - if m.Pass != nil { - - for _, pass := range m.Pass { - if strings.HasPrefix(field, pass) { - return true - } - } - - return false - } - - if m.Drop != nil { - - for _, drop := range m.Drop { - if strings.HasPrefix(field, drop) { - return false - } - } - - return true - } - - return true -} - -func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} { - - for field, _ := range fields { - if !m.shouldPass(field) { - delete(fields, field) - } - } - - return fields -} - func (j *Jolokia) Gather(acc plugins.Accumulator) error { - context := j.Context //"/jolokia/read" servers := j.Servers metrics := j.Metrics - tags := j.Tags - - if tags == nil { - tags = map[string]string{} - } + tags := make(map[string]string) for _, server := range servers { + tags["server"] = server.Name + tags["port"] = server.Port + tags["host"] = server.Host + fields := make(map[string]interface{}) for _, metric := range metrics { measurement := metric.Name jmxPath := metric.Jmx - tags["server"] = server.Name - tags["port"] = server.Port - tags["host"] = server.Host - // Prepare URL requestUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context + jmxPath) @@ -209,16 +149,20 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error { out, _ := j.getAttr(requestUrl) if values, ok := out["value"]; ok { - switch values.(type) { + switch t := values.(type) { case map[string]interface{}: - acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) + for k, v := range t { + fields[measurement+"_"+k] = v + } case interface{}: - acc.Add(measurement, values.(interface{}), tags) + fields[measurement] = t } } else { - fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) + fmt.Printf("Missing key 'value' in '%s' output response\n", + requestUrl.String()) } } + acc.AddFields("jolokia", fields, tags) } return nil diff --git a/plugins/mongodb/mongodb.go b/plugins/mongodb/mongodb.go index 87882a341..40c77931a 100644 --- a/plugins/mongodb/mongodb.go +++ b/plugins/mongodb/mongodb.go @@ -98,7 +98,8 @@ func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error { } dialInfo, err := mgo.ParseURL(dialAddrs[0]) if err != nil { - return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error()) + return fmt.Errorf("Unable to parse URL (%s), %s\n", + dialAddrs[0], err.Error()) } dialInfo.Direct = true dialInfo.Timeout = time.Duration(10) * time.Second diff --git a/plugins/mongodb/mongodb_data.go b/plugins/mongodb/mongodb_data.go index fda1843bb..1ebb76ced 100644 --- a/plugins/mongodb/mongodb_data.go +++ b/plugins/mongodb/mongodb_data.go @@ -10,6 +10,7 @@ import ( type MongodbData struct { StatLine *StatLine + Fields map[string]interface{} Tags map[string]string } @@ -20,6 +21,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData { return &MongodbData{ StatLine: statLine, Tags: tags, + Fields: make(map[string]interface{}), } } @@ -63,38 +65,44 @@ var WiredTigerStats = map[string]string{ "percent_cache_used": "CacheUsedPercent", } -func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) { +func (d *MongodbData) AddDefaultStats() { statLine := reflect.ValueOf(d.StatLine).Elem() - d.addStat(acc, statLine, DefaultStats) + d.addStat(statLine, DefaultStats) if d.StatLine.NodeType != "" { - d.addStat(acc, statLine, DefaultReplStats) + d.addStat(statLine, DefaultReplStats) } if d.StatLine.StorageEngine == "mmapv1" { - d.addStat(acc, statLine, MmapStats) + d.addStat(statLine, MmapStats) } else if d.StatLine.StorageEngine == "wiredTiger" { for key, value := range WiredTigerStats { val := statLine.FieldByName(value).Interface() percentVal := fmt.Sprintf("%.1f", val.(float64)*100) floatVal, _ := strconv.ParseFloat(percentVal, 64) - d.add(acc, key, floatVal) + d.add(key, floatVal) } } } -func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) { +func (d *MongodbData) addStat( + statLine reflect.Value, + stats map[string]string, +) { for key, value := range stats { val := statLine.FieldByName(value).Interface() - d.add(acc, key, val) + d.add(key, val) } } -func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { +func (d *MongodbData) add(key string, val interface{}) { + d.Fields[key] = val +} + +func (d *MongodbData) flush(acc plugins.Accumulator) { acc.AddFields( - key, - map[string]interface{}{ - "value": val, - }, + "mongodb", + d.Fields, d.Tags, d.StatLine.Time, ) + d.Fields = make(map[string]interface{}) } diff --git a/plugins/mongodb/mongodb_server.go b/plugins/mongodb/mongodb_server.go index d9b0edaad..134be5bae 100644 --- a/plugins/mongodb/mongodb_server.go +++ b/plugins/mongodb/mongodb_server.go @@ -44,7 +44,8 @@ func (s *Server) gatherData(acc plugins.Accumulator) error { NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds), s.getDefaultTags(), ) - data.AddDefaultStats(acc) + data.AddDefaultStats() + data.flush(acc) } return nil }