Breakout JSON flattening into internal package, exec & elasticsearch aggregation

This commit is contained in:
Cameron Sparr 2015-12-14 16:15:51 -06:00
parent 97a66b73cf
commit 3be111a160
7 changed files with 81 additions and 57 deletions

View File

@ -3,6 +3,7 @@ package internal
import ( import (
"bufio" "bufio"
"errors" "errors"
"fmt"
"os" "os"
"strings" "strings"
"time" "time"
@ -27,6 +28,39 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
var NotImplementedError = errors.New("not implemented yet") var NotImplementedError = errors.New("not implemented yet")
type JSONFlattener struct {
Fields map[string]interface{}
}
// FlattenJSON flattens nested maps/interfaces into a fields map
func (f *JSONFlattener) FlattenJSON(
fieldname string,
v interface{},
) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
fieldname = strings.Trim(fieldname, "_")
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
if err != nil {
return err
}
}
case float64:
f.Fields[fieldname] = t
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
t, t, fieldname)
}
return nil
}
// ReadLines reads contents from a file and splits them by new lines. // ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) { func ReadLines(filename string) ([]string, error) {

View File

@ -81,7 +81,9 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
} }
rawValue := strings.TrimSpace(string(file)) rawValue := strings.TrimSpace(string(file))
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
acc.Add("dirty_data", value, tags)
fields := make(map[string]interface{})
fields["dirty_data"] = value
for _, path := range metrics { for _, path := range metrics {
key := filepath.Base(path) key := filepath.Base(path)
@ -92,12 +94,13 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
} }
if key == "bypassed" { if key == "bypassed" {
value := prettyToBytes(rawValue) value := prettyToBytes(rawValue)
acc.Add(key, value, tags) fields[key] = value
} else { } else {
value, _ := strconv.ParseUint(rawValue, 10, 64) value, _ := strconv.ParseUint(rawValue, 10, 64)
acc.Add(key, value, tags) fields[key] = value
} }
} }
acc.AddFields("bcache", fields, tags)
return nil return nil
} }
@ -117,7 +120,7 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
} }
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 { if len(bdevs) < 1 {
return errors.New("Can't found any bcache device") return errors.New("Can't find any bcache device")
} }
for _, bdev := range bdevs { for _, bdev := range bdevs {
if restrictDevs { if restrictDevs {

View File

@ -155,6 +155,8 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
var read int var read int
fields := make(map[string]interface{})
tags := map[string]string{"host": addr.String()}
for read < sz { for read < sz {
line, err := r.ReadString('\n') line, err := r.ReadString('\n')
if err != nil { if err != nil {
@ -176,12 +178,11 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
continue continue
} }
tags := map[string]string{"host": addr.String()}
val := strings.TrimSpace(parts[1]) val := strings.TrimSpace(parts[1])
ival, err := strconv.ParseUint(val, 10, 64) ival, err := strconv.ParseUint(val, 10, 64)
if err == nil { if err == nil {
acc.Add(metric, ival, tags) fields[metric] = ival
continue continue
} }
@ -190,9 +191,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
return err return err
} }
acc.Add(metric, fval, tags) fields[metric] = fval
} }
acc.AddFields("disque", fields, tags)
return nil return nil
} }

View File

@ -31,8 +31,9 @@ contains `status`, `timed_out`, `number_of_nodes`, `number_of_data_nodes`,
`initializing_shards`, `unassigned_shards` fields `initializing_shards`, `unassigned_shards` fields
- elasticsearch_cluster_health - elasticsearch_cluster_health
contains `status`, `number_of_shards`, `number_of_replicas`, `active_primary_shards`, contains `status`, `number_of_shards`, `number_of_replicas`,
`active_shards`, `relocating_shards`, `initializing_shards`, `unassigned_shards` fields `active_primary_shards`, `active_shards`, `relocating_shards`,
`initializing_shards`, `unassigned_shards` fields
- elasticsearch_indices - elasticsearch_indices
#### node measurements: #### node measurements:

View File

@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins" "github.com/influxdb/telegraf/plugins"
) )
@ -141,10 +142,14 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) err
"breakers": n.Breakers, "breakers": n.Breakers,
} }
now := time.Now()
for p, s := range stats { for p, s := range stats {
if err := e.parseInterface(acc, p, tags, s); err != nil { f := internal.JSONFlattener{}
err := f.FlattenJSON("", s)
if err != nil {
return err return err
} }
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
} }
} }
return nil return nil
@ -168,7 +173,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
"unassigned_shards": clusterStats.UnassignedShards, "unassigned_shards": clusterStats.UnassignedShards,
} }
acc.AddFields( acc.AddFields(
"cluster_health", "elasticsearch_cluster_health",
clusterFields, clusterFields,
map[string]string{"name": clusterStats.ClusterName}, map[string]string{"name": clusterStats.ClusterName},
measurementTime, measurementTime,
@ -186,7 +191,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
"unassigned_shards": health.UnassignedShards, "unassigned_shards": health.UnassignedShards,
} }
acc.AddFields( acc.AddFields(
"indices", "elasticsearch_indices",
indexFields, indexFields,
map[string]string{"index": name}, map[string]string{"index": name},
measurementTime, measurementTime,
@ -205,7 +210,8 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for // to let the underlying transport close the connection and re-establish a new one for
// future calls. // future calls.
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
r.StatusCode, http.StatusOK)
} }
if err = json.NewDecoder(r.Body).Decode(v); err != nil { if err = json.NewDecoder(r.Body).Decode(v); err != nil {
return err return err
@ -213,25 +219,6 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
return nil return nil
} }
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, t, tags)
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
}
return nil
}
func init() { func init() {
plugins.Add("elasticsearch", func() plugins.Plugin { plugins.Add("elasticsearch", func() plugins.Plugin {
return NewElasticsearch() return NewElasticsearch()

View File

@ -5,13 +5,16 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/plugins"
"math" "math"
"os/exec" "os/exec"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/plugins"
) )
const sampleConfig = ` const sampleConfig = `
@ -136,25 +139,27 @@ func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
var jsonOut interface{} var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut) err = json.Unmarshal(out, &jsonOut)
if err != nil { if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
c.Command, err)
} }
processResponse(acc, c.Name, map[string]string{}, jsonOut) f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
var msrmnt_name string
if c.Name == "" {
msrmnt_name = "exec"
} else {
msrmnt_name = "exec_" + c.Name
}
acc.AddFields(msrmnt_name, f.Fields, nil)
} }
return nil return nil
} }
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
processResponse(acc, prefix+"_"+k, tags, v)
}
case float64:
acc.Add(prefix, v, tags)
}
}
func init() { func init() {
plugins.Add("exec", func() plugins.Plugin { plugins.Add("exec", func() plugins.Plugin {
return NewExec() return NewExec()

View File

@ -19,13 +19,6 @@ func (_ *SystemStats) Description() string {
func (_ *SystemStats) SampleConfig() string { return "" } func (_ *SystemStats) SampleConfig() string { return "" }
func (_ *SystemStats) add(acc plugins.Accumulator,
name string, val float64, tags map[string]string) {
if val >= 0 {
acc.Add(name, val, tags)
}
}
func (_ *SystemStats) Gather(acc plugins.Accumulator) error { func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
loadavg, err := load.LoadAvg() loadavg, err := load.LoadAvg()
if err != nil { if err != nil {
@ -41,7 +34,7 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
"load1": loadavg.Load1, "load1": loadavg.Load1,
"load5": loadavg.Load5, "load5": loadavg.Load5,
"load15": loadavg.Load15, "load15": loadavg.Load15,
"uptime": float64(hostinfo.Uptime), "uptime": hostinfo.Uptime,
"uptime_format": format_uptime(hostinfo.Uptime), "uptime_format": format_uptime(hostinfo.Uptime),
} }
acc.AddFields("system", fields, nil) acc.AddFields("system", fields, nil)