0.3.0 outputs: opentsdb
This commit is contained in:
parent
bdbba81d46
commit
11254e8468
|
@ -62,7 +62,8 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
||||||
if len(points) == 0 {
|
if len(points) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var timeNow = time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// Send Data with telnet / socket communication
|
// Send Data with telnet / socket communication
|
||||||
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
|
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
|
||||||
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
|
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
|
||||||
|
@ -70,32 +71,21 @@ func (o *OpenTSDB) Write(points []*client.Point) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
||||||
}
|
}
|
||||||
|
defer connection.Close()
|
||||||
|
|
||||||
for _, pt := range points {
|
for _, pt := range points {
|
||||||
metric := &MetricLine{
|
for _, metric := range buildMetrics(pt, now, o.Prefix) {
|
||||||
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Name()),
|
messageLine := fmt.Sprintf("put %s %v %s %s\n",
|
||||||
Timestamp: timeNow.Unix(),
|
metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
||||||
}
|
if o.Debug {
|
||||||
|
fmt.Print(messageLine)
|
||||||
metricValue, buildError := buildValue(pt)
|
}
|
||||||
if buildError != nil {
|
_, err := connection.Write([]byte(messageLine))
|
||||||
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
if err != nil {
|
||||||
continue
|
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
||||||
}
|
}
|
||||||
metric.Value = metricValue
|
|
||||||
|
|
||||||
tagsSlice := buildTags(pt.Tags())
|
|
||||||
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
|
||||||
|
|
||||||
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
|
|
||||||
if o.Debug {
|
|
||||||
fmt.Print(messageLine)
|
|
||||||
}
|
|
||||||
_, err := connection.Write([]byte(messageLine))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
defer connection.Close()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -111,9 +101,29 @@ func buildTags(ptTags map[string]string) []string {
|
||||||
return tags
|
return tags
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildValue(pt *client.Point) (string, error) {
|
func buildMetrics(pt *client.Point, now time.Time, prefix string) []*MetricLine {
|
||||||
|
ret := []*MetricLine{}
|
||||||
|
for fieldName, value := range pt.Fields() {
|
||||||
|
metric := &MetricLine{
|
||||||
|
Metric: fmt.Sprintf("%s%s_%s", prefix, pt.Name(), fieldName),
|
||||||
|
Timestamp: now.Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
metricValue, buildError := buildValue(value)
|
||||||
|
if buildError != nil {
|
||||||
|
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metric.Value = metricValue
|
||||||
|
tagsSlice := buildTags(pt.Tags())
|
||||||
|
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
|
||||||
|
ret = append(ret, metric)
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildValue(v interface{}) (string, error) {
|
||||||
var retv string
|
var retv string
|
||||||
var v = pt.Fields()["value"]
|
|
||||||
switch p := v.(type) {
|
switch p := v.(type) {
|
||||||
case int64:
|
case int64:
|
||||||
retv = IntToString(int64(p))
|
retv = IntToString(int64(p))
|
||||||
|
|
Loading…
Reference in New Issue