diff --git a/outputs/opentsdb/opentsdb.go b/outputs/opentsdb/opentsdb.go index b547c106f..0060da8d0 100644 --- a/outputs/opentsdb/opentsdb.go +++ b/outputs/opentsdb/opentsdb.go @@ -17,6 +17,8 @@ type OpenTSDB struct { Host string Port int + + Debug bool } var sampleConfig = ` @@ -29,6 +31,9 @@ var sampleConfig = ` # Port of the OpenTSDB server in telnet mode port = 4242 + + # Debug true - Prints OpenTSDB communication + debug = false ` type MetricLine struct { @@ -70,15 +75,20 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error { Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement), Timestamp: timeNow.Unix(), } - if metricValue, err := buildValue(bp, pt); err == nil { - metric.Value = metricValue + metricValue, buildError := buildValue(bp, pt) + if buildError != nil { + fmt.Printf("OpenTSDB: %s\n", buildError.Error()) + continue } + metric.Value = metricValue tagsSlice := buildTags(bp.Tags, pt.Tags) metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags) - fmt.Print(messageLine) + if o.Debug { + fmt.Print(messageLine) + } _, err := connection.Write([]byte(messageLine)) if err != nil { fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error()) @@ -115,7 +125,7 @@ func buildValue(bp client.BatchPoints, pt client.Point) (string, error) { case float64: retv = FloatToString(float64(p)) default: - return retv, fmt.Errorf("undeterminable type for telegraf") + return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v) } return retv, nil } diff --git a/outputs/opentsdb/opentsdb_test.go b/outputs/opentsdb/opentsdb_test.go index 774c06953..e73b1ae2b 100644 --- a/outputs/opentsdb/opentsdb_test.go +++ b/outputs/opentsdb/opentsdb_test.go @@ -3,9 +3,11 @@ package opentsdb import ( "reflect" "testing" + "time" - "github.com/influxdb/telegraf/testutil" - "github.com/stretchr/testify/require" + "github.com/influxdb/influxdb/client" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" ) func TestBuildTagsTelnet(t *testing.T) { @@ -43,20 +45,51 @@ func TestBuildTagsTelnet(t *testing.T) { } } func TestWrite(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } - o := &OpenTSDB{ - Host: testutil.GetLocalHost() , - Port: 24242, - } + o := &OpenTSDB{ + Host: testutil.GetLocalHost(), + Port: 24242, + Prefix: "prefix.test.", + } - // Verify that we can connect to the OpenTSDB instance - err := o.Connect() - require.NoError(t, err) + // Verify that we can connect to the OpenTSDB instance + err := o.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to OpenTSDB + err = o.Write(testutil.MockBatchPoints()) + require.NoError(t, err) + + // Verify postive and negative test cases of writing data + var bp client.BatchPoints + bp.Time = time.Now() + bp.Tags = map[string]string{"testkey": "testvalue"} + bp.Points = []client.Point{ + { + Measurement: "justametric.float", + Fields: map[string]interface{}{"value": float64(1.0)}, + }, + { + Measurement: "justametric.int", + Fields: map[string]interface{}{"value": int64(123456789)}, + }, + { + Measurement: "justametric.uint", + Fields: map[string]interface{}{"value": uint64(123456789012345)}, + }, + { + Measurement: "justametric.string", + Fields: map[string]interface{}{"value": "Lorem Ipsum"}, + }, + { + Measurement: "justametric.anotherfloat", + Fields: map[string]interface{}{"value": float64(42.0)}, + }, + } + err = o.Write(bp) + require.NoError(t, err) - // Verify that we can successfully write data to OpenTSDB - err = o.Write(testutil.MockBatchPoints()) - require.NoError(t, err) }