From 158d82fdb95ca178dc4ac1efeb9d7d60fb2396fb Mon Sep 17 00:00:00 2001 From: James Lamb Date: Thu, 10 Dec 2015 13:07:55 +1100 Subject: [PATCH] add amazon kinesis as an output plugin --- outputs/all/all.go | 1 + outputs/kinesis/README.md | 43 ++++++++ outputs/kinesis/kinesis_output.go | 134 +++++++++++++++++++++++++ outputs/kinesis/kinesis_output_test.go | 30 ++++++ 4 files changed, 208 insertions(+) create mode 100644 outputs/kinesis/README.md create mode 100644 outputs/kinesis/kinesis_output.go create mode 100644 outputs/kinesis/kinesis_output_test.go diff --git a/outputs/all/all.go b/outputs/all/all.go index 2c00f43f9..08ebf2549 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/kinesis" _ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/nsq" diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md new file mode 100644 index 000000000..8ffbf4445 --- /dev/null +++ b/outputs/kinesis/README.md @@ -0,0 +1,43 @@ +## Amazon Kinesis Output for Telegraf + +This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points +in one Put request to Kinesis. This should save the number of API requests by a considerable level. + +## About Kinesis + +This is not the place to document all of the various Kinesis terms however it +maybe useful for users to review Amazons official docuementation which is available +[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html). + +## Config + +For this output plugin to function correctly the following variables must be configured. + +* region +* streamname +* partitionkey + +### region + +The region is the Amazon region that you wish to connect to. Examples include but are not limited to +* us-west-1 +* us-west-2 +* us-east-1 +* ap-southeast-1 +* ap-southeast-2 + +### streamname + +The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to +note that the stream *MUST* be pre-configured for this plugin to function correctly. + +### partitionkey + +This is used to group data within a stream. Currently this plugin only supports a single partitionkey which means +that data will be entirely sent through a single Shard and Partition from a single host. If you have to scale out the +kinesis throughput using a different partition key on different hosts or host groups might be a workable solution. + + +## todo + +Check if the stream exists so that we have a graceful exit. \ No newline at end of file diff --git a/outputs/kinesis/kinesis_output.go b/outputs/kinesis/kinesis_output.go new file mode 100644 index 000000000..ca442bc97 --- /dev/null +++ b/outputs/kinesis/kinesis_output.go @@ -0,0 +1,134 @@ +package kinesis_output + +import ( + "errors" + "log" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/outputs" +) + +type KinesisOutput struct { + Region string `toml:"region"` + StreamName string `toml:"streamname"` + PartitionKey string `toml:"partitionkey"` + Format string `toml:"format"` + Debug bool `toml:"debug"` + svc *kinesis.Kinesis +} + +var sampleConfig = ` + # Amazon REGION of kinesis endpoint. + # Most AWS services have a region specific endpoint this will be used by + # telegraf to output data. + # Authentication is provided by an IAMS role, SharedCredentials or Environment + # Variables. + region = "ap-southeast-2" + # Kinesis StreamName must exist prior to starting telegraf. + streamname = "StreamName" + # PartitionKey as used for sharding data. + partitionkey = "PartitionKey" + # format of the Data payload in the kinesis PutRecord, supported + # String and Custom. + format = "string" + # debug will show upstream aws messages. + debug = false +` + +func (k *KinesisOutput) SampleConfig() string { + return sampleConfig +} + +func (k *KinesisOutput) Description() string { + return "Configuration for the AWS Kinesis output." +} + +func (k *KinesisOutput) Connect() error { + // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using + // environment variables, and then Shared Credentials. + if k.Debug { + log.Printf("Establishing a connection to Kinesis in %+v", k.Region) + } + Config := &aws.Config{ + Region: aws.String(k.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + svc := kinesis.New(session.New(Config)) + + k.svc = svc + return nil +} + +func (k *KinesisOutput) Close() error { + return errors.New("Error") +} + +func formatmetric(k *KinesisOutput, point *client.Point) (string, error) { + if k.Format == "string" { + return point.String(), nil + } else { + m := fmt.Sprintf("%+v,%+v,%+v %+v", + point.Name(), + point.Tags(), + point.String(), + point.Time(), + ) + return m, nil + } +} + +func (k *KinesisOutput) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + + r := []*kinesis.PutRecordsRequestEntry{} + + for _, p := range points { + metric, _ := formatmetric(k, p) + d := kinesis.PutRecordsRequestEntry{ + Data: []byte(metric), + PartitionKey: aws.String(k.PartitionKey), + } + r = append(r, &d) + } + + payload := &kinesis.PutRecordsInput{ + Records: r, + StreamName: aws.String(k.StreamName), + } + + if k.Debug { + resp, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + } + log.Printf("%+v \n", resp) + } else { + _, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("Unable to write to Kinesis : %+v \n", err.Error()) + } + } + + return nil +} + +func init() { + outputs.Add("kinesis_output", func() outputs.Output { + return &KinesisOutput{} + }) +} diff --git a/outputs/kinesis/kinesis_output_test.go b/outputs/kinesis/kinesis_output_test.go new file mode 100644 index 000000000..1e6dd3727 --- /dev/null +++ b/outputs/kinesis/kinesis_output_test.go @@ -0,0 +1,30 @@ +package kinesis_output + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Verify that we can connect kinesis endpoint. This test allows for a chain of credential + // so that various authentication methods can pass depending on the system that executes. + Config := &aws.Config{ + Region: aws.String(k.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + err := kinesis.New(session.New(Config)) + + + require.NoError(t, err) +}