From 0975bc1682cb57d6294b4944cc649d00cef22202 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 20:33:29 +0200 Subject: [PATCH 01/11] fixing link: warning: option -X main.Version v0.1.8-2-g8c5e1ff may not work in future releases; use -X main.Version=v0.1.8-2-g8c5e1ff --- Makefile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 89db1d5b0..69b701cd7 100644 --- a/Makefile +++ b/Makefile @@ -3,18 +3,18 @@ VERSION := $(shell sh -c 'git describe --always --tags') build: prepare $(GOPATH)/bin/godep go build -o telegraf -ldflags \ - "-X main.Version $(VERSION)" \ + "-X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go build-linux-bins: prepare GOARCH=amd64 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_amd64 \ - -ldflags "-X main.Version $(VERSION)" \ + -ldflags "-X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go GOARCH=386 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_386 \ - -ldflags "-X main.Version $(VERSION)" \ + -ldflags "-X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go GOARCH=arm GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_arm \ - -ldflags "-X main.Version $(VERSION)" \ + -ldflags "-X main.Version=$(VERSION)" \ ./cmd/telegraf/telegraf.go prepare: From 0b8e7cb27704264acbd94345e81dec0dbf3ba5c3 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 20:34:36 +0200 Subject: [PATCH 02/11] code improvements after running tests / compile step --- outputs/opentsdb/opentsdb.go | 151 +++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 outputs/opentsdb/opentsdb.go diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go new file mode 100644 index 000000000..1c29e731d --- /dev/null +++ b/outputs/opentsdb/opentsdb.go @@ -0,0 +1,151 @@ +package opentsdb + +import ( + "fmt" + "net" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/outputs" +) + +type OpenTSDB struct { + Prefix string + + Host string + Port int +} + +var sampleConfig = ` + # prefix for metrics keys + prefix = "my.specific.prefix." + + ## Telnet Mode ## + # DNS name of the OpenTSDB server in telnet mode + host = "opentsdb.example.com" + + # Port of the OpenTSDB server in telnet mode + port = 4242 +` + +type MetricLine struct { + Metric string + Timestamp int64 + Value string + Tags string +} + +func (o *OpenTSDB) Connect() error { + // Test Connection to OpenTSDB Server + uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + tcpAddr, err := net.ResolveTCPAddr("tcp", uri) + if err != nil { + return fmt.Errorf("OpenTSDB: TCP address cannot be resolved") + } + connection, err := net.DialTCP("tcp", nil, tcpAddr) + defer connection.Close() + if err != nil { + return fmt.Errorf("OpenTSDB: Telnet connect fail") + } + return nil +} + +func (o *OpenTSDB) Write(bp client.BatchPoints) error { + if len(bp.Points) == 0 { + return nil + } + var timeNow = time.Now() + // Send Data with telnet / socket communication + uri := fmt.Sprintf("%s:%d", o.Host, o.Port) + tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) + connection, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return fmt.Errorf("OpenTSDB: Telnet connect fail") + } + for _, pt := range bp.Points { + metric := &MetricLine{ + Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), + Timestamp: timeNow.Unix(), + } + if metricValue, err := buildValue(bp, pt); err == nil { + metric.Value = metricValue + } + + tagsSlice := buildTags(bp.Tags, pt.Tags) + metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) + + messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) + fmt.Print(messageLine) + _, err := connection.Write([]byte(messageLine)) + if err != nil { + fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error()) + } + } + defer connection.Close() + + return nil +} + +func buildTags(bpTags map[string]string, ptTags map[string]string) []string { + tags := make([]string, (len(bpTags) + len(ptTags))) + index := 0 + for k, v := range bpTags { + tags[index] = fmt.Sprintf("%s=%s", k, v) + index += 1 + } + for k, v := range ptTags { + tags[index] = fmt.Sprintf("%s=%s", k, v) + index += 1 + } + sort.Strings(tags) + return tags +} + +func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { + var retv string + var v = pt.Fields["value"] + switch p := v.(type) { + case int64: + retv = IntToString(int64(p)) + case uint64: + retv = UIntToString(uint64(p)) + case float64: + retv = FloatToString(float64(p)) + default: + return retv, fmt.Errorf("undeterminable type for telegraf") + } + return retv, nil +} + +func IntToString(input_num int64) string { + return strconv.FormatInt(input_num, 10) +} + +func UIntToString(input_num uint64) string { + return strconv.FormatUint(input_num, 10) +} + +func FloatToString(input_num float64) string { + return strconv.FormatFloat(input_num, 'f', 6, 64) +} + +func (o *OpenTSDB) SampleConfig() string { + return sampleConfig +} + +func (o *OpenTSDB) Description() string { + return "Configuration for OpenTSDB server to send metrics to" +} + +func (o *OpenTSDB) Close() error { + return nil +} + +func init() { + outputs.Add("opentsdb", func() outputs.Output { + return &OpenTSDB{} + }) +} From 9af525c975582f4550518ba747522ceb15469b82 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 20:35:12 +0200 Subject: [PATCH 03/11] change/fix expected test result --- outputs/opentsdb/opentsdb_test.go | 53 +++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 outputs/opentsdb/opentsdb_test.go diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go new file mode 100644 index 000000000..bd1bd8f7e --- /dev/null +++ b/outputs/opentsdb/opentsdb_test.go @@ -0,0 +1,53 @@ +package opentsdb + +import ( + "reflect" + "testing" +) + +var ( + fakeHost = "metrics.example.com" + fakePort = 4242 +) + +func fakeOpenTSDB() *OpenTSDB { + var o OpenTSDB + o.Host = fakeHost + o.Port = fakePort + return &o +} + +func TestBuildTagsTelnet(t *testing.T) { + var tagtests = []struct { + bpIn map[string]string + ptIn map[string]string + outTags []string + }{ + { + map[string]string{"one": "two"}, + map[string]string{"three": "four"}, + []string{"one=two", "three=four"}, + }, + { + map[string]string{"aaa": "bbb"}, + map[string]string{}, + []string{"aaa=bbb"}, + }, + { + map[string]string{"one": "two"}, + map[string]string{"aaa": "bbb"}, + []string{"aaa=bbb", "one=two"}, + }, + { + map[string]string{}, + map[string]string{}, + []string{}, + }, + } + for _, tt := range tagtests { + tags := buildTags(tt.bpIn, tt.ptIn) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags) + } + } +} From e2e1e3114afd88fbf8633ad03a3129e97f75f1cb Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 20:40:23 +0200 Subject: [PATCH 04/11] adds opentsdb telnet output plugin --- outputs/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/outputs/all/all.go b/outputs/all/all.go index 36d11ea61..8586174a5 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -4,4 +4,5 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/opentsdb" ) From 66681a7256bd355e25f4acf9a33f996186f9c1c8 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 20:42:53 +0200 Subject: [PATCH 05/11] added opentsdb as sink --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index a5c7c921d..0f4f74163 100644 --- a/README.md +++ b/README.md @@ -190,6 +190,7 @@ found by running `telegraf -sample-config` * influxdb * kafka * datadog +* opentsdb ## Contributing From e75b7d2e38efa4ed3b3dee179f4211e0cf6b08c2 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 21:25:07 +0200 Subject: [PATCH 06/11] added readme as suggested / whished in #177 --- outputs/opentsdb/README.md | 78 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 outputs/opentsdb/README.md diff --git a/outputs/opentsdb/README.md b/outputs/opentsdb/README.md new file mode 100644 index 000000000..59a03d3fd --- /dev/null +++ b/outputs/opentsdb/README.md @@ -0,0 +1,78 @@ +# OpenTSDB Output Plugin + +This plugin writes to a OpenTSDB instance using the "telnet" mode + +## Transfer "Protocol" in the telnet mode + +The expected input from OpenTSDB is specified in the following way: + +``` +put +``` + +The telegraf output plugin adds an optional prefix to the metric keys so +that a subamount can be selected. + +``` +put <[prefix.]metric> +``` + +### Example + +``` +put nine.telegraf.system_load1 1441910356 0.430000 dc=homeoffice host=irimame scope=green +put nine.telegraf.system_load5 1441910356 0.580000 dc=homeoffice host=irimame scope=green +put nine.telegraf.system_load15 1441910356 0.730000 dc=homeoffice host=irimame scope=green +put nine.telegraf.system_uptime 1441910356 3655970.000000 dc=homeoffice host=irimame scope=green +put nine.telegraf.system_uptime_format 1441910356 dc=homeoffice host=irimame scope=green +put nine.telegraf.mem_total 1441910356 4145426432 dc=homeoffice host=irimame scope=green +... +put nine.telegraf.io_write_bytes 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green +put nine.telegraf.io_read_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green +put nine.telegraf.io_write_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green +put nine.telegraf.io_io_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green +put nine.telegraf.ping_packets_transmitted 1441910366 dc=homeoffice host=irimame scope=green url=www.google.com +put nine.telegraf.ping_packets_received 1441910366 dc=homeoffice host=irimame scope=green url=www.google.com +put nine.telegraf.ping_percent_packet_loss 1441910366 0.000000 dc=homeoffice host=irimame scope=green url=www.google.com +put nine.telegraf.ping_average_response_ms 1441910366 24.006000 dc=homeoffice host=irimame scope=green url=www.google.com +... +``` + +## + +The OpenTSDB interface can be simulated with this reader: + +``` +// opentsdb_telnet_mode_mock.go +package main + +import ( + "io" + "log" + "net" + "os" +) + +func main() { + l, err := net.Listen("tcp", "localhost:4242") + if err != nil { + log.Fatal(err) + } + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + log.Fatal(err) + } + go func(c net.Conn) { + defer c.Close() + io.Copy(os.Stdout, c) + }(conn) + } +} + +``` + +## Allowed values for metrics + +OpenTSDB allows `integers` and `floats` as input values \ No newline at end of file From fa037adbde95195f97c5a00d329c15602bfe0f92 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Thu, 10 Sep 2015 21:27:34 +0200 Subject: [PATCH 07/11] fix spaces with gofmt --- outputs/opentsdb/opentsdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index 1c29e731d..b547c106f 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -9,7 +9,7 @@ import ( "time" "github.com/influxdb/influxdb/client" - "github.com/influxdb/telegraf/outputs" + "github.com/influxdb/telegraf/outputs" ) type OpenTSDB struct { From 58a2e5eb3bec9b8d55958137788838a52d29554a Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Fri, 11 Sep 2015 22:24:53 +0200 Subject: [PATCH 08/11] added docker image unit test with OpenTSDB --- outputs/opentsdb/opentsdb_test.go | 31 ++++++++++++++++++++----------- scripts/docker-compose.yml | 6 ++++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go index bd1bd8f7e..774c06953 100644 --- a/outputs/opentsdb/opentsdb_test.go +++ b/outputs/opentsdb/opentsdb_test.go @@ -3,20 +3,11 @@ package opentsdb import ( "reflect" "testing" -) -var ( - fakeHost = "metrics.example.com" - fakePort = 4242 + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" ) -func fakeOpenTSDB() *OpenTSDB { - var o OpenTSDB - o.Host = fakeHost - o.Port = fakePort - return &o -} - func TestBuildTagsTelnet(t *testing.T) { var tagtests = []struct { bpIn map[string]string @@ -51,3 +42,21 @@ func TestBuildTagsTelnet(t *testing.T) { } } } +func TestWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + o := &OpenTSDB{ + Host: testutil.GetLocalHost() , + Port: 24242, + } + + // Verify that we can connect to the OpenTSDB instance + err := o.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to OpenTSDB + err = o.Write(testutil.MockBatchPoints()) + require.NoError(t, err) +} diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index c51a0235b..a41cb67f4 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -25,3 +25,9 @@ kafka: environment: ADVERTISED_HOST: ADVERTISED_PORT: 9092 + +opentsdb: + image: lancope/opentsdb + ports: + - "24242:4242" + From 8a3b5633c4e147ca1cb7cd641d8af4bd87fc5910 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Sun, 13 Sep 2015 22:35:38 +0200 Subject: [PATCH 09/11] added prefix settings of the module and rearrange go test code --- outputs/opentsdb/opentsdb_test.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go index 774c06953..2db064243 100644 --- a/outputs/opentsdb/opentsdb_test.go +++ b/outputs/opentsdb/opentsdb_test.go @@ -4,8 +4,8 @@ import ( "reflect" "testing" - "github.com/influxdb/telegraf/testutil" - "github.com/stretchr/testify/require" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" ) func TestBuildTagsTelnet(t *testing.T) { @@ -43,20 +43,21 @@ func TestBuildTagsTelnet(t *testing.T) { } } func TestWrite(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } - o := &OpenTSDB{ - Host: testutil.GetLocalHost() , - Port: 24242, - } + o := &OpenTSDB{ + Host: testutil.GetLocalHost(), + Port: 24242, + Prefix: "prefix.test.", + } - // Verify that we can connect to the OpenTSDB instance - err := o.Connect() - require.NoError(t, err) + // Verify that we can connect to the OpenTSDB instance + err := o.Connect() + require.NoError(t, err) - // Verify that we can successfully write data to OpenTSDB - err = o.Write(testutil.MockBatchPoints()) - require.NoError(t, err) + // Verify that we can successfully write data to OpenTSDB + err = o.Write(testutil.MockBatchPoints()) + require.NoError(t, err) } From b4eb7d3f920e91d6377d49795069f2b113c3fb1f Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Mon, 14 Sep 2015 12:28:10 +0200 Subject: [PATCH 10/11] added more UNIT test cases for covering all parts of the code added debug statement for debugging OpenTSDB communication --- outputs/opentsdb/opentsdb.go | 18 +++++++++++++---- outputs/opentsdb/opentsdb_test.go | 32 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index 1c29e731d..d44bb837c 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -17,6 +17,8 @@ type OpenTSDB struct { Host string Port int + + Debug bool } var sampleConfig = ` @@ -29,6 +31,9 @@ var sampleConfig = ` # Port of the OpenTSDB server in telnet mode port = 4242 + + # Debug true - Prints OpenTSDB communication + debug = false ` type MetricLine struct { @@ -70,15 +75,20 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), Timestamp: timeNow.Unix(), } - if metricValue, err := buildValue(bp, pt); err == nil { - metric.Value = metricValue + metricValue, buildError := buildValue(bp, pt); + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue } + metric.Value = metricValue tagsSlice := buildTags(bp.Tags, pt.Tags) metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) - fmt.Print(messageLine) + if (o.Debug) { + fmt.Print(messageLine) + } _, err := connection.Write([]byte(messageLine)) if err != nil { fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error()) @@ -115,7 +125,7 @@ func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { case float64: retv = FloatToString(float64(p)) default: - return retv, fmt.Errorf("undeterminable type for telegraf") + return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v) } return retv, nil } diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go index 2db064243..e73b1ae2b 100644 --- a/outputs/opentsdb/opentsdb_test.go +++ b/outputs/opentsdb/opentsdb_test.go @@ -3,7 +3,9 @@ package opentsdb import ( "reflect" "testing" + "time" + "github.com/influxdb/influxdb/client" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -60,4 +62,34 @@ func TestWrite(t *testing.T) { // Verify that we can successfully write data to OpenTSDB err = o.Write(testutil.MockBatchPoints()) require.NoError(t, err) + + // Verify postive and negative test cases of writing data + var bp client.BatchPoints + bp.Time = time.Now() + bp.Tags = map[string]string{"testkey": "testvalue"} + bp.Points = []client.Point{ + { + Measurement: "justametric.float", + Fields: map[string]interface{}{"value": float64(1.0)}, + }, + { + Measurement: "justametric.int", + Fields: map[string]interface{}{"value": int64(123456789)}, + }, + { + Measurement: "justametric.uint", + Fields: map[string]interface{}{"value": uint64(123456789012345)}, + }, + { + Measurement: "justametric.string", + Fields: map[string]interface{}{"value": "Lorem Ipsum"}, + }, + { + Measurement: "justametric.anotherfloat", + Fields: map[string]interface{}{"value": float64(42.0)}, + }, + } + err = o.Write(bp) + require.NoError(t, err) + } From da3a590aab3c6ea2d0598832189db8b195dade04 Mon Sep 17 00:00:00 2001 From: Roman Plessl Date: Mon, 14 Sep 2015 12:34:34 +0200 Subject: [PATCH 11/11] enhance coding style gofmt --- outputs/opentsdb/opentsdb.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index d44bb837c..0060da8d0 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -9,7 +9,7 @@ import ( "time" "github.com/influxdb/influxdb/client" - "github.com/influxdb/telegraf/outputs" + "github.com/influxdb/telegraf/outputs" ) type OpenTSDB struct { @@ -75,7 +75,7 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), Timestamp: timeNow.Unix(), } - metricValue, buildError := buildValue(bp, pt); + metricValue, buildError := buildValue(bp, pt) if buildError != nil { fmt.Printf("OpenTSDB: %s\n", buildError.Error()) continue @@ -86,7 +86,7 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) - if (o.Debug) { + if o.Debug { fmt.Print(messageLine) } _, err := connection.Write([]byte(messageLine))