Add EC2, ELB, SQS, and RDS processors to annotate with AWS data.

This commit is contained in:
John Engelman 2016-11-11 09:26:09 -06:00
parent 393f5044bb
commit be1d93bb4e
No known key found for this signature in database
GPG Key ID: 6ED0F678B90EB06E
17 changed files with 1390 additions and 2 deletions

3
Godeps
View File

@ -2,7 +2,7 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/aerospike/aerospike-client-go 7f3a312c3b2a60ac083ec6da296091c52c795c63
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/aws/aws-sdk-go 1355b456f6ba4a8453249e0dc7743f16ccac362d
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/couchbase/go-couchbase cb664315a324d87d19c879d9cc67fda6be8c2ac1
@ -41,7 +41,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
github.com/nats-io/nats ea8b4fd12ebb823073c0004b9f09ac8748f4f165
github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715
github.com/nsqio/go-nsq 0b80d6f05e15ca1930e0c5e1d540ed627e299980
github.com/opencontainers/runc 89ab7f2ccc1e45ddf6485eaa802c35dcf321dfc8
github.com/prometheus/client_golang 18acf9993a863f4c4b40612e19cdd243e7c86831
github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6
github.com/prometheus/common e8eabff8812b05acf522b45fdcd725a785188e37

View File

@ -232,6 +232,11 @@ Telegraf can also collect metrics via the following service plugins:
## Processor Plugins
* [aws_metadata](./plugins/processors/aws)
* [aws_metadata_ec2](./plugins/processors/aws/ec2)
* [aws_metadata_elb](./plugins/processors/aws/elb)
* [aws_metadata_rds](./plugins/processors/aws/rds)
* [aws_metadata_sqs](./plugins/processors/aws/sqs)
* [printer](./plugins/processors/printer)
## Aggregator Plugins

View File

@ -1,5 +1,9 @@
package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/aws/ec2"
_ "github.com/influxdata/telegraf/plugins/processors/aws/elb"
_ "github.com/influxdata/telegraf/plugins/processors/aws/rds"
_ "github.com/influxdata/telegraf/plugins/processors/aws/sqs"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
)

View File

@ -0,0 +1,55 @@
# AWS Metadata Processor Plugins
A series of plugins that extract additional metadata from AWS to annotate metrics.
### Configuration:
Each processor is scoped to a single AWS service (e.g. EC2, ELB, etc).
The processor plugins each share a common configuration pattern for
configuring AWS credentials and basic processing information.
```toml
## Common AWS credential configuration
## Amazon Region (required)
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#profile = ""
#shared_credential_file = ""
## Common processing configuration
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Specify the metric names to annotate with this processor
## By default the processor is configured to process the associated metric name from the Cloudwatch input plugin
#metric_names = [ "cloudwatch_aws_ec2" ]
## Specify the metric tag which contains the AWS resource ID
## By default the plugin is configured to find the resource's ID in the tag created by the Cloudwatch input plugin
#id = "instance_id"
## Plugin specific configuration
## Configure specific annotations available for this processor
```
### Processor Plugins:
* [aws_metadata_ec2](./ec2)
* [aws_metadata_elb](./elb)
* [aws_metadata_rds](./rds)
* [aws_metadata_sqs](./sqs)

View File

@ -0,0 +1,40 @@
# EC2 Metadata Processor Plugin
The EC2 Metadata processor plugin appends additional metadata from AWS to metrics associated with EC2 instances.
### Configuration:
```toml
## Annotate metrics from the cloudwatch plugin
[[processors.aws_metadata_ec2]]
## Specify the Amazon Region to operate in
region = "us-east-1"
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Process metrics from a Cloudwatch input plugin configured for the AWS/EC2 namespace
## Default is "cloudwatch_aws_ec2"
#metric_names = ["cloudwatch_aws_ec2"]
## Metric tag that contains the EC2 Instance ID
#id = "instance_id"
## Annotate metrics with the EC2 Instance type
#instance_type = true
## Annotate metrics with the AMI ID
#ami_id = true
## Annotate metrics with EC2 Tags
#tags = [ "Name" ]
```
### Tags:
The plugin applies the following tags to metrics when configured:
* `instance_type` - the EC2 Instance Type for the instance
* `ami_id` - the AMI ID used by the instance
* Tags - for each configured tag name in the plugin, appends a tag if the EC2 Instance has a corresponding tag of that name

View File

@ -0,0 +1,213 @@
package ec2
import (
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/aws/utils"
)
type (
EC2 struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`
CacheTTL internal.Duration `toml:"cache_ttl"`
MetricNames []string `toml:"metric_names"`
Id string `toml:"id"`
InstanceType bool `toml:"instance_type"`
AmiId bool `toml:"ami_id"`
Tags []string `toml:"tags"`
client EC2Client
}
EC2Client interface {
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
}
CachingEC2Client struct {
client EC2Client
ttl time.Duration
fetched time.Time
data map[string]*ec2.DescribeInstancesOutput
}
)
func (e *CachingEC2Client) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
id := *input.InstanceIds[0]
if e.data == nil {
e.data = map[string]*ec2.DescribeInstancesOutput{}
}
if e.fetched.IsZero() {
e.fetched = time.Now()
}
if time.Since(e.fetched) >= e.ttl {
e.data = map[string]*ec2.DescribeInstancesOutput{}
}
if _, ok := e.data[id]; !ok {
response, err := e.client.DescribeInstances(input)
if err != nil {
return nil, err
}
e.data[id] = response
}
return e.data[id], nil
}
var sampleConfig = `
## Amazon Region
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#profile = ""
#shared_credential_file = ""
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Specify the metric names to annotate with EC2 metadata
## By default is configured for "cloudwatch_aws_ec2", the default output from the Cloudwatch input plugin
#metric_names = [ "cloudwatch_aws_ec2" ]
## Specify the metric tag which contains the EC2 Instance ID
## By default is configured for "instance_id", the default from Cloudwatch input plugin when using the InstanceId dimension
#id = "instance_id"
## Enable annotating metrics with the EC2 Instance Type
#instance_type = true
## Enable annotating metrics with the AMI ID
#ami_id = true
## Specify the EC2 Tags to append as metric tags
#tags = [ "Name" ]
`
func (e *EC2) SampleConfig() string {
return sampleConfig
}
func (e *EC2) Description() string {
return "Annotate metrics with AWS EC2 metadata"
}
func (e *EC2) Apply(in ...telegraf.Metric) []telegraf.Metric {
if e.client == nil {
e.initEc2Client()
}
for _, metric := range in {
if utils.IsSelected(metric, e.MetricNames) {
e.annotate(metric)
}
}
return in
}
func init() {
processors.Add("aws_metadata_ec2", func() telegraf.Processor {
return &EC2{
CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)},
MetricNames: []string{
"cloudwatch_aws_ec2",
},
Id: "instance_id",
InstanceType: true,
AmiId: true,
Tags: []string{"Name"},
}
})
}
func (e *EC2) annotate(metric telegraf.Metric) {
e.annotateWithInstanceMetadata(metric)
e.annotateWithTags(metric)
}
func (e *EC2) annotateWithInstanceMetadata(metric telegraf.Metric) {
instance, err := e.getInstanceForMetric(metric)
if err != nil {
log.Printf("E! %s", err)
return
}
if e.InstanceType {
metric.AddTag("instance_type", *instance.InstanceType)
}
if e.AmiId {
metric.AddTag("ami_id", *instance.ImageId)
}
}
func (e *EC2) annotateWithTags(metric telegraf.Metric) {
instance, err := e.getInstanceForMetric(metric)
if err != nil {
log.Printf("E! %s", err)
return
}
for _, tag := range e.Tags {
for _, it := range instance.Tags {
if tag == *it.Key {
metric.AddTag(tag, *it.Value)
break
}
}
}
}
func (e *EC2) getInstanceForMetric(metric telegraf.Metric) (*ec2.Instance, error) {
id := metric.Tags()[e.Id]
output, err := e.client.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: []*string{
aws.String(id),
},
})
if err != nil {
return nil, err
}
if len(output.Reservations) == 0 || len(output.Reservations[0].Instances) == 0 {
return nil, fmt.Errorf("Instance %s not found", id)
}
return output.Reservations[0].Instances[0], nil
}
func (e *EC2) initEc2Client() error {
credentialConfig := &internalaws.CredentialConfig{
Region: e.Region,
AccessKey: e.AccessKey,
SecretKey: e.SecretKey,
RoleARN: e.RoleARN,
Profile: e.Profile,
Filename: e.Filename,
Token: e.Token,
}
configProvider := credentialConfig.Credentials()
// e.client = ec2.New(configProvider)
e.client = &CachingEC2Client{
client: ec2.New(configProvider),
ttl: e.CacheTTL.Duration,
}
return nil
}

View File

@ -0,0 +1,139 @@
package ec2
import (
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
type mockEc2Client struct {
InstanceId string
}
func (m *mockEc2Client) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
reservations := []*ec2.Reservation{}
if *input.InstanceIds[0] == m.InstanceId {
reservations = append(reservations, &ec2.Reservation{
Instances: []*ec2.Instance{
&ec2.Instance{
InstanceType: aws.String("t2.micro"),
ImageId: aws.String("ami-12345"),
Tags: []*ec2.Tag{
&ec2.Tag{
Key: aws.String("Environment"),
Value: aws.String("acc-test"),
},
&ec2.Tag{
Key: aws.String("not-included"),
Value: aws.String("true"),
},
},
},
},
})
}
return &ec2.DescribeInstancesOutput{
Reservations: reservations,
}, nil
}
func cache(client EC2Client, ttl time.Duration) EC2Client {
return &CachingEC2Client{
client: client,
ttl: ttl,
}
}
func TestProcess_basic(t *testing.T) {
p := &EC2{
MetricNames: []string{
"cloudwatch_aws_ec2",
},
Id: "instance_id",
InstanceType: true,
AmiId: true,
Tags: []string{"Environment"},
client: cache(&mockEc2Client{
InstanceId: "i-123abc",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_ec2",
map[string]string{
"instance_id": "i-123abc",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "t2.micro", pm.Tags()["instance_type"])
assert.Equal(t, "ami-12345", pm.Tags()["ami_id"])
assert.Equal(t, "acc-test", pm.Tags()["Environment"])
}
func TestProcess_missingTag(t *testing.T) {
p := &EC2{
MetricNames: []string{
"cloudwatch_aws_ec2",
},
Id: "instance_id",
InstanceType: false,
AmiId: false,
Tags: []string{"Name"},
client: cache(&mockEc2Client{
InstanceId: "i-123abc",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_ec2",
map[string]string{
"instance_id": "i-123abc",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["instance_type"])
assert.Equal(t, "", pm.Tags()["ami_id"])
assert.Equal(t, "", pm.Tags()["Name"])
}
func TestProcess_missingInstance(t *testing.T) {
p := &EC2{
MetricNames: []string{
"cloudwatch_aws_ec2",
},
Id: "instance_id",
InstanceType: true,
AmiId: true,
Tags: []string{"Environment"},
client: cache(&mockEc2Client{
InstanceId: "i-xyz987",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_ec2",
map[string]string{
"instance_id": "i-123abc",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["instance_type"])
assert.Equal(t, "", pm.Tags()["ami_id"])
assert.Equal(t, "", pm.Tags()["Environment"])
}

View File

@ -0,0 +1,32 @@
# ELB Metadata Processor Plugin
The ELB Metadata processor plugin appends additional metadata from AWS to metrics associated with Elastic Load Balancers.
### Configuration:
```toml
## Annotate metrics from the cloudwatch plugin
[[processors.aws_metadata_elb]]
## Specify the Amazon Region to operate in
region = "us-east-1"
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Process metrics from a Cloudwatch input plugin configured for the AWS/ELB namespace
## Default is "cloudwatch_aws_elb"
#metric_names = ["cloudwatch_aws_elb"]
## Metric tag that contains the Load Balancer Name
#id = "load_balancer_name"
## Annotate metrics with ELB Tags
#tags = [ "Name" ]
```
### Tags:
The plugin applies the following tags to metrics when configured:
* Tags - for each configured tag name in the plugin, appends a tag if the ELB has a corresponding tag of that name

View File

@ -0,0 +1,187 @@
package elb
import (
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/aws/utils"
)
type (
ELB struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`
CacheTTL internal.Duration `toml:"cache_ttl"`
MetricNames []string `toml:"metric_names"`
Id string `toml:"id"`
Tags []string `toml:"tags"`
client ELBClient
}
ELBClient interface {
DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error)
}
CachingELBClient struct {
client ELBClient
ttl time.Duration
fetched time.Time
data map[string]*elb.DescribeTagsOutput
}
)
func (e *CachingELBClient) DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error) {
id := *input.LoadBalancerNames[0]
if e.data == nil {
e.data = map[string]*elb.DescribeTagsOutput{}
}
if e.fetched.IsZero() {
e.fetched = time.Now()
}
if time.Since(e.fetched) >= e.ttl {
e.data = map[string]*elb.DescribeTagsOutput{}
}
if _, ok := e.data[id]; !ok {
response, err := e.client.DescribeTags(input)
if err != nil {
return nil, err
}
e.data[id] = response
}
return e.data[id], nil
}
var sampleConfig = `
## Amazon Region
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#profile = ""
#shared_credential_file = ""
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Specify the metric names to annotate with ELB metadata
## By default is configured for "cloudwatch_aws_elb", the default output from the Cloudwatch input plugin
#metric_names = [ "cloudwatch_aws_elb" ]
## Specify the metric tag which contains the ELB Name
## By default is configured for "load_balancer_name", the default from Cloudwatch input plugin when using the LoadBalancerName dimension
#id = "load_balancer_name"
## Specify the ELB Tags to append as metric tags
#tags = [ "Name" ]
`
func (e *ELB) SampleConfig() string {
return sampleConfig
}
func (e *ELB) Description() string {
return "Annotate metrics with AWS ELB metadata"
}
func (e *ELB) Apply(in ...telegraf.Metric) []telegraf.Metric {
if e.client == nil {
e.initElbClient()
}
for _, metric := range in {
if utils.IsSelected(metric, e.MetricNames) {
e.annotate(metric)
}
}
return in
}
func init() {
processors.Add("aws_metadata_elb", func() telegraf.Processor {
return &ELB{
CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)},
MetricNames: []string{
"cloudwatch_aws_elb",
},
Id: "load_balancer_name",
Tags: []string{"Name"},
}
})
}
func (e *ELB) annotate(metric telegraf.Metric) {
e.annotateWithTags(metric)
}
func (e *ELB) annotateWithTags(metric telegraf.Metric) {
tags, err := e.getTagsForLoadBalancer(metric)
if err != nil {
log.Printf("E! %s", err)
return
}
for _, tag := range e.Tags {
for _, it := range tags {
if tag == *it.Key {
metric.AddTag(tag, *it.Value)
break
}
}
}
}
func (e *ELB) getTagsForLoadBalancer(metric telegraf.Metric) ([]*elb.Tag, error) {
name := metric.Tags()[e.Id]
output, err := e.client.DescribeTags(&elb.DescribeTagsInput{
LoadBalancerNames: []*string{
aws.String(name),
},
})
if err != nil {
return nil, err
}
if len(output.TagDescriptions) == 0 {
return nil, fmt.Errorf("ELB %s not found", name)
}
return output.TagDescriptions[0].Tags, nil
}
func (e *ELB) initElbClient() error {
credentialConfig := &internalaws.CredentialConfig{
Region: e.Region,
AccessKey: e.AccessKey,
SecretKey: e.SecretKey,
RoleARN: e.RoleARN,
Profile: e.Profile,
Filename: e.Filename,
Token: e.Token,
}
configProvider := credentialConfig.Credentials()
e.client = &CachingELBClient{
client: elb.New(configProvider),
ttl: e.CacheTTL.Duration,
}
return nil
}

View File

@ -0,0 +1,118 @@
package elb
import (
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
type mockElbClient struct {
Name string
}
func (m *mockElbClient) DescribeTags(input *elb.DescribeTagsInput) (*elb.DescribeTagsOutput, error) {
descriptions := []*elb.TagDescription{}
if *input.LoadBalancerNames[0] == m.Name {
descriptions = append(descriptions, &elb.TagDescription{
LoadBalancerName: &m.Name,
Tags: []*elb.Tag{
&elb.Tag{
Key: aws.String("Environment"),
Value: aws.String("acc-test"),
},
},
})
}
return &elb.DescribeTagsOutput{
TagDescriptions: descriptions,
}, nil
}
func cache(client ELBClient, ttl time.Duration) ELBClient {
return &CachingELBClient{
client: client,
ttl: ttl,
}
}
func TestProcess_basic(t *testing.T) {
p := &ELB{
MetricNames: []string{
"cloudwatch_aws_elb",
},
Id: "load_balancer_name",
Tags: []string{"Environment"},
client: cache(&mockElbClient{
Name: "acc-test-lb",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_elb",
map[string]string{
"load_balancer_name": "acc-test-lb",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "acc-test", pm.Tags()["Environment"])
}
func TestProcess_missingTag(t *testing.T) {
p := &ELB{
MetricNames: []string{
"cloudwatch_aws_elb",
},
Id: "load_balancer_name",
Tags: []string{"Name"},
client: cache(&mockElbClient{
Name: "acc-test-lb",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_elb",
map[string]string{
"load_balancer_name": "acc-test-lb",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["Name"])
}
func TestProcess_missingInstance(t *testing.T) {
p := &ELB{
MetricNames: []string{
"cloudwatch_aws_elb",
},
Id: "load_balancer_name",
Tags: []string{"Environment"},
client: cache(&mockElbClient{
Name: "acc-test-lb",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_elb",
map[string]string{
"load_balancer_name": "unknown-lb",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["Environment"])
}

View File

@ -0,0 +1,40 @@
# RDS Metadata Processor Plugin
The RDS Metadata processor plugin appends additional metadata from AWS to metrics associated with RDS instances.
### Configuration:
```toml
## Annotate metrics from the cloudwatch plugin
[[processors.aws_metadata_rds]]
## Specify the Amazon Region to operate in
region = "us-east-1"
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Process metrics from a Cloudwatch input plugin configured for the AWS/RDS namespace
## Default is "cloudwatch_aws_rds"
#metric_names = ["cloudwatch_aws_rds"]
## Metric tag that contains the RDS DB Instance Identifier
#id = "db_instance_identifier"
## Annotate metrics with the RDS engine type
#engine = true
## Annotate metrics with the engine version
#engine_version = true
## Annotate metrics with RDS Tags
#tags = [ "Name" ]
```
### Tags:
The plugin applies the following tags to metrics when configured:
* `engine` - the RDS Engine type for the DB instance
* `engine_Version` - the RDS engine version for the DB Instance
* Tags - for each configured tag name in the plugin, appends a tag if the RDS Instance has a corresponding tag of that name

View File

@ -0,0 +1,246 @@
package rds
import (
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/aws/utils"
)
type (
RDS struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`
CacheTTL internal.Duration `toml:"cache_ttl"`
MetricNames []string `toml:"metric_names"`
Id string `toml:"id"`
InstanceType bool `toml:"instance_type"`
Engine bool `toml:"engine"`
EngineVersion bool `toml:"engine_version"`
Tags []string `toml:"tags"`
client RDSClient
}
RDSClient interface {
DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error)
ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error)
}
CachingRDSClient struct {
client RDSClient
ttl time.Duration
fetched time.Time
instanceData map[string]*rds.DescribeDBInstancesOutput
tagData map[string]*rds.ListTagsForResourceOutput
}
)
func (e *CachingRDSClient) DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error) {
id := *input.DBInstanceIdentifier
if e.instanceData == nil {
e.instanceData = map[string]*rds.DescribeDBInstancesOutput{}
}
if e.fetched.IsZero() {
e.fetched = time.Now()
}
if time.Since(e.fetched) >= e.ttl {
e.instanceData = map[string]*rds.DescribeDBInstancesOutput{}
}
if _, ok := e.instanceData[id]; !ok {
response, err := e.client.DescribeDBInstances(input)
if err != nil {
return nil, err
}
e.instanceData[id] = response
}
return e.instanceData[id], nil
}
func (e *CachingRDSClient) ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error) {
id := *input.ResourceName
if e.tagData == nil {
e.tagData = map[string]*rds.ListTagsForResourceOutput{}
}
if e.fetched.IsZero() {
e.fetched = time.Now()
}
if time.Since(e.fetched) >= e.ttl {
e.tagData = map[string]*rds.ListTagsForResourceOutput{}
}
if _, ok := e.tagData[id]; !ok {
response, err := e.client.ListTagsForResource(input)
if err != nil {
return nil, err
}
e.tagData[id] = response
}
return e.tagData[id], nil
}
var sampleConfig = `
## Amazon Region
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#profile = ""
#shared_credential_file = ""
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Specify the metric names to annotate with RDS metadata
## By default is configured for "cloudwatch_aws_rds", the default output from the Cloudwatch input plugin
#metric_names = [ "cloudwatch_aws_rds" ]
## Specify the metric tag which contains the RDS DB Instance Identifier
## By default is configured for "db_instance_identifier", the default from Cloudwatch input plugin when using the RDS dimension
#id = "db_instance_identifier"
## Enable annotating with RDS DB Instance type
#instance_type = true
## Enable annotating with the RDS engine type
#engine = true
## Enable annotating with the RDS engine version
#engine_version = true
## Specify the RDS Tags to append as metric tags
#tags = [ "Name" ]
`
func (r *RDS) SampleConfig() string {
return sampleConfig
}
func (r *RDS) Description() string {
return "Annotate metrics with AWS RDS metadata"
}
func (r *RDS) Apply(in ...telegraf.Metric) []telegraf.Metric {
if r.client == nil {
r.initRdsClient()
}
for _, metric := range in {
if utils.IsSelected(metric, r.MetricNames) {
r.annotate(metric)
}
}
return in
}
func init() {
processors.Add("aws_metadata_rds", func() telegraf.Processor {
return &RDS{
CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)},
MetricNames: []string{
"cloudwatch_aws_rds",
},
Id: "db_instance_identifier",
Engine: true,
EngineVersion: true,
Tags: []string{"Name"},
}
})
}
func (r *RDS) annotate(metric telegraf.Metric) {
r.annotateWithInstanceData(metric)
r.annotateWithTags(metric)
}
func (r *RDS) annotateWithInstanceData(metric telegraf.Metric) {
name := metric.Tags()[r.Id]
instance, err := r.getDBInstance(name)
if err != nil {
log.Printf("E! %s", err)
return
}
if r.Engine {
metric.AddTag("engine", *instance.Engine)
}
if r.EngineVersion {
metric.AddTag("engine_version", *instance.EngineVersion)
}
}
func (r *RDS) annotateWithTags(metric telegraf.Metric) {
name := metric.Tags()[r.Id]
instance, err := r.getDBInstance(name)
if err != nil {
log.Printf("E! %s", err)
return
}
tags, err := r.client.ListTagsForResource(&rds.ListTagsForResourceInput{
ResourceName: instance.DBInstanceArn,
})
if err != nil {
log.Printf("E! %s", err)
return
}
for _, tag := range r.Tags {
for _, it := range tags.TagList {
if tag == *it.Key {
metric.AddTag(tag, *it.Value)
break
}
}
}
}
func (r *RDS) getDBInstance(identifier string) (*rds.DBInstance, error) {
output, err := r.client.DescribeDBInstances(&rds.DescribeDBInstancesInput{
DBInstanceIdentifier: aws.String(identifier),
})
if err != nil {
return nil, err
}
if len(output.DBInstances) == 0 {
return nil, fmt.Errorf("DB Instance %s not found", identifier)
}
return output.DBInstances[0], nil
}
func (r *RDS) initRdsClient() error {
credentialConfig := &internalaws.CredentialConfig{
Region: r.Region,
AccessKey: r.AccessKey,
SecretKey: r.SecretKey,
RoleARN: r.RoleARN,
Profile: r.Profile,
Filename: r.Filename,
Token: r.Token,
}
configProvider := credentialConfig.Credentials()
r.client = &CachingRDSClient{
client: rds.New(configProvider),
ttl: r.CacheTTL.Duration,
}
return nil
}

View File

@ -0,0 +1,144 @@
package rds
import (
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)
type mockRdsClient struct {
DbIdentifier string
}
func (m *mockRdsClient) DescribeDBInstances(input *rds.DescribeDBInstancesInput) (*rds.DescribeDBInstancesOutput, error) {
instances := []*rds.DBInstance{}
if *input.DBInstanceIdentifier == m.DbIdentifier {
instances = append(instances, &rds.DBInstance{
DBInstanceIdentifier: input.DBInstanceIdentifier,
Engine: aws.String("mysql"),
EngineVersion: aws.String("5.6"),
DBInstanceArn: aws.String("arn:aws:rds:us-east-1:1111111:db:rds-test-instance"),
})
}
return &rds.DescribeDBInstancesOutput{
DBInstances: instances,
}, nil
}
func (m *mockRdsClient) ListTagsForResource(input *rds.ListTagsForResourceInput) (*rds.ListTagsForResourceOutput, error) {
tags := []*rds.Tag{}
if *input.ResourceName == "arn:aws:rds:us-east-1:1111111:db:rds-test-instance" {
tags = append(tags, &rds.Tag{
Key: aws.String("Environment"),
Value: aws.String("acc-test"),
})
tags = append(tags, &rds.Tag{
Key: aws.String("not-included"),
Value: aws.String("true"),
})
}
return &rds.ListTagsForResourceOutput{
TagList: tags,
}, nil
}
func cache(client RDSClient, ttl time.Duration) RDSClient {
return &CachingRDSClient{
client: client,
ttl: ttl,
}
}
func TestProcess_basic(t *testing.T) {
p := &RDS{
MetricNames: []string{
"cloudwatch_aws_rds",
},
Id: "db_instance_identifier",
Engine: true,
EngineVersion: true,
Tags: []string{"Environment"},
client: cache(&mockRdsClient{
DbIdentifier: "rds-test-instance",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_rds",
map[string]string{
"db_instance_identifier": "rds-test-instance",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "mysql", pm.Tags()["engine"])
assert.Equal(t, "5.6", pm.Tags()["engine_version"])
assert.Equal(t, "acc-test", pm.Tags()["Environment"])
}
func TestProcess_missingTag(t *testing.T) {
p := &RDS{
MetricNames: []string{
"cloudwatch_aws_rds",
},
Id: "db_instance_identifier",
Engine: false,
EngineVersion: false,
Tags: []string{"Name"},
client: cache(&mockRdsClient{
DbIdentifier: "rds-test-instance",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_rds",
map[string]string{
"db_instance_identifier": "rds-test-instance",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["engine"])
assert.Equal(t, "", pm.Tags()["engine_version"])
assert.Equal(t, "", pm.Tags()["Name"])
}
func TestProcess_missingInstance(t *testing.T) {
p := &RDS{
MetricNames: []string{
"cloudwatch_aws_rds",
},
Id: "db_instance_identifier",
Engine: true,
EngineVersion: true,
Tags: []string{"Environment"},
client: cache(&mockRdsClient{
DbIdentifier: "rds-test-instance2",
}, time.Duration(15*time.Minute)),
}
metric, _ := metric.New("cloudwatch_aws_rds",
map[string]string{
"db_instance_identifier": "rds-test-instance",
},
map[string]interface{}{
"count": 1,
},
time.Now())
processedMetrics := p.Apply(metric)
assert.Equal(t, 1, len(processedMetrics))
pm := processedMetrics[0]
assert.Equal(t, "", pm.Tags()["engine"])
assert.Equal(t, "", pm.Tags()["engine_version"])
assert.Equal(t, "", pm.Tags()["Environment"])
}

View File

@ -0,0 +1,30 @@
# SQS Metadata Processor Plugin
The SQS Metadata processor plugin appends additional metadata from AWS to metrics associated with Simple Queue Service.
### Configuration:
```toml
## Annotate metrics from the cloudwatch plugin
[[processors.aws_metadata_sqs]]
## Specify the Amazon Region to operate in
region = "us-east-1"
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Process metrics from a Cloudwatch input plugin configured for the AWS/SQS namespace
## Default is "cloudwatch_aws_sqs"
#metric_names = ["cloudwatch_aws_sqs"]
## Metric tag that contains the SQS queue name
#id = "queue_name"
```
### Tags:
The plugin applies the following tags to metrics when configured:
* (none) - there is currently no support metadata to extract for SQS queues.

View File

@ -0,0 +1,123 @@
package sqs
import (
"time"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/aws/utils"
)
type (
SQS struct {
Region string `toml:"region"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
RoleARN string `toml:"role_arn"`
Profile string `toml:"profile"`
Filename string `toml:"shared_credential_file"`
Token string `toml:"token"`
CacheTTL internal.Duration `toml:"cache_ttl"`
MetricNames []string `toml:"metric_names"`
Id string `toml:"id"`
client SQSClient
}
SQSClient interface {
}
CachingSQSClient struct {
client SQSClient
ttl time.Duration
fetched time.Time
}
)
var sampleConfig = `
## Amazon Region
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## 1) Assumed credentials via STS if role_arn is specified
## 2) explicit credentials from 'access_key' and 'secret_key'
## 3) shared profile from 'profile'
## 4) environment variables
## 5) shared credentials file
## 6) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#profile = ""
#shared_credential_file = ""
## Specify the TTL for metadata lookups
#cache_ttl = "1h"
## Specify the metric names to annotate with SQS metadata
## By default is configured for "cloudwatch_aws_sqs", the default output from the Cloudwatch input plugin
#metric_names = [ "cloudwatch_aws_sqs" ]
## Specify the metric tag which contains the SQS queue name
## By default is configured for "queue_name", the default from Cloudwatch input plugin when using the QueueName dimension
#id = "queue_name"
`
func (s *SQS) SampleConfig() string {
return sampleConfig
}
func (s *SQS) Description() string {
return "Annotate metrics with AWS SQS metadata"
}
func (s *SQS) Apply(in ...telegraf.Metric) []telegraf.Metric {
if s.client == nil {
s.initSqsClient()
}
for _, metric := range in {
if utils.IsSelected(metric, s.MetricNames) {
s.annotate(metric)
}
}
return in
}
func init() {
processors.Add("aws_metadata_sqs", func() telegraf.Processor {
return &SQS{
CacheTTL: internal.Duration{Duration: time.Duration(1 * time.Hour)},
MetricNames: []string{
"cloudwatch_aws_sqs",
},
Id: "queue_name",
}
})
}
func (s *SQS) annotate(metric telegraf.Metric) {
}
func (s *SQS) initSqsClient() error {
credentialConfig := &internalaws.CredentialConfig{
Region: s.Region,
AccessKey: s.AccessKey,
SecretKey: s.SecretKey,
RoleARN: s.RoleARN,
Profile: s.Profile,
Filename: s.Filename,
Token: s.Token,
}
configProvider := credentialConfig.Credentials()
s.client = &CachingSQSClient{
client: sqs.New(configProvider),
ttl: s.CacheTTL.Duration,
}
return nil
}

View File

@ -0,0 +1 @@
package sqs

View File

@ -0,0 +1,12 @@
package utils
import "github.com/influxdata/telegraf"
func IsSelected(metric telegraf.Metric, configuredMetrics []string) bool {
for _, m := range configuredMetrics {
if m == metric.Name() {
return true
}
}
return false
}