Use default partition key when tag does not exist in kinesis output (#4904)
This commit is contained in:
parent
170cddc956
commit
742a74dcf0
|
@ -71,7 +71,7 @@ All metrics will be mapped to the same shard which may limit throughput.
|
||||||
#### tag
|
#### tag
|
||||||
|
|
||||||
This will take the value of the specified tag from each metric as the paritionKey.
|
This will take the value of the specified tag from each metric as the paritionKey.
|
||||||
If the tag is not found an empty string will be used.
|
If the tag is not found the `default` value will be used or `telegraf` if unspecified
|
||||||
|
|
||||||
#### measurement
|
#### measurement
|
||||||
|
|
||||||
|
|
|
@ -36,8 +36,9 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
Partition struct {
|
Partition struct {
|
||||||
Method string `toml:"method"`
|
Method string `toml:"method"`
|
||||||
Key string `toml:"key"`
|
Key string `toml:"key"`
|
||||||
|
Default string `toml:"default"`
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -90,10 +91,11 @@ var sampleConfig = `
|
||||||
# method = "measurement"
|
# method = "measurement"
|
||||||
#
|
#
|
||||||
## Use the value of a tag for all writes, if the tag is not set the empty
|
## Use the value of a tag for all writes, if the tag is not set the empty
|
||||||
## string will be used:
|
## default option will be used. When no default, defaults to "telegraf"
|
||||||
# [outputs.kinesis.partition]
|
# [outputs.kinesis.partition]
|
||||||
# method = "tag"
|
# method = "tag"
|
||||||
# key = "host"
|
# key = "host"
|
||||||
|
# default = "mykey"
|
||||||
|
|
||||||
|
|
||||||
## Data format to output.
|
## Data format to output.
|
||||||
|
@ -187,10 +189,13 @@ func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
|
||||||
case "measurement":
|
case "measurement":
|
||||||
return metric.Name()
|
return metric.Name()
|
||||||
case "tag":
|
case "tag":
|
||||||
if metric.HasTag(k.Partition.Key) {
|
if t, ok := metric.GetTag(k.Partition.Key); ok {
|
||||||
return metric.Tags()[k.Partition.Key]
|
return t
|
||||||
|
} else if len(k.Partition.Default) > 0 {
|
||||||
|
return k.Partition.Default
|
||||||
}
|
}
|
||||||
log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key)
|
// Default partition name if default is not set
|
||||||
|
return "telegraf"
|
||||||
default:
|
default:
|
||||||
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
|
log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,13 +29,22 @@ func TestPartitionKey(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
|
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
|
||||||
|
|
||||||
|
k = KinesisOutput{
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "tag",
|
||||||
|
Key: "doesnotexist",
|
||||||
|
Default: "somedefault",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Partition: &Partition{
|
Partition: &Partition{
|
||||||
Method: "tag",
|
Method: "tag",
|
||||||
Key: "doesnotexist",
|
Key: "doesnotexist",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
|
assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Partition: &Partition{
|
Partition: &Partition{
|
||||||
|
|
Loading…
Reference in New Issue