diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index 6baad21f1..3b75117ac 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -45,4 +45,17 @@ plugin will result in telegraf exiting with an exit code of 1. This is used to group data within a stream. Currently this plugin only supports a single partitionkey. Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable -solution to scale out. \ No newline at end of file +solution to scale out. + +### format + +The format configuration value has been designated to allow people to change the format of the Point as written to +Kinesis. Right now there are two supported formats string and custom. + +#### string + +String is defined using the default Point.String() value and translated to []byte for the Kinesis stream. + +#### custom + +Custom is a string defined by a number of values in the FormatMetric() function. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis.go similarity index 87% rename from outputs/kinesis/kinesis_output.go rename to outputs/kinesis/kinesis.go index 18dceafc2..144131707 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis.go @@ -1,4 +1,4 @@ -package kinesis_output +package kinesis import ( "errors" @@ -64,7 +64,7 @@ func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region) + log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region) } Config := &aws.Config{ Region: aws.String(k.Region), @@ -84,17 +84,17 @@ func (k *KinesisOutput) Connect() error { resp, err := svc.ListStreams(KinesisParams) if err != nil { - log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err) + log.Printf("kinesis: Error in ListSteams API call : %+v \n", err) } if checkstream(resp.StreamNames, k.StreamName) { if k.Debug { - log.Printf("kinesis_output: Stream Exists") + log.Printf("kinesis: Stream Exists") } k.svc = svc return nil } else { - log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) os.Exit(1) } return err @@ -126,14 +126,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du if k.Debug { resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) } else { _, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } } return time.Since(start) @@ -173,7 +173,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error { } func init() { - outputs.Add("kinesis_output", func() outputs.Output { + outputs.Add("kinesis", func() outputs.Output { return &KinesisOutput{} }) } diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go deleted file mode 100644 index 00abfb235..000000000 --- a/outputs/kinesis/kinesis_output_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package kinesis_output - -import ( - "testing" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" - "github.com/aws/aws-sdk-go/aws/ec2metadata" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/influxdb/telegraf/testutil" - "github.com/stretchr/testify/require" -) - -func TestConnectAndWrite(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - k := &KinesisOutput{ - Region: "us-west-2", - } - - // Verify that we can connect kinesis endpoint. This test allows for a chain of credential - // so that various authentication methods can pass depending on the system that executes. - Config := &aws.Config{ - Region: aws.String(k.Region), - Credentials: credentials.NewChainCredentials( - []credentials.Provider{ - &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, - &credentials.EnvProvider{}, - &credentials.SharedCredentialsProvider{}, - }), - } - svc := kinesis.New(session.New(Config)) - - KinesisParams := &kinesis.ListStreamsInput{ - Limit: aws.Int64(1)} - _, err := svc.ListStreams(KinesisParams) - - if err != nil { - t.Error("Unable to connect to Kinesis") - } - require.NoError(t, err) -} - -func TestFormatMetric(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - k := &KinesisOutput{ - Format: "string", - } - - p := testutil.MockBatchPoints().Points()[0] - - valid_string := "test1,tag1=value1 value=1 1257894000000000000" - func_string, err := FormatMetric(k, p) - - if func_string != valid_string { - t.Error("Expected ", valid_string) - } - require.NoError(t, err) - - k = &KinesisOutput{ - Format: "custom", - } - - valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" - func_custom, err := FormatMetric(k, p) - - if func_custom != valid_custom { - t.Error("Expected ", valid_custom) - } - require.NoError(t, err) -} diff --git a/outputs/kinesis/kinesis_test.go b/outputs/kinesis/kinesis_test.go new file mode 100644 index 000000000..76bf242d7 --- /dev/null +++ b/outputs/kinesis/kinesis_test.go @@ -0,0 +1,40 @@ +package kinesis + +import ( + "testing" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + + +func TestFormatMetric(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + k := &KinesisOutput{ + Format: "string", + } + + p := testutil.MockBatchPoints().Points()[0] + + valid_string := "test1,tag1=value1 value=1 1257894000000000000" + func_string, err := FormatMetric(k, p) + + if func_string != valid_string { + t.Error("Expected ", valid_string) + } + require.NoError(t, err) + + k = &KinesisOutput{ + Format: "custom", + } + + valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" + func_custom, err := FormatMetric(k, p) + + if func_custom != valid_custom { + t.Error("Expected ", valid_custom) + } + require.NoError(t, err) +}