diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index cb8375d09..9c72af7db 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -16,8 +16,6 @@ import ( "github.com/influxdb/influxdb/client/v2" "github.com/influxdb/telegraf/plugins/outputs" - - "github.com/meirf/gopart" ) type CloudWatch struct { @@ -89,12 +87,12 @@ func (c *CloudWatch) Write(points []*client.Point) error { // is equal to one MetricDatum. There is a limit on how many MetricDatums a // request can have so we process one Point at a time. func (c *CloudWatch) WriteSinglePoint(point *client.Point) error { - datums := buildMetricDatum(point) + datums := BuildMetricDatum(point) const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call - for idxRange := range gopart.Partition(len(datums), maxDatumsPerCall) { - err := c.WriteToCloudWatch(datums[idxRange.Low:idxRange.High]) + for _, partition := range PartitionDatums(maxDatumsPerCall, datums) { + err := c.WriteToCloudWatch(partition) if err != nil { return err @@ -119,9 +117,33 @@ func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error { return err } +// Partition the MetricDatums into smaller slices of a max size so that are under the limit +// for the AWS API calls. +func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum { + + numberOfPartitions := len(datums) / size + if len(datums)%size != 0 { + numberOfPartitions += 1 + } + + partitions := make([][]*cloudwatch.MetricDatum, numberOfPartitions) + + for i := 0; i < numberOfPartitions; i++ { + start := size * i + end := size * (i + 1) + if end > len(datums) { + end = len(datums) + } + + partitions[i] = datums[start:end] + } + + return partitions +} + // Make a MetricDatum for each field in a Point. Only fields with values that can be // converted to float64 are supported. Non-supported fields are skipped. -func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { +func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) i := 0 @@ -154,7 +176,7 @@ func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { datums[i] = &cloudwatch.MetricDatum{ MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")), Value: aws.Float64(value), - Dimensions: buildDimensions(point.Tags()), + Dimensions: BuildDimensions(point.Tags()), Timestamp: aws.Time(point.Time()), } @@ -167,7 +189,7 @@ func buildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { // Make a list of Dimensions by using a Point's tags. CloudWatch supports up to // 10 dimensions per metric so we only keep up to the first 10 alphabetically. // This always includes the "host" tag if it exists. -func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { +func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { const MaxDimensions = 10 dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions))) @@ -180,18 +202,19 @@ func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { Name: aws.String("host"), Value: aws.String(host), } - delete(ptTags, "host") i += 1 } var keys []string for k := range ptTags { - keys = append(keys, k) + if k != "host" { + keys = append(keys, k) + } } sort.Strings(keys) for _, k := range keys { - if i <= MaxDimensions { + if i >= MaxDimensions { break } @@ -199,6 +222,8 @@ func buildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { Name: aws.String(k), Value: aws.String(ptTags[k]), } + + i += 1 } return dimensions diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go new file mode 100644 index 000000000..3d1f5c4db --- /dev/null +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -0,0 +1,88 @@ +package cloudwatch + +import ( + "sort" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatch" + + "github.com/influxdata/telegraf/testutil" + "github.com/influxdb/influxdb/client/v2" + + "github.com/stretchr/testify/assert" +) + +// Test that each tag becomes one dimension +func TestBuildDimensions(t *testing.T) { + const MaxDimensions = 10 + + assert := assert.New(t) + + testPoint := testutil.TestPoint(1) + dimensions := BuildDimensions(testPoint.Tags()) + + tagKeys := make([]string, len(testPoint.Tags())) + i := 0 + for k, _ := range testPoint.Tags() { + tagKeys[i] = k + i += 1 + } + + sort.Strings(tagKeys) + + if len(testPoint.Tags()) >= MaxDimensions { + assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions") + } else { + assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags") + } + + for i, key := range tagKeys { + if i >= 10 { + break + } + assert.Equal(key, *dimensions[i].Name, "Key should be equal") + assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal") + } +} + +// Test that points with valid values have a MetricDatum created where as non valid do not. +// Skips "time.Time" type as something is converting the value to string. +func TestBuildMetricDatums(t *testing.T) { + assert := assert.New(t) + + validPoints := []*client.Point{ + testutil.TestPoint(1), + testutil.TestPoint(int32(1)), + testutil.TestPoint(int64(1)), + testutil.TestPoint(float64(1)), + testutil.TestPoint(true), + } + + for _, point := range validPoints { + datums := BuildMetricDatum(point) + assert.Equal(1, len(datums), "Valid type should create a Datum") + } + + nonValidPoint := testutil.TestPoint("Foo") + + assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum") +} + +func TestPartitionDatums(t *testing.T) { + + assert := assert.New(t) + + testDatum := cloudwatch.MetricDatum{ + MetricName: aws.String("Foo"), + Value: aws.Float64(1), + } + + oneDatum := []*cloudwatch.MetricDatum{&testDatum} + twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum} + threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum} + + assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) + assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum)) + assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum)) +}