From c66e2896c658cc2d3ccf9fdec3be8072e87162a2 Mon Sep 17 00:00:00 2001 From: Nevins Date: Wed, 26 Apr 2017 13:54:24 -0400 Subject: [PATCH] add option to randomize Kinesis partition key (#2705) --- CHANGELOG.md | 1 + Godeps | 1 + plugins/outputs/kinesis/README.md | 5 +++++ plugins/outputs/kinesis/kinesis.go | 23 ++++++++++++++++++----- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fdfbc32d..b2bfd9a79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ be deprecated eventually. - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin +- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer ### Bugfixes diff --git a/Godeps b/Godeps index 9ffd7e1b8..aa9ace1ab 100644 --- a/Godeps +++ b/Godeps @@ -45,6 +45,7 @@ github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4 github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 +github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 diff --git a/plugins/outputs/kinesis/README.md b/plugins/outputs/kinesis/README.md index 115d14355..52b80208a 100644 --- a/plugins/outputs/kinesis/README.md +++ b/plugins/outputs/kinesis/README.md @@ -50,6 +50,11 @@ This is used to group data within a stream. Currently this plugin only supports Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable solution to scale out. +### use_random_partitionkey + +When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards. +If true then the partitionkey option will be ignored. + ### format The format configuration value has been designated to allow people to change the format of the Point as written to diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 8cbdea682..d69ae3954 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/satori/go.uuid" "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/internal/config/aws" @@ -23,10 +24,11 @@ type KinesisOutput struct { Filename string `toml:"shared_credential_file"` Token string `toml:"token"` - StreamName string `toml:"streamname"` - PartitionKey string `toml:"partitionkey"` - Debug bool `toml:"debug"` - svc *kinesis.Kinesis + StreamName string `toml:"streamname"` + PartitionKey string `toml:"partitionkey"` + RandomPartitionKey bool `toml:"use_random_partitionkey"` + Debug bool `toml:"debug"` + svc *kinesis.Kinesis serializer serializers.Serializer } @@ -54,6 +56,11 @@ var sampleConfig = ` streamname = "StreamName" ## PartitionKey as used for sharding data. partitionkey = "PartitionKey" + ## If set the paritionKey will be a random UUID on every put. + ## This allows for scaling across multiple shards in a stream. + ## This will cause issues with ordering. + use_random_partitionkey = false + ## Data format to output. ## Each data format has it's own unique set of configuration options, read @@ -173,9 +180,15 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { return err } + partitionKey := k.PartitionKey + if k.RandomPartitionKey { + u := uuid.NewV4() + partitionKey = u.String() + } + d := kinesis.PutRecordsRequestEntry{ Data: values, - PartitionKey: aws.String(k.PartitionKey), + PartitionKey: aws.String(partitionKey), } r = append(r, &d)