From 95fe0e43f5514bbc1f485cd6e746eb8f84fe0b92 Mon Sep 17 00:00:00 2001 From: Nevins Date: Mon, 28 Aug 2017 19:24:38 -0400 Subject: [PATCH] Add support for sharding based on metric name (#3170) --- plugins/outputs/kinesis/README.md | 28 ++++++- plugins/outputs/kinesis/kinesis.go | 103 ++++++++++++++++++------ plugins/outputs/kinesis/kinesis_test.go | 77 ++++++++++++++++++ 3 files changed, 181 insertions(+), 27 deletions(-) create mode 100644 plugins/outputs/kinesis/kinesis_test.go diff --git a/plugins/outputs/kinesis/README.md b/plugins/outputs/kinesis/README.md index 52b80208a..809bb7790 100644 --- a/plugins/outputs/kinesis/README.md +++ b/plugins/outputs/kinesis/README.md @@ -27,7 +27,6 @@ For this output plugin to function correctly the following variables must be con * region * streamname -* partitionkey ### region @@ -44,17 +43,40 @@ The streamname is used by the plugin to ensure that data is sent to the correct note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the plugin will result in telegraf exiting with an exit code of 1. -### partitionkey +### partitionkey [DEPRECATED] This is used to group data within a stream. Currently this plugin only supports a single partitionkey. Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable solution to scale out. -### use_random_partitionkey +### use_random_partitionkey [DEPRECATED] When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards. If true then the partitionkey option will be ignored. +### partition + +This is used to group data within a stream. Currently four methods are supported: random, static, tag or measurement + +#### random + +This will generate a UUIDv4 for each metric to spread them across shards. +Any guarantee of ordering is lost with this method + +#### static + +This uses a static string as a partitionkey. +All metrics will be mapped to the same shard which may limit throughput. + +#### tag + +This will take the value of the specified tag from each metric as the paritionKey. +If the tag is not found an empty string will be used. + +#### measurement + +This will use the measurement's name as the partitionKey. + ### format The format configuration value has been designated to allow people to change the format of the Point as written to diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 8c807ceed..d77ff08a5 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -15,23 +15,31 @@ import ( "github.com/influxdata/telegraf/plugins/serializers" ) -type KinesisOutput struct { - Region string `toml:"region"` - AccessKey string `toml:"access_key"` - SecretKey string `toml:"secret_key"` - RoleARN string `toml:"role_arn"` - Profile string `toml:"profile"` - Filename string `toml:"shared_credential_file"` - Token string `toml:"token"` +type ( + KinesisOutput struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` - StreamName string `toml:"streamname"` - PartitionKey string `toml:"partitionkey"` - RandomPartitionKey bool `toml:"use_random_partitionkey"` - Debug bool `toml:"debug"` - svc *kinesis.Kinesis + StreamName string `toml:"streamname"` + PartitionKey string `toml:"partitionkey"` + RandomPartitionKey bool `toml:"use_random_partitionkey"` + Partition *Partition `toml:"partition"` + Debug bool `toml:"debug"` + svc *kinesis.Kinesis - serializer serializers.Serializer -} + serializer serializers.Serializer + } + + Partition struct { + Method string `toml:"method"` + Key string `toml:"key"` + } +) var sampleConfig = ` ## Amazon REGION of kinesis endpoint. @@ -54,12 +62,32 @@ var sampleConfig = ` ## Kinesis StreamName must exist prior to starting telegraf. streamname = "StreamName" - ## PartitionKey as used for sharding data. + ## DEPRECATED: PartitionKey as used for sharding data. partitionkey = "PartitionKey" - ## If set the paritionKey will be a random UUID on every put. + ## DEPRECATED: If set the paritionKey will be a random UUID on every put. ## This allows for scaling across multiple shards in a stream. ## This will cause issues with ordering. use_random_partitionkey = false + ## The partition key can be calculated using one of several methods: + ## + ## Use a static value for all writes: + # [outputs.kinesis.partition] + # method = "static" + # key = "howdy" + # + ## Use a random partition key on each write: + # [outputs.kinesis.partition] + # method = "random" + # + ## Use the measurement name as the partition key: + # [outputs.kinesis.partition] + # method = "measurement" + # + ## Use the value of a tag for all writes, if the tag is not set the empty + ## string will be used: + # [outputs.kinesis.partition] + # method = "tag" + # key = "host" ## Data format to output. @@ -129,6 +157,9 @@ func (k *KinesisOutput) Connect() error { log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) os.Exit(1) } + if k.Partition == nil { + log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition") + } return err } @@ -163,6 +194,32 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du return time.Since(start) } +func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string { + if k.Partition != nil { + switch k.Partition.Method { + case "static": + return k.Partition.Key + case "random": + u := uuid.NewV4() + return u.String() + case "measurement": + return metric.Name() + case "tag": + if metric.HasTag(k.Partition.Key) { + return metric.Tags()[k.Partition.Key] + } + log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key) + default: + log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method) + } + } + if k.RandomPartitionKey { + u := uuid.NewV4() + return u.String() + } + return k.PartitionKey +} + func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { var sz uint32 @@ -180,11 +237,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { return err } - partitionKey := k.PartitionKey - if k.RandomPartitionKey { - u := uuid.NewV4() - partitionKey = u.String() - } + partitionKey := k.getPartitionKey(metric) d := kinesis.PutRecordsRequestEntry{ Data: values, @@ -202,8 +255,10 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { } } - - writekinesis(k, r) + if sz > 0 { + elapsed := writekinesis(k, r) + log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) + } return nil } diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go new file mode 100644 index 000000000..281dbecb5 --- /dev/null +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -0,0 +1,77 @@ +package kinesis + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" +) + +func TestPartitionKey(t *testing.T) { + + assert := assert.New(t) + testPoint := testutil.TestMetric(1) + + k := KinesisOutput{ + Partition: &Partition{ + Method: "static", + Key: "-", + }, + } + assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") + + k = KinesisOutput{ + Partition: &Partition{ + Method: "tag", + Key: "tag1", + }, + } + assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") + + k = KinesisOutput{ + Partition: &Partition{ + Method: "tag", + Key: "doesnotexist", + }, + } + assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") + + k = KinesisOutput{ + Partition: &Partition{ + Method: "not supported", + }, + } + assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") + + k = KinesisOutput{ + Partition: &Partition{ + Method: "measurement", + }, + } + assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name") + + k = KinesisOutput{ + Partition: &Partition{ + Method: "random", + }, + } + partitionKey := k.getPartitionKey(testPoint) + u, err := uuid.FromString(partitionKey) + assert.Nil(err, "Issue parsing UUID") + assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4") + + k = KinesisOutput{ + PartitionKey: "-", + } + assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") + + k = KinesisOutput{ + RandomPartitionKey: true, + } + partitionKey = k.getPartitionKey(testPoint) + u, err = uuid.FromString(partitionKey) + assert.Nil(err, "Issue parsing UUID") + assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4") + +}