diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 4675dfffe..c0eea6bdc 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -7,6 +7,11 @@ import ( "strconv" "strings" "time" + "encoding/json" + //"os" + "io" + "net/http" + "bytes" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" @@ -18,6 +23,8 @@ type OpenTSDB struct { Host string Port int + UseHttp bool + Debug bool } @@ -29,12 +36,15 @@ var sampleConfig = ` prefix = "my.specific.prefix." ## Telnet Mode ## - ## DNS name of the OpenTSDB server in telnet mode + ## DNS name of the OpenTSDB server host = "opentsdb.example.com" ## Port of the OpenTSDB server in telnet mode port = 4242 + ## Use Http PUT API + useHttp = false + ## Debug true - Prints OpenTSDB communication debug = false ` @@ -46,6 +56,13 @@ type MetricLine struct { Tags string } +type HttpMetric struct { + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` + Tags map[string]string `json:"tags"` +} + func (o *OpenTSDB) Connect() error { // Test Connection to OpenTSDB Server uri := fmt.Sprintf("%s:%d", o.Host, o.Port) @@ -65,6 +82,71 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } + + if o.UseHttp { + return o.WriteHttp(metrics) + } else { + return o.WriteTelnet(metrics) + } +} + +func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { + + b := new(bytes.Buffer) + + enc := json.NewEncoder(b) + + + + io.WriteString(b, "[") + + i := 0 + + for _, m := range metrics { + now := m.UnixNano() / 1000000000 + for fieldName, value := range m.Fields() { + metricValue, buildError := buildValue(value) + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue + } + + if i > 0 { + io.WriteString(b, ",") + } + + metric := &HttpMetric{ + Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", + o.Prefix, m.Name(), fieldName)), + Tags: m.Tags(), + Timestamp: now, + Value: metricValue, + } + + if err := enc.Encode(metric); err != nil { + return fmt.Errorf("Metric serialization error %s", err.Error()) + } + + i++; + } + } + + io.WriteString(b, "]") + + uri := fmt.Sprintf("http://%s:%d/api/put", o.Host, o.Port) + resp, err := http.Post(uri, "applicaton/json", b) + if err != nil { + return fmt.Errorf("Error when sending metrics: %s", err.Error()) + } + + if resp.StatusCode == 200 { + fmt.Println("Sent metrics !") + } + + return nil +} + +func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric) error { now := time.Now() // Send Data with telnet / socket communication