From 158d82fdb95ca178dc4ac1efeb9d7d60fb2396fb Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 10 Dec 2015 13:07:55 +1100 Subject: [PATCH 1/8] add amazon kinesis as an output plugin --- outputs/all/all.go | 1 + outputs/kinesis/README.md | 43 ++++++++ outputs/kinesis/kinesis_output.go | 134 +++++++++++++++++++++++++ outputs/kinesis/kinesis_output_test.go | 30 ++++++ 4 files changed, 208 insertions(+) create mode 100644 outputs/kinesis/README.md create mode 100644 outputs/kinesis/kinesis_output.go create mode 100644 outputs/kinesis/kinesis_output_test.go diff --git a/outputs/all/all.go b/outputs/all/all.go index 2c00f43f9..08ebf2549 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/kinesis" _ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/nsq" diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md new file mode 100644 index 000000000..8ffbf4445 --- /dev/null +++ b/outputs/kinesis/README.md @@ -0,0 +1,43 @@ +## Amazon Kinesis Output for Telegraf + +This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points +in one Put request to Kinesis. This should save the number of API requests by a considerable level. + +## About Kinesis + +This is not the place to document all of the various Kinesis terms however it +maybe useful for users to review Amazons official docuementation which is available +[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html). + +## Config + +For this output plugin to function correctly the following variables must be configured. + +* region +* streamname +* partitionkey + +### region + +The region is the Amazon region that you wish to connect to. Examples include but are not limited to +* us-west-1 +* us-west-2 +* us-east-1 +* ap-southeast-1 +* ap-southeast-2 + +### streamname + +The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to +note that the stream *MUST* be pre-configured for this plugin to function correctly. + +### partitionkey + +This is used to group data within a stream. Currently this plugin only supports a single partitionkey which means +that data will be entirely sent through a single Shard and Partition from a single host. If you have to scale out the +kinesis throughput using a different partition key on different hosts or host groups might be a workable solution. + + +## todo + +Check if the stream exists so that we have a graceful exit. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go new file mode 100644 index 000000000..ca442bc97 --- /dev/null +++ b/outputs/kinesis/kinesis_output.go @@ -0,0 +1,134 @@ +package kinesis_output + +import ( + "errors" + "log" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/outputs" +) + +type KinesisOutput struct { + Region string `toml:"region"` + StreamName string `toml:"streamname"` + PartitionKey string `toml:"partitionkey"` + Format string `toml:"format"` + Debug bool `toml:"debug"` + svc *kinesis.Kinesis +} + +var sampleConfig = ` + # Amazon REGION of kinesis endpoint. + # Most AWS services have a region specific endpoint this will be used by + # telegraf to output data. + # Authentication is provided by an IAMS role, SharedCredentials or Environment + # Variables. + region = "ap-southeast-2" + # Kinesis StreamName must exist prior to starting telegraf. + streamname = "StreamName" + # PartitionKey as used for sharding data. + partitionkey = "PartitionKey" + # format of the Data payload in the kinesis PutRecord, supported + # String and Custom. + format = "string" + # debug will show upstream aws messages. + debug = false +` + +func (k *KinesisOutput) SampleConfig() string { + return sampleConfig +} + +func (k *KinesisOutput) Description() string { + return "Configuration for the AWS Kinesis output." +} + +func (k *KinesisOutput) Connect() error { + // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using + // environment variables, and then Shared Credentials. + if k.Debug { + log.Printf("Establishing a connection to Kinesis in %+v", k.Region) + } + Config := &aws.Config{ + Region: aws.String(k.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + svc := kinesis.New(session.New(Config)) + + k.svc = svc + return nil +} + +func (k *KinesisOutput) Close() error { + return errors.New("Error") +} + +func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { + if k.Format == "string" { + return point.String(), nil + } else { + m := fmt.Sprintf("%+v,%+v,%+v %+v", + point.Name(), + point.Tags(), + point.String(), + point.Time(), + ) + return m, nil + } +} + +func (k *KinesisOutput) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + + r := []*kinesis.PutRecordsRequestEntry{} + + for _, p := range points { + metric, _ := formatmetric(k, p) + d := kinesis.PutRecordsRequestEntry{ + Data: []byte(metric), + PartitionKey: aws.String(k.PartitionKey), + } + r = append(r, &d) + } + + payload := &kinesis.PutRecordsInput{ + Records: r, + StreamName: aws.String(k.StreamName), + } + + if k.Debug { + resp, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + } + log.Printf("%+v \n", resp) + } else { + _, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + } + } + + return nil +} + +func init() { + outputs.Add("kinesis_output", func() outputs.Output { + return &KinesisOutput{} + }) +} diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go new file mode 100644 index 000000000..1e6dd3727 --- /dev/null +++ b/outputs/kinesis/kinesis_output_test.go @@ -0,0 +1,30 @@ +package kinesis_output + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Verify that we can connect kinesis endpoint. This test allows for a chain of credential + // so that various authentication methods can pass depending on the system that executes. + Config := &aws.Config{ + Region: aws.String(k.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + err := kinesis.New(session.New(Config)) + + + require.NoError(t, err) +} From e559183a52486d34f9b81c6eca88d6b589264ebf Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 10 Dec 2015 13:17:30 +1100 Subject: [PATCH 2/8] gofmt kinesis output plugin --- outputs/kinesis/kinesis_output.go | 2 +- outputs/kinesis/kinesis_output_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index ca442bc97..61c51fc51 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -2,8 +2,8 @@ package kinesis_output import ( "errors" - "log" "fmt" + "log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go index 1e6dd3727..af8125941 100644 --- a/outputs/kinesis/kinesis_output_test.go +++ b/outputs/kinesis/kinesis_output_test.go @@ -25,6 +25,5 @@ func TestConnectAndWrite(t *testing.T) { } err := kinesis.New(session.New(Config)) - require.NoError(t, err) } From 6e3ad123b23e6a363b1f095c9efc3b4a398749f7 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 10 Dec 2015 18:14:41 +1100 Subject: [PATCH 3/8] fix bug where many points would result in a kinesis alert, batching at 500 per PUT --- outputs/kinesis/README.md | 3 +- outputs/kinesis/kinesis_output.go | 53 +++++++++++++++++++++---------- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index 8ffbf4445..aa37cff4b 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -40,4 +40,5 @@ kinesis throughput using a different partition key on different hosts or host gr ## todo -Check if the stream exists so that we have a graceful exit. \ No newline at end of file +* Check if the stream exists so that we have a graceful exit. +* Better client smarts to ensure that the plugin operates within the Amazon service limits. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 61c51fc51..6d0544290 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "log" + "sync/atomic" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -90,22 +92,8 @@ func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { } } -func (k *KinesisOutput) Write(points []*client.Point) error { - if len(points) == 0 { - return nil - } - - r := []*kinesis.PutRecordsRequestEntry{} - - for _, p := range points { - metric, _ := formatmetric(k, p) - d := kinesis.PutRecordsRequestEntry{ - Data: []byte(metric), - PartitionKey: aws.String(k.PartitionKey), - } - r = append(r, &d) - } - +func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { + start := time.Now() payload := &kinesis.PutRecordsInput{ Records: r, StreamName: aws.String(k.StreamName), @@ -117,12 +105,45 @@ func (k *KinesisOutput) Write(points []*client.Point) error { log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) + } else { _, err := k.svc.PutRecords(payload) if err != nil { log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) } } + return time.Since(start) +} + +func (k *KinesisOutput) Write(points []*client.Point) error { + var sz uint32 = 0 + + if len(points) == 0 { + return nil + } + + r := []*kinesis.PutRecordsRequestEntry{} + + for _, p := range points { + atomic.AddUint32(&sz, 1) + + metric, _ := formatmetric(k, p) + d := kinesis.PutRecordsRequestEntry{ + Data: []byte(metric), + PartitionKey: aws.String(k.PartitionKey), + } + r = append(r, &d) + + if sz == 500 { + // Max Messages Per PutRecordRequest is 500 + elapsed := writekinesis(k, r) + log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) + atomic.StoreUint32(&sz, 0) + r = nil + } + } + + writekinesis(k, r) return nil } From 2cb66ad02cab7f077a13dd9665389c8ae2312b93 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 10 Dec 2015 20:54:30 +1100 Subject: [PATCH 4/8] update tests to work properly --- outputs/kinesis/README.md | 1 - outputs/kinesis/kinesis_output_test.go | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index aa37cff4b..3ed4ac377 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -41,4 +41,3 @@ kinesis throughput using a different partition key on different hosts or host gr ## todo * Check if the stream exists so that we have a graceful exit. -* Better client smarts to ensure that the plugin operates within the Amazon service limits. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go index af8125941..778865c36 100644 --- a/outputs/kinesis/kinesis_output_test.go +++ b/outputs/kinesis/kinesis_output_test.go @@ -1,9 +1,15 @@ package kinesis_output import ( + "fmt" "testing" - "github.com/influxdb/telegraf/testutil" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/stretchr/testify/require" ) @@ -15,7 +21,7 @@ func TestConnectAndWrite(t *testing.T) { // Verify that we can connect kinesis endpoint. This test allows for a chain of credential // so that various authentication methods can pass depending on the system that executes. Config := &aws.Config{ - Region: aws.String(k.Region), + Region: aws.String("us-west-1"), Credentials: credentials.NewChainCredentials( []credentials.Provider{ &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, @@ -23,7 +29,13 @@ func TestConnectAndWrite(t *testing.T) { &credentials.SharedCredentialsProvider{}, }), } - err := kinesis.New(session.New(Config)) + svc := kinesis.New(session.New(Config)) + + KinesisParams := &kinesis.ListStreamsInput{ + Limit: aws.Int64(1)} + resp, err := svc.ListStreams(KinesisParams) + + fmt.Println(resp) require.NoError(t, err) } From d2737fc5d00cab636b1c994cda6c1a9cade3fa07 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 11 Dec 2015 09:04:28 +1100 Subject: [PATCH 5/8] adds functionality that will exit if the straem does not exist --- outputs/kinesis/kinesis_output.go | 44 ++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 6d0544290..739062f23 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "log" + "os" "sync/atomic" "time" @@ -29,10 +30,6 @@ type KinesisOutput struct { var sampleConfig = ` # Amazon REGION of kinesis endpoint. - # Most AWS services have a region specific endpoint this will be used by - # telegraf to output data. - # Authentication is provided by an IAMS role, SharedCredentials or Environment - # Variables. region = "ap-southeast-2" # Kinesis StreamName must exist prior to starting telegraf. streamname = "StreamName" @@ -53,11 +50,21 @@ func (k *KinesisOutput) Description() string { return "Configuration for the AWS Kinesis output." } +func checkstream(l []*string, s string) bool { + // Check if the StreamName exists in the slice returned from the ListStreams API request. + for _, stream := range l { + if *stream == s { + return true + } + } + return false +} + func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("Establishing a connection to Kinesis in %+v", k.Region) + log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region) } Config := &aws.Config{ Region: aws.String(k.Region), @@ -70,8 +77,27 @@ func (k *KinesisOutput) Connect() error { } svc := kinesis.New(session.New(Config)) - k.svc = svc - return nil + KinesisParams := &kinesis.ListStreamsInput{ + Limit: aws.Int64(100), + } + + resp, err := svc.ListStreams(KinesisParams) + + if err != nil { + log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err) + } + + if checkstream(resp.StreamNames, k.StreamName) { + if k.Debug { + log.Printf("kinesis_output: Stream Exists") + } + k.svc = svc + return nil + } else { + log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + os.Exit(1) + } + return err } func (k *KinesisOutput) Close() error { @@ -102,14 +128,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du if k.Debug { resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) } else { _, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) } } return time.Since(start) From 6572e033eb6732cd8c466357e163014f883af2ba Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 11 Dec 2015 09:05:42 +1100 Subject: [PATCH 6/8] adds functionality that will exit if the kinesis stream does not exist --- .gitignore | 1 + outputs/kinesis/README.md | 25 +++++++++++++++---------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index d69f9330b..59b50782c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ tivan .vagrant telegraf +.idea diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index 3ed4ac377..6baad21f1 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -6,9 +6,18 @@ in one Put request to Kinesis. This should save the number of API requests by a ## About Kinesis This is not the place to document all of the various Kinesis terms however it -maybe useful for users to review Amazons official docuementation which is available +maybe useful for users to review Amazons official documentation which is available [here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html). +## Amazon Authentication + +This plugin uses a credential chain for Authentication with the Kinesis API endpoint. In the following order the plugin +will attempt to authenticate. +1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) +2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) +3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) + + ## Config For this output plugin to function correctly the following variables must be configured. @@ -29,15 +38,11 @@ The region is the Amazon region that you wish to connect to. Examples include bu ### streamname The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to -note that the stream *MUST* be pre-configured for this plugin to function correctly. +note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the +plugin will result in telegraf exiting with an exit code of 1. ### partitionkey -This is used to group data within a stream. Currently this plugin only supports a single partitionkey which means -that data will be entirely sent through a single Shard and Partition from a single host. If you have to scale out the -kinesis throughput using a different partition key on different hosts or host groups might be a workable solution. - - -## todo - -* Check if the stream exists so that we have a graceful exit. +This is used to group data within a stream. Currently this plugin only supports a single partitionkey. +Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable +solution to scale out. \ No newline at end of file From d5d785654bcc0b8a6728bd4e77c8bbd103d065b3 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 11 Dec 2015 11:45:35 +1100 Subject: [PATCH 7/8] added unit tests for FormatMetric --- outputs/kinesis/kinesis_output.go | 10 +++--- outputs/kinesis/kinesis_output_test.go | 47 +++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 739062f23..18dceafc2 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -104,16 +104,14 @@ func (k *KinesisOutput) Close() error { return errors.New("Error") } -func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { +func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { if k.Format == "string" { return point.String(), nil } else { - m := fmt.Sprintf("%+v,%+v,%+v %+v", + m := fmt.Sprintf("%+v,%+v,%+v", point.Name(), point.Tags(), - point.String(), - point.Time(), - ) + point.String()) return m, nil } } @@ -153,7 +151,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error { for _, p := range points { atomic.AddUint32(&sz, 1) - metric, _ := formatmetric(k, p) + metric, _ := FormatMetric(k, p) d := kinesis.PutRecordsRequestEntry{ Data: []byte(metric), PartitionKey: aws.String(k.PartitionKey), diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go index 778865c36..00abfb235 100644 --- a/outputs/kinesis/kinesis_output_test.go +++ b/outputs/kinesis/kinesis_output_test.go @@ -1,7 +1,6 @@ package kinesis_output import ( - "fmt" "testing" "github.com/aws/aws-sdk-go/aws" @@ -10,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -18,10 +18,14 @@ func TestConnectAndWrite(t *testing.T) { t.Skip("Skipping integration test in short mode") } + k := &KinesisOutput{ + Region: "us-west-2", + } + // Verify that we can connect kinesis endpoint. This test allows for a chain of credential // so that various authentication methods can pass depending on the system that executes. Config := &aws.Config{ - Region: aws.String("us-west-1"), + Region: aws.String(k.Region), Credentials: credentials.NewChainCredentials( []credentials.Provider{ &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, @@ -33,9 +37,42 @@ func TestConnectAndWrite(t *testing.T) { KinesisParams := &kinesis.ListStreamsInput{ Limit: aws.Int64(1)} - resp, err := svc.ListStreams(KinesisParams) - - fmt.Println(resp) + _, err := svc.ListStreams(KinesisParams) + if err != nil { + t.Error("Unable to connect to Kinesis") + } + require.NoError(t, err) +} + +func TestFormatMetric(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + k := &KinesisOutput{ + Format: "string", + } + + p := testutil.MockBatchPoints().Points()[0] + + valid_string := "test1,tag1=value1 value=1 1257894000000000000" + func_string, err := FormatMetric(k, p) + + if func_string != valid_string { + t.Error("Expected ", valid_string) + } + require.NoError(t, err) + + k = &KinesisOutput{ + Format: "custom", + } + + valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" + func_custom, err := FormatMetric(k, p) + + if func_custom != valid_custom { + t.Error("Expected ", valid_custom) + } require.NoError(t, err) } From c23a261dbf4fa0b910bd916373069d7c2f3436d8 Mon Sep 17 00:00:00 2001 From: James Lamb Date: Fri, 11 Dec 2015 14:01:26 +1100 Subject: [PATCH 8/8] rename kinesis_output to kinesis --- outputs/kinesis/README.md | 15 +++- .../kinesis/{kinesis_output.go => kinesis.go} | 16 ++-- outputs/kinesis/kinesis_output_test.go | 78 ------------------- outputs/kinesis/kinesis_test.go | 40 ++++++++++ 4 files changed, 62 insertions(+), 87 deletions(-) rename outputs/kinesis/{kinesis_output.go => kinesis.go} (87%) delete mode 100644 outputs/kinesis/kinesis_output_test.go create mode 100644 outputs/kinesis/kinesis_test.go diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index 6baad21f1..3b75117ac 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -45,4 +45,17 @@ plugin will result in telegraf exiting with an exit code of 1. This is used to group data within a stream. Currently this plugin only supports a single partitionkey. Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable -solution to scale out. \ No newline at end of file +solution to scale out. + +### format + +The format configuration value has been designated to allow people to change the format of the Point as written to +Kinesis. Right now there are two supported formats string and custom. + +#### string + +String is defined using the default Point.String() value and translated to []byte for the Kinesis stream. + +#### custom + +Custom is a string defined by a number of values in the FormatMetric() function. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis.go similarity index 87% rename from outputs/kinesis/kinesis_output.go rename to outputs/kinesis/kinesis.go index 18dceafc2..144131707 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis.go @@ -1,4 +1,4 @@ -package kinesis_output +package kinesis import ( "errors" @@ -64,7 +64,7 @@ func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("kinesis_output: Establishing a connection to Kinesis in %+v", k.Region) + log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region) } Config := &aws.Config{ Region: aws.String(k.Region), @@ -84,17 +84,17 @@ func (k *KinesisOutput) Connect() error { resp, err := svc.ListStreams(KinesisParams) if err != nil { - log.Printf("kinesis_output: Error in ListSteams API call : %+v \n", err) + log.Printf("kinesis: Error in ListSteams API call : %+v \n", err) } if checkstream(resp.StreamNames, k.StreamName) { if k.Debug { - log.Printf("kinesis_output: Stream Exists") + log.Printf("kinesis: Stream Exists") } k.svc = svc return nil } else { - log.Printf("kinesis_output : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) os.Exit(1) } return err @@ -126,14 +126,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du if k.Debug { resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) } else { _, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("kinesis_output: Unable to write to Kinesis : %+v \n", err.Error()) + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) } } return time.Since(start) @@ -173,7 +173,7 @@ func (k *KinesisOutput) Write(points []*client.Point) error { } func init() { - outputs.Add("kinesis_output", func() outputs.Output { + outputs.Add("kinesis", func() outputs.Output { return &KinesisOutput{} }) } diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go deleted file mode 100644 index 00abfb235..000000000 --- a/outputs/kinesis/kinesis_output_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package kinesis_output - -import ( - "testing" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" - "github.com/aws/aws-sdk-go/aws/ec2metadata" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/influxdb/telegraf/testutil" - "github.com/stretchr/testify/require" -) - -func TestConnectAndWrite(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - k := &KinesisOutput{ - Region: "us-west-2", - } - - // Verify that we can connect kinesis endpoint. This test allows for a chain of credential - // so that various authentication methods can pass depending on the system that executes. - Config := &aws.Config{ - Region: aws.String(k.Region), - Credentials: credentials.NewChainCredentials( - []credentials.Provider{ - &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, - &credentials.EnvProvider{}, - &credentials.SharedCredentialsProvider{}, - }), - } - svc := kinesis.New(session.New(Config)) - - KinesisParams := &kinesis.ListStreamsInput{ - Limit: aws.Int64(1)} - _, err := svc.ListStreams(KinesisParams) - - if err != nil { - t.Error("Unable to connect to Kinesis") - } - require.NoError(t, err) -} - -func TestFormatMetric(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - k := &KinesisOutput{ - Format: "string", - } - - p := testutil.MockBatchPoints().Points()[0] - - valid_string := "test1,tag1=value1 value=1 1257894000000000000" - func_string, err := FormatMetric(k, p) - - if func_string != valid_string { - t.Error("Expected ", valid_string) - } - require.NoError(t, err) - - k = &KinesisOutput{ - Format: "custom", - } - - valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" - func_custom, err := FormatMetric(k, p) - - if func_custom != valid_custom { - t.Error("Expected ", valid_custom) - } - require.NoError(t, err) -} diff --git a/outputs/kinesis/kinesis_test.go b/outputs/kinesis/kinesis_test.go new file mode 100644 index 000000000..76bf242d7 --- /dev/null +++ b/outputs/kinesis/kinesis_test.go @@ -0,0 +1,40 @@ +package kinesis + +import ( + "testing" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + + +func TestFormatMetric(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + k := &KinesisOutput{ + Format: "string", + } + + p := testutil.MockBatchPoints().Points()[0] + + valid_string := "test1,tag1=value1 value=1 1257894000000000000" + func_string, err := FormatMetric(k, p) + + if func_string != valid_string { + t.Error("Expected ", valid_string) + } + require.NoError(t, err) + + k = &KinesisOutput{ + Format: "custom", + } + + valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" + func_custom, err := FormatMetric(k, p) + + if func_custom != valid_custom { + t.Error("Expected ", valid_custom) + } + require.NoError(t, err) +}