add option to randomize Kinesis partition key (#2705)

This commit is contained in:
Nevins 2017-04-26 13:54:24 -04:00 committed by Daniel Nelson
parent b95ade7ec4
commit 0514b3cfa7
4 changed files with 25 additions and 5 deletions

View File

@ -74,6 +74,7 @@ be deprecated eventually.
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
### Bugfixes ### Bugfixes

1
Godeps
View File

@ -45,6 +45,7 @@ github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4
github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160
github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c
github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693
github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b
github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062
github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15
github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6

View File

@ -50,6 +50,11 @@ This is used to group data within a stream. Currently this plugin only supports
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out. solution to scale out.
### use_random_partitionkey
When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards.
If true then the partitionkey option will be ignored.
### format ### format
The format configuration value has been designated to allow people to change the format of the Point as written to The format configuration value has been designated to allow people to change the format of the Point as written to

View File

@ -7,6 +7,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/satori/go.uuid"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/internal/config/aws" internalaws "github.com/influxdata/telegraf/internal/config/aws"
@ -23,10 +24,11 @@ type KinesisOutput struct {
Filename string `toml:"shared_credential_file"` Filename string `toml:"shared_credential_file"`
Token string `toml:"token"` Token string `toml:"token"`
StreamName string `toml:"streamname"` StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"` PartitionKey string `toml:"partitionkey"`
Debug bool `toml:"debug"` RandomPartitionKey bool `toml:"use_random_partitionkey"`
svc *kinesis.Kinesis Debug bool `toml:"debug"`
svc *kinesis.Kinesis
serializer serializers.Serializer serializer serializers.Serializer
} }
@ -54,6 +56,11 @@ var sampleConfig = `
streamname = "StreamName" streamname = "StreamName"
## PartitionKey as used for sharding data. ## PartitionKey as used for sharding data.
partitionkey = "PartitionKey" partitionkey = "PartitionKey"
## If set the paritionKey will be a random UUID on every put.
## This allows for scaling across multiple shards in a stream.
## This will cause issues with ordering.
use_random_partitionkey = false
## Data format to output. ## Data format to output.
## Each data format has it's own unique set of configuration options, read ## Each data format has it's own unique set of configuration options, read
@ -173,9 +180,15 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
return err return err
} }
partitionKey := k.PartitionKey
if k.RandomPartitionKey {
u := uuid.NewV4()
partitionKey = u.String()
}
d := kinesis.PutRecordsRequestEntry{ d := kinesis.PutRecordsRequestEntry{
Data: values, Data: values,
PartitionKey: aws.String(k.PartitionKey), PartitionKey: aws.String(partitionKey),
} }
r = append(r, &d) r = append(r, &d)