diff --git a/CHANGELOG.md b/CHANGELOG.md index 665add6ca..f40abf279 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#1513](https://github.com/influxdata/telegraf/issues/1513): Add Ceph Cluster Performance Statistics - [#1650](https://github.com/influxdata/telegraf/issues/1650): Ability to configure response_timeout in httpjson input. - [#1685](https://github.com/influxdata/telegraf/issues/1685): Add additional redis metrics. +- [#1539](https://github.com/influxdata/telegraf/pull/1539): Added capability to send metrics through Http API for OpenTSDB. ### Bugfixes diff --git a/plugins/outputs/opentsdb/README.md b/plugins/outputs/opentsdb/README.md index 59a03d3fd..2fd0bd2d8 100644 --- a/plugins/outputs/opentsdb/README.md +++ b/plugins/outputs/opentsdb/README.md @@ -1,6 +1,12 @@ # OpenTSDB Output Plugin -This plugin writes to a OpenTSDB instance using the "telnet" mode +This plugin writes to an OpenTSDB instance using either the "telnet" or Http mode. + +Using the Http API is the recommended way of writing metrics since OpenTSDB 2.0 +To use Http mode, set useHttp to true in config. You can also control how many +metrics is sent in each http request by setting batchSize in config. + +See http://opentsdb.net/docs/build/html/api_http/put.html for details. ## Transfer "Protocol" in the telnet mode @@ -10,14 +16,14 @@ The expected input from OpenTSDB is specified in the following way: put ``` -The telegraf output plugin adds an optional prefix to the metric keys so +The telegraf output plugin adds an optional prefix to the metric keys so that a subamount can be selected. ``` put <[prefix.]metric> ``` -### Example +### Example ``` put nine.telegraf.system_load1 1441910356 0.430000 dc=homeoffice host=irimame scope=green @@ -38,12 +44,12 @@ put nine.telegraf.ping_average_response_ms 1441910366 24.006000 dc=homeoffice ho ... ``` -## +## -The OpenTSDB interface can be simulated with this reader: +The OpenTSDB telnet interface can be simulated with this reader: ``` -// opentsdb_telnet_mode_mock.go +// opentsdb_telnet_mode_mock.go package main import ( @@ -75,4 +81,4 @@ func main() { ## Allowed values for metrics -OpenTSDB allows `integers` and `floats` as input values \ No newline at end of file +OpenTSDB allows `integers` and `floats` as input values diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 4675dfffe..d7b3eb915 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -3,10 +3,10 @@ package opentsdb import ( "fmt" "net" + "net/url" "sort" "strconv" "strings" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" @@ -18,6 +18,8 @@ type OpenTSDB struct { Host string Port int + HttpBatchSize int + Debug bool } @@ -28,27 +30,41 @@ var sampleConfig = ` ## prefix for metrics keys prefix = "my.specific.prefix." - ## Telnet Mode ## - ## DNS name of the OpenTSDB server in telnet mode + ## DNS name of the OpenTSDB server + ## Using "opentsdb.example.com" or "tcp://opentsdb.example.com" will use the + ## telnet API. "http://opentsdb.example.com" will use the Http API. host = "opentsdb.example.com" - ## Port of the OpenTSDB server in telnet mode + ## Port of the OpenTSDB server port = 4242 + ## Number of data points to send to OpenTSDB in Http requests. + ## Not used with telnet API. + httpBatchSize = 50 + ## Debug true - Prints OpenTSDB communication debug = false ` -type MetricLine struct { - Metric string - Timestamp int64 - Value string - Tags string +func ToLineFormat(tags map[string]string) string { + tagsArray := make([]string, len(tags)) + index := 0 + for k, v := range tags { + tagsArray[index] = fmt.Sprintf("%s=%s", k, v) + index++ + } + sort.Strings(tagsArray) + return strings.Join(tagsArray, " ") } func (o *OpenTSDB) Connect() error { // Test Connection to OpenTSDB Server - uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + u, err := url.Parse(o.Host) + if err != nil { + return fmt.Errorf("Error in parsing host url: %s", err.Error()) + } + + uri := fmt.Sprintf("%s:%d", u.Host, o.Port) tcpAddr, err := net.ResolveTCPAddr("tcp", uri) if err != nil { return fmt.Errorf("OpenTSDB: TCP address cannot be resolved") @@ -65,10 +81,64 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } - now := time.Now() + u, err := url.Parse(o.Host) + if err != nil { + return fmt.Errorf("Error in parsing host url: %s", err.Error()) + } + + if u.Scheme == "" || u.Scheme == "tcp" { + return o.WriteTelnet(metrics, u) + } else if u.Scheme == "http" { + return o.WriteHttp(metrics, u) + } else { + return fmt.Errorf("Unknown scheme in host parameter.") + } +} + +func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error { + http := openTSDBHttp{ + Host: u.Host, + Port: o.Port, + BatchSize: o.HttpBatchSize, + Debug: o.Debug, + } + + for _, m := range metrics { + now := m.UnixNano() / 1000000000 + tags := cleanTags(m.Tags()) + + for fieldName, value := range m.Fields() { + metricValue, buildError := buildValue(value) + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue + } + + metric := &HttpMetric{ + Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", + o.Prefix, m.Name(), fieldName)), + Tags: tags, + Timestamp: now, + Value: metricValue, + } + + if err := http.sendDataPoint(metric); err != nil { + return err + } + } + } + + if err := http.flush(); err != nil { + return err + } + + return nil +} + +func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error { // Send Data with telnet / socket communication - uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + uri := fmt.Sprintf("%s:%d", u.Host, o.Port) tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) connection, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { @@ -77,9 +147,20 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { defer connection.Close() for _, m := range metrics { - for _, metric := range buildMetrics(m, now, o.Prefix) { + now := m.UnixNano() / 1000000000 + tags := ToLineFormat(cleanTags(m.Tags())) + + for fieldName, value := range m.Fields() { + metricValue, buildError := buildValue(value) + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue + } + messageLine := fmt.Sprintf("put %s %v %s %s\n", - metric.Metric, metric.Timestamp, metric.Value, metric.Tags) + sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)), + now, metricValue, tags) + if o.Debug { fmt.Print(messageLine) } @@ -93,37 +174,12 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error { return nil } -func buildTags(mTags map[string]string) []string { - tags := make([]string, len(mTags)) - index := 0 - for k, v := range mTags { - tags[index] = sanitizedChars.Replace(fmt.Sprintf("%s=%s", k, v)) - index++ +func cleanTags(tags map[string]string) map[string]string { + tagSet := make(map[string]string, len(tags)) + for k, v := range tags { + tagSet[sanitizedChars.Replace(k)] = sanitizedChars.Replace(v) } - sort.Strings(tags) - return tags -} - -func buildMetrics(m telegraf.Metric, now time.Time, prefix string) []*MetricLine { - ret := []*MetricLine{} - for fieldName, value := range m.Fields() { - metric := &MetricLine{ - Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", - prefix, m.Name(), fieldName)), - Timestamp: now.Unix(), - } - - metricValue, buildError := buildValue(value) - if buildError != nil { - fmt.Printf("OpenTSDB: %s\n", buildError.Error()) - continue - } - metric.Value = metricValue - tagsSlice := buildTags(m.Tags()) - metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) - ret = append(ret, metric) - } - return ret + return tagSet } func buildValue(v interface{}) (string, error) { diff --git a/plugins/outputs/opentsdb/opentsdb_http.go b/plugins/outputs/opentsdb/opentsdb_http.go new file mode 100644 index 000000000..27e4afdda --- /dev/null +++ b/plugins/outputs/opentsdb/opentsdb_http.go @@ -0,0 +1,174 @@ +package opentsdb + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/http/httputil" + "net/url" +) + +type HttpMetric struct { + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` + Tags map[string]string `json:"tags"` +} + +type openTSDBHttp struct { + Host string + Port int + BatchSize int + Debug bool + + metricCounter int + body requestBody +} + +type requestBody struct { + b bytes.Buffer + g *gzip.Writer + + dbgB bytes.Buffer + + w io.Writer + enc *json.Encoder + + empty bool +} + +func (r *requestBody) reset(debug bool) { + r.b.Reset() + r.dbgB.Reset() + + if r.g == nil { + r.g = gzip.NewWriter(&r.b) + } else { + r.g.Reset(&r.b) + } + + if debug { + r.w = io.MultiWriter(r.g, &r.dbgB) + } else { + r.w = r.g + } + + r.enc = json.NewEncoder(r.w) + + io.WriteString(r.w, "[") + + r.empty = true +} + +func (r *requestBody) addMetric(metric *HttpMetric) error { + if !r.empty { + io.WriteString(r.w, ",") + } + + if err := r.enc.Encode(metric); err != nil { + return fmt.Errorf("Metric serialization error %s", err.Error()) + } + + r.empty = false + + return nil +} + +func (r *requestBody) close() error { + io.WriteString(r.w, "]") + + if err := r.g.Close(); err != nil { + return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) + } + + return nil +} + +func (o *openTSDBHttp) sendDataPoint(metric *HttpMetric) error { + if o.metricCounter == 0 { + o.body.reset(o.Debug) + } + + if err := o.body.addMetric(metric); err != nil { + return err + } + + o.metricCounter++ + if o.metricCounter == o.BatchSize { + if err := o.flush(); err != nil { + return err + } + + o.metricCounter = 0 + } + + return nil +} + +func (o *openTSDBHttp) flush() error { + if o.metricCounter == 0 { + return nil + } + + o.body.close() + + u := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", o.Host, o.Port), + Path: "/api/put", + } + + if o.Debug { + u.RawQuery = "details" + } + + req, err := http.NewRequest("POST", u.String(), &o.body.b) + if err != nil { + return fmt.Errorf("Error when building request: %s", err.Error()) + } + req.Header.Set("Content-Type", "applicaton/json") + req.Header.Set("Content-Encoding", "gzip") + + if o.Debug { + dump, err := httputil.DumpRequestOut(req, false) + if err != nil { + return fmt.Errorf("Error when dumping request: %s", err.Error()) + } + + fmt.Printf("Sending metrics:\n%s", dump) + fmt.Printf("Body:\n%s\n\n", o.body.dbgB.String()) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("Error when sending metrics: %s", err.Error()) + } + defer resp.Body.Close() + + if o.Debug { + dump, err := httputil.DumpResponse(resp, true) + if err != nil { + return fmt.Errorf("Error when dumping response: %s", err.Error()) + } + + fmt.Printf("Received response\n%s\n\n", dump) + } else { + // Important so http client reuse connection for next request if need be. + io.Copy(ioutil.Discard, resp.Body) + } + + if resp.StatusCode/100 != 2 { + if resp.StatusCode/100 == 4 { + log.Printf("WARNING: Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode) + } else { + return fmt.Errorf("Error when sending metrics.Received status %d", resp.StatusCode) + } + } + + return nil +} diff --git a/plugins/outputs/opentsdb/opentsdb_test.go b/plugins/outputs/opentsdb/opentsdb_test.go index 6c141d463..669ab5303 100644 --- a/plugins/outputs/opentsdb/opentsdb_test.go +++ b/plugins/outputs/opentsdb/opentsdb_test.go @@ -1,46 +1,119 @@ package opentsdb import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" "reflect" + "strconv" "testing" - // "github.com/influxdata/telegraf/testutil" - // "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + //"github.com/stretchr/testify/require" ) +func TestCleanTags(t *testing.T) { + var tagtests = []struct { + ptIn map[string]string + outTags map[string]string + }{ + { + map[string]string{"one": "two", "three": "four"}, + map[string]string{"one": "two", "three": "four"}, + }, + { + map[string]string{"aaa": "bbb"}, + map[string]string{"aaa": "bbb"}, + }, + { + map[string]string{"Sp%ci@l Chars": "g$t repl#ced"}, + map[string]string{"Sp-ci-l_Chars": "g-t_repl-ced"}, + }, + { + map[string]string{}, + map[string]string{}, + }, + } + for _, tt := range tagtests { + tags := cleanTags(tt.ptIn) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) + } + } +} + func TestBuildTagsTelnet(t *testing.T) { var tagtests = []struct { ptIn map[string]string - outTags []string + outTags string }{ { map[string]string{"one": "two", "three": "four"}, - []string{"one=two", "three=four"}, + "one=two three=four", }, { map[string]string{"aaa": "bbb"}, - []string{"aaa=bbb"}, + "aaa=bbb", }, { map[string]string{"one": "two", "aaa": "bbb"}, - []string{"aaa=bbb", "one=two"}, - }, - { - map[string]string{"Sp%ci@l Chars": "g$t repl#ced"}, - []string{"Sp-ci-l_Chars=g-t_repl-ced"}, + "aaa=bbb one=two", }, { map[string]string{}, - []string{}, + "", }, } for _, tt := range tagtests { - tags := buildTags(tt.ptIn) + tags := ToLineFormat(tt.ptIn) if !reflect.DeepEqual(tags, tt.outTags) { t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) } } } +func BenchmarkHttpSend(b *testing.B) { + const BatchSize = 50 + const MetricsCount = 4 * BatchSize + metrics := make([]telegraf.Metric, MetricsCount) + for i := 0; i < MetricsCount; i++ { + metrics[i] = testutil.TestMetric(1.0) + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, "{}") + })) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + panic(err) + } + + _, p, _ := net.SplitHostPort(u.Host) + + port, err := strconv.Atoi(p) + if err != nil { + panic(err) + } + + o := &OpenTSDB{ + Host: ts.URL, + Port: port, + Prefix: "", + HttpBatchSize: BatchSize, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + o.Write(metrics) + } +} + // func TestWrite(t *testing.T) { // if testing.Short() { // t.Skip("Skipping integration test in short mode")