Add high resolution metrics support to CloudWatch output (#6689)

This commit is contained in:
Marc Ruiz 2019-11-22 03:37:33 +01:00 committed by Daniel Nelson
parent a193f527f0
commit c7af10b159
3 changed files with 64 additions and 30 deletions

View File

@ -46,3 +46,6 @@ also save AWS API cost. If enable this flag, this plugin would parse the require
(count, min, max, and sum) and send them to CloudWatch. You could use `basicstats` (count, min, max, and sum) and send them to CloudWatch. You could use `basicstats`
aggregator to calculate those fields. If not all statistic fields are available, aggregator to calculate those fields. If not all statistic fields are available,
all fields would still be sent as raw metrics. all fields would still be sent as raw metrics.
## high_resolution_metrics
Enable high resolution metrics (1 second precision) instead of standard ones (60 seconds precision)

View File

@ -26,6 +26,7 @@ type CloudWatch struct {
EndpointURL string `toml:"endpoint_url"` EndpointURL string `toml:"endpoint_url"`
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
HighResolutionMetrics bool `toml:"high_resolution_metrics"`
svc *cloudwatch.CloudWatch svc *cloudwatch.CloudWatch
WriteStatistics bool `toml:"write_statistics"` WriteStatistics bool `toml:"write_statistics"`
@ -52,6 +53,7 @@ type statisticField struct {
tags map[string]string tags map[string]string
values map[statisticType]float64 values map[statisticType]float64
timestamp time.Time timestamp time.Time
storageResolution int64
} }
func (f *statisticField) addValue(sType statisticType, value float64) { func (f *statisticField) addValue(sType statisticType, value float64) {
@ -81,6 +83,7 @@ func (f *statisticField) buildDatum() []*cloudwatch.MetricDatum {
Sum: aws.Float64(sum), Sum: aws.Float64(sum),
SampleCount: aws.Float64(count), SampleCount: aws.Float64(count),
}, },
StorageResolution: aws.Int64(f.storageResolution),
} }
datums = append(datums, datum) datums = append(datums, datum)
@ -131,6 +134,7 @@ type valueField struct {
tags map[string]string tags map[string]string
value float64 value float64
timestamp time.Time timestamp time.Time
storageResolution int64
} }
func (f *valueField) addValue(sType statisticType, value float64) { func (f *valueField) addValue(sType statisticType, value float64) {
@ -147,6 +151,7 @@ func (f *valueField) buildDatum() []*cloudwatch.MetricDatum {
Value: aws.Float64(f.value), Value: aws.Float64(f.value),
Dimensions: BuildDimensions(f.tags), Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp), Timestamp: aws.Time(f.timestamp),
StorageResolution: aws.Int64(f.storageResolution),
}, },
} }
} }
@ -186,6 +191,9 @@ var sampleConfig = `
## You could use basicstats aggregator to calculate those fields. If not all statistic ## You could use basicstats aggregator to calculate those fields. If not all statistic
## fields are available, all fields would still be sent as raw metrics. ## fields are available, all fields would still be sent as raw metrics.
# write_statistics = false # write_statistics = false
## Enable high resolution metrics of 1 second (standard resolution metrics are 60 seconds)
## high_resolution_metrics = false
` `
func (c *CloudWatch) SampleConfig() string { func (c *CloudWatch) SampleConfig() string {
@ -220,7 +228,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error {
var datums []*cloudwatch.MetricDatum var datums []*cloudwatch.MetricDatum
for _, m := range metrics { for _, m := range metrics {
d := BuildMetricDatum(c.WriteStatistics, m) d := BuildMetricDatum(c.WriteStatistics, c.HighResolutionMetrics, m)
datums = append(datums, d...) datums = append(datums, d...)
} }
@ -278,10 +286,14 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
// Make a MetricDatum from telegraf.Metric. It would check if all required fields of // Make a MetricDatum from telegraf.Metric. It would check if all required fields of
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values. // cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
// Otherwise, fields would still been built independently. // Otherwise, fields would still been built independently.
func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum { func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point telegraf.Metric) []*cloudwatch.MetricDatum {
fields := make(map[string]cloudwatchField) fields := make(map[string]cloudwatchField)
tags := point.Tags() tags := point.Tags()
storageResolution := int64(60)
if highResolutionMetrics {
storageResolution = 1
}
for k, v := range point.Fields() { for k, v := range point.Fields() {
@ -302,6 +314,7 @@ func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.
tags: tags, tags: tags,
timestamp: point.Time(), timestamp: point.Time(),
value: val, value: val,
storageResolution: storageResolution,
} }
continue continue
} }
@ -317,6 +330,7 @@ func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.
values: map[statisticType]float64{ values: map[statisticType]float64{
sType: val, sType: val,
}, },
storageResolution: storageResolution,
} }
} else { } else {
// Add new statistic value to this field // Add new statistic value to this field

View File

@ -75,11 +75,11 @@ func TestBuildMetricDatums(t *testing.T) {
testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108 testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108
} }
for _, point := range validMetrics { for _, point := range validMetrics {
datums := BuildMetricDatum(false, point) datums := BuildMetricDatum(false, false, point)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point)) assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
} }
for _, point := range invalidMetrics { for _, point := range invalidMetrics {
datums := BuildMetricDatum(false, point) datums := BuildMetricDatum(false, false, point)
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point)) assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
} }
@ -89,7 +89,7 @@ func TestBuildMetricDatums(t *testing.T) {
map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)}, map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
datums := BuildMetricDatum(true, statisticMetric) datums := BuildMetricDatum(true, false, statisticMetric)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric)) assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))
multiFieldsMetric, _ := metric.New( multiFieldsMetric, _ := metric.New(
@ -98,7 +98,7 @@ func TestBuildMetricDatums(t *testing.T) {
map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)}, map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
datums = BuildMetricDatum(true, multiFieldsMetric) datums = BuildMetricDatum(true, false, multiFieldsMetric)
assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric)) assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric))
multiStatisticMetric, _ := metric.New( multiStatisticMetric, _ := metric.New(
@ -112,10 +112,27 @@ func TestBuildMetricDatums(t *testing.T) {
}, },
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
datums = BuildMetricDatum(true, multiStatisticMetric) datums = BuildMetricDatum(true, false, multiStatisticMetric)
assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric)) assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric))
} }
func TestMetricDatumResolution(t *testing.T) {
const expectedStandardResolutionValue = int64(60)
const expectedHighResolutionValue = int64(1)
assert := assert.New(t)
metric := testutil.TestMetric(1)
standardResolutionDatum := BuildMetricDatum(false, false, metric)
actualStandardResolutionValue := *standardResolutionDatum[0].StorageResolution
assert.Equal(expectedStandardResolutionValue, actualStandardResolutionValue)
highResolutionDatum := BuildMetricDatum(false, true, metric)
actualHighResolutionValue := *highResolutionDatum[0].StorageResolution
assert.Equal(expectedHighResolutionValue, actualHighResolutionValue)
}
func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) { func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) {
input := testutil.MustMetric( input := testutil.MustMetric(
"cpu", "cpu",
@ -129,7 +146,7 @@ func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) {
time.Unix(0, 0), time.Unix(0, 0),
) )
datums := BuildMetricDatum(true, input) datums := BuildMetricDatum(true, false, input)
require.Len(t, datums[0].Dimensions, 1) require.Len(t, datums[0].Dimensions, 1)
} }