Add output plugin for Warp10 (#1923)
This commit is contained in:
parent
07b75c57fe
commit
ce02bebf30
|
@ -409,4 +409,5 @@ For documentation on the latest development code see the [documentation index][d
|
|||
* [syslog](./plugins/outputs/syslog)
|
||||
* [tcp](./plugins/outputs/socket_writer)
|
||||
* [udp](./plugins/outputs/socket_writer)
|
||||
* [warp10](./plugins/outputs/warp10)
|
||||
* [wavefront](./plugins/outputs/wavefront)
|
||||
|
|
|
@ -33,5 +33,6 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/stackdriver"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/syslog"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/warp10"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
|
||||
)
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# README #
|
||||
|
||||
Telegraph plugin to push metrics on Warp10
|
||||
|
||||
### Telegraph output for Warp10 ###
|
||||
|
||||
Execute a post http on Warp10 at every flush time configured in telegraph in order to push the metrics collected
|
||||
|
||||
### Config ###
|
||||
|
||||
Add following instruction in the config file (Output part)
|
||||
|
||||
```
|
||||
[[outputs.warp10]]
|
||||
warpUrl = "http://localhost:4242"
|
||||
token = "token"
|
||||
prefix = "telegraf."
|
||||
timeout = "15s"
|
||||
```
|
||||
|
||||
To get more details on Warp 10 errors occuring when pushing data with Telegraf, you can optionaly set:
|
||||
|
||||
```
|
||||
printErrorBody = true ## To print the full body of the HTTP Post instead of the request status
|
||||
maxStringErrorSize = 700 ## To update the maximal string size of the Warp 10 error body. By default it's set to 512.
|
||||
```
|
||||
|
||||
### Values format
|
||||
|
||||
The Warp 10 output support natively number, float and boolean values. String are send as URL encoded values as well as all Influx objects.
|
|
@ -0,0 +1,282 @@
|
|||
package warp10
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/internal/tls"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultClientTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
// Warp10 output plugin
|
||||
type Warp10 struct {
|
||||
Prefix string
|
||||
WarpURL string
|
||||
Token string
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
PrintErrorBody bool
|
||||
MaxStringErrorSize int
|
||||
client *http.Client
|
||||
tls.ClientConfig
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
# prefix for metrics class Name
|
||||
prefix = "telegraf."
|
||||
## POST HTTP(or HTTPS) ##
|
||||
# Url name of the Warp 10 server
|
||||
warp_url = "http://localhost:8080"
|
||||
# Token to access your app on warp 10
|
||||
token = "Token"
|
||||
# Warp 10 query timeout, by default 15s
|
||||
timeout = "15s"
|
||||
## Optional Print Warp 10 error body
|
||||
# print_error_body = false
|
||||
## Optional Max string error Size
|
||||
# max_string_error_size = 511
|
||||
## Optional TLS Config
|
||||
# tls_ca = "/etc/telegraf/ca.pem"
|
||||
# tls_cert = "/etc/telegraf/cert.pem"
|
||||
# tls_key = "/etc/telegraf/key.pem"
|
||||
`
|
||||
|
||||
// MetricLine Warp 10 metrics
|
||||
type MetricLine struct {
|
||||
Metric string
|
||||
Timestamp int64
|
||||
Value string
|
||||
Tags string
|
||||
}
|
||||
|
||||
func (w *Warp10) createClient() (*http.Client, error) {
|
||||
tlsCfg, err := w.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if w.Timeout.Duration == 0 {
|
||||
w.Timeout.Duration = defaultClientTimeout
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: tlsCfg,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
},
|
||||
Timeout: w.Timeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// Connect to warp10
|
||||
func (w *Warp10) Connect() error {
|
||||
client, err := w.createClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.client = client
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenWarp10Payload compute Warp 10 metrics payload
|
||||
func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric, now time.Time) string {
|
||||
collectString := make([]string, 0)
|
||||
for _, mm := range metrics {
|
||||
|
||||
for _, field := range mm.FieldList() {
|
||||
|
||||
metric := &MetricLine{
|
||||
Metric: fmt.Sprintf("%s%s", w.Prefix, mm.Name()+"."+field.Key),
|
||||
Timestamp: now.UnixNano() / 1000,
|
||||
}
|
||||
|
||||
metricValue, err := buildValue(field.Value)
|
||||
if err != nil {
|
||||
log.Printf("E! [outputs.warp10] Could not encode value: %v", err)
|
||||
continue
|
||||
}
|
||||
metric.Value = metricValue
|
||||
|
||||
tagsSlice := buildTags(mm.TagList())
|
||||
metric.Tags = strings.Join(tagsSlice, ",")
|
||||
|
||||
messageLine := fmt.Sprintf("%d// %s{%s} %s\n", metric.Timestamp, metric.Metric, metric.Tags, metric.Value)
|
||||
|
||||
collectString = append(collectString, messageLine)
|
||||
}
|
||||
}
|
||||
return fmt.Sprint(strings.Join(collectString, ""))
|
||||
}
|
||||
|
||||
// Write metrics to Warp10
|
||||
func (w *Warp10) Write(metrics []telegraf.Metric) error {
|
||||
|
||||
var now = time.Now()
|
||||
payload := w.GenWarp10Payload(metrics, now)
|
||||
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", w.WarpURL+"/api/v0/update", bytes.NewBufferString(payload))
|
||||
req.Header.Set("X-Warp10-Token", w.Token)
|
||||
req.Header.Set("Content-Type", "text/plain")
|
||||
|
||||
resp, err := w.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if w.PrintErrorBody {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
return fmt.Errorf(w.WarpURL + ": " + w.HandleError(string(body), w.MaxStringErrorSize))
|
||||
}
|
||||
|
||||
if len(resp.Status) < w.MaxStringErrorSize {
|
||||
return fmt.Errorf(w.WarpURL + ": " + resp.Status)
|
||||
}
|
||||
|
||||
return fmt.Errorf(w.WarpURL + ": " + resp.Status[0:w.MaxStringErrorSize])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildTags(tags []*telegraf.Tag) []string {
|
||||
|
||||
tagsString := make([]string, len(tags)+1)
|
||||
indexSource := 0
|
||||
for index, tag := range tags {
|
||||
tagsString[index] = fmt.Sprintf("%s=%s", tag.Key, tag.Value)
|
||||
indexSource = index
|
||||
}
|
||||
indexSource++
|
||||
tagsString[indexSource] = fmt.Sprintf("source=telegraf")
|
||||
sort.Strings(tagsString)
|
||||
return tagsString
|
||||
}
|
||||
|
||||
func buildValue(v interface{}) (string, error) {
|
||||
var retv string
|
||||
switch p := v.(type) {
|
||||
case int64:
|
||||
retv = intToString(int64(p))
|
||||
case string:
|
||||
retv = fmt.Sprintf("'%s'", strings.Replace(p, "'", "\\'", -1))
|
||||
case bool:
|
||||
retv = boolToString(bool(p))
|
||||
case uint64:
|
||||
retv = uIntToString(uint64(p))
|
||||
case float64:
|
||||
retv = floatToString(float64(p))
|
||||
default:
|
||||
retv = "'" + strings.Replace(fmt.Sprintf("%s", p), "'", "\\'", -1) + "'"
|
||||
}
|
||||
return retv, nil
|
||||
}
|
||||
|
||||
func intToString(inputNum int64) string {
|
||||
return strconv.FormatInt(inputNum, 10)
|
||||
}
|
||||
|
||||
func boolToString(inputBool bool) string {
|
||||
return strconv.FormatBool(inputBool)
|
||||
}
|
||||
|
||||
func uIntToString(inputNum uint64) string {
|
||||
return strconv.FormatUint(inputNum, 10)
|
||||
}
|
||||
|
||||
func floatToString(inputNum float64) string {
|
||||
return strconv.FormatFloat(inputNum, 'f', 6, 64)
|
||||
}
|
||||
|
||||
// SampleConfig get config
|
||||
func (w *Warp10) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description get description
|
||||
func (w *Warp10) Description() string {
|
||||
return "Configuration for Warp server to send metrics to"
|
||||
}
|
||||
|
||||
// Close close
|
||||
func (w *Warp10) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init Warp10 struct
|
||||
func (w *Warp10) Init() error {
|
||||
if w.MaxStringErrorSize <= 0 {
|
||||
w.MaxStringErrorSize = 511
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("warp10", func() telegraf.Output {
|
||||
return &Warp10{}
|
||||
})
|
||||
}
|
||||
|
||||
// HandleError read http error body and return a corresponding error
|
||||
func (w *Warp10) HandleError(body string, maxStringSize int) string {
|
||||
if body == "" {
|
||||
return "Empty return"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Invalid token") {
|
||||
return "Invalid token"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Write token missing") {
|
||||
return "Write token missing"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Token Expired") {
|
||||
return "Token Expired"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Token revoked") {
|
||||
return "Token revoked"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "exceed your Monthly Active Data Streams limit") || strings.Contains(body, "exceed the Monthly Active Data Streams limit") {
|
||||
return "Exceeded Monthly Active Data Streams limit"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Daily Data Points limit being already exceeded") {
|
||||
return "Exceeded Daily Data Points limit"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "Application suspended or closed") {
|
||||
return "Application suspended or closed"
|
||||
}
|
||||
|
||||
if strings.Contains(body, "broken pipe") {
|
||||
return "broken pipe"
|
||||
}
|
||||
|
||||
if len(body) < maxStringSize {
|
||||
return body
|
||||
}
|
||||
return body[0:maxStringSize]
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package warp10
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
type ErrorTest struct {
|
||||
Message string
|
||||
Expected string
|
||||
}
|
||||
|
||||
func TestWriteWarp10(t *testing.T) {
|
||||
w := Warp10{
|
||||
Prefix: "unit.test",
|
||||
WarpURL: "http://localhost:8090",
|
||||
Token: "WRITE",
|
||||
}
|
||||
|
||||
var now = time.Now()
|
||||
payload := w.GenWarp10Payload(testutil.MockMetrics(), now)
|
||||
require.Exactly(t, fmt.Sprintf("%d// unit.testtest1.value{source=telegraf,tag1=value1} 1.000000\n", now.UnixNano()/1000), payload)
|
||||
}
|
||||
|
||||
func TestHandleWarp10Error(t *testing.T) {
|
||||
w := Warp10{
|
||||
Prefix: "unit.test",
|
||||
WarpURL: "http://localhost:8090",
|
||||
Token: "WRITE",
|
||||
}
|
||||
tests := [...]*ErrorTest{
|
||||
{
|
||||
Message: `
|
||||
<html>
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
|
||||
<title>Error 500 io.warp10.script.WarpScriptException: Invalid token.</title>
|
||||
</head>
|
||||
<body><h2>HTTP ERROR 500</h2>
|
||||
<p>Problem accessing /api/v0/update. Reason:
|
||||
<pre> io.warp10.script.WarpScriptException: Invalid token.</pre></p>
|
||||
</body>
|
||||
</html>
|
||||
`,
|
||||
Expected: fmt.Sprintf("Invalid token"),
|
||||
},
|
||||
{
|
||||
Message: `
|
||||
<html>
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
|
||||
<title>Error 500 io.warp10.script.WarpScriptException: Token Expired.</title>
|
||||
</head>
|
||||
<body><h2>HTTP ERROR 500</h2>
|
||||
<p>Problem accessing /api/v0/update. Reason:
|
||||
<pre> io.warp10.script.WarpScriptException: Token Expired.</pre></p>
|
||||
</body>
|
||||
</html>
|
||||
`,
|
||||
Expected: fmt.Sprintf("Token Expired"),
|
||||
},
|
||||
{
|
||||
Message: `
|
||||
<html>
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
|
||||
<title>Error 500 io.warp10.script.WarpScriptException: Token revoked.</title>
|
||||
</head>
|
||||
<body><h2>HTTP ERROR 500</h2>
|
||||
<p>Problem accessing /api/v0/update. Reason:
|
||||
<pre> io.warp10.script.WarpScriptException: Token revoked.</pre></p>
|
||||
</body>
|
||||
</html>
|
||||
`,
|
||||
Expected: fmt.Sprintf("Token revoked"),
|
||||
},
|
||||
{
|
||||
Message: `
|
||||
<html>
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
|
||||
<title>Error 500 io.warp10.script.WarpScriptException: Write token missing.</title>
|
||||
</head>
|
||||
<body><h2>HTTP ERROR 500</h2>
|
||||
<p>Problem accessing /api/v0/update. Reason:
|
||||
<pre> io.warp10.script.WarpScriptException: Write token missing.</pre></p>
|
||||
</body>
|
||||
</html>
|
||||
`,
|
||||
Expected: "Write token missing",
|
||||
},
|
||||
{
|
||||
Message: `<title>Error 503: server unavailable</title>`,
|
||||
Expected: "<title>Error 503: server unavailable</title>",
|
||||
},
|
||||
}
|
||||
|
||||
for _, handledError := range tests {
|
||||
payload := w.HandleError(handledError.Message, 511)
|
||||
require.Exactly(t, handledError.Expected, payload)
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue