From ce02bebf3037be3c6043652fa61e5dc5adcc135a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20H=C3=89BERT?= Date: Thu, 9 Jan 2020 20:29:16 +0100 Subject: [PATCH] Add output plugin for Warp10 (#1923) --- README.md | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/warp10/README.md | 30 +++ plugins/outputs/warp10/warp10.go | 282 ++++++++++++++++++++++++++ plugins/outputs/warp10/warp10_test.go | 108 ++++++++++ 5 files changed, 422 insertions(+) create mode 100644 plugins/outputs/warp10/README.md create mode 100644 plugins/outputs/warp10/warp10.go create mode 100644 plugins/outputs/warp10/warp10_test.go diff --git a/README.md b/README.md index 4150ec17a..3276f33bf 100644 --- a/README.md +++ b/README.md @@ -409,4 +409,5 @@ For documentation on the latest development code see the [documentation index][d * [syslog](./plugins/outputs/syslog) * [tcp](./plugins/outputs/socket_writer) * [udp](./plugins/outputs/socket_writer) +* [warp10](./plugins/outputs/warp10) * [wavefront](./plugins/outputs/wavefront) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index e40230993..35e0393de 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -33,5 +33,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" _ "github.com/influxdata/telegraf/plugins/outputs/stackdriver" _ "github.com/influxdata/telegraf/plugins/outputs/syslog" + _ "github.com/influxdata/telegraf/plugins/outputs/warp10" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" ) diff --git a/plugins/outputs/warp10/README.md b/plugins/outputs/warp10/README.md new file mode 100644 index 000000000..df56e6816 --- /dev/null +++ b/plugins/outputs/warp10/README.md @@ -0,0 +1,30 @@ +# README # + +Telegraph plugin to push metrics on Warp10 + +### Telegraph output for Warp10 ### + +Execute a post http on Warp10 at every flush time configured in telegraph in order to push the metrics collected + +### Config ### + +Add following instruction in the config file (Output part) + +``` +[[outputs.warp10]] +warpUrl = "http://localhost:4242" +token = "token" +prefix = "telegraf." +timeout = "15s" +``` + +To get more details on Warp 10 errors occuring when pushing data with Telegraf, you can optionaly set: + +``` +printErrorBody = true ## To print the full body of the HTTP Post instead of the request status +maxStringErrorSize = 700  ## To update the maximal string size of the Warp 10 error body. By default it's set to 512. +``` + +### Values format + +The Warp 10 output support natively number, float and boolean values. String are send as URL encoded values as well as all Influx objects. \ No newline at end of file diff --git a/plugins/outputs/warp10/warp10.go b/plugins/outputs/warp10/warp10.go new file mode 100644 index 000000000..deaefc6fc --- /dev/null +++ b/plugins/outputs/warp10/warp10.go @@ -0,0 +1,282 @@ +package warp10 + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" +) + +const ( + defaultClientTimeout = 15 * time.Second +) + +// Warp10 output plugin +type Warp10 struct { + Prefix string + WarpURL string + Token string + Timeout internal.Duration `toml:"timeout"` + PrintErrorBody bool + MaxStringErrorSize int + client *http.Client + tls.ClientConfig +} + +var sampleConfig = ` + # prefix for metrics class Name + prefix = "telegraf." + ## POST HTTP(or HTTPS) ## + # Url name of the Warp 10 server + warp_url = "http://localhost:8080" + # Token to access your app on warp 10 + token = "Token" + # Warp 10 query timeout, by default 15s + timeout = "15s" + ## Optional Print Warp 10 error body + # print_error_body = false + ## Optional Max string error Size + # max_string_error_size = 511 + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" +` + +// MetricLine Warp 10 metrics +type MetricLine struct { + Metric string + Timestamp int64 + Value string + Tags string +} + +func (w *Warp10) createClient() (*http.Client, error) { + tlsCfg, err := w.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + if w.Timeout.Duration == 0 { + w.Timeout.Duration = defaultClientTimeout + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + }, + Timeout: w.Timeout.Duration, + } + + return client, nil +} + +// Connect to warp10 +func (w *Warp10) Connect() error { + client, err := w.createClient() + if err != nil { + return err + } + + w.client = client + return nil +} + +// GenWarp10Payload compute Warp 10 metrics payload +func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric, now time.Time) string { + collectString := make([]string, 0) + for _, mm := range metrics { + + for _, field := range mm.FieldList() { + + metric := &MetricLine{ + Metric: fmt.Sprintf("%s%s", w.Prefix, mm.Name()+"."+field.Key), + Timestamp: now.UnixNano() / 1000, + } + + metricValue, err := buildValue(field.Value) + if err != nil { + log.Printf("E! [outputs.warp10] Could not encode value: %v", err) + continue + } + metric.Value = metricValue + + tagsSlice := buildTags(mm.TagList()) + metric.Tags = strings.Join(tagsSlice, ",") + + messageLine := fmt.Sprintf("%d// %s{%s} %s\n", metric.Timestamp, metric.Metric, metric.Tags, metric.Value) + + collectString = append(collectString, messageLine) + } + } + return fmt.Sprint(strings.Join(collectString, "")) +} + +// Write metrics to Warp10 +func (w *Warp10) Write(metrics []telegraf.Metric) error { + + var now = time.Now() + payload := w.GenWarp10Payload(metrics, now) + + if payload == "" { + return nil + } + + req, err := http.NewRequest("POST", w.WarpURL+"/api/v0/update", bytes.NewBufferString(payload)) + req.Header.Set("X-Warp10-Token", w.Token) + req.Header.Set("Content-Type", "text/plain") + + resp, err := w.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + if w.PrintErrorBody { + body, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf(w.WarpURL + ": " + w.HandleError(string(body), w.MaxStringErrorSize)) + } + + if len(resp.Status) < w.MaxStringErrorSize { + return fmt.Errorf(w.WarpURL + ": " + resp.Status) + } + + return fmt.Errorf(w.WarpURL + ": " + resp.Status[0:w.MaxStringErrorSize]) + } + + return nil +} + +func buildTags(tags []*telegraf.Tag) []string { + + tagsString := make([]string, len(tags)+1) + indexSource := 0 + for index, tag := range tags { + tagsString[index] = fmt.Sprintf("%s=%s", tag.Key, tag.Value) + indexSource = index + } + indexSource++ + tagsString[indexSource] = fmt.Sprintf("source=telegraf") + sort.Strings(tagsString) + return tagsString +} + +func buildValue(v interface{}) (string, error) { + var retv string + switch p := v.(type) { + case int64: + retv = intToString(int64(p)) + case string: + retv = fmt.Sprintf("'%s'", strings.Replace(p, "'", "\\'", -1)) + case bool: + retv = boolToString(bool(p)) + case uint64: + retv = uIntToString(uint64(p)) + case float64: + retv = floatToString(float64(p)) + default: + retv = "'" + strings.Replace(fmt.Sprintf("%s", p), "'", "\\'", -1) + "'" + } + return retv, nil +} + +func intToString(inputNum int64) string { + return strconv.FormatInt(inputNum, 10) +} + +func boolToString(inputBool bool) string { + return strconv.FormatBool(inputBool) +} + +func uIntToString(inputNum uint64) string { + return strconv.FormatUint(inputNum, 10) +} + +func floatToString(inputNum float64) string { + return strconv.FormatFloat(inputNum, 'f', 6, 64) +} + +// SampleConfig get config +func (w *Warp10) SampleConfig() string { + return sampleConfig +} + +// Description get description +func (w *Warp10) Description() string { + return "Configuration for Warp server to send metrics to" +} + +// Close close +func (w *Warp10) Close() error { + return nil +} + +// Init Warp10 struct +func (w *Warp10) Init() error { + if w.MaxStringErrorSize <= 0 { + w.MaxStringErrorSize = 511 + } + return nil +} + +func init() { + outputs.Add("warp10", func() telegraf.Output { + return &Warp10{} + }) +} + +// HandleError read http error body and return a corresponding error +func (w *Warp10) HandleError(body string, maxStringSize int) string { + if body == "" { + return "Empty return" + } + + if strings.Contains(body, "Invalid token") { + return "Invalid token" + } + + if strings.Contains(body, "Write token missing") { + return "Write token missing" + } + + if strings.Contains(body, "Token Expired") { + return "Token Expired" + } + + if strings.Contains(body, "Token revoked") { + return "Token revoked" + } + + if strings.Contains(body, "exceed your Monthly Active Data Streams limit") || strings.Contains(body, "exceed the Monthly Active Data Streams limit") { + return "Exceeded Monthly Active Data Streams limit" + } + + if strings.Contains(body, "Daily Data Points limit being already exceeded") { + return "Exceeded Daily Data Points limit" + } + + if strings.Contains(body, "Application suspended or closed") { + return "Application suspended or closed" + } + + if strings.Contains(body, "broken pipe") { + return "broken pipe" + } + + if len(body) < maxStringSize { + return body + } + return body[0:maxStringSize] +} diff --git a/plugins/outputs/warp10/warp10_test.go b/plugins/outputs/warp10/warp10_test.go new file mode 100644 index 000000000..e222b7d93 --- /dev/null +++ b/plugins/outputs/warp10/warp10_test.go @@ -0,0 +1,108 @@ +package warp10 + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" +) + +type ErrorTest struct { + Message string + Expected string +} + +func TestWriteWarp10(t *testing.T) { + w := Warp10{ + Prefix: "unit.test", + WarpURL: "http://localhost:8090", + Token: "WRITE", + } + + var now = time.Now() + payload := w.GenWarp10Payload(testutil.MockMetrics(), now) + require.Exactly(t, fmt.Sprintf("%d// unit.testtest1.value{source=telegraf,tag1=value1} 1.000000\n", now.UnixNano()/1000), payload) +} + +func TestHandleWarp10Error(t *testing.T) { + w := Warp10{ + Prefix: "unit.test", + WarpURL: "http://localhost:8090", + Token: "WRITE", + } + tests := [...]*ErrorTest{ + { + Message: ` + + + + Error 500 io.warp10.script.WarpScriptException: Invalid token. + +

HTTP ERROR 500

+

Problem accessing /api/v0/update. Reason: +

    io.warp10.script.WarpScriptException: Invalid token.

+ + + `, + Expected: fmt.Sprintf("Invalid token"), + }, + { + Message: ` + + + + Error 500 io.warp10.script.WarpScriptException: Token Expired. + +

HTTP ERROR 500

+

Problem accessing /api/v0/update. Reason: +

    io.warp10.script.WarpScriptException: Token Expired.

+ + + `, + Expected: fmt.Sprintf("Token Expired"), + }, + { + Message: ` + + + + Error 500 io.warp10.script.WarpScriptException: Token revoked. + +

HTTP ERROR 500

+

Problem accessing /api/v0/update. Reason: +

    io.warp10.script.WarpScriptException: Token revoked.

+ + + `, + Expected: fmt.Sprintf("Token revoked"), + }, + { + Message: ` + + + + Error 500 io.warp10.script.WarpScriptException: Write token missing. + +

HTTP ERROR 500

+

Problem accessing /api/v0/update. Reason: +

    io.warp10.script.WarpScriptException: Write token missing.

+ + + `, + Expected: "Write token missing", + }, + { + Message: `Error 503: server unavailable`, + Expected: "Error 503: server unavailable", + }, + } + + for _, handledError := range tests { + payload := w.HandleError(handledError.Message, 511) + require.Exactly(t, handledError.Expected, payload) + } + +}