diff --git a/CHANGELOG.md b/CHANGELOG.md index 64cdb7852..9efc05a34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ renamed to `sqlserver_azure_db_resource_stats` due to an issue where numeric metrics were previously being reported incorrectly as strings. +#### New Outputs + +- [warp10](/plugins/outputs/warp10/README.md) - Contributed by @aurrelhebert + #### Features - [#6730](https://github.com/influxdata/telegraf/pull/6730): Add page_faults for mongodb wired tiger. diff --git a/plugins/outputs/warp10/README.md b/plugins/outputs/warp10/README.md index df56e6816..07e6cd25b 100644 --- a/plugins/outputs/warp10/README.md +++ b/plugins/outputs/warp10/README.md @@ -1,30 +1,50 @@ -# README # +# Warp10 Output Plugin -Telegraph plugin to push metrics on Warp10 +The `warp10` output plugin writes metrics to [Warp 10][]. -### Telegraph output for Warp10 ### +### Configuration -Execute a post http on Warp10 at every flush time configured in telegraph in order to push the metrics collected - -### Config ### - -Add following instruction in the config file (Output part) - -``` +```toml [[outputs.warp10]] -warpUrl = "http://localhost:4242" -token = "token" -prefix = "telegraf." -timeout = "15s" + # Prefix to add to the measurement. + prefix = "telegraf." + + # URL of the Warp 10 server + warp_url = "http://localhost:8080" + + # Write token to access your app on warp 10 + token = "Token" + + # Warp 10 query timeout + # timeout = "15s" + + ## Print Warp 10 error body + # print_error_body = false + + ## Max string error size + # max_string_error_size = 511 + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ``` -To get more details on Warp 10 errors occuring when pushing data with Telegraf, you can optionaly set: +### Output Format -``` -printErrorBody = true ## To print the full body of the HTTP Post instead of the request status -maxStringErrorSize = 700  ## To update the maximal string size of the Warp 10 error body. By default it's set to 512. -``` +Metrics are converted and sent using the [Geo Time Series][] (GTS) input format. -### Values format +The class name of the reading is produced by combining the value of the +`prefix` option, the measurement name, and the field key. A dot (`.`) +character is used as the joining character. -The Warp 10 output support natively number, float and boolean values. String are send as URL encoded values as well as all Influx objects. \ No newline at end of file +The GTS form provides support for the Telegraf integer, float, boolean, and +string types directly. Unsigned integer fields will be capped to the largest +64-bit integer (2^63-1) in case of overflow. + +Timestamps are sent in microsecond precision. + +[Warp 10]: https://www.warp10.io +[Geo Time Series]: https://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/02_GTS_input_format diff --git a/plugins/outputs/warp10/warp10.go b/plugins/outputs/warp10/warp10.go index deaefc6fc..eead153e0 100644 --- a/plugins/outputs/warp10/warp10.go +++ b/plugins/outputs/warp10/warp10.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "net/http" "sort" "strconv" @@ -23,34 +24,41 @@ const ( // Warp10 output plugin type Warp10 struct { - Prefix string - WarpURL string - Token string + Prefix string `toml:"prefix"` + WarpURL string `toml:"warp_url"` + Token string `toml:"token"` Timeout internal.Duration `toml:"timeout"` - PrintErrorBody bool - MaxStringErrorSize int + PrintErrorBody bool `toml:"print_error_body"` + MaxStringErrorSize int `toml:"max_string_error_size"` client *http.Client tls.ClientConfig } var sampleConfig = ` - # prefix for metrics class Name + # Prefix to add to the measurement. prefix = "telegraf." - ## POST HTTP(or HTTPS) ## - # Url name of the Warp 10 server + + # URL of the Warp 10 server warp_url = "http://localhost:8080" - # Token to access your app on warp 10 + + # Write token to access your app on warp 10 token = "Token" - # Warp 10 query timeout, by default 15s - timeout = "15s" - ## Optional Print Warp 10 error body + + # Warp 10 query timeout + # timeout = "15s" + + ## Print Warp 10 error body # print_error_body = false - ## Optional Max string error Size + + ## Max string error size # max_string_error_size = 511 + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ` // MetricLine Warp 10 metrics @@ -94,7 +102,7 @@ func (w *Warp10) Connect() error { } // GenWarp10Payload compute Warp 10 metrics payload -func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric, now time.Time) string { +func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric) string { collectString := make([]string, 0) for _, mm := range metrics { @@ -102,7 +110,7 @@ func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric, now time.Time) stri metric := &MetricLine{ Metric: fmt.Sprintf("%s%s", w.Prefix, mm.Name()+"."+field.Key), - Timestamp: now.UnixNano() / 1000, + Timestamp: mm.Time().UnixNano() / 1000, } metricValue, err := buildValue(field.Value) @@ -125,10 +133,7 @@ func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric, now time.Time) stri // Write metrics to Warp10 func (w *Warp10) Write(metrics []telegraf.Metric) error { - - var now = time.Now() - payload := w.GenWarp10Payload(metrics, now) - + payload := w.GenWarp10Payload(metrics) if payload == "" { return nil } @@ -177,17 +182,21 @@ func buildValue(v interface{}) (string, error) { var retv string switch p := v.(type) { case int64: - retv = intToString(int64(p)) + retv = intToString(p) case string: retv = fmt.Sprintf("'%s'", strings.Replace(p, "'", "\\'", -1)) case bool: - retv = boolToString(bool(p)) + retv = boolToString(p) case uint64: - retv = uIntToString(uint64(p)) + if p <= uint64(math.MaxInt64) { + retv = strconv.FormatInt(int64(p), 10) + } else { + retv = strconv.FormatInt(math.MaxInt64, 10) + } case float64: retv = floatToString(float64(p)) default: - retv = "'" + strings.Replace(fmt.Sprintf("%s", p), "'", "\\'", -1) + "'" + return "", fmt.Errorf("unsupported type: %T", v) } return retv, nil } @@ -215,7 +224,7 @@ func (w *Warp10) SampleConfig() string { // Description get description func (w *Warp10) Description() string { - return "Configuration for Warp server to send metrics to" + return "Write metrics to Warp 10" } // Close close diff --git a/plugins/outputs/warp10/warp10_test.go b/plugins/outputs/warp10/warp10_test.go index e222b7d93..5b543b34c 100644 --- a/plugins/outputs/warp10/warp10_test.go +++ b/plugins/outputs/warp10/warp10_test.go @@ -3,11 +3,9 @@ package warp10 import ( "fmt" "testing" - "time" - - "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) type ErrorTest struct { @@ -22,9 +20,8 @@ func TestWriteWarp10(t *testing.T) { Token: "WRITE", } - var now = time.Now() - payload := w.GenWarp10Payload(testutil.MockMetrics(), now) - require.Exactly(t, fmt.Sprintf("%d// unit.testtest1.value{source=telegraf,tag1=value1} 1.000000\n", now.UnixNano()/1000), payload) + payload := w.GenWarp10Payload(testutil.MockMetrics()) + require.Exactly(t, "1257894000000000// unit.testtest1.value{source=telegraf,tag1=value1} 1.000000\n", payload) } func TestHandleWarp10Error(t *testing.T) {