Merge pull request #2 from rplessl/opentsdb_output_plugin_telnet_unittest

Opentsdb output plugin telnet unittest
This commit is contained in:
Roman Plessl 2015-09-14 12:36:42 +02:00
commit 9e76c5dcbf
2 changed files with 62 additions and 19 deletions

View File

@ -17,6 +17,8 @@ type OpenTSDB struct {
Host string
Port int
Debug bool
}
var sampleConfig = `
@ -29,6 +31,9 @@ var sampleConfig = `
# Port of the OpenTSDB server in telnet mode
port = 4242
# Debug true - Prints OpenTSDB communication
debug = false
`
type MetricLine struct {
@ -70,15 +75,20 @@ func (o *OpenTSDB) Write(bp client.BatchPoints) error {
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement),
Timestamp: timeNow.Unix(),
}
if metricValue, err := buildValue(bp, pt); err == nil {
metric.Value = metricValue
metricValue, buildError := buildValue(bp, pt)
if buildError != nil {
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(bp.Tags, pt.Tags)
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
fmt.Print(messageLine)
if o.Debug {
fmt.Print(messageLine)
}
_, err := connection.Write([]byte(messageLine))
if err != nil {
fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
@ -115,7 +125,7 @@ func buildValue(bp client.BatchPoints, pt client.Point) (string, error) {
case float64:
retv = FloatToString(float64(p))
default:
return retv, fmt.Errorf("undeterminable type for telegraf")
return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
}
return retv, nil
}

View File

@ -3,9 +3,11 @@ package opentsdb
import (
"reflect"
"testing"
"time"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestBuildTagsTelnet(t *testing.T) {
@ -43,20 +45,51 @@ func TestBuildTagsTelnet(t *testing.T) {
}
}
func TestWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
o := &OpenTSDB{
Host: testutil.GetLocalHost() ,
Port: 24242,
}
o := &OpenTSDB{
Host: testutil.GetLocalHost(),
Port: 24242,
Prefix: "prefix.test.",
}
// Verify that we can connect to the OpenTSDB instance
err := o.Connect()
require.NoError(t, err)
// Verify that we can connect to the OpenTSDB instance
err := o.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockBatchPoints())
require.NoError(t, err)
// Verify postive and negative test cases of writing data
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"testkey": "testvalue"}
bp.Points = []client.Point{
{
Measurement: "justametric.float",
Fields: map[string]interface{}{"value": float64(1.0)},
},
{
Measurement: "justametric.int",
Fields: map[string]interface{}{"value": int64(123456789)},
},
{
Measurement: "justametric.uint",
Fields: map[string]interface{}{"value": uint64(123456789012345)},
},
{
Measurement: "justametric.string",
Fields: map[string]interface{}{"value": "Lorem Ipsum"},
},
{
Measurement: "justametric.anotherfloat",
Fields: map[string]interface{}{"value": float64(42.0)},
},
}
err = o.Write(bp)
require.NoError(t, err)
// Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockBatchPoints())
require.NoError(t, err)
}