From 3be111a160e32399d9997c8eb073e101dcc7d844 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 14 Dec 2015 16:15:51 -0600 Subject: [PATCH] Breakout JSON flattening into internal package, exec & elasticsearch aggregation --- internal/internal.go | 34 +++++++++++++++++++++++++ plugins/bcache/bcache.go | 11 +++++--- plugins/disque/disque.go | 9 ++++--- plugins/elasticsearch/README.md | 7 +++--- plugins/elasticsearch/elasticsearch.go | 33 ++++++++---------------- plugins/exec/exec.go | 35 +++++++++++++++----------- plugins/system/system.go | 9 +------ 7 files changed, 81 insertions(+), 57 deletions(-) diff --git a/internal/internal.go b/internal/internal.go index 45164682b..93c467808 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -3,6 +3,7 @@ package internal import ( "bufio" "errors" + "fmt" "os" "strings" "time" @@ -27,6 +28,39 @@ func (d *Duration) UnmarshalTOML(b []byte) error { var NotImplementedError = errors.New("not implemented yet") +type JSONFlattener struct { + Fields map[string]interface{} +} + +// FlattenJSON flattens nested maps/interfaces into a fields map +func (f *JSONFlattener) FlattenJSON( + fieldname string, + v interface{}, +) error { + if f.Fields == nil { + f.Fields = make(map[string]interface{}) + } + fieldname = strings.Trim(fieldname, "_") + switch t := v.(type) { + case map[string]interface{}: + for k, v := range t { + err := f.FlattenJSON(fieldname+"_"+k+"_", v) + if err != nil { + return err + } + } + case float64: + f.Fields[fieldname] = t + case bool, string, []interface{}: + // ignored types + return nil + default: + return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)", + t, t, fieldname) + } + return nil +} + // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { diff --git a/plugins/bcache/bcache.go b/plugins/bcache/bcache.go index 76e638ea4..92cea3d63 100644 --- a/plugins/bcache/bcache.go +++ b/plugins/bcache/bcache.go @@ -81,7 +81,9 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { } rawValue := strings.TrimSpace(string(file)) value := prettyToBytes(rawValue) - acc.Add("dirty_data", value, tags) + + fields := make(map[string]interface{}) + fields["dirty_data"] = value for _, path := range metrics { key := filepath.Base(path) @@ -92,12 +94,13 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error { } if key == "bypassed" { value := prettyToBytes(rawValue) - acc.Add(key, value, tags) + fields[key] = value } else { value, _ := strconv.ParseUint(rawValue, 10, 64) - acc.Add(key, value, tags) + fields[key] = value } } + acc.AddFields("bcache", fields, tags) return nil } @@ -117,7 +120,7 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error { } bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*") if len(bdevs) < 1 { - return errors.New("Can't found any bcache device") + return errors.New("Can't find any bcache device") } for _, bdev := range bdevs { if restrictDevs { diff --git a/plugins/disque/disque.go b/plugins/disque/disque.go index 004aa3c0f..b7b7dd5c1 100644 --- a/plugins/disque/disque.go +++ b/plugins/disque/disque.go @@ -155,6 +155,8 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error { var read int + fields := make(map[string]interface{}) + tags := map[string]string{"host": addr.String()} for read < sz { line, err := r.ReadString('\n') if err != nil { @@ -176,12 +178,11 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error { continue } - tags := map[string]string{"host": addr.String()} val := strings.TrimSpace(parts[1]) ival, err := strconv.ParseUint(val, 10, 64) if err == nil { - acc.Add(metric, ival, tags) + fields[metric] = ival continue } @@ -190,9 +191,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error { return err } - acc.Add(metric, fval, tags) + fields[metric] = fval } - + acc.AddFields("disque", fields, tags) return nil } diff --git a/plugins/elasticsearch/README.md b/plugins/elasticsearch/README.md index dbc9a3587..03acad034 100644 --- a/plugins/elasticsearch/README.md +++ b/plugins/elasticsearch/README.md @@ -31,8 +31,9 @@ contains `status`, `timed_out`, `number_of_nodes`, `number_of_data_nodes`, `initializing_shards`, `unassigned_shards` fields - elasticsearch_cluster_health -contains `status`, `number_of_shards`, `number_of_replicas`, `active_primary_shards`, -`active_shards`, `relocating_shards`, `initializing_shards`, `unassigned_shards` fields +contains `status`, `number_of_shards`, `number_of_replicas`, +`active_primary_shards`, `active_shards`, `relocating_shards`, +`initializing_shards`, `unassigned_shards` fields - elasticsearch_indices #### node measurements: @@ -316,4 +317,4 @@ Transport statistics about sent and received bytes in cluster communication meas - elasticsearch_transport_rx_count value=6 - elasticsearch_transport_rx_size_in_bytes value=1380 - elasticsearch_transport_tx_count value=6 -- elasticsearch_transport_tx_size_in_bytes value=1380 \ No newline at end of file +- elasticsearch_transport_tx_size_in_bytes value=1380 diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index bfe6f20bb..2266f2243 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "github.com/influxdb/telegraf/internal" "github.com/influxdb/telegraf/plugins" ) @@ -141,10 +142,14 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) err "breakers": n.Breakers, } + now := time.Now() for p, s := range stats { - if err := e.parseInterface(acc, p, tags, s); err != nil { + f := internal.JSONFlattener{} + err := f.FlattenJSON("", s) + if err != nil { return err } + acc.AddFields("elasticsearch_"+p, f.Fields, tags, now) } } return nil @@ -168,7 +173,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) "unassigned_shards": clusterStats.UnassignedShards, } acc.AddFields( - "cluster_health", + "elasticsearch_cluster_health", clusterFields, map[string]string{"name": clusterStats.ClusterName}, measurementTime, @@ -186,7 +191,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) "unassigned_shards": health.UnassignedShards, } acc.AddFields( - "indices", + "elasticsearch_indices", indexFields, map[string]string{"index": name}, measurementTime, @@ -205,7 +210,8 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error { // NOTE: we are not going to read/discard r.Body under the assumption we'd prefer // to let the underlying transport close the connection and re-establish a new one for // future calls. - return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", + r.StatusCode, http.StatusOK) } if err = json.NewDecoder(r.Body).Decode(v); err != nil { return err @@ -213,25 +219,6 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error { return nil } -func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error { - switch t := v.(type) { - case map[string]interface{}: - for k, v := range t { - if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil { - return err - } - } - case float64: - acc.Add(prefix, t, tags) - case bool, string, []interface{}: - // ignored types - return nil - default: - return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix) - } - return nil -} - func init() { plugins.Add("elasticsearch", func() plugins.Plugin { return NewElasticsearch() diff --git a/plugins/exec/exec.go b/plugins/exec/exec.go index d4a42b6c4..1571b6bf4 100644 --- a/plugins/exec/exec.go +++ b/plugins/exec/exec.go @@ -5,13 +5,16 @@ import ( "encoding/json" "errors" "fmt" - "github.com/gonuts/go-shellquote" - "github.com/influxdb/telegraf/plugins" "math" "os/exec" "strings" "sync" "time" + + "github.com/gonuts/go-shellquote" + + "github.com/influxdb/telegraf/internal" + "github.com/influxdb/telegraf/plugins" ) const sampleConfig = ` @@ -136,25 +139,27 @@ func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { var jsonOut interface{} err = json.Unmarshal(out, &jsonOut) if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) + return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", + c.Command, err) } - processResponse(acc, c.Name, map[string]string{}, jsonOut) + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + return err + } + + var msrmnt_name string + if c.Name == "" { + msrmnt_name = "exec" + } else { + msrmnt_name = "exec_" + c.Name + } + acc.AddFields(msrmnt_name, f.Fields, nil) } return nil } -func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) { - switch t := v.(type) { - case map[string]interface{}: - for k, v := range t { - processResponse(acc, prefix+"_"+k, tags, v) - } - case float64: - acc.Add(prefix, v, tags) - } -} - func init() { plugins.Add("exec", func() plugins.Plugin { return NewExec() diff --git a/plugins/system/system.go b/plugins/system/system.go index 82d4f4f24..1adf6c051 100644 --- a/plugins/system/system.go +++ b/plugins/system/system.go @@ -19,13 +19,6 @@ func (_ *SystemStats) Description() string { func (_ *SystemStats) SampleConfig() string { return "" } -func (_ *SystemStats) add(acc plugins.Accumulator, - name string, val float64, tags map[string]string) { - if val >= 0 { - acc.Add(name, val, tags) - } -} - func (_ *SystemStats) Gather(acc plugins.Accumulator) error { loadavg, err := load.LoadAvg() if err != nil { @@ -41,7 +34,7 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error { "load1": loadavg.Load1, "load5": loadavg.Load5, "load15": loadavg.Load15, - "uptime": float64(hostinfo.Uptime), + "uptime": hostinfo.Uptime, "uptime_format": format_uptime(hostinfo.Uptime), } acc.AddFields("system", fields, nil)