0.3.0: mongodb and jolokia

This commit is contained in:
Cameron Sparr 2015-12-19 13:31:22 -07:00
parent 64a832467e
commit 30d8ed411a
4 changed files with 38 additions and 84 deletions

View File

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@ -23,8 +22,6 @@ type Server struct {
type Metric struct { type Metric struct {
Name string Name string
Jmx string Jmx string
Pass []string
Drop []string
} }
type JolokiaClient interface { type JolokiaClient interface {
@ -44,7 +41,6 @@ type Jolokia struct {
Context string Context string
Servers []Server Servers []Server
Metrics []Metric Metrics []Metric
Tags map[string]string
} }
func (j *Jolokia) SampleConfig() string { func (j *Jolokia) SampleConfig() string {
@ -52,10 +48,6 @@ func (j *Jolokia) SampleConfig() string {
# This is the context root used to compose the jolokia url # This is the context root used to compose the jolokia url
context = "/jolokia/read" context = "/jolokia/read"
# Tags added to each measurements
[jolokia.tags]
group = "as"
# List of servers exposing jolokia read service # List of servers exposing jolokia read service
[[plugins.jolokia.servers]] [[plugins.jolokia.servers]]
name = "stable" name = "stable"
@ -76,17 +68,11 @@ func (j *Jolokia) SampleConfig() string {
[[plugins.jolokia.metrics]] [[plugins.jolokia.metrics]]
name = "memory_eden" name = "memory_eden"
jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage" jmx = "/java.lang:type=MemoryPool,name=PS Eden Space/Usage"
drop = [ "committed" ]
# This passes only DaemonThreadCount and ThreadCount # This passes only DaemonThreadCount and ThreadCount
[[plugins.jolokia.metrics]] [[plugins.jolokia.metrics]]
name = "heap_threads" name = "heap_threads"
jmx = "/java.lang:type=Threading" jmx = "/java.lang:type=Threading"
pass = [
"DaemonThreadCount",
"ThreadCount"
]
` `
} }
@ -100,12 +86,9 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer req.Body.Close()
resp, err := j.jClient.MakeRequest(req) resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -137,65 +120,22 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
return jsonOut, nil return jsonOut, nil
} }
func (m *Metric) shouldPass(field string) bool {
if m.Pass != nil {
for _, pass := range m.Pass {
if strings.HasPrefix(field, pass) {
return true
}
}
return false
}
if m.Drop != nil {
for _, drop := range m.Drop {
if strings.HasPrefix(field, drop) {
return false
}
}
return true
}
return true
}
func (m *Metric) filterFields(fields map[string]interface{}) map[string]interface{} {
for field, _ := range fields {
if !m.shouldPass(field) {
delete(fields, field)
}
}
return fields
}
func (j *Jolokia) Gather(acc plugins.Accumulator) error { func (j *Jolokia) Gather(acc plugins.Accumulator) error {
context := j.Context //"/jolokia/read" context := j.Context //"/jolokia/read"
servers := j.Servers servers := j.Servers
metrics := j.Metrics metrics := j.Metrics
tags := j.Tags tags := make(map[string]string)
if tags == nil {
tags = map[string]string{}
}
for _, server := range servers { for _, server := range servers {
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
fields := make(map[string]interface{})
for _, metric := range metrics { for _, metric := range metrics {
measurement := metric.Name measurement := metric.Name
jmxPath := metric.Jmx jmxPath := metric.Jmx
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
// Prepare URL // Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" + requestUrl, err := url.Parse("http://" + server.Host + ":" +
server.Port + context + jmxPath) server.Port + context + jmxPath)
@ -209,16 +149,20 @@ func (j *Jolokia) Gather(acc plugins.Accumulator) error {
out, _ := j.getAttr(requestUrl) out, _ := j.getAttr(requestUrl)
if values, ok := out["value"]; ok { if values, ok := out["value"]; ok {
switch values.(type) { switch t := values.(type) {
case map[string]interface{}: case map[string]interface{}:
acc.AddFields(measurement, metric.filterFields(values.(map[string]interface{})), tags) for k, v := range t {
fields[measurement+"_"+k] = v
}
case interface{}: case interface{}:
acc.Add(measurement, values.(interface{}), tags) fields[measurement] = t
} }
} else { } else {
fmt.Printf("Missing key 'value' in '%s' output response\n", requestUrl.String()) fmt.Printf("Missing key 'value' in '%s' output response\n",
requestUrl.String())
} }
} }
acc.AddFields("jolokia", fields, tags)
} }
return nil return nil

View File

@ -98,7 +98,8 @@ func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
} }
dialInfo, err := mgo.ParseURL(dialAddrs[0]) dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil { if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error()) return fmt.Errorf("Unable to parse URL (%s), %s\n",
dialAddrs[0], err.Error())
} }
dialInfo.Direct = true dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second dialInfo.Timeout = time.Duration(10) * time.Second

View File

@ -10,6 +10,7 @@ import (
type MongodbData struct { type MongodbData struct {
StatLine *StatLine StatLine *StatLine
Fields map[string]interface{}
Tags map[string]string Tags map[string]string
} }
@ -20,6 +21,7 @@ func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
return &MongodbData{ return &MongodbData{
StatLine: statLine, StatLine: statLine,
Tags: tags, Tags: tags,
Fields: make(map[string]interface{}),
} }
} }
@ -63,38 +65,44 @@ var WiredTigerStats = map[string]string{
"percent_cache_used": "CacheUsedPercent", "percent_cache_used": "CacheUsedPercent",
} }
func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) { func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem() statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats) d.addStat(statLine, DefaultStats)
if d.StatLine.NodeType != "" { if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats) d.addStat(statLine, DefaultReplStats)
} }
if d.StatLine.StorageEngine == "mmapv1" { if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats) d.addStat(statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" { } else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats { for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface() val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100) percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64) floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal) d.add(key, floatVal)
} }
} }
} }
func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) { func (d *MongodbData) addStat(
statLine reflect.Value,
stats map[string]string,
) {
for key, value := range stats { for key, value := range stats {
val := statLine.FieldByName(value).Interface() val := statLine.FieldByName(value).Interface()
d.add(acc, key, val) d.add(key, val)
} }
} }
func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) { func (d *MongodbData) add(key string, val interface{}) {
d.Fields[key] = val
}
func (d *MongodbData) flush(acc plugins.Accumulator) {
acc.AddFields( acc.AddFields(
key, "mongodb",
map[string]interface{}{ d.Fields,
"value": val,
},
d.Tags, d.Tags,
d.StatLine.Time, d.StatLine.Time,
) )
d.Fields = make(map[string]interface{})
} }

View File

@ -44,7 +44,8 @@ func (s *Server) gatherData(acc plugins.Accumulator) error {
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds), NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(), s.getDefaultTags(),
) )
data.AddDefaultStats(acc) data.AddDefaultStats()
data.flush(acc)
} }
return nil return nil
} }