diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 1d0a38e9b..bb6d150e3 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -17,7 +17,7 @@ type OpenTSDB struct { Host string Port int - UseHttp bool + UseHttp bool BatchSize int Debug bool @@ -47,6 +47,7 @@ var sampleConfig = ` ## Debug true - Prints OpenTSDB communication debug = false ` + type TagSet map[string]string func (t TagSet) ToLineFormat() string { @@ -89,10 +90,10 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { http := openTSDBHttp{ - Host: o.Host, - Port: o.Port, + Host: o.Host, + Port: o.Port, BatchSize: o.BatchSize, - Debug: o.Debug, + Debug: o.Debug, } for _, m := range metrics { @@ -106,21 +107,21 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { continue } - metric := &HttpMetric{ - Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", - o.Prefix, m.Name(), fieldName)), - Tags: tags, + metric := &HttpMetric{ + Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", + o.Prefix, m.Name(), fieldName)), + Tags: tags, Timestamp: now, - Value: metricValue, - } + Value: metricValue, + } - if err:= http.sendDataPoint(metric); err != nil { + if err := http.sendDataPoint(metric); err != nil { return err } } } - if err:= http.flush(); err != nil { + if err := http.flush(); err != nil { return err } @@ -149,7 +150,7 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric) error { } messageLine := fmt.Sprintf("put %s %v %s %s\n", - sanitizedChars.Replace(fmt.Sprintf("%s%s_%s",o.Prefix, m.Name(), fieldName)), + sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)), now, metricValue, tags) if o.Debug { diff --git a/plugins/outputs/opentsdb/opentsdb_http.go b/plugins/outputs/opentsdb/opentsdb_http.go index 0f391abcf..27e4afdda 100644 --- a/plugins/outputs/opentsdb/opentsdb_http.go +++ b/plugins/outputs/opentsdb/opentsdb_http.go @@ -1,130 +1,129 @@ package opentsdb import ( - "fmt" - "encoding/json" + "bytes" + "compress/gzip" + "encoding/json" + "fmt" "io" "io/ioutil" + "log" "net/http" "net/http/httputil" "net/url" - "bytes" - "compress/gzip" - "log" ) type HttpMetric struct { - Metric string `json:"metric"` - Timestamp int64 `json:"timestamp"` - Value string `json:"value"` + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` Tags map[string]string `json:"tags"` } - type openTSDBHttp struct { - Host string - Port int - BatchSize int - Debug bool + Host string + Port int + BatchSize int + Debug bool - metricCounter int - body requestBody + metricCounter int + body requestBody } type requestBody struct { - b bytes.Buffer - g *gzip.Writer + b bytes.Buffer + g *gzip.Writer - dbgB bytes.Buffer + dbgB bytes.Buffer - w io.Writer - enc *json.Encoder + w io.Writer + enc *json.Encoder - empty bool + empty bool } func (r *requestBody) reset(debug bool) { - r.b.Reset() - r.dbgB.Reset() + r.b.Reset() + r.dbgB.Reset() - if r.g == nil { - r.g = gzip.NewWriter(&r.b) - } else { - r.g.Reset(&r.b) - } + if r.g == nil { + r.g = gzip.NewWriter(&r.b) + } else { + r.g.Reset(&r.b) + } - if debug { - r.w = io.MultiWriter(r.g, &r.dbgB) - } else { - r.w = r.g - } + if debug { + r.w = io.MultiWriter(r.g, &r.dbgB) + } else { + r.w = r.g + } - r.enc = json.NewEncoder(r.w) + r.enc = json.NewEncoder(r.w) - io.WriteString(r.w, "[") + io.WriteString(r.w, "[") - r.empty = true + r.empty = true } func (r *requestBody) addMetric(metric *HttpMetric) error { - if !r.empty { - io.WriteString(r.w, ",") - } + if !r.empty { + io.WriteString(r.w, ",") + } - if err := r.enc.Encode(metric); err != nil { - return fmt.Errorf("Metric serialization error %s", err.Error()) - } + if err := r.enc.Encode(metric); err != nil { + return fmt.Errorf("Metric serialization error %s", err.Error()) + } - r.empty = false + r.empty = false - return nil + return nil } func (r *requestBody) close() error { - io.WriteString(r.w, "]") + io.WriteString(r.w, "]") - if err := r.g.Close(); err != nil { - return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) - } + if err := r.g.Close(); err != nil { + return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) + } - return nil + return nil } func (o *openTSDBHttp) sendDataPoint(metric *HttpMetric) error { - if o.metricCounter == 0 { - o.body.reset(o.Debug) - } + if o.metricCounter == 0 { + o.body.reset(o.Debug) + } - if err := o.body.addMetric(metric); err != nil { - return err - } + if err := o.body.addMetric(metric); err != nil { + return err + } - o.metricCounter++ - if o.metricCounter == o.BatchSize { - if err := o.flush(); err != nil { - return err - } + o.metricCounter++ + if o.metricCounter == o.BatchSize { + if err := o.flush(); err != nil { + return err + } - o.metricCounter = 0 - } + o.metricCounter = 0 + } - return nil + return nil } func (o *openTSDBHttp) flush() error { - if o.metricCounter == 0 { - return nil - } + if o.metricCounter == 0 { + return nil + } - o.body.close() + o.body.close() - u := url.URL { + u := url.URL{ Scheme: "http", Host: fmt.Sprintf("%s:%d", o.Host, o.Port), Path: "/api/put", } - if (o.Debug) { + if o.Debug { u.RawQuery = "details" } @@ -135,7 +134,7 @@ func (o *openTSDBHttp) flush() error { req.Header.Set("Content-Type", "applicaton/json") req.Header.Set("Content-Encoding", "gzip") - if (o.Debug) { + if o.Debug { dump, err := httputil.DumpRequestOut(req, false) if err != nil { return fmt.Errorf("Error when dumping request: %s", err.Error()) @@ -145,23 +144,23 @@ func (o *openTSDBHttp) flush() error { fmt.Printf("Body:\n%s\n\n", o.body.dbgB.String()) } - resp, err := http.DefaultClient.Do(req) + resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("Error when sending metrics: %s", err.Error()) } defer resp.Body.Close() - if o.Debug { + if o.Debug { dump, err := httputil.DumpResponse(resp, true) if err != nil { return fmt.Errorf("Error when dumping response: %s", err.Error()) - } + } fmt.Printf("Received response\n%s\n\n", dump) } else { - // Important so http client reuse connection for next request if need be. - io.Copy(ioutil.Discard, resp.Body) - } + // Important so http client reuse connection for next request if need be. + io.Copy(ioutil.Discard, resp.Body) + } if resp.StatusCode/100 != 2 { if resp.StatusCode/100 == 4 { diff --git a/plugins/outputs/opentsdb/opentsdb_test.go b/plugins/outputs/opentsdb/opentsdb_test.go index 700cec007..2871b7bdb 100644 --- a/plugins/outputs/opentsdb/opentsdb_test.go +++ b/plugins/outputs/opentsdb/opentsdb_test.go @@ -1,14 +1,14 @@ package opentsdb import ( - "reflect" - "testing" "fmt" "net" "net/http" "net/http/httptest" "net/url" + "reflect" "strconv" + "testing" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" @@ -77,18 +77,17 @@ func TestBuildTagsTelnet(t *testing.T) { func BenchmarkHttpSend(b *testing.B) { const BatchSize = 50 - const MetricsCount = 4*BatchSize + const MetricsCount = 4 * BatchSize metrics := make([]telegraf.Metric, MetricsCount) - for i:=0; i