Use own partitioning and add unit tests

This commit is contained in:
Stephen Kwong 2016-01-20 17:09:46 -08:00
parent 881af1f078
commit 02ee480396
2 changed files with 124 additions and 11 deletions

View File

@ -16,8 +16,6 @@ import (
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/plugins/outputs"
"github.com/meirf/gopart"
)
type CloudWatch struct {
@ -89,12 +87,12 @@ func (c *CloudWatch) Write(points []*client.Point) error {
// is equal to one MetricDatum. There is a limit on how many MetricDatums a
// request can have so we process one Point at a time.
func (c *CloudWatch) WriteSinglePoint(point *client.Point) error {
datums := buildMetricDatum(point)
datums := BuildMetricDatum(point)
const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call
for idxRange := range gopart.Partition(len(datums), maxDatumsPerCall) {
err := c.WriteToCloudWatch(datums[idxRange.Low:idxRange.High])
for _, partition := range PartitionDatums(maxDatumsPerCall, datums) {
err := c.WriteToCloudWatch(partition)
if err != nil {
return err
@ -119,9 +117,33 @@ func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error {
return err
}
// Partition the MetricDatums into smaller slices of a max size so that are under the limit
// for the AWS API calls.
func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum {
numberOfPartitions := len(datums) / size
if len(datums)%size != 0 {
numberOfPartitions += 1
}
partitions := make([][]*cloudwatch.MetricDatum, numberOfPartitions)
for i := 0; i < numberOfPartitions; i++ {
start := size * i
end := size * (i + 1)
if end > len(datums) {
end = len(datums)
}
partitions[i] = datums[start:end]
}
return partitions
}
// Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped.
func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0
@ -154,7 +176,7 @@ func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
datums[i] = &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
Value: aws.Float64(value),
Dimensions: buildDimensions(point.Tags()),
Dimensions: BuildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
}
@ -167,7 +189,7 @@ func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
// This always includes the "host" tag if it exists.
func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
const MaxDimensions = 10
dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions)))
@ -180,18 +202,19 @@ func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
Name: aws.String("host"),
Value: aws.String(host),
}
delete(ptTags, "host")
i += 1
}
var keys []string
for k := range ptTags {
keys = append(keys, k)
if k != "host" {
keys = append(keys, k)
}
}
sort.Strings(keys)
for _, k := range keys {
if i <= MaxDimensions {
if i >= MaxDimensions {
break
}
@ -199,6 +222,8 @@ func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {
Name: aws.String(k),
Value: aws.String(ptTags[k]),
}
i += 1
}
return dimensions

View File

@ -0,0 +1,88 @@
package cloudwatch
import (
"sort"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdb/influxdb/client/v2"
"github.com/stretchr/testify/assert"
)
// Test that each tag becomes one dimension
func TestBuildDimensions(t *testing.T) {
const MaxDimensions = 10
assert := assert.New(t)
testPoint := testutil.TestPoint(1)
dimensions := BuildDimensions(testPoint.Tags())
tagKeys := make([]string, len(testPoint.Tags()))
i := 0
for k, _ := range testPoint.Tags() {
tagKeys[i] = k
i += 1
}
sort.Strings(tagKeys)
if len(testPoint.Tags()) >= MaxDimensions {
assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions")
} else {
assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags")
}
for i, key := range tagKeys {
if i >= 10 {
break
}
assert.Equal(key, *dimensions[i].Name, "Key should be equal")
assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal")
}
}
// Test that points with valid values have a MetricDatum created where as non valid do not.
// Skips "time.Time" type as something is converting the value to string.
func TestBuildMetricDatums(t *testing.T) {
assert := assert.New(t)
validPoints := []*client.Point{
testutil.TestPoint(1),
testutil.TestPoint(int32(1)),
testutil.TestPoint(int64(1)),
testutil.TestPoint(float64(1)),
testutil.TestPoint(true),
}
for _, point := range validPoints {
datums := BuildMetricDatum(point)
assert.Equal(1, len(datums), "Valid type should create a Datum")
}
nonValidPoint := testutil.TestPoint("Foo")
assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum")
}
func TestPartitionDatums(t *testing.T) {
assert := assert.New(t)
testDatum := cloudwatch.MetricDatum{
MetricName: aws.String("Foo"),
Value: aws.Float64(1),
}
oneDatum := []*cloudwatch.MetricDatum{&testDatum}
twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum}
threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum}
assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum))
}