add option to randomize Kinesis partition key (#2705)
This commit is contained in:
		
							parent
							
								
									9b874dff8d
								
							
						
					
					
						commit
						c66e2896c6
					
				|  | @ -74,6 +74,7 @@ be deprecated eventually. | ||||||
| - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks | - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks | ||||||
| - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. | - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. | ||||||
| - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin | - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin | ||||||
|  | - [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option | ||||||
| - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer | - [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer | ||||||
| 
 | 
 | ||||||
| ### Bugfixes | ### Bugfixes | ||||||
|  |  | ||||||
							
								
								
									
										1
									
								
								Godeps
								
								
								
								
							
							
						
						
									
										1
									
								
								Godeps
								
								
								
								
							|  | @ -45,6 +45,7 @@ github.com/prometheus/common dd2f054febf4a6c00f2343686efb775948a8bff4 | ||||||
| github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 | github.com/prometheus/procfs 1878d9fbb537119d24b21ca07effd591627cd160 | ||||||
| github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c | github.com/rcrowley/go-metrics 1f30fe9094a513ce4c700b9a54458bbb0c96996c | ||||||
| github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 | github.com/samuel/go-zookeeper 1d7be4effb13d2d908342d349d71a284a7542693 | ||||||
|  | github.com/satori/go.uuid 5bf94b69c6b68ee1b541973bb8e1144db23a194b | ||||||
| github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 | github.com/shirou/gopsutil 70693b6a3da51a8a686d31f1b346077bbc066062 | ||||||
| github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 | github.com/soniah/gosnmp 5ad50dc75ab389f8a1c9f8a67d3a1cd85f67ed15 | ||||||
| github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 | github.com/streadway/amqp 63795daa9a446c920826655f26ba31c81c860fd6 | ||||||
|  |  | ||||||
|  | @ -50,6 +50,11 @@ This is used to group data within a stream. Currently this plugin only supports | ||||||
| Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable | Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable | ||||||
| solution to scale out. | solution to scale out. | ||||||
| 
 | 
 | ||||||
|  | ### use_random_partitionkey | ||||||
|  | 
 | ||||||
|  | When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards. | ||||||
|  | If true then the partitionkey option will be ignored. | ||||||
|  | 
 | ||||||
| ### format | ### format | ||||||
| 
 | 
 | ||||||
| The format configuration value has been designated to allow people to change the format of the Point as written to | The format configuration value has been designated to allow people to change the format of the Point as written to | ||||||
|  |  | ||||||
|  | @ -7,6 +7,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"github.com/aws/aws-sdk-go/aws" | 	"github.com/aws/aws-sdk-go/aws" | ||||||
| 	"github.com/aws/aws-sdk-go/service/kinesis" | 	"github.com/aws/aws-sdk-go/service/kinesis" | ||||||
|  | 	"github.com/satori/go.uuid" | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdata/telegraf" | 	"github.com/influxdata/telegraf" | ||||||
| 	internalaws "github.com/influxdata/telegraf/internal/config/aws" | 	internalaws "github.com/influxdata/telegraf/internal/config/aws" | ||||||
|  | @ -23,10 +24,11 @@ type KinesisOutput struct { | ||||||
| 	Filename  string `toml:"shared_credential_file"` | 	Filename  string `toml:"shared_credential_file"` | ||||||
| 	Token     string `toml:"token"` | 	Token     string `toml:"token"` | ||||||
| 
 | 
 | ||||||
| 	StreamName   string `toml:"streamname"` | 	StreamName         string `toml:"streamname"` | ||||||
| 	PartitionKey string `toml:"partitionkey"` | 	PartitionKey       string `toml:"partitionkey"` | ||||||
| 	Debug        bool   `toml:"debug"` | 	RandomPartitionKey bool   `toml:"use_random_partitionkey"` | ||||||
| 	svc          *kinesis.Kinesis | 	Debug              bool   `toml:"debug"` | ||||||
|  | 	svc                *kinesis.Kinesis | ||||||
| 
 | 
 | ||||||
| 	serializer serializers.Serializer | 	serializer serializers.Serializer | ||||||
| } | } | ||||||
|  | @ -54,6 +56,11 @@ var sampleConfig = ` | ||||||
|   streamname = "StreamName" |   streamname = "StreamName" | ||||||
|   ## PartitionKey as used for sharding data. |   ## PartitionKey as used for sharding data. | ||||||
|   partitionkey = "PartitionKey" |   partitionkey = "PartitionKey" | ||||||
|  |   ## If set the paritionKey will be a random UUID on every put. | ||||||
|  |   ## This allows for scaling across multiple shards in a stream. | ||||||
|  |   ## This will cause issues with ordering. | ||||||
|  |   use_random_partitionkey = false | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
|   ## Data format to output. |   ## Data format to output. | ||||||
|   ## Each data format has it's own unique set of configuration options, read |   ## Each data format has it's own unique set of configuration options, read | ||||||
|  | @ -173,9 +180,15 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		partitionKey := k.PartitionKey | ||||||
|  | 		if k.RandomPartitionKey { | ||||||
|  | 			u := uuid.NewV4() | ||||||
|  | 			partitionKey = u.String() | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		d := kinesis.PutRecordsRequestEntry{ | 		d := kinesis.PutRecordsRequestEntry{ | ||||||
| 			Data:         values, | 			Data:         values, | ||||||
| 			PartitionKey: aws.String(k.PartitionKey), | 			PartitionKey: aws.String(partitionKey), | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		r = append(r, &d) | 		r = append(r, &d) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue