From 06ef2a72c5e16dfe56600f70802dc0a3739b52f3 Mon Sep 17 00:00:00 2001 From: Alvaro Morales Date: Tue, 4 Aug 2015 14:48:13 -0700 Subject: [PATCH] Add httpjson plugin --- plugins/all/all.go | 1 + plugins/httpjson/httpjson.go | 110 +++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 plugins/httpjson/httpjson.go diff --git a/plugins/all/all.go b/plugins/all/all.go index bc610c39c..8670c3d8b 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/exec" _ "github.com/influxdb/telegraf/plugins/haproxy" + _ "github.com/influxdb/telegraf/plugins/httpjson" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/leofs" _ "github.com/influxdb/telegraf/plugins/lustre2" diff --git a/plugins/httpjson/httpjson.go b/plugins/httpjson/httpjson.go new file mode 100644 index 000000000..90af0c1a7 --- /dev/null +++ b/plugins/httpjson/httpjson.go @@ -0,0 +1,110 @@ +package httpjson + +import ( + "encoding/json" + "fmt" + "github.com/bitly/go-simplejson" + "github.com/influxdb/telegraf/plugins" + "net/http" + "sync" +) + +type HttpJson struct { + Servers []string + Measurements map[string]string + Method string + Foo string + client *http.Client +} + +var sampleConfig = ` +# stats url endpoint +servers = ["http://localhost:5000"] + +# a name for server(s) +foo = "mycluster" + +# HTTP method (GET or POST) +method = "GET" + +# Map of key transforms # TODO describe +[httpjson.measurements] +stats_measurements_measurement = "my_measurement" +` + +func (h *HttpJson) SampleConfig() string { + return sampleConfig +} + +func (h *HttpJson) Description() string { + return "Read flattened metrics from one or more JSON HTTP endpoints" +} + +func (h *HttpJson) Gather(acc plugins.Accumulator) error { + var wg sync.WaitGroup + + var outerr error + + for _, server := range h.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + outerr = h.gatherServer(server, acc) + }(server) + } + + wg.Wait() + + return outerr +} + +func (h *HttpJson) gatherServer(url string, acc plugins.Accumulator) error { + r, err := h.client.Get(url) + if err != nil { + return err + } + + if r.StatusCode != http.StatusOK { + return fmt.Errorf("httpjson: server '%s' responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + } + + response, err := simplejson.NewFromReader(r.Body) + + if err != nil { + return err + } + + tags := map[string]string{ + "server": url, + } + + return parseResponse(acc, h.Foo, tags, response.Interface(), h.Measurements) +} + +func parseResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}, measurements map[string]string) error { + switch t := v.(type) { + case map[string]interface{}: + for k, v := range t { + if err := parseResponse(acc, prefix+"_"+k, tags, v, measurements); err != nil { + return err + } + } + case json.Number: + if transform, ok := measurements[prefix]; ok { + prefix = transform + } + acc.Add(prefix, t, tags) + case bool, string, []interface{}: + // ignored types + return nil + default: + return fmt.Errorf("httpjson: got unexpected type %T with value %v (%s)", t, v, prefix) + } + return nil +} + +func init() { + plugins.Add("httpjson", func() plugins.Plugin { + return &HttpJson{client: http.DefaultClient} + }) +}