diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go new file mode 100644 index 000000000..1c29e731d --- /dev/null +++ b/outputs/opentsdb/opentsdb.go @@ -0,0 +1,151 @@ +package opentsdb + +import ( + "fmt" + "net" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" +) + +type OpenTSDB struct { + Prefix string + + Host string + Port int +} + +var sampleConfig = ` + # prefix for metrics keys + prefix = "my.specific.prefix." + + ## Telnet Mode ## + # DNS name of the OpenTSDB server in telnet mode + host = "opentsdb.example.com" + + # Port of the OpenTSDB server in telnet mode + port = 4242 +` + +type MetricLine struct { + Metric string + Timestamp int64 + Value string + Tags string +} + +func (o *OpenTSDB) Connect() error { + // Test Connection to OpenTSDB Server + uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + tcpAddr, err := net.ResolveTCPAddr("tcp", uri) + if err != nil { + return fmt.Errorf("OpenTSDB: TCP address cannot be resolved") + } + connection, err := net.DialTCP("tcp", nil, tcpAddr) + defer connection.Close() + if err != nil { + return fmt.Errorf("OpenTSDB: Telnet connect fail") + } + return nil +} + +func (o *OpenTSDB) Write(bp client.BatchPoints) error { + if len(bp.Points) == 0 { + return nil + } + var timeNow = time.Now() + // Send Data with telnet / socket communication + uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) + connection, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return fmt.Errorf("OpenTSDB: Telnet connect fail") + } + for _, pt := range bp.Points { + metric := &MetricLine{ + Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), + Timestamp: timeNow.Unix(), + } + if metricValue, err := buildValue(bp, pt); err == nil { + metric.Value = metricValue + } + + tagsSlice := buildTags(bp.Tags, pt.Tags) + metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) + + messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) + fmt.Print(messageLine) + _, err := connection.Write([]byte(messageLine)) + if err != nil { + fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error()) + } + } + defer connection.Close() + + return nil +} + +func buildTags(bpTags map[string]string, ptTags map[string]string) []string { + tags := make([]string, (len(bpTags) + len(ptTags))) + index := 0 + for k, v := range bpTags { + tags[index] = fmt.Sprintf("%s=%s", k, v) + index += 1 + } + for k, v := range ptTags { + tags[index] = fmt.Sprintf("%s=%s", k, v) + index += 1 + } + sort.Strings(tags) + return tags +} + +func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { + var retv string + var v = pt.Fields["value"] + switch p := v.(type) { + case int64: + retv = IntToString(int64(p)) + case uint64: + retv = UIntToString(uint64(p)) + case float64: + retv = FloatToString(float64(p)) + default: + return retv, fmt.Errorf("undeterminable type for telegraf") + } + return retv, nil +} + +func IntToString(input_num int64) string { + return strconv.FormatInt(input_num, 10) +} + +func UIntToString(input_num uint64) string { + return strconv.FormatUint(input_num, 10) +} + +func FloatToString(input_num float64) string { + return strconv.FormatFloat(input_num, 'f', 6, 64) +} + +func (o *OpenTSDB) SampleConfig() string { + return sampleConfig +} + +func (o *OpenTSDB) Description() string { + return "Configuration for OpenTSDB server to send metrics to" +} + +func (o *OpenTSDB) Close() error { + return nil +} + +func init() { + outputs.Add("opentsdb", func() outputs.Output { + return &OpenTSDB{} + }) +}