Support StatisticValues in cloudwatch output plugin (#4364)

This commit is contained in:
david7482 2018-08-02 06:43:17 +08:00 committed by Greg
parent 66528354a5
commit 199841a820
3 changed files with 286 additions and 55 deletions

View File

@ -36,3 +36,13 @@ Examples include but are not limited to:
### namespace ### namespace
The namespace used for AWS CloudWatch metrics. The namespace used for AWS CloudWatch metrics.
### write_statistics
If you have a large amount of metrics, you should consider to send statistic
values instead of raw metrics which could not only improve performance but
also save AWS API cost. If enable this flag, this plugin would parse the required
[CloudWatch statistic fields](https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/#StatisticSet)
(count, min, max, and sum) and send them to CloudWatch. You could use `basicstats`
aggregator to calculate those fields. If not all statistic fields are available,
all fields would still be sent as raw metrics.

View File

@ -28,6 +28,128 @@ type CloudWatch struct {
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
svc *cloudwatch.CloudWatch svc *cloudwatch.CloudWatch
WriteStatistics bool `toml:"write_statistics"`
}
type statisticType int
const (
statisticTypeNone statisticType = iota
statisticTypeMax
statisticTypeMin
statisticTypeSum
statisticTypeCount
)
type cloudwatchField interface {
addValue(sType statisticType, value float64)
buildDatum() []*cloudwatch.MetricDatum
}
type statisticField struct {
metricName string
fieldName string
tags map[string]string
values map[statisticType]float64
timestamp time.Time
}
func (f *statisticField) addValue(sType statisticType, value float64) {
if sType != statisticTypeNone {
f.values[sType] = value
}
}
func (f *statisticField) buildDatum() []*cloudwatch.MetricDatum {
var datums []*cloudwatch.MetricDatum
if f.hasAllFields() {
// If we have all required fields, we build datum with StatisticValues
min, _ := f.values[statisticTypeMin]
max, _ := f.values[statisticTypeMax]
sum, _ := f.values[statisticTypeSum]
count, _ := f.values[statisticTypeCount]
datum := &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
StatisticValues: &cloudwatch.StatisticSet{
Minimum: aws.Float64(min),
Maximum: aws.Float64(max),
Sum: aws.Float64(sum),
SampleCount: aws.Float64(count),
},
}
datums = append(datums, datum)
} else {
// If we don't have all required fields, we build each field as independent datum
for sType, value := range f.values {
datum := &cloudwatch.MetricDatum{
Value: aws.Float64(value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
}
switch sType {
case statisticTypeMin:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_"))
case statisticTypeMax:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_"))
case statisticTypeSum:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_"))
case statisticTypeCount:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_"))
default:
// should not be here
continue
}
datums = append(datums, datum)
}
}
return datums
}
func (f *statisticField) hasAllFields() bool {
_, hasMin := f.values[statisticTypeMin]
_, hasMax := f.values[statisticTypeMax]
_, hasSum := f.values[statisticTypeSum]
_, hasCount := f.values[statisticTypeCount]
return hasMin && hasMax && hasSum && hasCount
}
type valueField struct {
metricName string
fieldName string
tags map[string]string
value float64
timestamp time.Time
}
func (f *valueField) addValue(sType statisticType, value float64) {
if sType == statisticTypeNone {
f.value = value
}
}
func (f *valueField) buildDatum() []*cloudwatch.MetricDatum {
return []*cloudwatch.MetricDatum{
{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Value: aws.Float64(f.value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
},
}
} }
var sampleConfig = ` var sampleConfig = `
@ -57,6 +179,14 @@ var sampleConfig = `
## Namespace for the CloudWatch MetricDatums ## Namespace for the CloudWatch MetricDatums
namespace = "InfluxData/Telegraf" namespace = "InfluxData/Telegraf"
## If you have a large amount of metrics, you should consider to send statistic
## values instead of raw metrics which could not only improve performance but
## also save AWS API cost. If enable this flag, this plugin would parse the required
## CloudWatch statistic fields (count, min, max, and sum) and send them to CloudWatch.
## You could use basicstats aggregator to calculate those fields. If not all statistic
## fields are available, all fields would still be sent as raw metrics.
# write_statistics = false
` `
func (c *CloudWatch) SampleConfig() string { func (c *CloudWatch) SampleConfig() string {
@ -104,7 +234,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error {
var datums []*cloudwatch.MetricDatum var datums []*cloudwatch.MetricDatum
for _, m := range metrics { for _, m := range metrics {
d := BuildMetricDatum(m) d := BuildMetricDatum(c.WriteStatistics, m)
datums = append(datums, d...) datums = append(datums, d...)
} }
@ -159,67 +289,58 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
return partitions return partitions
} }
// Make a MetricDatum for each field in a Point. Only fields with values that can be // Make a MetricDatum from telegraf.Metric. It would check if all required fields of
// converted to float64 are supported. Non-supported fields are skipped. // cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum { // Otherwise, fields would still been built independently.
datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum {
i := 0
var value float64 fields := make(map[string]cloudwatchField)
for k, v := range point.Fields() { for k, v := range point.Fields() {
switch t := v.(type) {
case int: val, ok := convert(v)
value = float64(t) if !ok {
case int32: // Only fields with values that can be converted to float64 (and within CloudWatch boundary) are supported.
value = float64(t) // Non-supported fields are skipped.
case int64: continue
value = float64(t) }
case uint64:
value = float64(t) sType, fieldName := getStatisticType(k)
case float64:
value = t // If statistic metric is not enabled or non-statistic type, just take current field as a value field.
case bool: if !buildStatistic || sType == statisticTypeNone {
if t { fields[k] = &valueField{
value = 1 metricName: point.Name(),
fieldName: k,
tags: point.Tags(),
timestamp: point.Time(),
value: val,
}
continue
}
// Otherwise, it shall be a statistic field.
if _, ok := fields[fieldName]; !ok {
// Hit an uncached field, create statisticField for first time
fields[fieldName] = &statisticField{
metricName: point.Name(),
fieldName: fieldName,
tags: point.Tags(),
timestamp: point.Time(),
values: map[statisticType]float64{
sType: val,
},
}
} else { } else {
value = 0 // Add new statistic value to this field
fields[fieldName].addValue(sType, val)
} }
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
datums = datums[:len(datums)-1]
continue
} }
// Do CloudWatch boundary checking var datums []*cloudwatch.MetricDatum
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html for _, f := range fields {
if math.IsNaN(value) { d := f.buildDatum()
datums = datums[:len(datums)-1] datums = append(datums, d...)
continue
}
if math.IsInf(value, 0) {
datums = datums[:len(datums)-1]
continue
}
if value > 0 && value < float64(8.515920e-109) {
datums = datums[:len(datums)-1]
continue
}
if value > float64(1.174271e+108) {
datums = datums[:len(datums)-1]
continue
}
datums[i] = &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
Value: aws.Float64(value),
Dimensions: BuildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
}
i += 1
} }
return datums return datums
@ -268,6 +389,72 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension {
return dimensions return dimensions
} }
func getStatisticType(name string) (sType statisticType, fieldName string) {
switch {
case strings.HasSuffix(name, "_max"):
sType = statisticTypeMax
fieldName = strings.TrimSuffix(name, "_max")
case strings.HasSuffix(name, "_min"):
sType = statisticTypeMin
fieldName = strings.TrimSuffix(name, "_min")
case strings.HasSuffix(name, "_sum"):
sType = statisticTypeSum
fieldName = strings.TrimSuffix(name, "_sum")
case strings.HasSuffix(name, "_count"):
sType = statisticTypeCount
fieldName = strings.TrimSuffix(name, "_count")
default:
sType = statisticTypeNone
fieldName = name
}
return
}
func convert(v interface{}) (value float64, ok bool) {
ok = true
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case uint64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
ok = false
return
}
// Do CloudWatch boundary checking
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
switch {
case math.IsNaN(value):
return 0, false
case math.IsInf(value, 0):
return 0, false
case value > 0 && value < float64(8.515920e-109):
return 0, false
case value > float64(1.174271e+108):
return 0, false
}
return
}
func init() { func init() {
outputs.Add("cloudwatch", func() telegraf.Output { outputs.Add("cloudwatch", func() telegraf.Output {
return &CloudWatch{} return &CloudWatch{}

View File

@ -5,11 +5,13 @@ import (
"math" "math"
"sort" "sort"
"testing" "testing"
"time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -72,13 +74,45 @@ func TestBuildMetricDatums(t *testing.T) {
testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108 testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108
} }
for _, point := range validMetrics { for _, point := range validMetrics {
datums := BuildMetricDatum(point) datums := BuildMetricDatum(false, point)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point)) assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point))
} }
for _, point := range invalidMetrics { for _, point := range invalidMetrics {
datums := BuildMetricDatum(point) datums := BuildMetricDatum(false, point)
assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point)) assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point))
} }
statisticMetric, _ := metric.New(
"test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
datums := BuildMetricDatum(true, statisticMetric)
assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric))
multiFieldsMetric, _ := metric.New(
"test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
datums = BuildMetricDatum(true, multiFieldsMetric)
assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric))
multiStatisticMetric, _ := metric.New(
"test1",
map[string]string{"tag1": "value1"},
map[string]interface{}{
"valueA_max": float64(10), "valueA_min": float64(0), "valueA_sum": float64(100), "valueA_count": float64(20),
"valueB_max": float64(10), "valueB_min": float64(0), "valueB_sum": float64(100), "valueB_count": float64(20),
"valueC_max": float64(10), "valueC_min": float64(0), "valueC_sum": float64(100),
"valueD": float64(10), "valueE": float64(0),
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
datums = BuildMetricDatum(true, multiStatisticMetric)
assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric))
} }
func TestPartitionDatums(t *testing.T) { func TestPartitionDatums(t *testing.T) {