diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 42d6c0f3e..a0039de27 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -3,10 +3,8 @@ package opentsdb import ( "fmt" "net" - "sort" "strconv" "strings" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" @@ -48,12 +46,22 @@ var sampleConfig = ` ## Debug true - Prints OpenTSDB communication debug = false ` +type TagSet map[string]string + +func (t TagSet) ToLineFormat() string { + var line string + for k, v := range t { + line += fmt.Sprintf(" %s=%s", k, v) + } + + return strings.TrimLeft(line, " ") +} type MetricLine struct { Metric string Timestamp int64 Value string - Tags string + Tags TagSet } func (o *OpenTSDB) Connect() error { @@ -93,6 +101,8 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { for _, m := range metrics { now := m.UnixNano() / 1000000000 + tags := cleanTags(m.Tags()) + for fieldName, value := range m.Fields() { metricValue, buildError := buildValue(value) if buildError != nil { @@ -101,11 +111,11 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { } metric := &HttpMetric{ - Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", - o.Prefix, m.Name(), fieldName)), - Tags: m.Tags(), - Timestamp: now, - Value: metricValue, + Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", + o.Prefix, m.Name(), fieldName)), + Tags: tags, + Timestamp: now, + Value: metricValue, } if err:= http.sendDataPoint(metric); err != nil { @@ -122,8 +132,6 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric) error { } func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric) error { - now := time.Now() - // Send Data with telnet / socket communication uri := fmt.Sprintf("%s:%d", o.Host, o.Port) tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) @@ -134,9 +142,20 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric) error { defer connection.Close() for _, m := range metrics { - for _, metric := range buildMetrics(m, now, o.Prefix) { + now := m.UnixNano() / 1000000000 + tags := cleanTags(m.Tags()).ToLineFormat() + + for fieldName, value := range m.Fields() { + metricValue, buildError := buildValue(value) + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue + } + messageLine := fmt.Sprintf("put %s %v %s %s\n", - metric.Metric, metric.Timestamp, metric.Value, metric.Tags) + sanitizedChars.Replace(fmt.Sprintf("%s%s_%s",o.Prefix, m.Name(), fieldName)), + now, metricValue, tags) + if o.Debug { fmt.Print(messageLine) } @@ -150,37 +169,12 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric) error { return nil } -func buildTags(mTags map[string]string) []string { - tags := make([]string, len(mTags)) - index := 0 - for k, v := range mTags { - tags[index] = sanitizedChars.Replace(fmt.Sprintf("%s=%s", k, v)) - index++ +func cleanTags(tags map[string]string) TagSet { + tagSet := make(map[string]string, len(tags)) + for k, v := range tags { + tagSet[sanitizedChars.Replace(k)] = sanitizedChars.Replace(v) } - sort.Strings(tags) - return tags -} - -func buildMetrics(m telegraf.Metric, now time.Time, prefix string) []*MetricLine { - ret := []*MetricLine{} - for fieldName, value := range m.Fields() { - metric := &MetricLine{ - Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", - prefix, m.Name(), fieldName)), - Timestamp: now.Unix(), - } - - metricValue, buildError := buildValue(value) - if buildError != nil { - fmt.Printf("OpenTSDB: %s\n", buildError.Error()) - continue - } - metric.Value = metricValue - tagsSlice := buildTags(m.Tags()) - metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) - ret = append(ret, metric) - } - return ret + return tagSet } func buildValue(v interface{}) (string, error) {