0.3.0 outputs: opentsdb
This commit is contained in:
parent
a73b5257dc
commit
1e76e36df2
|
@ -62,7 +62,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
|||
if len(points) == 0 {
|
||||
return nil
|
||||
}
|
||||
var timeNow = time.Now()
|
||||
now := time.Now()
|
||||
|
||||
// Send Data with telnet / socket communication
|
||||
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
|
||||
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
|
||||
|
@ -70,32 +71,21 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
||||
}
|
||||
defer connection.Close()
|
||||
|
||||
for _, pt := range points {
|
||||
metric := &MetricLine{
|
||||
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()),
|
||||
Timestamp: timeNow.Unix(),
|
||||
}
|
||||
|
||||
metricValue, buildError := buildValue(pt)
|
||||
if buildError != nil {
|
||||
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
||||
continue
|
||||
}
|
||||
metric.Value = metricValue
|
||||
|
||||
tagsSlice := buildTags(pt.Tags())
|
||||
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
||||
|
||||
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
||||
if o.Debug {
|
||||
fmt.Print(messageLine)
|
||||
}
|
||||
_, err := connection.Write([]byte(messageLine))
|
||||
if err != nil {
|
||||
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
||||
for _, metric := range buildMetrics(pt, now, o.Prefix) {
|
||||
messageLine := fmt.Sprintf("put %s %v %s %s\n",
|
||||
metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
||||
if o.Debug {
|
||||
fmt.Print(messageLine)
|
||||
}
|
||||
_, err := connection.Write([]byte(messageLine))
|
||||
if err != nil {
|
||||
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
defer connection.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -111,9 +101,29 @@ func buildTags(ptTags map[string]string) []string {
|
|||
return tags
|
||||
}
|
||||
|
||||
func buildValue(pt *client.Point) (string, error) {
|
||||
func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine {
|
||||
ret := []*MetricLine{}
|
||||
for fieldName, value := range pt.Fields() {
|
||||
metric := &MetricLine{
|
||||
Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName),
|
||||
Timestamp: now.Unix(),
|
||||
}
|
||||
|
||||
metricValue, buildError := buildValue(value)
|
||||
if buildError != nil {
|
||||
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
||||
continue
|
||||
}
|
||||
metric.Value = metricValue
|
||||
tagsSlice := buildTags(pt.Tags())
|
||||
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
||||
ret = append(ret, metric)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func buildValue(v interface{}) (string, error) {
|
||||
var retv string
|
||||
var v = pt.Fields()["value"]
|
||||
switch p := v.(type) {
|
||||
case int64:
|
||||
retv = IntToString(int64(p))
|
||||
|
|
Loading…
Reference in New Issue