diff --git a/Godeps b/Godeps index 6a0a17df1..1fef5fd4b 100644 --- a/Godeps +++ b/Godeps @@ -2,7 +2,7 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63 github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 -github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 +github.com/aws/aws-sdk-go 1355b456f6ba4a8453249e0dc7743f16ccac362d github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99 github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1 @@ -41,7 +41,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/nats-io/nats ea8b4fd12ebb823073c0004b9f09ac8748f4f165 github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715 github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980 -github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8 github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831 github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37 diff --git a/README.md b/README.md index b11412065..8156b9250 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,11 @@ Telegraf can also collect metrics via the following service plugins: ## Processor Plugins +* [aws_metadata](./plugins/processors/aws) + * [aws_metadata_ec2](./plugins/processors/aws/ec2) + * [aws_metadata_elb](./plugins/processors/aws/elb) + * [aws_metadata_rds](./plugins/processors/aws/rds) + * [aws_metadata_sqs](./plugins/processors/aws/sqs) * [printer](./plugins/processors/printer) ## Aggregator Plugins diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 462298f6b..de14c7411 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -1,5 +1,9 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/processors/aws/ec2" + _ "github.com/influxdata/telegraf/plugins/processors/aws/elb" + _ "github.com/influxdata/telegraf/plugins/processors/aws/rds" + _ "github.com/influxdata/telegraf/plugins/processors/aws/sqs" _ "github.com/influxdata/telegraf/plugins/processors/printer" ) diff --git a/plugins/processors/aws/README.md b/plugins/processors/aws/README.md new file mode 100644 index 000000000..31629aaed --- /dev/null +++ b/plugins/processors/aws/README.md @@ -0,0 +1,55 @@ +# AWS Metadata Processor Plugins + +A series of plugins that extract additional metadata from AWS to annotate metrics. + +### Configuration: + +Each processor is scoped to a single AWS service (e.g. EC2, ELB, etc). + +The processor plugins each share a common configuration pattern for +configuring AWS credentials and basic processing information. + +```toml +## Common AWS credential configuration + +## Amazon Region (required) +region = "us-east-1" + +## Amazon Credentials +## Credentials are loaded in the following order +## 1) Assumed credentials via STS if role_arn is specified +## 2) explicit credentials from 'access_key' and 'secret_key' +## 3) shared profile from 'profile' +## 4) environment variables +## 5) shared credentials file +## 6) EC2 Instance Profile +#access_key = "" +#secret_key = "" +#token = "" +#role_arn = "" +#profile = "" +#shared_credential_file = "" + +## Common processing configuration + +## Specify the TTL for metadata lookups +#cache_ttl = "1h" + +## Specify the metric names to annotate with this processor +## By default the processor is configured to process the associated metric name from the Cloudwatch input plugin +#metric_names = [ "cloudwatch_aws_ec2" ] + +## Specify the metric tag which contains the AWS resource ID +## By default the plugin is configured to find the resource's ID in the tag created by the Cloudwatch input plugin +#id = "instance_id" + +## Plugin specific configuration +## Configure specific annotations available for this processor +``` + +### Processor Plugins: + +* [aws_metadata_ec2](./ec2) +* [aws_metadata_elb](./elb) +* [aws_metadata_rds](./rds) +* [aws_metadata_sqs](./sqs) diff --git a/plugins/processors/aws/ec2/README.md b/plugins/processors/aws/ec2/README.md new file mode 100644 index 000000000..f8ee84a64 --- /dev/null +++ b/plugins/processors/aws/ec2/README.md @@ -0,0 +1,40 @@ +# EC2 Metadata Processor Plugin + +The EC2 Metadata processor plugin appends additional metadata from AWS to metrics associated with EC2 instances. + +### Configuration: + +```toml +## Annotate metrics from the cloudwatch plugin +[[processors.aws_metadata_ec2]] + +## Specify the Amazon Region to operate in +region = "us-east-1" + +## Specify the TTL for metadata lookups +#cache_ttl = "1h" + +## Process metrics from a Cloudwatch input plugin configured for the AWS/EC2 namespace +## Default is "cloudwatch_aws_ec2" +#metric_names = ["cloudwatch_aws_ec2"] + +## Metric tag that contains the EC2 Instance ID +#id = "instance_id" + +## Annotate metrics with the EC2 Instance type +#instance_type = true + +## Annotate metrics with the AMI ID +#ami_id = true + +## Annotate metrics with EC2 Tags +#tags = [ "Name" ] +``` + +### Tags: + +The plugin applies the following tags to metrics when configured: + +* `instance_type` - the EC2 Instance Type for the instance +* `ami_id` - the AMI ID used by the instance +* Tags - for each configured tag name in the plugin, appends a tag if the EC2 Instance has a corresponding tag of that name diff --git a/plugins/processors/aws/ec2/ec2.go b/plugins/processors/aws/ec2/ec2.go new file mode 100644 index 000000000..c6a64e3de --- /dev/null +++ b/plugins/processors/aws/ec2/ec2.go @@ -0,0 +1,213 @@ +package ec2 + +import ( + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/aws/utils" +) + +type ( + EC2 struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + + CacheTTL internal.Duration `toml:"cache_ttl"` + MetricNames []string `toml:"metric_names"` + Id string `toml:"id"` + InstanceType bool `toml:"instance_type"` + AmiId bool `toml:"ami_id"` + Tags []string `toml:"tags"` + + client EC2Client + } + + EC2Client interface { + DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) + } + + CachingEC2Client struct { + client EC2Client + ttl time.Duration + fetched time.Time + data map[string]*ec2.DescribeInstancesOutput + } +) + +func (e *CachingEC2Client) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) { + id := *input.InstanceIds[0] + if e.data == nil { + e.data = map[string]*ec2.DescribeInstancesOutput{} + } + if e.fetched.IsZero() { + e.fetched = time.Now() + } + if time.Since(e.fetched) >= e.ttl { + e.data = map[string]*ec2.DescribeInstancesOutput{} + } + if _, ok := e.data[id]; !ok { + response, err := e.client.DescribeInstances(input) + if err != nil { + return nil, err + } + e.data[id] = response + } + return e.data[id], nil +} + +var sampleConfig = ` + ## Amazon Region + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + ## Specify the TTL for metadata lookups + #cache_ttl = "1h" + + ## Specify the metric names to annotate with EC2 metadata + ## By default is configured for "cloudwatch_aws_ec2", the default output from the Cloudwatch input plugin + #metric_names = [ "cloudwatch_aws_ec2" ] + + ## Specify the metric tag which contains the EC2 Instance ID + ## By default is configured for "instance_id", the default from Cloudwatch input plugin when using the InstanceId dimension + #id = "instance_id" + + ## Enable annotating metrics with the EC2 Instance Type + #instance_type = true + + ## Enable annotating metrics with the AMI ID + #ami_id = true + + ## Specify the EC2 Tags to append as metric tags + #tags = [ "Name" ] +` + +func (e *EC2) SampleConfig() string { + return sampleConfig +} + +func (e *EC2) Description() string { + return "Annotate metrics with AWS EC2 metadata" +} + +func (e *EC2) Apply(in ...telegraf.Metric) []telegraf.Metric { + if e.client == nil { + e.initEc2Client() + } + for _, metric := range in { + if utils.IsSelected(metric, e.MetricNames) { + e.annotate(metric) + } + } + return in +} + +func init() { + processors.Add("aws_metadata_ec2", func() telegraf.Processor { + return &EC2{ + CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)}, + MetricNames: []string{ + "cloudwatch_aws_ec2", + }, + Id: "instance_id", + InstanceType: true, + AmiId: true, + Tags: []string{"Name"}, + } + }) +} + +func (e *EC2) annotate(metric telegraf.Metric) { + e.annotateWithInstanceMetadata(metric) + e.annotateWithTags(metric) +} + +func (e *EC2) annotateWithInstanceMetadata(metric telegraf.Metric) { + instance, err := e.getInstanceForMetric(metric) + if err != nil { + log.Printf("E! %s", err) + return + } + if e.InstanceType { + metric.AddTag("instance_type", *instance.InstanceType) + } + if e.AmiId { + metric.AddTag("ami_id", *instance.ImageId) + } +} + +func (e *EC2) annotateWithTags(metric telegraf.Metric) { + instance, err := e.getInstanceForMetric(metric) + if err != nil { + log.Printf("E! %s", err) + return + } + for _, tag := range e.Tags { + for _, it := range instance.Tags { + if tag == *it.Key { + metric.AddTag(tag, *it.Value) + break + } + } + } +} + +func (e *EC2) getInstanceForMetric(metric telegraf.Metric) (*ec2.Instance, error) { + id := metric.Tags()[e.Id] + output, err := e.client.DescribeInstances(&ec2.DescribeInstancesInput{ + InstanceIds: []*string{ + aws.String(id), + }, + }) + if err != nil { + return nil, err + } + if len(output.Reservations) == 0 || len(output.Reservations[0].Instances) == 0 { + return nil, fmt.Errorf("Instance %s not found", id) + } + return output.Reservations[0].Instances[0], nil +} + +func (e *EC2) initEc2Client() error { + credentialConfig := &internalaws.CredentialConfig{ + Region: e.Region, + AccessKey: e.AccessKey, + SecretKey: e.SecretKey, + RoleARN: e.RoleARN, + Profile: e.Profile, + Filename: e.Filename, + Token: e.Token, + } + configProvider := credentialConfig.Credentials() + // e.client = ec2.New(configProvider) + e.client = &CachingEC2Client{ + client: ec2.New(configProvider), + ttl: e.CacheTTL.Duration, + } + return nil +} diff --git a/plugins/processors/aws/ec2/ec2_test.go b/plugins/processors/aws/ec2/ec2_test.go new file mode 100644 index 000000000..a0716dd71 --- /dev/null +++ b/plugins/processors/aws/ec2/ec2_test.go @@ -0,0 +1,139 @@ +package ec2 + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +type mockEc2Client struct { + InstanceId string +} + +func (m *mockEc2Client) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) { + reservations := []*ec2.Reservation{} + if *input.InstanceIds[0] == m.InstanceId { + reservations = append(reservations, &ec2.Reservation{ + Instances: []*ec2.Instance{ + &ec2.Instance{ + InstanceType: aws.String("t2.micro"), + ImageId: aws.String("ami-12345"), + Tags: []*ec2.Tag{ + &ec2.Tag{ + Key: aws.String("Environment"), + Value: aws.String("acc-test"), + }, + &ec2.Tag{ + Key: aws.String("not-included"), + Value: aws.String("true"), + }, + }, + }, + }, + }) + } + return &ec2.DescribeInstancesOutput{ + Reservations: reservations, + }, nil +} + +func cache(client EC2Client, ttl time.Duration) EC2Client { + return &CachingEC2Client{ + client: client, + ttl: ttl, + } +} + +func TestProcess_basic(t *testing.T) { + p := &EC2{ + MetricNames: []string{ + "cloudwatch_aws_ec2", + }, + Id: "instance_id", + InstanceType: true, + AmiId: true, + Tags: []string{"Environment"}, + client: cache(&mockEc2Client{ + InstanceId: "i-123abc", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_ec2", + map[string]string{ + "instance_id": "i-123abc", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "t2.micro", pm.Tags()["instance_type"]) + assert.Equal(t, "ami-12345", pm.Tags()["ami_id"]) + assert.Equal(t, "acc-test", pm.Tags()["Environment"]) +} + +func TestProcess_missingTag(t *testing.T) { + p := &EC2{ + MetricNames: []string{ + "cloudwatch_aws_ec2", + }, + Id: "instance_id", + InstanceType: false, + AmiId: false, + Tags: []string{"Name"}, + client: cache(&mockEc2Client{ + InstanceId: "i-123abc", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_ec2", + map[string]string{ + "instance_id": "i-123abc", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["instance_type"]) + assert.Equal(t, "", pm.Tags()["ami_id"]) + assert.Equal(t, "", pm.Tags()["Name"]) +} + +func TestProcess_missingInstance(t *testing.T) { + p := &EC2{ + MetricNames: []string{ + "cloudwatch_aws_ec2", + }, + Id: "instance_id", + InstanceType: true, + AmiId: true, + Tags: []string{"Environment"}, + client: cache(&mockEc2Client{ + InstanceId: "i-xyz987", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_ec2", + map[string]string{ + "instance_id": "i-123abc", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["instance_type"]) + assert.Equal(t, "", pm.Tags()["ami_id"]) + assert.Equal(t, "", pm.Tags()["Environment"]) +} diff --git a/plugins/processors/aws/elb/README.md b/plugins/processors/aws/elb/README.md new file mode 100644 index 000000000..dbab30b59 --- /dev/null +++ b/plugins/processors/aws/elb/README.md @@ -0,0 +1,32 @@ +# ELB Metadata Processor Plugin + +The ELB Metadata processor plugin appends additional metadata from AWS to metrics associated with Elastic Load Balancers. + +### Configuration: + +```toml +## Annotate metrics from the cloudwatch plugin +[[processors.aws_metadata_elb]] + +## Specify the Amazon Region to operate in +region = "us-east-1" + +## Specify the TTL for metadata lookups +#cache_ttl = "1h" + +## Process metrics from a Cloudwatch input plugin configured for the AWS/ELB namespace +## Default is "cloudwatch_aws_elb" +#metric_names = ["cloudwatch_aws_elb"] + +## Metric tag that contains the Load Balancer Name +#id = "load_balancer_name" + +## Annotate metrics with ELB Tags +#tags = [ "Name" ] +``` + +### Tags: + +The plugin applies the following tags to metrics when configured: + +* Tags - for each configured tag name in the plugin, appends a tag if the ELB has a corresponding tag of that name diff --git a/plugins/processors/aws/elb/elb.go b/plugins/processors/aws/elb/elb.go new file mode 100644 index 000000000..ad513c433 --- /dev/null +++ b/plugins/processors/aws/elb/elb.go @@ -0,0 +1,187 @@ +package elb + +import ( + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/aws/utils" +) + +type ( + ELB struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + + CacheTTL internal.Duration `toml:"cache_ttl"` + MetricNames []string `toml:"metric_names"` + Id string `toml:"id"` + Tags []string `toml:"tags"` + + client ELBClient + } + + ELBClient interface { + DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error) + } + + CachingELBClient struct { + client ELBClient + ttl time.Duration + fetched time.Time + data map[string]*elb.DescribeTagsOutput + } +) + +func (e *CachingELBClient) DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error) { + id := *input.LoadBalancerNames[0] + if e.data == nil { + e.data = map[string]*elb.DescribeTagsOutput{} + } + if e.fetched.IsZero() { + e.fetched = time.Now() + } + if time.Since(e.fetched) >= e.ttl { + e.data = map[string]*elb.DescribeTagsOutput{} + } + if _, ok := e.data[id]; !ok { + response, err := e.client.DescribeTags(input) + if err != nil { + return nil, err + } + e.data[id] = response + } + return e.data[id], nil +} + +var sampleConfig = ` + ## Amazon Region + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + ## Specify the TTL for metadata lookups + #cache_ttl = "1h" + + ## Specify the metric names to annotate with ELB metadata + ## By default is configured for "cloudwatch_aws_elb", the default output from the Cloudwatch input plugin + #metric_names = [ "cloudwatch_aws_elb" ] + + ## Specify the metric tag which contains the ELB Name + ## By default is configured for "load_balancer_name", the default from Cloudwatch input plugin when using the LoadBalancerName dimension + #id = "load_balancer_name" + + ## Specify the ELB Tags to append as metric tags + #tags = [ "Name" ] +` + +func (e *ELB) SampleConfig() string { + return sampleConfig +} + +func (e *ELB) Description() string { + return "Annotate metrics with AWS ELB metadata" +} + +func (e *ELB) Apply(in ...telegraf.Metric) []telegraf.Metric { + if e.client == nil { + e.initElbClient() + } + for _, metric := range in { + if utils.IsSelected(metric, e.MetricNames) { + e.annotate(metric) + } + } + return in +} + +func init() { + processors.Add("aws_metadata_elb", func() telegraf.Processor { + return &ELB{ + CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)}, + MetricNames: []string{ + "cloudwatch_aws_elb", + }, + Id: "load_balancer_name", + Tags: []string{"Name"}, + } + }) +} + +func (e *ELB) annotate(metric telegraf.Metric) { + e.annotateWithTags(metric) +} + +func (e *ELB) annotateWithTags(metric telegraf.Metric) { + tags, err := e.getTagsForLoadBalancer(metric) + if err != nil { + log.Printf("E! %s", err) + return + } + for _, tag := range e.Tags { + for _, it := range tags { + if tag == *it.Key { + metric.AddTag(tag, *it.Value) + break + } + } + } +} + +func (e *ELB) getTagsForLoadBalancer(metric telegraf.Metric) ([]*elb.Tag, error) { + name := metric.Tags()[e.Id] + output, err := e.client.DescribeTags(&elb.DescribeTagsInput{ + LoadBalancerNames: []*string{ + aws.String(name), + }, + }) + if err != nil { + return nil, err + } + if len(output.TagDescriptions) == 0 { + return nil, fmt.Errorf("ELB %s not found", name) + } + return output.TagDescriptions[0].Tags, nil +} + +func (e *ELB) initElbClient() error { + credentialConfig := &internalaws.CredentialConfig{ + Region: e.Region, + AccessKey: e.AccessKey, + SecretKey: e.SecretKey, + RoleARN: e.RoleARN, + Profile: e.Profile, + Filename: e.Filename, + Token: e.Token, + } + configProvider := credentialConfig.Credentials() + e.client = &CachingELBClient{ + client: elb.New(configProvider), + ttl: e.CacheTTL.Duration, + } + return nil +} diff --git a/plugins/processors/aws/elb/elb_test.go b/plugins/processors/aws/elb/elb_test.go new file mode 100644 index 000000000..8d8628b82 --- /dev/null +++ b/plugins/processors/aws/elb/elb_test.go @@ -0,0 +1,118 @@ +package elb + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/elb" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +type mockElbClient struct { + Name string +} + +func (m *mockElbClient) DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error) { + descriptions := []*elb.TagDescription{} + if *input.LoadBalancerNames[0] == m.Name { + descriptions = append(descriptions, &elb.TagDescription{ + LoadBalancerName: &m.Name, + Tags: []*elb.Tag{ + &elb.Tag{ + Key: aws.String("Environment"), + Value: aws.String("acc-test"), + }, + }, + }) + } + return &elb.DescribeTagsOutput{ + TagDescriptions: descriptions, + }, nil +} + +func cache(client ELBClient, ttl time.Duration) ELBClient { + return &CachingELBClient{ + client: client, + ttl: ttl, + } +} + +func TestProcess_basic(t *testing.T) { + p := &ELB{ + MetricNames: []string{ + "cloudwatch_aws_elb", + }, + Id: "load_balancer_name", + Tags: []string{"Environment"}, + client: cache(&mockElbClient{ + Name: "acc-test-lb", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_elb", + map[string]string{ + "load_balancer_name": "acc-test-lb", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "acc-test", pm.Tags()["Environment"]) +} + +func TestProcess_missingTag(t *testing.T) { + p := &ELB{ + MetricNames: []string{ + "cloudwatch_aws_elb", + }, + Id: "load_balancer_name", + Tags: []string{"Name"}, + client: cache(&mockElbClient{ + Name: "acc-test-lb", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_elb", + map[string]string{ + "load_balancer_name": "acc-test-lb", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["Name"]) +} + +func TestProcess_missingInstance(t *testing.T) { + p := &ELB{ + MetricNames: []string{ + "cloudwatch_aws_elb", + }, + Id: "load_balancer_name", + Tags: []string{"Environment"}, + client: cache(&mockElbClient{ + Name: "acc-test-lb", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_elb", + map[string]string{ + "load_balancer_name": "unknown-lb", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["Environment"]) +} diff --git a/plugins/processors/aws/rds/README.md b/plugins/processors/aws/rds/README.md new file mode 100644 index 000000000..10ebfeb7a --- /dev/null +++ b/plugins/processors/aws/rds/README.md @@ -0,0 +1,40 @@ +# RDS Metadata Processor Plugin + +The RDS Metadata processor plugin appends additional metadata from AWS to metrics associated with RDS instances. + +### Configuration: + +```toml +## Annotate metrics from the cloudwatch plugin +[[processors.aws_metadata_rds]] + +## Specify the Amazon Region to operate in +region = "us-east-1" + +## Specify the TTL for metadata lookups +#cache_ttl = "1h" + +## Process metrics from a Cloudwatch input plugin configured for the AWS/RDS namespace +## Default is "cloudwatch_aws_rds" +#metric_names = ["cloudwatch_aws_rds"] + +## Metric tag that contains the RDS DB Instance Identifier +#id = "db_instance_identifier" + +## Annotate metrics with the RDS engine type +#engine = true + +## Annotate metrics with the engine version +#engine_version = true + +## Annotate metrics with RDS Tags +#tags = [ "Name" ] +``` + +### Tags: + +The plugin applies the following tags to metrics when configured: + +* `engine` - the RDS Engine type for the DB instance +* `engine_Version` - the RDS engine version for the DB Instance +* Tags - for each configured tag name in the plugin, appends a tag if the RDS Instance has a corresponding tag of that name diff --git a/plugins/processors/aws/rds/rds.go b/plugins/processors/aws/rds/rds.go new file mode 100644 index 000000000..9d0734a59 --- /dev/null +++ b/plugins/processors/aws/rds/rds.go @@ -0,0 +1,246 @@ +package rds + +import ( + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/rds" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/aws/utils" +) + +type ( + RDS struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + + CacheTTL internal.Duration `toml:"cache_ttl"` + MetricNames []string `toml:"metric_names"` + Id string `toml:"id"` + InstanceType bool `toml:"instance_type"` + Engine bool `toml:"engine"` + EngineVersion bool `toml:"engine_version"` + Tags []string `toml:"tags"` + + client RDSClient + } + + RDSClient interface { + DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error) + ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error) + } + + CachingRDSClient struct { + client RDSClient + ttl time.Duration + fetched time.Time + + instanceData map[string]*rds.DescribeDBInstancesOutput + tagData map[string]*rds.ListTagsForResourceOutput + } +) + +func (e *CachingRDSClient) DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error) { + id := *input.DBInstanceIdentifier + if e.instanceData == nil { + e.instanceData = map[string]*rds.DescribeDBInstancesOutput{} + } + if e.fetched.IsZero() { + e.fetched = time.Now() + } + if time.Since(e.fetched) >= e.ttl { + e.instanceData = map[string]*rds.DescribeDBInstancesOutput{} + } + if _, ok := e.instanceData[id]; !ok { + response, err := e.client.DescribeDBInstances(input) + if err != nil { + return nil, err + } + e.instanceData[id] = response + } + return e.instanceData[id], nil +} + +func (e *CachingRDSClient) ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error) { + id := *input.ResourceName + if e.tagData == nil { + e.tagData = map[string]*rds.ListTagsForResourceOutput{} + } + if e.fetched.IsZero() { + e.fetched = time.Now() + } + if time.Since(e.fetched) >= e.ttl { + e.tagData = map[string]*rds.ListTagsForResourceOutput{} + } + if _, ok := e.tagData[id]; !ok { + response, err := e.client.ListTagsForResource(input) + if err != nil { + return nil, err + } + e.tagData[id] = response + } + return e.tagData[id], nil +} + +var sampleConfig = ` + ## Amazon Region + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + ## Specify the TTL for metadata lookups + #cache_ttl = "1h" + + ## Specify the metric names to annotate with RDS metadata + ## By default is configured for "cloudwatch_aws_rds", the default output from the Cloudwatch input plugin + #metric_names = [ "cloudwatch_aws_rds" ] + + ## Specify the metric tag which contains the RDS DB Instance Identifier + ## By default is configured for "db_instance_identifier", the default from Cloudwatch input plugin when using the RDS dimension + #id = "db_instance_identifier" + + ## Enable annotating with RDS DB Instance type + #instance_type = true + + ## Enable annotating with the RDS engine type + #engine = true + + ## Enable annotating with the RDS engine version + #engine_version = true + + ## Specify the RDS Tags to append as metric tags + #tags = [ "Name" ] +` + +func (r *RDS) SampleConfig() string { + return sampleConfig +} + +func (r *RDS) Description() string { + return "Annotate metrics with AWS RDS metadata" +} + +func (r *RDS) Apply(in ...telegraf.Metric) []telegraf.Metric { + if r.client == nil { + r.initRdsClient() + } + for _, metric := range in { + if utils.IsSelected(metric, r.MetricNames) { + r.annotate(metric) + } + } + return in +} + +func init() { + processors.Add("aws_metadata_rds", func() telegraf.Processor { + return &RDS{ + CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)}, + MetricNames: []string{ + "cloudwatch_aws_rds", + }, + Id: "db_instance_identifier", + Engine: true, + EngineVersion: true, + Tags: []string{"Name"}, + } + }) +} + +func (r *RDS) annotate(metric telegraf.Metric) { + r.annotateWithInstanceData(metric) + r.annotateWithTags(metric) +} + +func (r *RDS) annotateWithInstanceData(metric telegraf.Metric) { + name := metric.Tags()[r.Id] + instance, err := r.getDBInstance(name) + if err != nil { + log.Printf("E! %s", err) + return + } + if r.Engine { + metric.AddTag("engine", *instance.Engine) + } + if r.EngineVersion { + metric.AddTag("engine_version", *instance.EngineVersion) + } +} + +func (r *RDS) annotateWithTags(metric telegraf.Metric) { + name := metric.Tags()[r.Id] + instance, err := r.getDBInstance(name) + if err != nil { + log.Printf("E! %s", err) + return + } + tags, err := r.client.ListTagsForResource(&rds.ListTagsForResourceInput{ + ResourceName: instance.DBInstanceArn, + }) + if err != nil { + log.Printf("E! %s", err) + return + } + for _, tag := range r.Tags { + for _, it := range tags.TagList { + if tag == *it.Key { + metric.AddTag(tag, *it.Value) + break + } + } + } +} + +func (r *RDS) getDBInstance(identifier string) (*rds.DBInstance, error) { + output, err := r.client.DescribeDBInstances(&rds.DescribeDBInstancesInput{ + DBInstanceIdentifier: aws.String(identifier), + }) + if err != nil { + return nil, err + } + if len(output.DBInstances) == 0 { + return nil, fmt.Errorf("DB Instance %s not found", identifier) + } + return output.DBInstances[0], nil +} + +func (r *RDS) initRdsClient() error { + credentialConfig := &internalaws.CredentialConfig{ + Region: r.Region, + AccessKey: r.AccessKey, + SecretKey: r.SecretKey, + RoleARN: r.RoleARN, + Profile: r.Profile, + Filename: r.Filename, + Token: r.Token, + } + configProvider := credentialConfig.Credentials() + r.client = &CachingRDSClient{ + client: rds.New(configProvider), + ttl: r.CacheTTL.Duration, + } + return nil +} diff --git a/plugins/processors/aws/rds/rds_test.go b/plugins/processors/aws/rds/rds_test.go new file mode 100644 index 000000000..e076e30c7 --- /dev/null +++ b/plugins/processors/aws/rds/rds_test.go @@ -0,0 +1,144 @@ +package rds + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/rds" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +type mockRdsClient struct { + DbIdentifier string +} + +func (m *mockRdsClient) DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error) { + instances := []*rds.DBInstance{} + if *input.DBInstanceIdentifier == m.DbIdentifier { + instances = append(instances, &rds.DBInstance{ + DBInstanceIdentifier: input.DBInstanceIdentifier, + Engine: aws.String("mysql"), + EngineVersion: aws.String("5.6"), + DBInstanceArn: aws.String("arn:aws:rds:us-east-1:1111111:db:rds-test-instance"), + }) + } + return &rds.DescribeDBInstancesOutput{ + DBInstances: instances, + }, nil +} + +func (m *mockRdsClient) ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error) { + tags := []*rds.Tag{} + if *input.ResourceName == "arn:aws:rds:us-east-1:1111111:db:rds-test-instance" { + tags = append(tags, &rds.Tag{ + Key: aws.String("Environment"), + Value: aws.String("acc-test"), + }) + tags = append(tags, &rds.Tag{ + Key: aws.String("not-included"), + Value: aws.String("true"), + }) + } + return &rds.ListTagsForResourceOutput{ + TagList: tags, + }, nil +} + +func cache(client RDSClient, ttl time.Duration) RDSClient { + return &CachingRDSClient{ + client: client, + ttl: ttl, + } +} + +func TestProcess_basic(t *testing.T) { + p := &RDS{ + MetricNames: []string{ + "cloudwatch_aws_rds", + }, + Id: "db_instance_identifier", + Engine: true, + EngineVersion: true, + Tags: []string{"Environment"}, + client: cache(&mockRdsClient{ + DbIdentifier: "rds-test-instance", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_rds", + map[string]string{ + "db_instance_identifier": "rds-test-instance", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "mysql", pm.Tags()["engine"]) + assert.Equal(t, "5.6", pm.Tags()["engine_version"]) + assert.Equal(t, "acc-test", pm.Tags()["Environment"]) +} + +func TestProcess_missingTag(t *testing.T) { + p := &RDS{ + MetricNames: []string{ + "cloudwatch_aws_rds", + }, + Id: "db_instance_identifier", + Engine: false, + EngineVersion: false, + Tags: []string{"Name"}, + client: cache(&mockRdsClient{ + DbIdentifier: "rds-test-instance", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_rds", + map[string]string{ + "db_instance_identifier": "rds-test-instance", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["engine"]) + assert.Equal(t, "", pm.Tags()["engine_version"]) + assert.Equal(t, "", pm.Tags()["Name"]) +} + +func TestProcess_missingInstance(t *testing.T) { + p := &RDS{ + MetricNames: []string{ + "cloudwatch_aws_rds", + }, + Id: "db_instance_identifier", + Engine: true, + EngineVersion: true, + Tags: []string{"Environment"}, + client: cache(&mockRdsClient{ + DbIdentifier: "rds-test-instance2", + }, time.Duration(15*time.Minute)), + } + metric, _ := metric.New("cloudwatch_aws_rds", + map[string]string{ + "db_instance_identifier": "rds-test-instance", + }, + map[string]interface{}{ + "count": 1, + }, + time.Now()) + processedMetrics := p.Apply(metric) + assert.Equal(t, 1, len(processedMetrics)) + + pm := processedMetrics[0] + assert.Equal(t, "", pm.Tags()["engine"]) + assert.Equal(t, "", pm.Tags()["engine_version"]) + assert.Equal(t, "", pm.Tags()["Environment"]) +} diff --git a/plugins/processors/aws/sqs/README.md b/plugins/processors/aws/sqs/README.md new file mode 100644 index 000000000..e613f72b8 --- /dev/null +++ b/plugins/processors/aws/sqs/README.md @@ -0,0 +1,30 @@ +# SQS Metadata Processor Plugin + +The SQS Metadata processor plugin appends additional metadata from AWS to metrics associated with Simple Queue Service. + +### Configuration: + +```toml +## Annotate metrics from the cloudwatch plugin +[[processors.aws_metadata_sqs]] + +## Specify the Amazon Region to operate in +region = "us-east-1" + +## Specify the TTL for metadata lookups +#cache_ttl = "1h" + +## Process metrics from a Cloudwatch input plugin configured for the AWS/SQS namespace +## Default is "cloudwatch_aws_sqs" +#metric_names = ["cloudwatch_aws_sqs"] + +## Metric tag that contains the SQS queue name +#id = "queue_name" + +``` + +### Tags: + +The plugin applies the following tags to metrics when configured: + +* (none) - there is currently no support metadata to extract for SQS queues. diff --git a/plugins/processors/aws/sqs/sqs.go b/plugins/processors/aws/sqs/sqs.go new file mode 100644 index 000000000..a19a51c80 --- /dev/null +++ b/plugins/processors/aws/sqs/sqs.go @@ -0,0 +1,123 @@ +package sqs + +import ( + "time" + + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + internalaws "github.com/influxdata/telegraf/internal/config/aws" + "github.com/influxdata/telegraf/plugins/processors" + "github.com/influxdata/telegraf/plugins/processors/aws/utils" +) + +type ( + SQS struct { + Region string `toml:"region"` + AccessKey string `toml:"access_key"` + SecretKey string `toml:"secret_key"` + RoleARN string `toml:"role_arn"` + Profile string `toml:"profile"` + Filename string `toml:"shared_credential_file"` + Token string `toml:"token"` + + CacheTTL internal.Duration `toml:"cache_ttl"` + MetricNames []string `toml:"metric_names"` + Id string `toml:"id"` + + client SQSClient + } + + SQSClient interface { + } + + CachingSQSClient struct { + client SQSClient + ttl time.Duration + fetched time.Time + } +) + +var sampleConfig = ` + ## Amazon Region + region = "us-east-1" + + ## Amazon Credentials + ## Credentials are loaded in the following order + ## 1) Assumed credentials via STS if role_arn is specified + ## 2) explicit credentials from 'access_key' and 'secret_key' + ## 3) shared profile from 'profile' + ## 4) environment variables + ## 5) shared credentials file + ## 6) EC2 Instance Profile + #access_key = "" + #secret_key = "" + #token = "" + #role_arn = "" + #profile = "" + #shared_credential_file = "" + + ## Specify the TTL for metadata lookups + #cache_ttl = "1h" + + ## Specify the metric names to annotate with SQS metadata + ## By default is configured for "cloudwatch_aws_sqs", the default output from the Cloudwatch input plugin + #metric_names = [ "cloudwatch_aws_sqs" ] + + ## Specify the metric tag which contains the SQS queue name + ## By default is configured for "queue_name", the default from Cloudwatch input plugin when using the QueueName dimension + #id = "queue_name" +` + +func (s *SQS) SampleConfig() string { + return sampleConfig +} + +func (s *SQS) Description() string { + return "Annotate metrics with AWS SQS metadata" +} + +func (s *SQS) Apply(in ...telegraf.Metric) []telegraf.Metric { + if s.client == nil { + s.initSqsClient() + } + for _, metric := range in { + if utils.IsSelected(metric, s.MetricNames) { + s.annotate(metric) + } + } + return in +} + +func init() { + processors.Add("aws_metadata_sqs", func() telegraf.Processor { + return &SQS{ + CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)}, + MetricNames: []string{ + "cloudwatch_aws_sqs", + }, + Id: "queue_name", + } + }) +} + +func (s *SQS) annotate(metric telegraf.Metric) { +} + +func (s *SQS) initSqsClient() error { + credentialConfig := &internalaws.CredentialConfig{ + Region: s.Region, + AccessKey: s.AccessKey, + SecretKey: s.SecretKey, + RoleARN: s.RoleARN, + Profile: s.Profile, + Filename: s.Filename, + Token: s.Token, + } + configProvider := credentialConfig.Credentials() + s.client = &CachingSQSClient{ + client: sqs.New(configProvider), + ttl: s.CacheTTL.Duration, + } + return nil +} diff --git a/plugins/processors/aws/sqs/sqs_test.go b/plugins/processors/aws/sqs/sqs_test.go new file mode 100644 index 000000000..e63cf0e58 --- /dev/null +++ b/plugins/processors/aws/sqs/sqs_test.go @@ -0,0 +1 @@ +package sqs diff --git a/plugins/processors/aws/utils/utils.go b/plugins/processors/aws/utils/utils.go new file mode 100644 index 000000000..68f56d035 --- /dev/null +++ b/plugins/processors/aws/utils/utils.go @@ -0,0 +1,12 @@ +package utils + +import "github.com/influxdata/telegraf" + +func IsSelected(metric telegraf.Metric, configuredMetrics []string) bool { + for _, m := range configuredMetrics { + if m == metric.Name() { + return true + } + } + return false +}