diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md index 8ffbf4445..aa37cff4b 100644 --- a/outputs/kinesis/README.md +++ b/outputs/kinesis/README.md @@ -40,4 +40,5 @@ kinesis throughput using a different partition key on different hosts or host gr ## todo -Check if the stream exists so that we have a graceful exit. \ No newline at end of file +* Check if the stream exists so that we have a graceful exit. +* Better client smarts to ensure that the plugin operates within the Amazon service limits. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go index 61c51fc51..6d0544290 100644 --- a/outputs/kinesis/kinesis_output.go +++ b/outputs/kinesis/kinesis_output.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "log" + "sync/atomic" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -90,22 +92,8 @@ func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { } } -func (k *KinesisOutput) Write(points []*client.Point) error { - if len(points) == 0 { - return nil - } - - r := []*kinesis.PutRecordsRequestEntry{} - - for _, p := range points { - metric, _ := formatmetric(k, p) - d := kinesis.PutRecordsRequestEntry{ - Data: []byte(metric), - PartitionKey: aws.String(k.PartitionKey), - } - r = append(r, &d) - } - +func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { + start := time.Now() payload := &kinesis.PutRecordsInput{ Records: r, StreamName: aws.String(k.StreamName), @@ -117,12 +105,45 @@ func (k *KinesisOutput) Write(points []*client.Point) error { log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) } log.Printf("%+v \n", resp) + } else { _, err := k.svc.PutRecords(payload) if err != nil { log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) } } + return time.Since(start) +} + +func (k *KinesisOutput) Write(points []*client.Point) error { + var sz uint32 = 0 + + if len(points) == 0 { + return nil + } + + r := []*kinesis.PutRecordsRequestEntry{} + + for _, p := range points { + atomic.AddUint32(&sz, 1) + + metric, _ := formatmetric(k, p) + d := kinesis.PutRecordsRequestEntry{ + Data: []byte(metric), + PartitionKey: aws.String(k.PartitionKey), + } + r = append(r, &d) + + if sz == 500 { + // Max Messages Per PutRecordRequest is 500 + elapsed := writekinesis(k, r) + log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) + atomic.StoreUint32(&sz, 0) + r = nil + } + } + + writekinesis(k, r) return nil }