package warp10 import ( "bytes" "fmt" "io/ioutil" "log" "math" "net/http" "sort" "strconv" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/outputs" ) const ( defaultClientTimeout = 15 * time.Second ) // Warp10 output plugin type Warp10 struct { Prefix string `toml:"prefix"` WarpURL string `toml:"warp_url"` Token string `toml:"token"` Timeout internal.Duration `toml:"timeout"` PrintErrorBody bool `toml:"print_error_body"` MaxStringErrorSize int `toml:"max_string_error_size"` client *http.Client tls.ClientConfig } var sampleConfig = ` # Prefix to add to the measurement. prefix = "telegraf." # URL of the Warp 10 server warp_url = "http://localhost:8080" # Write token to access your app on warp 10 token = "Token" # Warp 10 query timeout # timeout = "15s" ## Print Warp 10 error body # print_error_body = false ## Max string error size # max_string_error_size = 511 ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ` // MetricLine Warp 10 metrics type MetricLine struct { Metric string Timestamp int64 Value string Tags string } func (w *Warp10) createClient() (*http.Client, error) { tlsCfg, err := w.ClientConfig.TLSConfig() if err != nil { return nil, err } if w.Timeout.Duration == 0 { w.Timeout.Duration = defaultClientTimeout } client := &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, Proxy: http.ProxyFromEnvironment, }, Timeout: w.Timeout.Duration, } return client, nil } // Connect to warp10 func (w *Warp10) Connect() error { client, err := w.createClient() if err != nil { return err } w.client = client return nil } // GenWarp10Payload compute Warp 10 metrics payload func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric) string { collectString := make([]string, 0) for _, mm := range metrics { for _, field := range mm.FieldList() { metric := &MetricLine{ Metric: fmt.Sprintf("%s%s", w.Prefix, mm.Name()+"."+field.Key), Timestamp: mm.Time().UnixNano() / 1000, } metricValue, err := buildValue(field.Value) if err != nil { log.Printf("E! [outputs.warp10] Could not encode value: %v", err) continue } metric.Value = metricValue tagsSlice := buildTags(mm.TagList()) metric.Tags = strings.Join(tagsSlice, ",") messageLine := fmt.Sprintf("%d// %s{%s} %s\n", metric.Timestamp, metric.Metric, metric.Tags, metric.Value) collectString = append(collectString, messageLine) } } return fmt.Sprint(strings.Join(collectString, "")) } // Write metrics to Warp10 func (w *Warp10) Write(metrics []telegraf.Metric) error { payload := w.GenWarp10Payload(metrics) if payload == "" { return nil } req, err := http.NewRequest("POST", w.WarpURL+"/api/v0/update", bytes.NewBufferString(payload)) req.Header.Set("X-Warp10-Token", w.Token) req.Header.Set("Content-Type", "text/plain") resp, err := w.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { if w.PrintErrorBody { body, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf(w.WarpURL + ": " + w.HandleError(string(body), w.MaxStringErrorSize)) } if len(resp.Status) < w.MaxStringErrorSize { return fmt.Errorf(w.WarpURL + ": " + resp.Status) } return fmt.Errorf(w.WarpURL + ": " + resp.Status[0:w.MaxStringErrorSize]) } return nil } func buildTags(tags []*telegraf.Tag) []string { tagsString := make([]string, len(tags)+1) indexSource := 0 for index, tag := range tags { tagsString[index] = fmt.Sprintf("%s=%s", tag.Key, tag.Value) indexSource = index } indexSource++ tagsString[indexSource] = fmt.Sprintf("source=telegraf") sort.Strings(tagsString) return tagsString } func buildValue(v interface{}) (string, error) { var retv string switch p := v.(type) { case int64: retv = intToString(p) case string: retv = fmt.Sprintf("'%s'", strings.Replace(p, "'", "\\'", -1)) case bool: retv = boolToString(p) case uint64: if p <= uint64(math.MaxInt64) { retv = strconv.FormatInt(int64(p), 10) } else { retv = strconv.FormatInt(math.MaxInt64, 10) } case float64: retv = floatToString(float64(p)) default: return "", fmt.Errorf("unsupported type: %T", v) } return retv, nil } func intToString(inputNum int64) string { return strconv.FormatInt(inputNum, 10) } func boolToString(inputBool bool) string { return strconv.FormatBool(inputBool) } func uIntToString(inputNum uint64) string { return strconv.FormatUint(inputNum, 10) } func floatToString(inputNum float64) string { return strconv.FormatFloat(inputNum, 'f', 6, 64) } // SampleConfig get config func (w *Warp10) SampleConfig() string { return sampleConfig } // Description get description func (w *Warp10) Description() string { return "Write metrics to Warp 10" } // Close close func (w *Warp10) Close() error { return nil } // Init Warp10 struct func (w *Warp10) Init() error { if w.MaxStringErrorSize <= 0 { w.MaxStringErrorSize = 511 } return nil } func init() { outputs.Add("warp10", func() telegraf.Output { return &Warp10{} }) } // HandleError read http error body and return a corresponding error func (w *Warp10) HandleError(body string, maxStringSize int) string { if body == "" { return "Empty return" } if strings.Contains(body, "Invalid token") { return "Invalid token" } if strings.Contains(body, "Write token missing") { return "Write token missing" } if strings.Contains(body, "Token Expired") { return "Token Expired" } if strings.Contains(body, "Token revoked") { return "Token revoked" } if strings.Contains(body, "exceed your Monthly Active Data Streams limit") || strings.Contains(body, "exceed the Monthly Active Data Streams limit") { return "Exceeded Monthly Active Data Streams limit" } if strings.Contains(body, "Daily Data Points limit being already exceeded") { return "Exceeded Daily Data Points limit" } if strings.Contains(body, "Application suspended or closed") { return "Application suspended or closed" } if strings.Contains(body, "broken pipe") { return "broken pipe" } if len(body) < maxStringSize { return body } return body[0:maxStringSize] }