From 93d44e1a6af1a0e838ca15ffafe434a1da3ed4ca Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 20 Jul 2016 12:51:47 -0400 Subject: [PATCH] Refactored code to separate http handling from opentsdb module. Added batching support. --- plugins/outputs/opentsdb/opentsdb.go | 57 ++----- plugins/outputs/opentsdb/opentsdb_http.go | 179 ++++++++++++++++++++++ 2 files changed, 195 insertions(+), 41 deletions(-) create mode 100644 plugins/outputs/opentsdb/opentsdb_http.go diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index c0eea6bdc..42d6c0f3e 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -7,11 +7,6 @@ import ( "strconv" "strings" "time" - "encoding/json" - //"os" - "io" - "net/http" - "bytes" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" @@ -24,6 +19,7 @@ type OpenTSDB struct { Port int UseHttp bool + BatchSize int Debug bool } @@ -45,6 +41,10 @@ var sampleConfig = ` ## Use Http PUT API useHttp = false + ## Number of data points to send to OpenTSDB in Http requests. + ## Not used when useHttp is false. + batchSize = 50 + ## Debug true - Prints OpenTSDB communication debug = false ` @@ -56,13 +56,6 @@ type MetricLine struct { Tags string } -type HttpMetric struct { - Metric string `json:"metric"` - Timestamp int64 `json:"timestamp"` - Value string `json:"value"` - Tags map[string]string `json:"tags"` -} - func (o *OpenTSDB) Connect() error { // Test Connection to OpenTSDB Server uri := fmt.Sprintf("%s:%d", o.Host, o.Port) @@ -91,16 +84,12 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { } func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { - - b := new(bytes.Buffer) - - enc := json.NewEncoder(b) - - - - io.WriteString(b, "[") - - i := 0 + http := openTSDBHttp{ + Host: o.Host, + Port: o.Port, + BatchSize: o.BatchSize, + Debug: o.Debug, + } for _, m := range metrics { now := m.UnixNano() / 1000000000 @@ -111,10 +100,6 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { continue } - if i > 0 { - io.WriteString(b, ",") - } - metric := &HttpMetric{ Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)), @@ -123,24 +108,14 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { Value: metricValue, } - if err := enc.Encode(metric); err != nil { - return fmt.Errorf("Metric serialization error %s", err.Error()) - } - - i++; + if err:= http.sendDataPoint(metric); err != nil { + return err + } } } - io.WriteString(b, "]") - - uri := fmt.Sprintf("http://%s:%d/api/put", o.Host, o.Port) - resp, err := http.Post(uri, "applicaton/json", b) - if err != nil { - return fmt.Errorf("Error when sending metrics: %s", err.Error()) - } - - if resp.StatusCode == 200 { - fmt.Println("Sent metrics !") + if err:= http.flush(); err != nil { + return err } return nil diff --git a/plugins/outputs/opentsdb/opentsdb_http.go b/plugins/outputs/opentsdb/opentsdb_http.go new file mode 100644 index 000000000..8556a4000 --- /dev/null +++ b/plugins/outputs/opentsdb/opentsdb_http.go @@ -0,0 +1,179 @@ +package opentsdb + +import ( + "fmt" + "encoding/json" + //"os" + "io" + //"io/ioutil" + "net/http" + "net/http/httputil" + "net/url" + "bytes" + "compress/gzip" + "log" +) + +type HttpMetric struct { + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` + Tags map[string]string `json:"tags"` +} + + +type openTSDBHttp struct { + Host string + Port int + BatchSize int + Debug bool + + metricCounter int + body requestBody +} + +type requestBody struct { + b bytes.Buffer + g *gzip.Writer + + dbgB bytes.Buffer + + w io.Writer + enc *json.Encoder + + empty bool +} + +func (r *requestBody) reset(debug bool) { + r.b.Reset() + r.dbgB.Reset() + + if r.g == nil { + r.g = gzip.NewWriter(&r.b) + } else { + r.g.Reset(&r.b) + } + + if debug { + r.w = io.MultiWriter(r.g, &r.dbgB) + } else { + r.w = r.g + } + + r.enc = json.NewEncoder(r.w) + + io.WriteString(r.w, "[") + + r.empty = true +} + +func (r *requestBody) addMetric(metric *HttpMetric) error { + if !r.empty { + io.WriteString(r.w, ",") + } + + if err := r.enc.Encode(metric); err != nil { + return fmt.Errorf("Metric serialization error %s", err.Error()) + } + + r.empty = false + + return nil +} + +func (r *requestBody) close() error { + io.WriteString(r.w, "]") + + if err := r.g.Close(); err != nil { + return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) + } + + return nil +} + +func (o *openTSDBHttp) startNewRequest() error { + o.body.reset(o.Debug) + + return nil +} + +func (o *openTSDBHttp) sendDataPoint(metric *HttpMetric) error { + if o.metricCounter == 0 { + o.startNewRequest() + } + + if err := o.body.addMetric(metric); err != nil { + return err + } + + o.metricCounter++ + if o.metricCounter == o.BatchSize { + if err := o.flush(); err != nil { + return err + } + + o.metricCounter = 0 + } + + return nil +} + +func (o *openTSDBHttp) flush() error { + if o.metricCounter == 0 { + return nil + } + + o.body.close() + + u := url.URL { + Scheme: "http", + Host: fmt.Sprintf("%s:%d", o.Host, o.Port), + Path: "/api/put", + } + + if (o.Debug) { + u.RawQuery = "details" + } + + req, err := http.NewRequest("POST", u.String(), &o.body.b) + if err != nil { + return fmt.Errorf("Error when building request: %s", err.Error()) + } + req.Header.Add("Content-Type", "applicaton/json") + req.Header.Set("Content-Encoding", "gzip") + + if (o.Debug) { + dump, err := httputil.DumpRequestOut(req, false) + if err != nil { + return fmt.Errorf("Error when dumping request: %s", err.Error()) + } + + fmt.Printf("Sending metrics:\n%s", dump) + fmt.Printf("Body:\n%s\n\n", o.body.dbgB.String()) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("Error when sending metrics: %s", err.Error()) + } + defer resp.Body.Close() + + if o.Debug { + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + return fmt.Errorf("Error when dumping response: %s", err.Error()) + } + + fmt.Printf("Received response\n%s\n\n", dump) + } + + if resp.StatusCode/100 != 2 { + if resp.StatusCode/100 == 4 { + log.Printf("WARNING: Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode) + } else { + return fmt.Errorf("Error when sending metrics.Received status %d", resp.StatusCode) + } + } + + return nil +}