Add support for sharding based on metric name (#3170)
This commit is contained in:
@@ -15,23 +15,31 @@ import (
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
)
|
||||
|
||||
type KinesisOutput struct {
|
||||
Region string `toml:"region"`
|
||||
AccessKey string `toml:"access_key"`
|
||||
SecretKey string `toml:"secret_key"`
|
||||
RoleARN string `toml:"role_arn"`
|
||||
Profile string `toml:"profile"`
|
||||
Filename string `toml:"shared_credential_file"`
|
||||
Token string `toml:"token"`
|
||||
type (
|
||||
KinesisOutput struct {
|
||||
Region string `toml:"region"`
|
||||
AccessKey string `toml:"access_key"`
|
||||
SecretKey string `toml:"secret_key"`
|
||||
RoleARN string `toml:"role_arn"`
|
||||
Profile string `toml:"profile"`
|
||||
Filename string `toml:"shared_credential_file"`
|
||||
Token string `toml:"token"`
|
||||
|
||||
StreamName string `toml:"streamname"`
|
||||
PartitionKey string `toml:"partitionkey"`
|
||||
RandomPartitionKey bool `toml:"use_random_partitionkey"`
|
||||
Debug bool `toml:"debug"`
|
||||
svc *kinesis.Kinesis
|
||||
StreamName string `toml:"streamname"`
|
||||
PartitionKey string `toml:"partitionkey"`
|
||||
RandomPartitionKey bool `toml:"use_random_partitionkey"`
|
||||
Partition *Partition `toml:"partition"`
|
||||
Debug bool `toml:"debug"`
|
||||
svc *kinesis.Kinesis
|
||||
|
||||
serializer serializers.Serializer
|
||||
}
|
||||
serializer serializers.Serializer
|
||||
}
|
||||
|
||||
Partition struct {
|
||||
Method string `toml:"method"`
|
||||
Key string `toml:"key"`
|
||||
}
|
||||
)
|
||||
|
||||
var sampleConfig = `
|
||||
## Amazon REGION of kinesis endpoint.
|
||||
@@ -54,12 +62,32 @@ var sampleConfig = `
|
||||
|
||||
## Kinesis StreamName must exist prior to starting telegraf.
|
||||
streamname = "StreamName"
|
||||
## PartitionKey as used for sharding data.
|
||||
## DEPRECATED: PartitionKey as used for sharding data.
|
||||
partitionkey = "PartitionKey"
|
||||
## If set the paritionKey will be a random UUID on every put.
|
||||
## DEPRECATED: If set the paritionKey will be a random UUID on every put.
|
||||
## This allows for scaling across multiple shards in a stream.
|
||||
## This will cause issues with ordering.
|
||||
use_random_partitionkey = false
|
||||
## The partition key can be calculated using one of several methods:
|
||||
##
|
||||
## Use a static value for all writes:
|
||||
# [outputs.kinesis.partition]
|
||||
# method = "static"
|
||||
# key = "howdy"
|
||||
#
|
||||
## Use a random partition key on each write:
|
||||
# [outputs.kinesis.partition]
|
||||
# method = "random"
|
||||
#
|
||||
## Use the measurement name as the partition key:
|
||||
# [outputs.kinesis.partition]
|
||||
# method = "measurement"
|
||||
#
|
||||
## Use the value of a tag for all writes, if the tag is not set the empty
|
||||
## string will be used:
|
||||
# [outputs.kinesis.partition]
|
||||
# method = "tag"
|
||||
# key = "host"
|
||||
|
||||
|
||||
## Data format to output.
|
||||
@@ -129,6 +157,9 @@ func (k *KinesisOutput) Connect() error {
|
||||
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
|
||||
os.Exit(1)
|
||||
}
|
||||
if k.Partition == nil {
|
||||
log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -163,6 +194,32 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
|
||||
return time.Since(start)
|
||||
}
|
||||
|
||||
func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
|
||||
if k.Partition != nil {
|
||||
switch k.Partition.Method {
|
||||
case "static":
|
||||
return k.Partition.Key
|
||||
case "random":
|
||||
u := uuid.NewV4()
|
||||
return u.String()
|
||||
case "measurement":
|
||||
return metric.Name()
|
||||
case "tag":
|
||||
if metric.HasTag(k.Partition.Key) {
|
||||
return metric.Tags()[k.Partition.Key]
|
||||
}
|
||||
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
|
||||
default:
|
||||
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
|
||||
}
|
||||
}
|
||||
if k.RandomPartitionKey {
|
||||
u := uuid.NewV4()
|
||||
return u.String()
|
||||
}
|
||||
return k.PartitionKey
|
||||
}
|
||||
|
||||
func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||
var sz uint32
|
||||
|
||||
@@ -180,11 +237,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||
return err
|
||||
}
|
||||
|
||||
partitionKey := k.PartitionKey
|
||||
if k.RandomPartitionKey {
|
||||
u := uuid.NewV4()
|
||||
partitionKey = u.String()
|
||||
}
|
||||
partitionKey := k.getPartitionKey(metric)
|
||||
|
||||
d := kinesis.PutRecordsRequestEntry{
|
||||
Data: values,
|
||||
@@ -202,8 +255,10 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
writekinesis(k, r)
|
||||
if sz > 0 {
|
||||
elapsed := writekinesis(k, r)
|
||||
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user