Breakout JSON flattening into internal package, exec & elasticsearch aggregation
This commit is contained in:
parent
59f804d77a
commit
3fc43df84e
|
@ -3,6 +3,7 @@ package internal
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,6 +28,39 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
|
||||||
|
|
||||||
var NotImplementedError = errors.New("not implemented yet")
|
var NotImplementedError = errors.New("not implemented yet")
|
||||||
|
|
||||||
|
type JSONFlattener struct {
|
||||||
|
Fields map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlattenJSON flattens nested maps/interfaces into a fields map
|
||||||
|
func (f *JSONFlattener) FlattenJSON(
|
||||||
|
fieldname string,
|
||||||
|
v interface{},
|
||||||
|
) error {
|
||||||
|
if f.Fields == nil {
|
||||||
|
f.Fields = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
fieldname = strings.Trim(fieldname, "_")
|
||||||
|
switch t := v.(type) {
|
||||||
|
case map[string]interface{}:
|
||||||
|
for k, v := range t {
|
||||||
|
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case float64:
|
||||||
|
f.Fields[fieldname] = t
|
||||||
|
case bool, string, []interface{}:
|
||||||
|
// ignored types
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
|
||||||
|
t, t, fieldname)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReadLines reads contents from a file and splits them by new lines.
|
// ReadLines reads contents from a file and splits them by new lines.
|
||||||
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
|
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
|
||||||
func ReadLines(filename string) ([]string, error) {
|
func ReadLines(filename string) ([]string, error) {
|
||||||
|
|
|
@ -81,7 +81,9 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
|
||||||
}
|
}
|
||||||
rawValue := strings.TrimSpace(string(file))
|
rawValue := strings.TrimSpace(string(file))
|
||||||
value := prettyToBytes(rawValue)
|
value := prettyToBytes(rawValue)
|
||||||
acc.Add("dirty_data", value, tags)
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
fields["dirty_data"] = value
|
||||||
|
|
||||||
for _, path := range metrics {
|
for _, path := range metrics {
|
||||||
key := filepath.Base(path)
|
key := filepath.Base(path)
|
||||||
|
@ -92,12 +94,13 @@ func (b *Bcache) gatherBcache(bdev string, acc plugins.Accumulator) error {
|
||||||
}
|
}
|
||||||
if key == "bypassed" {
|
if key == "bypassed" {
|
||||||
value := prettyToBytes(rawValue)
|
value := prettyToBytes(rawValue)
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
} else {
|
} else {
|
||||||
value, _ := strconv.ParseUint(rawValue, 10, 64)
|
value, _ := strconv.ParseUint(rawValue, 10, 64)
|
||||||
acc.Add(key, value, tags)
|
fields[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
acc.AddFields("bcache", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +120,7 @@ func (b *Bcache) Gather(acc plugins.Accumulator) error {
|
||||||
}
|
}
|
||||||
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
|
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
|
||||||
if len(bdevs) < 1 {
|
if len(bdevs) < 1 {
|
||||||
return errors.New("Can't found any bcache device")
|
return errors.New("Can't find any bcache device")
|
||||||
}
|
}
|
||||||
for _, bdev := range bdevs {
|
for _, bdev := range bdevs {
|
||||||
if restrictDevs {
|
if restrictDevs {
|
||||||
|
|
|
@ -155,6 +155,8 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||||
|
|
||||||
var read int
|
var read int
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := map[string]string{"host": addr.String()}
|
||||||
for read < sz {
|
for read < sz {
|
||||||
line, err := r.ReadString('\n')
|
line, err := r.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -176,12 +178,11 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"host": addr.String()}
|
|
||||||
val := strings.TrimSpace(parts[1])
|
val := strings.TrimSpace(parts[1])
|
||||||
|
|
||||||
ival, err := strconv.ParseUint(val, 10, 64)
|
ival, err := strconv.ParseUint(val, 10, 64)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
acc.Add(metric, ival, tags)
|
fields[metric] = ival
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,9 +191,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.Add(metric, fval, tags)
|
fields[metric] = fval
|
||||||
}
|
}
|
||||||
|
acc.AddFields("disque", fields, tags)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,9 @@ contains `status`, `timed_out`, `number_of_nodes`, `number_of_data_nodes`,
|
||||||
`initializing_shards`, `unassigned_shards` fields
|
`initializing_shards`, `unassigned_shards` fields
|
||||||
- elasticsearch_cluster_health
|
- elasticsearch_cluster_health
|
||||||
|
|
||||||
contains `status`, `number_of_shards`, `number_of_replicas`, `active_primary_shards`,
|
contains `status`, `number_of_shards`, `number_of_replicas`,
|
||||||
`active_shards`, `relocating_shards`, `initializing_shards`, `unassigned_shards` fields
|
`active_primary_shards`, `active_shards`, `relocating_shards`,
|
||||||
|
`initializing_shards`, `unassigned_shards` fields
|
||||||
- elasticsearch_indices
|
- elasticsearch_indices
|
||||||
|
|
||||||
#### node measurements:
|
#### node measurements:
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/internal"
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -141,10 +142,14 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) err
|
||||||
"breakers": n.Breakers,
|
"breakers": n.Breakers,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
for p, s := range stats {
|
for p, s := range stats {
|
||||||
if err := e.parseInterface(acc, p, tags, s); err != nil {
|
f := internal.JSONFlattener{}
|
||||||
|
err := f.FlattenJSON("", s)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -168,7 +173,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
|
||||||
"unassigned_shards": clusterStats.UnassignedShards,
|
"unassigned_shards": clusterStats.UnassignedShards,
|
||||||
}
|
}
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"cluster_health",
|
"elasticsearch_cluster_health",
|
||||||
clusterFields,
|
clusterFields,
|
||||||
map[string]string{"name": clusterStats.ClusterName},
|
map[string]string{"name": clusterStats.ClusterName},
|
||||||
measurementTime,
|
measurementTime,
|
||||||
|
@ -186,7 +191,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator)
|
||||||
"unassigned_shards": health.UnassignedShards,
|
"unassigned_shards": health.UnassignedShards,
|
||||||
}
|
}
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"indices",
|
"elasticsearch_indices",
|
||||||
indexFields,
|
indexFields,
|
||||||
map[string]string{"index": name},
|
map[string]string{"index": name},
|
||||||
measurementTime,
|
measurementTime,
|
||||||
|
@ -205,7 +210,8 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
||||||
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
||||||
// to let the underlying transport close the connection and re-establish a new one for
|
// to let the underlying transport close the connection and re-establish a new one for
|
||||||
// future calls.
|
// future calls.
|
||||||
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
||||||
|
r.StatusCode, http.StatusOK)
|
||||||
}
|
}
|
||||||
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -213,25 +219,6 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case map[string]interface{}:
|
|
||||||
for k, v := range t {
|
|
||||||
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case float64:
|
|
||||||
acc.Add(prefix, t, tags)
|
|
||||||
case bool, string, []interface{}:
|
|
||||||
// ignored types
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("elasticsearch", func() plugins.Plugin {
|
plugins.Add("elasticsearch", func() plugins.Plugin {
|
||||||
return NewElasticsearch()
|
return NewElasticsearch()
|
||||||
|
|
|
@ -5,13 +5,16 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gonuts/go-shellquote"
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
|
||||||
"math"
|
"math"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gonuts/go-shellquote"
|
||||||
|
|
||||||
|
"github.com/influxdb/telegraf/internal"
|
||||||
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
|
@ -136,25 +139,27 @@ func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
|
||||||
var jsonOut interface{}
|
var jsonOut interface{}
|
||||||
err = json.Unmarshal(out, &jsonOut)
|
err = json.Unmarshal(out, &jsonOut)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
|
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
|
||||||
|
c.Command, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
processResponse(acc, c.Name, map[string]string{}, jsonOut)
|
f := internal.JSONFlattener{}
|
||||||
|
err = f.FlattenJSON("", jsonOut)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var msrmnt_name string
|
||||||
|
if c.Name == "" {
|
||||||
|
msrmnt_name = "exec"
|
||||||
|
} else {
|
||||||
|
msrmnt_name = "exec_" + c.Name
|
||||||
|
}
|
||||||
|
acc.AddFields(msrmnt_name, f.Fields, nil)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) {
|
|
||||||
switch t := v.(type) {
|
|
||||||
case map[string]interface{}:
|
|
||||||
for k, v := range t {
|
|
||||||
processResponse(acc, prefix+"_"+k, tags, v)
|
|
||||||
}
|
|
||||||
case float64:
|
|
||||||
acc.Add(prefix, v, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
plugins.Add("exec", func() plugins.Plugin {
|
plugins.Add("exec", func() plugins.Plugin {
|
||||||
return NewExec()
|
return NewExec()
|
||||||
|
|
|
@ -19,13 +19,6 @@ func (_ *SystemStats) Description() string {
|
||||||
|
|
||||||
func (_ *SystemStats) SampleConfig() string { return "" }
|
func (_ *SystemStats) SampleConfig() string { return "" }
|
||||||
|
|
||||||
func (_ *SystemStats) add(acc plugins.Accumulator,
|
|
||||||
name string, val float64, tags map[string]string) {
|
|
||||||
if val >= 0 {
|
|
||||||
acc.Add(name, val, tags)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
||||||
loadavg, err := load.LoadAvg()
|
loadavg, err := load.LoadAvg()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -41,7 +34,7 @@ func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
|
||||||
"load1": loadavg.Load1,
|
"load1": loadavg.Load1,
|
||||||
"load5": loadavg.Load5,
|
"load5": loadavg.Load5,
|
||||||
"load15": loadavg.Load15,
|
"load15": loadavg.Load15,
|
||||||
"uptime": float64(hostinfo.Uptime),
|
"uptime": hostinfo.Uptime,
|
||||||
"uptime_format": format_uptime(hostinfo.Uptime),
|
"uptime_format": format_uptime(hostinfo.Uptime),
|
||||||
}
|
}
|
||||||
acc.AddFields("system", fields, nil)
|
acc.AddFields("system", fields, nil)
|
||||||
|
|
Loading…
Reference in New Issue