From e0dc1ef5bd1c424aadbcb0c58029fe67df32cb33 Mon Sep 17 00:00:00 2001 From: Stephen Kwong Date: Mon, 18 Jan 2016 11:39:14 -0800 Subject: [PATCH] Add Cloudwatch output closes #553 --- CHANGELOG.md | 1 + README.md | 3 +- plugins/outputs/all/all.go | 1 + plugins/outputs/cloudwatch/README.md | 33 +++ plugins/outputs/cloudwatch/cloudwatch.go | 236 ++++++++++++++++++ plugins/outputs/cloudwatch/cloudwatch_test.go | 88 +++++++ 6 files changed, 361 insertions(+), 1 deletion(-) create mode 100644 plugins/outputs/cloudwatch/README.md create mode 100644 plugins/outputs/cloudwatch/cloudwatch.go create mode 100644 plugins/outputs/cloudwatch/cloudwatch_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eaf32638f..2cc1308b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [#551](https://github.com/influxdata/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable. - [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering. - [#484](https://github.com/influxdata/telegraf/issues/484): Include usage percent with procstat metrics. +- [#553](https://github.com/influxdata/telegraf/pull/553): Amazon CloudWatch output. thanks @skwong2! ### Bugfixes - [#506](https://github.com/influxdata/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/README.md b/README.md index 5a997d363..89d26faad 100644 --- a/README.md +++ b/README.md @@ -189,10 +189,11 @@ want to add support for another service or third-party API. * influxdb * amon * amqp +* aws kinesis +* aws cloudwatch * datadog * graphite * kafka -* amazon kinesis * librato * mqtt * nsq diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index e05d53acc..ac8357c90 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -3,6 +3,7 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amqp" + _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" diff --git a/plugins/outputs/cloudwatch/README.md b/plugins/outputs/cloudwatch/README.md new file mode 100644 index 000000000..853d038c3 --- /dev/null +++ b/plugins/outputs/cloudwatch/README.md @@ -0,0 +1,33 @@ +## Amazon CloudWatch Output for Telegraf + +This plugin will send points to Amazon CloudWatch. + +## Amazon Authentication + +This plugin uses a credential chain for Authentication with the CloudWatch +API endpoint. In the following order the plugin will attempt to authenticate. +1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) +2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) +3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) + +## Config + +For this output plugin to function correctly the following variables +must be configured. + +* region +* namespace + +### region + +The region is the Amazon region that you wish to connect to. +Examples include but are not limited to: +* us-west-1 +* us-west-2 +* us-east-1 +* ap-southeast-1 +* ap-southeast-2 + +### namespace + +The namespace used for AWS CloudWatch metrics. diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go new file mode 100644 index 000000000..1e20836da --- /dev/null +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -0,0 +1,236 @@ +package cloudwatch + +import ( + "log" + "math" + "sort" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type CloudWatch struct { + Region string // AWS Region + Namespace string // CloudWatch Metrics Namespace + svc *cloudwatch.CloudWatch +} + +var sampleConfig = ` + # Amazon REGION + region = 'us-east-1' + + # Namespace for the CloudWatch MetricDatums + namespace = 'InfluxData/Telegraf' +` + +func (c *CloudWatch) SampleConfig() string { + return sampleConfig +} + +func (c *CloudWatch) Description() string { + return "Configuration for AWS CloudWatch output." +} + +func (c *CloudWatch) Connect() error { + Config := &aws.Config{ + Region: aws.String(c.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + + svc := cloudwatch.New(session.New(Config)) + + params := &cloudwatch.ListMetricsInput{ + Namespace: aws.String(c.Namespace), + } + + _, err := svc.ListMetrics(params) // Try a read-only call to test connection. + + if err != nil { + log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error()) + } + + c.svc = svc + + return err +} + +func (c *CloudWatch) Close() error { + return nil +} + +func (c *CloudWatch) Write(points []*client.Point) error { + for _, pt := range points { + err := c.WriteSinglePoint(pt) + if err != nil { + return err + } + } + + return nil +} + +// Write data for a single point. A point can have many fields and one field +// is equal to one MetricDatum. There is a limit on how many MetricDatums a +// request can have so we process one Point at a time. +func (c *CloudWatch) WriteSinglePoint(point *client.Point) error { + datums := BuildMetricDatum(point) + + const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call + + for _, partition := range PartitionDatums(maxDatumsPerCall, datums) { + err := c.WriteToCloudWatch(partition) + + if err != nil { + return err + } + } + + return nil +} + +func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error { + params := &cloudwatch.PutMetricDataInput{ + MetricData: datums, + Namespace: aws.String(c.Namespace), + } + + _, err := c.svc.PutMetricData(params) + + if err != nil { + log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error()) + } + + return err +} + +// Partition the MetricDatums into smaller slices of a max size so that are under the limit +// for the AWS API calls. +func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum { + + numberOfPartitions := len(datums) / size + if len(datums)%size != 0 { + numberOfPartitions += 1 + } + + partitions := make([][]*cloudwatch.MetricDatum, numberOfPartitions) + + for i := 0; i < numberOfPartitions; i++ { + start := size * i + end := size * (i + 1) + if end > len(datums) { + end = len(datums) + } + + partitions[i] = datums[start:end] + } + + return partitions +} + +// Make a MetricDatum for each field in a Point. Only fields with values that can be +// converted to float64 are supported. Non-supported fields are skipped. +func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum { + datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) + i := 0 + + var value float64 + + for k, v := range point.Fields() { + switch t := v.(type) { + case int: + value = float64(t) + case int32: + value = float64(t) + case int64: + value = float64(t) + case float64: + value = t + case bool: + if t { + value = 1 + } else { + value = 0 + } + case time.Time: + value = float64(t.Unix()) + default: + // Skip unsupported type. + datums = datums[:len(datums)-1] + continue + } + + datums[i] = &cloudwatch.MetricDatum{ + MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")), + Value: aws.Float64(value), + Dimensions: BuildDimensions(point.Tags()), + Timestamp: aws.Time(point.Time()), + } + + i += 1 + } + + return datums +} + +// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to +// 10 dimensions per metric so we only keep up to the first 10 alphabetically. +// This always includes the "host" tag if it exists. +func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension { + + const MaxDimensions = 10 + dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions))) + + i := 0 + + // This is pretty ugly but we always want to include the "host" tag if it exists. + if host, ok := ptTags["host"]; ok { + dimensions[i] = &cloudwatch.Dimension{ + Name: aws.String("host"), + Value: aws.String(host), + } + i += 1 + } + + var keys []string + for k := range ptTags { + if k != "host" { + keys = append(keys, k) + } + } + sort.Strings(keys) + + for _, k := range keys { + if i >= MaxDimensions { + break + } + + dimensions[i] = &cloudwatch.Dimension{ + Name: aws.String(k), + Value: aws.String(ptTags[k]), + } + + i += 1 + } + + return dimensions +} + +func init() { + outputs.Add("cloudwatch", func() outputs.Output { + return &CloudWatch{} + }) +} diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go new file mode 100644 index 000000000..2041e14fd --- /dev/null +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -0,0 +1,88 @@ +package cloudwatch + +import ( + "sort" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatch" + + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +// Test that each tag becomes one dimension +func TestBuildDimensions(t *testing.T) { + const MaxDimensions = 10 + + assert := assert.New(t) + + testPoint := testutil.TestPoint(1) + dimensions := BuildDimensions(testPoint.Tags()) + + tagKeys := make([]string, len(testPoint.Tags())) + i := 0 + for k, _ := range testPoint.Tags() { + tagKeys[i] = k + i += 1 + } + + sort.Strings(tagKeys) + + if len(testPoint.Tags()) >= MaxDimensions { + assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions") + } else { + assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags") + } + + for i, key := range tagKeys { + if i >= 10 { + break + } + assert.Equal(key, *dimensions[i].Name, "Key should be equal") + assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal") + } +} + +// Test that points with valid values have a MetricDatum created where as non valid do not. +// Skips "time.Time" type as something is converting the value to string. +func TestBuildMetricDatums(t *testing.T) { + assert := assert.New(t) + + validPoints := []*client.Point{ + testutil.TestPoint(1), + testutil.TestPoint(int32(1)), + testutil.TestPoint(int64(1)), + testutil.TestPoint(float64(1)), + testutil.TestPoint(true), + } + + for _, point := range validPoints { + datums := BuildMetricDatum(point) + assert.Equal(1, len(datums), "Valid type should create a Datum") + } + + nonValidPoint := testutil.TestPoint("Foo") + + assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum") +} + +func TestPartitionDatums(t *testing.T) { + + assert := assert.New(t) + + testDatum := cloudwatch.MetricDatum{ + MetricName: aws.String("Foo"), + Value: aws.Float64(1), + } + + oneDatum := []*cloudwatch.MetricDatum{&testDatum} + twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum} + threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum} + + assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) + assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum)) + assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum)) +}