Add support for sharding based on metric name (#3170)

This commit is contained in:
Nevins 2017-08-28 19:24:38 -04:00 committed by Daniel Nelson
parent 914a8132b5
commit 77c6089634
3 changed files with 181 additions and 27 deletions

View File

@ -27,7 +27,6 @@ For this output plugin to function correctly the following variables must be con
* region * region
* streamname * streamname
* partitionkey
### region ### region
@ -44,17 +43,40 @@ The streamname is used by the plugin to ensure that data is sent to the correct
note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the
plugin will result in telegraf exiting with an exit code of 1. plugin will result in telegraf exiting with an exit code of 1.
### partitionkey ### partitionkey [DEPRECATED]
This is used to group data within a stream. Currently this plugin only supports a single partitionkey. This is used to group data within a stream. Currently this plugin only supports a single partitionkey.
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out. solution to scale out.
### use_random_partitionkey ### use_random_partitionkey [DEPRECATED]
When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards. When true a random UUID will be generated and used as the partitionkey when sending data to Kinesis. This allows data to evenly spread across multiple shards in the stream. Due to using a random paritionKey there can be no guarantee of ordering when consuming the data off the shards.
If true then the partitionkey option will be ignored. If true then the partitionkey option will be ignored.
### partition
This is used to group data within a stream. Currently four methods are supported: random, static, tag or measurement
#### random
This will generate a UUIDv4 for each metric to spread them across shards.
Any guarantee of ordering is lost with this method
#### static
This uses a static string as a partitionkey.
All metrics will be mapped to the same shard which may limit throughput.
#### tag
This will take the value of the specified tag from each metric as the paritionKey.
If the tag is not found an empty string will be used.
#### measurement
This will use the measurement's name as the partitionKey.
### format ### format
The format configuration value has been designated to allow people to change the format of the Point as written to The format configuration value has been designated to allow people to change the format of the Point as written to

View File

@ -15,7 +15,8 @@ import (
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
) )
type KinesisOutput struct { type (
KinesisOutput struct {
Region string `toml:"region"` Region string `toml:"region"`
AccessKey string `toml:"access_key"` AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"` SecretKey string `toml:"secret_key"`
@ -27,12 +28,19 @@ type KinesisOutput struct {
StreamName string `toml:"streamname"` StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"` PartitionKey string `toml:"partitionkey"`
RandomPartitionKey bool `toml:"use_random_partitionkey"` RandomPartitionKey bool `toml:"use_random_partitionkey"`
Partition *Partition `toml:"partition"`
Debug bool `toml:"debug"` Debug bool `toml:"debug"`
svc *kinesis.Kinesis svc *kinesis.Kinesis
serializer serializers.Serializer serializer serializers.Serializer
} }
Partition struct {
Method string `toml:"method"`
Key string `toml:"key"`
}
)
var sampleConfig = ` var sampleConfig = `
## Amazon REGION of kinesis endpoint. ## Amazon REGION of kinesis endpoint.
region = "ap-southeast-2" region = "ap-southeast-2"
@ -54,12 +62,32 @@ var sampleConfig = `
## Kinesis StreamName must exist prior to starting telegraf. ## Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName" streamname = "StreamName"
## PartitionKey as used for sharding data. ## DEPRECATED: PartitionKey as used for sharding data.
partitionkey = "PartitionKey" partitionkey = "PartitionKey"
## If set the paritionKey will be a random UUID on every put. ## DEPRECATED: If set the paritionKey will be a random UUID on every put.
## This allows for scaling across multiple shards in a stream. ## This allows for scaling across multiple shards in a stream.
## This will cause issues with ordering. ## This will cause issues with ordering.
use_random_partitionkey = false use_random_partitionkey = false
## The partition key can be calculated using one of several methods:
##
## Use a static value for all writes:
# [outputs.kinesis.partition]
# method = "static"
# key = "howdy"
#
## Use a random partition key on each write:
# [outputs.kinesis.partition]
# method = "random"
#
## Use the measurement name as the partition key:
# [outputs.kinesis.partition]
# method = "measurement"
#
## Use the value of a tag for all writes, if the tag is not set the empty
## string will be used:
# [outputs.kinesis.partition]
# method = "tag"
# key = "host"
## Data format to output. ## Data format to output.
@ -129,6 +157,9 @@ func (k *KinesisOutput) Connect() error {
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1) os.Exit(1)
} }
if k.Partition == nil {
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
}
return err return err
} }
@ -163,6 +194,32 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
return time.Since(start) return time.Since(start)
} }
func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
if k.Partition != nil {
switch k.Partition.Method {
case "static":
return k.Partition.Key
case "random":
u := uuid.NewV4()
return u.String()
case "measurement":
return metric.Name()
case "tag":
if metric.HasTag(k.Partition.Key) {
return metric.Tags()[k.Partition.Key]
}
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
default:
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
}
}
if k.RandomPartitionKey {
u := uuid.NewV4()
return u.String()
}
return k.PartitionKey
}
func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
var sz uint32 var sz uint32
@ -180,11 +237,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
return err return err
} }
partitionKey := k.PartitionKey partitionKey := k.getPartitionKey(metric)
if k.RandomPartitionKey {
u := uuid.NewV4()
partitionKey = u.String()
}
d := kinesis.PutRecordsRequestEntry{ d := kinesis.PutRecordsRequestEntry{
Data: values, Data: values,
@ -202,8 +255,10 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
} }
} }
if sz > 0 {
writekinesis(k, r) elapsed := writekinesis(k, r)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
}
return nil return nil
} }

View File

@ -0,0 +1,77 @@
package kinesis
import (
"testing"
"github.com/influxdata/telegraf/testutil"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)
func TestPartitionKey(t *testing.T) {
assert := assert.New(t)
testPoint := testutil.TestMetric(1)
k := KinesisOutput{
Partition: &Partition{
Method: "static",
Key: "-",
},
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "tag1",
},
}
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
k = KinesisOutput{
Partition: &Partition{
Method: "tag",
Key: "doesnotexist",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
k = KinesisOutput{
Partition: &Partition{
Method: "not supported",
},
}
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
k = KinesisOutput{
Partition: &Partition{
Method: "measurement",
},
}
assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")
k = KinesisOutput{
Partition: &Partition{
Method: "random",
},
}
partitionKey := k.getPartitionKey(testPoint)
u, err := uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")
k = KinesisOutput{
PartitionKey: "-",
}
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
k = KinesisOutput{
RandomPartitionKey: true,
}
partitionKey = k.getPartitionKey(testPoint)
u, err = uuid.FromString(partitionKey)
assert.Nil(err, "Issue parsing UUID")
assert.Equal(uint(4), u.Version(), "PartitionKey should be UUIDv4")
}