From 55adf5f511d58cd84f36ba9397201f6706c75781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20H=C3=A9bert?= Date: Mon, 4 Jul 2016 14:51:54 +0200 Subject: [PATCH] Add output plugin warp10 Change syntax and update code according to comments Delete unused import --- README.md | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/warp10/README.md | 24 +++++ plugins/outputs/warp10/warp10.go | 166 +++++++++++++++++++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 plugins/outputs/warp10/README.md create mode 100755 plugins/outputs/warp10/warp10.go diff --git a/README.md b/README.md index 53e672534..f51dcbcbc 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,7 @@ want to add support for another service or third-party API. * [opentsdb](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/opentsdb) * [prometheus](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/prometheus_client) * [riemann](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/riemann) +* [warp10](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/warp10) ## Contributing diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 27f8958fe..1c09d3a15 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -18,4 +18,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/opentsdb" _ "github.com/influxdata/telegraf/plugins/outputs/prometheus_client" _ "github.com/influxdata/telegraf/plugins/outputs/riemann" + _ "github.com/influxdata/telegraf/plugins/outputs/warp10" ) diff --git a/plugins/outputs/warp10/README.md b/plugins/outputs/warp10/README.md new file mode 100644 index 000000000..0fa077d5e --- /dev/null +++ b/plugins/outputs/warp10/README.md @@ -0,0 +1,24 @@ +# README # + +Telegraph plugin to push metrics on Warp10 + +### Telegraph output for Warp10 ### + +* Execute a post http on Warp10 at every flush time configured in telegraph in order to push the metrics collected + +### Config ### + +* Add following instruction in the config file (Output part) + +``` +[[outputs.warp10]] +warpUrl = "https://warp1.cityzendata.net/api/v0/update" +token = "token" +prefix = "telegraf." +debug = false + +``` + +### Contact ### + +* contact@cityzendata.com diff --git a/plugins/outputs/warp10/warp10.go b/plugins/outputs/warp10/warp10.go new file mode 100755 index 000000000..83e32af20 --- /dev/null +++ b/plugins/outputs/warp10/warp10.go @@ -0,0 +1,166 @@ +package warp10 + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type Warp10 struct { + Prefix string + + WarpUrl string + + Token string +} + +var sampleConfig = ` + # prefix for metrics class Name + prefix = "telegraf." + ## POST HTTP Mode ## + # Url name of the Warp 10 server + warp_url = "localhost:4242/" + # Token to access your app on warp 10 + token = "Token" +` + +type MetricLine struct { + Metric string + Timestamp int64 + Value string + Tags string +} + +func (o *Warp10) Connect() error { + return nil +} + +func (o *Warp10) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + var timeNow = time.Now() + collectString := make([]string, 0) + index := 0 + for _, mm := range metrics { + + for k, v := range mm.Fields() { + + metric := &MetricLine{ + Metric: fmt.Sprintf("%s%s", o.Prefix, mm.Name()+"."+k), + Timestamp: timeNow.Unix() * 1000000, + } + + metricValue, buildError := buildValue(v) + if buildError != nil { + fmt.Printf("Warp: %s\n", buildError.Error()) + continue + } + metric.Value = metricValue + + tagsSlice := buildTags(mm.Tags()) + metric.Tags = fmt.Sprint(strings.Join(tagsSlice, ",")) + + messageLine := fmt.Sprintf("%v// %s{%s} %v \n", metric.Timestamp, metric.Metric, metric.Tags, metric.Value) + + collectString = append(collectString, messageLine) + index += 1 + } + } + payload := fmt.Sprint(strings.Join(collectString, "\n")) + //defer connection.Close() + req, err := http.NewRequest("POST", o.WarpUrl, bytes.NewBufferString(payload)) + req.Header.Set("X-Warp10-Token", o.Token) + req.Header.Set("Content-Type", "text/plain") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + fmt.Println("response Status:", resp.Status) + fmt.Println("response Headers:", resp.Header) + body, _ := ioutil.ReadAll(resp.Body) + fmt.Println("response Body:", string(body)) + + return nil +} + +func buildTags(ptTags map[string]string) []string { + sizeTags := len(ptTags) + sizeTags += 1 + tags := make([]string, sizeTags) + index := 0 + for k, v := range ptTags { + tags[index] = fmt.Sprintf("%s=%s", k, v) + index += 1 + } + tags[index] = fmt.Sprintf("source=telegraf") + sort.Strings(tags) + return tags +} + +func buildValue(v interface{}) (string, error) { + var retv string + switch p := v.(type) { + case int64: + retv = IntToString(int64(p)) + case string: + retv = fmt.Sprintf("'%s'", p) + case bool: + retv = BoolToString(bool(p)) + case uint64: + retv = UIntToString(uint64(p)) + case float64: + retv = FloatToString(float64(p)) + default: + retv = fmt.Sprintf("'%s'", p) + // return retv, fmt.Errorf("unexpected type %T with value %v for Warp", v, v) + } + return retv, nil +} + +func IntToString(input_num int64) string { + return strconv.FormatInt(input_num, 10) +} + +func BoolToString(input_bool bool) string { + return strconv.FormatBool(input_bool) +} + +func UIntToString(input_num uint64) string { + return strconv.FormatUint(input_num, 10) +} + +func FloatToString(input_num float64) string { + return strconv.FormatFloat(input_num, 'f', 6, 64) +} + +func (o *Warp10) SampleConfig() string { + return sampleConfig +} + +func (o *Warp10) Description() string { + return "Configuration for Warp server to send metrics to" +} + +func (o *Warp10) Close() error { + // Basically nothing to do for Warp10 here + return nil +} + +func init() { + outputs.Add("warp10", func() telegraf.Output { + return &Warp10{} + }) +}