diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 739062f23..18dceafc2 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -104,16 +104,14 @@ func (k *KinesisOutput) Close() error { return errors.New("Error") } -func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { +func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { if k.Format == "string" { return point.String(), nil } else { - m := fmt.Sprintf("%+v,%+v,%+v %+v", + m := fmt.Sprintf("%+v,%+v,%+v", point.Name(), point.Tags(), - point.String(), - point.Time(), - ) + point.String()) return m, nil } } @@ -153,7 +151,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error { for _, p := range points { atomic.AddUint32(&sz, 1) - metric, _ := formatmetric(k, p) + metric, _ := FormatMetric(k, p) d := kinesis.PutRecordsRequestEntry{ Data: []byte(metric), PartitionKey: aws.String(k.PartitionKey), diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go index 778865c36..00abfb235 100644 --- a/outputs/kinesis/kinesis_output_test.go +++ b/outputs/kinesis/kinesis_output_test.go @@ -1,7 +1,6 @@ package kinesis_output import ( - "fmt" "testing" "github.com/aws/aws-sdk-go/aws" @@ -10,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -18,10 +18,14 @@ func TestConnectAndWrite(t *testing.T) { t.Skip("Skipping integration test in short mode") } + k := &KinesisOutput{ + Region: "us-west-2", + } + // Verify that we can connect kinesis endpoint. This test allows for a chain of credential // so that various authentication methods can pass depending on the system that executes. Config := &aws.Config{ - Region: aws.String("us-west-1"), + Region: aws.String(k.Region), Credentials: credentials.NewChainCredentials( []credentials.Provider{ &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, @@ -33,9 +37,42 @@ func TestConnectAndWrite(t *testing.T) { KinesisParams := &kinesis.ListStreamsInput{ Limit: aws.Int64(1)} - resp, err := svc.ListStreams(KinesisParams) - - fmt.Println(resp) + _, err := svc.ListStreams(KinesisParams) + if err != nil { + t.Error("Unable to connect to Kinesis") + } + require.NoError(t, err) +} + +func TestFormatMetric(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + k := &KinesisOutput{ + Format: "string", + } + + p := testutil.MockBatchPoints().Points()[0] + + valid_string := "test1,tag1=value1 value=1 1257894000000000000" + func_string, err := FormatMetric(k, p) + + if func_string != valid_string { + t.Error("Expected ", valid_string) + } + require.NoError(t, err) + + k = &KinesisOutput{ + Format: "custom", + } + + valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" + func_custom, err := FormatMetric(k, p) + + if func_custom != valid_custom { + t.Error("Expected ", valid_custom) + } require.NoError(t, err) }