0.3.0 outputs: opentsdb

This commit is contained in:
Cameron Sparr 2015-12-19 14:30:37 -07:00
parent 2611931f82
commit 40a3feaad0
1 changed files with 36 additions and 26 deletions

View File

@ -62,7 +62,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
if len(points) == 0 { if len(points) == 0 {
return nil return nil
} }
var timeNow = time.Now() now := time.Now()
// Send Data with telnet / socket communication // Send Data with telnet / socket communication
uri := fmt.Sprintf("%s:%d", o.Host, o.Port) uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
@ -70,23 +71,12 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
if err != nil { if err != nil {
return fmt.Errorf("OpenTSDB: Telnet connect fail") return fmt.Errorf("OpenTSDB: Telnet connect fail")
} }
defer connection.Close()
for _, pt := range points { for _, pt := range points {
metric := &MetricLine{ for _, metric := range buildMetrics(pt, now, o.Prefix) {
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()), messageLine := fmt.Sprintf("put %s %v %s %s\n",
Timestamp: timeNow.Unix(), metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
}
metricValue, buildError := buildValue(pt)
if buildError != nil {
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(pt.Tags())
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
if o.Debug { if o.Debug {
fmt.Print(messageLine) fmt.Print(messageLine)
} }
@ -95,7 +85,7 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error()) return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
} }
} }
defer connection.Close() }
return nil return nil
} }
@ -111,9 +101,29 @@ func buildTags(ptTags map[string]string) []string {
return tags return tags
} }
func buildValue(pt *client.Point) (string, error) { func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine {
ret := []*MetricLine{}
for fieldName, value := range pt.Fields() {
metric := &MetricLine{
Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName),
Timestamp: now.Unix(),
}
metricValue, buildError := buildValue(value)
if buildError != nil {
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(pt.Tags())
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
ret = append(ret, metric)
}
return ret
}
func buildValue(v interface{}) (string, error) {
var retv string var retv string
var v = pt.Fields()["value"]
switch p := v.(type) { switch p := v.(type) {
case int64: case int64:
retv = IntToString(int64(p)) retv = IntToString(int64(p))